2 Copyright 2015 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
23 "k8s.io/apimachinery/pkg/util/clock"
26 type Interface interface {
29 Get() (item interface{}, shutdown bool)
30 Done(item interface{})
35 // New constructs a new work queue (see the package comment).
40 func NewNamed(name string) *Type {
41 rc := clock.RealClock{}
44 globalMetricsFactory.newQueueMetrics(name, rc),
45 defaultUnfinishedWorkUpdatePeriod,
49 func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
54 cond: sync.NewCond(&sync.Mutex{}),
56 unfinishedWorkUpdatePeriod: updatePeriod,
58 go t.updateUnfinishedWorkLoop()
62 const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
64 // Type is a work queue (see the package comment).
66 // queue defines the order in which we will work on items. Every
67 // element of queue should be in the dirty set and not in the
71 // dirty defines all of the items that need to be processed.
74 // Things that are currently being processed are in the processing set.
75 // These things may be simultaneously in the dirty set. When we finish
76 // processing something and remove it from this set, we'll check if
77 // it's in the dirty set, and if so, add it to the queue.
86 unfinishedWorkUpdatePeriod time.Duration
94 func (s set) has(item t) bool {
99 func (s set) insert(item t) {
103 func (s set) delete(item t) {
107 // Add marks item as needing processing.
108 func (q *Type) Add(item interface{}) {
110 defer q.cond.L.Unlock()
114 if q.dirty.has(item) {
121 if q.processing.has(item) {
125 q.queue = append(q.queue, item)
129 // Len returns the current queue length, for informational purposes only. You
130 // shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
131 // value, that can't be synchronized properly.
132 func (q *Type) Len() int {
134 defer q.cond.L.Unlock()
138 // Get blocks until it can return an item to be processed. If shutdown = true,
139 // the caller should end their goroutine. You must call Done with item when you
140 // have finished processing it.
141 func (q *Type) Get() (item interface{}, shutdown bool) {
143 defer q.cond.L.Unlock()
144 for len(q.queue) == 0 && !q.shuttingDown {
147 if len(q.queue) == 0 {
148 // We must be shutting down.
152 item, q.queue = q.queue[0], q.queue[1:]
156 q.processing.insert(item)
162 // Done marks item as done processing, and if it has been marked as dirty again
163 // while it was being processed, it will be re-added to the queue for
165 func (q *Type) Done(item interface{}) {
167 defer q.cond.L.Unlock()
171 q.processing.delete(item)
172 if q.dirty.has(item) {
173 q.queue = append(q.queue, item)
178 // ShutDown will cause q to ignore all new items added to it. As soon as the
179 // worker goroutines have drained the existing items in the queue, they will be
180 // instructed to exit.
181 func (q *Type) ShutDown() {
183 defer q.cond.L.Unlock()
184 q.shuttingDown = true
188 func (q *Type) ShuttingDown() bool {
190 defer q.cond.L.Unlock()
192 return q.shuttingDown
195 func (q *Type) updateUnfinishedWorkLoop() {
196 t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
201 defer q.cond.L.Unlock()
203 q.metrics.updateUnfinishedWork()