Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / util / workqueue / queue.go
1 /*
2 Copyright 2015 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package workqueue
18
19 import (
20         "sync"
21         "time"
22
23         "k8s.io/apimachinery/pkg/util/clock"
24 )
25
26 type Interface interface {
27         Add(item interface{})
28         Len() int
29         Get() (item interface{}, shutdown bool)
30         Done(item interface{})
31         ShutDown()
32         ShuttingDown() bool
33 }
34
35 // New constructs a new work queue (see the package comment).
36 func New() *Type {
37         return NewNamed("")
38 }
39
40 func NewNamed(name string) *Type {
41         rc := clock.RealClock{}
42         return newQueue(
43                 rc,
44                 globalMetricsFactory.newQueueMetrics(name, rc),
45                 defaultUnfinishedWorkUpdatePeriod,
46         )
47 }
48
49 func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
50         t := &Type{
51                 clock:                      c,
52                 dirty:                      set{},
53                 processing:                 set{},
54                 cond:                       sync.NewCond(&sync.Mutex{}),
55                 metrics:                    metrics,
56                 unfinishedWorkUpdatePeriod: updatePeriod,
57         }
58         go t.updateUnfinishedWorkLoop()
59         return t
60 }
61
62 const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
63
64 // Type is a work queue (see the package comment).
65 type Type struct {
66         // queue defines the order in which we will work on items. Every
67         // element of queue should be in the dirty set and not in the
68         // processing set.
69         queue []t
70
71         // dirty defines all of the items that need to be processed.
72         dirty set
73
74         // Things that are currently being processed are in the processing set.
75         // These things may be simultaneously in the dirty set. When we finish
76         // processing something and remove it from this set, we'll check if
77         // it's in the dirty set, and if so, add it to the queue.
78         processing set
79
80         cond *sync.Cond
81
82         shuttingDown bool
83
84         metrics queueMetrics
85
86         unfinishedWorkUpdatePeriod time.Duration
87         clock                      clock.Clock
88 }
89
90 type empty struct{}
91 type t interface{}
92 type set map[t]empty
93
94 func (s set) has(item t) bool {
95         _, exists := s[item]
96         return exists
97 }
98
99 func (s set) insert(item t) {
100         s[item] = empty{}
101 }
102
103 func (s set) delete(item t) {
104         delete(s, item)
105 }
106
107 // Add marks item as needing processing.
108 func (q *Type) Add(item interface{}) {
109         q.cond.L.Lock()
110         defer q.cond.L.Unlock()
111         if q.shuttingDown {
112                 return
113         }
114         if q.dirty.has(item) {
115                 return
116         }
117
118         q.metrics.add(item)
119
120         q.dirty.insert(item)
121         if q.processing.has(item) {
122                 return
123         }
124
125         q.queue = append(q.queue, item)
126         q.cond.Signal()
127 }
128
129 // Len returns the current queue length, for informational purposes only. You
130 // shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
131 // value, that can't be synchronized properly.
132 func (q *Type) Len() int {
133         q.cond.L.Lock()
134         defer q.cond.L.Unlock()
135         return len(q.queue)
136 }
137
138 // Get blocks until it can return an item to be processed. If shutdown = true,
139 // the caller should end their goroutine. You must call Done with item when you
140 // have finished processing it.
141 func (q *Type) Get() (item interface{}, shutdown bool) {
142         q.cond.L.Lock()
143         defer q.cond.L.Unlock()
144         for len(q.queue) == 0 && !q.shuttingDown {
145                 q.cond.Wait()
146         }
147         if len(q.queue) == 0 {
148                 // We must be shutting down.
149                 return nil, true
150         }
151
152         item, q.queue = q.queue[0], q.queue[1:]
153
154         q.metrics.get(item)
155
156         q.processing.insert(item)
157         q.dirty.delete(item)
158
159         return item, false
160 }
161
162 // Done marks item as done processing, and if it has been marked as dirty again
163 // while it was being processed, it will be re-added to the queue for
164 // re-processing.
165 func (q *Type) Done(item interface{}) {
166         q.cond.L.Lock()
167         defer q.cond.L.Unlock()
168
169         q.metrics.done(item)
170
171         q.processing.delete(item)
172         if q.dirty.has(item) {
173                 q.queue = append(q.queue, item)
174                 q.cond.Signal()
175         }
176 }
177
178 // ShutDown will cause q to ignore all new items added to it. As soon as the
179 // worker goroutines have drained the existing items in the queue, they will be
180 // instructed to exit.
181 func (q *Type) ShutDown() {
182         q.cond.L.Lock()
183         defer q.cond.L.Unlock()
184         q.shuttingDown = true
185         q.cond.Broadcast()
186 }
187
188 func (q *Type) ShuttingDown() bool {
189         q.cond.L.Lock()
190         defer q.cond.L.Unlock()
191
192         return q.shuttingDown
193 }
194
195 func (q *Type) updateUnfinishedWorkLoop() {
196         t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
197         defer t.Stop()
198         for range t.C() {
199                 if !func() bool {
200                         q.cond.L.Lock()
201                         defer q.cond.L.Unlock()
202                         if !q.shuttingDown {
203                                 q.metrics.updateUnfinishedWork()
204                                 return true
205                         }
206                         return false
207
208                 }() {
209                         return
210                 }
211         }
212 }