Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / util / workqueue / delaying_queue.go
1 /*
2 Copyright 2016 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package workqueue
18
19 import (
20         "container/heap"
21         "time"
22
23         "k8s.io/apimachinery/pkg/util/clock"
24         utilruntime "k8s.io/apimachinery/pkg/util/runtime"
25 )
26
27 // DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
28 // requeue items after failures without ending up in a hot-loop.
29 type DelayingInterface interface {
30         Interface
31         // AddAfter adds an item to the workqueue after the indicated duration has passed
32         AddAfter(item interface{}, duration time.Duration)
33 }
34
35 // NewDelayingQueue constructs a new workqueue with delayed queuing ability
36 func NewDelayingQueue() DelayingInterface {
37         return newDelayingQueue(clock.RealClock{}, "")
38 }
39
40 func NewNamedDelayingQueue(name string) DelayingInterface {
41         return newDelayingQueue(clock.RealClock{}, name)
42 }
43
44 func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
45         ret := &delayingType{
46                 Interface:       NewNamed(name),
47                 clock:           clock,
48                 heartbeat:       clock.NewTicker(maxWait),
49                 stopCh:          make(chan struct{}),
50                 waitingForAddCh: make(chan *waitFor, 1000),
51                 metrics:         newRetryMetrics(name),
52         }
53
54         go ret.waitingLoop()
55
56         return ret
57 }
58
59 // delayingType wraps an Interface and provides delayed re-enquing
60 type delayingType struct {
61         Interface
62
63         // clock tracks time for delayed firing
64         clock clock.Clock
65
66         // stopCh lets us signal a shutdown to the waiting loop
67         stopCh chan struct{}
68
69         // heartbeat ensures we wait no more than maxWait before firing
70         heartbeat clock.Ticker
71
72         // waitingForAddCh is a buffered channel that feeds waitingForAdd
73         waitingForAddCh chan *waitFor
74
75         // metrics counts the number of retries
76         metrics retryMetrics
77 }
78
79 // waitFor holds the data to add and the time it should be added
80 type waitFor struct {
81         data    t
82         readyAt time.Time
83         // index in the priority queue (heap)
84         index int
85 }
86
87 // waitForPriorityQueue implements a priority queue for waitFor items.
88 //
89 // waitForPriorityQueue implements heap.Interface. The item occurring next in
90 // time (i.e., the item with the smallest readyAt) is at the root (index 0).
91 // Peek returns this minimum item at index 0. Pop returns the minimum item after
92 // it has been removed from the queue and placed at index Len()-1 by
93 // container/heap. Push adds an item at index Len(), and container/heap
94 // percolates it into the correct location.
95 type waitForPriorityQueue []*waitFor
96
97 func (pq waitForPriorityQueue) Len() int {
98         return len(pq)
99 }
100 func (pq waitForPriorityQueue) Less(i, j int) bool {
101         return pq[i].readyAt.Before(pq[j].readyAt)
102 }
103 func (pq waitForPriorityQueue) Swap(i, j int) {
104         pq[i], pq[j] = pq[j], pq[i]
105         pq[i].index = i
106         pq[j].index = j
107 }
108
109 // Push adds an item to the queue. Push should not be called directly; instead,
110 // use `heap.Push`.
111 func (pq *waitForPriorityQueue) Push(x interface{}) {
112         n := len(*pq)
113         item := x.(*waitFor)
114         item.index = n
115         *pq = append(*pq, item)
116 }
117
118 // Pop removes an item from the queue. Pop should not be called directly;
119 // instead, use `heap.Pop`.
120 func (pq *waitForPriorityQueue) Pop() interface{} {
121         n := len(*pq)
122         item := (*pq)[n-1]
123         item.index = -1
124         *pq = (*pq)[0:(n - 1)]
125         return item
126 }
127
128 // Peek returns the item at the beginning of the queue, without removing the
129 // item or otherwise mutating the queue. It is safe to call directly.
130 func (pq waitForPriorityQueue) Peek() interface{} {
131         return pq[0]
132 }
133
134 // ShutDown gives a way to shut off this queue
135 func (q *delayingType) ShutDown() {
136         q.Interface.ShutDown()
137         close(q.stopCh)
138         q.heartbeat.Stop()
139 }
140
141 // AddAfter adds the given item to the work queue after the given delay
142 func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
143         // don't add if we're already shutting down
144         if q.ShuttingDown() {
145                 return
146         }
147
148         q.metrics.retry()
149
150         // immediately add things with no delay
151         if duration <= 0 {
152                 q.Add(item)
153                 return
154         }
155
156         select {
157         case <-q.stopCh:
158                 // unblock if ShutDown() is called
159         case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
160         }
161 }
162
163 // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
164 // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
165 // expired item sitting for more than 10 seconds.
166 const maxWait = 10 * time.Second
167
168 // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
169 func (q *delayingType) waitingLoop() {
170         defer utilruntime.HandleCrash()
171
172         // Make a placeholder channel to use when there are no items in our list
173         never := make(<-chan time.Time)
174
175         waitingForQueue := &waitForPriorityQueue{}
176         heap.Init(waitingForQueue)
177
178         waitingEntryByData := map[t]*waitFor{}
179
180         for {
181                 if q.Interface.ShuttingDown() {
182                         return
183                 }
184
185                 now := q.clock.Now()
186
187                 // Add ready entries
188                 for waitingForQueue.Len() > 0 {
189                         entry := waitingForQueue.Peek().(*waitFor)
190                         if entry.readyAt.After(now) {
191                                 break
192                         }
193
194                         entry = heap.Pop(waitingForQueue).(*waitFor)
195                         q.Add(entry.data)
196                         delete(waitingEntryByData, entry.data)
197                 }
198
199                 // Set up a wait for the first item's readyAt (if one exists)
200                 nextReadyAt := never
201                 if waitingForQueue.Len() > 0 {
202                         entry := waitingForQueue.Peek().(*waitFor)
203                         nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
204                 }
205
206                 select {
207                 case <-q.stopCh:
208                         return
209
210                 case <-q.heartbeat.C():
211                         // continue the loop, which will add ready items
212
213                 case <-nextReadyAt:
214                         // continue the loop, which will add ready items
215
216                 case waitEntry := <-q.waitingForAddCh:
217                         if waitEntry.readyAt.After(q.clock.Now()) {
218                                 insert(waitingForQueue, waitingEntryByData, waitEntry)
219                         } else {
220                                 q.Add(waitEntry.data)
221                         }
222
223                         drained := false
224                         for !drained {
225                                 select {
226                                 case waitEntry := <-q.waitingForAddCh:
227                                         if waitEntry.readyAt.After(q.clock.Now()) {
228                                                 insert(waitingForQueue, waitingEntryByData, waitEntry)
229                                         } else {
230                                                 q.Add(waitEntry.data)
231                                         }
232                                 default:
233                                         drained = true
234                                 }
235                         }
236                 }
237         }
238 }
239
240 // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
241 func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
242         // if the entry already exists, update the time only if it would cause the item to be queued sooner
243         existing, exists := knownEntries[entry.data]
244         if exists {
245                 if existing.readyAt.After(entry.readyAt) {
246                         existing.readyAt = entry.readyAt
247                         heap.Fix(q, existing.index)
248                 }
249
250                 return
251         }
252
253         heap.Push(q, entry)
254         knownEntries[entry.data] = entry
255 }