2 Copyright 2016 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
23 "k8s.io/apimachinery/pkg/util/clock"
24 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27 // DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
28 // requeue items after failures without ending up in a hot-loop.
29 type DelayingInterface interface {
31 // AddAfter adds an item to the workqueue after the indicated duration has passed
32 AddAfter(item interface{}, duration time.Duration)
35 // NewDelayingQueue constructs a new workqueue with delayed queuing ability
36 func NewDelayingQueue() DelayingInterface {
37 return newDelayingQueue(clock.RealClock{}, "")
40 func NewNamedDelayingQueue(name string) DelayingInterface {
41 return newDelayingQueue(clock.RealClock{}, name)
44 func newDelayingQueue(clock clock.Clock, name string) DelayingInterface {
46 Interface: NewNamed(name),
48 heartbeat: clock.NewTicker(maxWait),
49 stopCh: make(chan struct{}),
50 waitingForAddCh: make(chan *waitFor, 1000),
51 metrics: newRetryMetrics(name),
59 // delayingType wraps an Interface and provides delayed re-enquing
60 type delayingType struct {
63 // clock tracks time for delayed firing
66 // stopCh lets us signal a shutdown to the waiting loop
69 // heartbeat ensures we wait no more than maxWait before firing
70 heartbeat clock.Ticker
72 // waitingForAddCh is a buffered channel that feeds waitingForAdd
73 waitingForAddCh chan *waitFor
75 // metrics counts the number of retries
79 // waitFor holds the data to add and the time it should be added
83 // index in the priority queue (heap)
87 // waitForPriorityQueue implements a priority queue for waitFor items.
89 // waitForPriorityQueue implements heap.Interface. The item occurring next in
90 // time (i.e., the item with the smallest readyAt) is at the root (index 0).
91 // Peek returns this minimum item at index 0. Pop returns the minimum item after
92 // it has been removed from the queue and placed at index Len()-1 by
93 // container/heap. Push adds an item at index Len(), and container/heap
94 // percolates it into the correct location.
95 type waitForPriorityQueue []*waitFor
97 func (pq waitForPriorityQueue) Len() int {
100 func (pq waitForPriorityQueue) Less(i, j int) bool {
101 return pq[i].readyAt.Before(pq[j].readyAt)
103 func (pq waitForPriorityQueue) Swap(i, j int) {
104 pq[i], pq[j] = pq[j], pq[i]
109 // Push adds an item to the queue. Push should not be called directly; instead,
111 func (pq *waitForPriorityQueue) Push(x interface{}) {
115 *pq = append(*pq, item)
118 // Pop removes an item from the queue. Pop should not be called directly;
119 // instead, use `heap.Pop`.
120 func (pq *waitForPriorityQueue) Pop() interface{} {
124 *pq = (*pq)[0:(n - 1)]
128 // Peek returns the item at the beginning of the queue, without removing the
129 // item or otherwise mutating the queue. It is safe to call directly.
130 func (pq waitForPriorityQueue) Peek() interface{} {
134 // ShutDown gives a way to shut off this queue
135 func (q *delayingType) ShutDown() {
136 q.Interface.ShutDown()
141 // AddAfter adds the given item to the work queue after the given delay
142 func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
143 // don't add if we're already shutting down
144 if q.ShuttingDown() {
150 // immediately add things with no delay
158 // unblock if ShutDown() is called
159 case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
163 // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
164 // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
165 // expired item sitting for more than 10 seconds.
166 const maxWait = 10 * time.Second
168 // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
169 func (q *delayingType) waitingLoop() {
170 defer utilruntime.HandleCrash()
172 // Make a placeholder channel to use when there are no items in our list
173 never := make(<-chan time.Time)
175 waitingForQueue := &waitForPriorityQueue{}
176 heap.Init(waitingForQueue)
178 waitingEntryByData := map[t]*waitFor{}
181 if q.Interface.ShuttingDown() {
188 for waitingForQueue.Len() > 0 {
189 entry := waitingForQueue.Peek().(*waitFor)
190 if entry.readyAt.After(now) {
194 entry = heap.Pop(waitingForQueue).(*waitFor)
196 delete(waitingEntryByData, entry.data)
199 // Set up a wait for the first item's readyAt (if one exists)
201 if waitingForQueue.Len() > 0 {
202 entry := waitingForQueue.Peek().(*waitFor)
203 nextReadyAt = q.clock.After(entry.readyAt.Sub(now))
210 case <-q.heartbeat.C():
211 // continue the loop, which will add ready items
214 // continue the loop, which will add ready items
216 case waitEntry := <-q.waitingForAddCh:
217 if waitEntry.readyAt.After(q.clock.Now()) {
218 insert(waitingForQueue, waitingEntryByData, waitEntry)
220 q.Add(waitEntry.data)
226 case waitEntry := <-q.waitingForAddCh:
227 if waitEntry.readyAt.After(q.clock.Now()) {
228 insert(waitingForQueue, waitingEntryByData, waitEntry)
230 q.Add(waitEntry.data)
240 // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
241 func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
242 // if the entry already exists, update the time only if it would cause the item to be queued sooner
243 existing, exists := knownEntries[entry.data]
245 if existing.readyAt.After(entry.readyAt) {
246 existing.readyAt = entry.readyAt
247 heap.Fix(q, existing.index)
254 knownEntries[entry.data] = entry