Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / tools / cache / fifo.go
1 /*
2 Copyright 2014 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package cache
18
19 import (
20         "errors"
21         "sync"
22
23         "k8s.io/apimachinery/pkg/util/sets"
24 )
25
26 // PopProcessFunc is passed to Pop() method of Queue interface.
27 // It is supposed to process the element popped from the queue.
28 type PopProcessFunc func(interface{}) error
29
30 // ErrRequeue may be returned by a PopProcessFunc to safely requeue
31 // the current item. The value of Err will be returned from Pop.
32 type ErrRequeue struct {
33         // Err is returned by the Pop function
34         Err error
35 }
36
37 var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
38
39 func (e ErrRequeue) Error() string {
40         if e.Err == nil {
41                 return "the popped item should be requeued without returning an error"
42         }
43         return e.Err.Error()
44 }
45
46 // Queue is exactly like a Store, but has a Pop() method too.
47 type Queue interface {
48         Store
49
50         // Pop blocks until it has something to process.
51         // It returns the object that was process and the result of processing.
52         // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
53         // should be requeued before releasing the lock on the queue.
54         Pop(PopProcessFunc) (interface{}, error)
55
56         // AddIfNotPresent adds a value previously
57         // returned by Pop back into the queue as long
58         // as nothing else (presumably more recent)
59         // has since been added.
60         AddIfNotPresent(interface{}) error
61
62         // HasSynced returns true if the first batch of items has been popped
63         HasSynced() bool
64
65         // Close queue
66         Close()
67 }
68
69 // Helper function for popping from Queue.
70 // WARNING: Do NOT use this function in non-test code to avoid races
71 // unless you really really really really know what you are doing.
72 func Pop(queue Queue) interface{} {
73         var result interface{}
74         queue.Pop(func(obj interface{}) error {
75                 result = obj
76                 return nil
77         })
78         return result
79 }
80
81 // FIFO receives adds and updates from a Reflector, and puts them in a queue for
82 // FIFO order processing. If multiple adds/updates of a single item happen while
83 // an item is in the queue before it has been processed, it will only be
84 // processed once, and when it is processed, the most recent version will be
85 // processed. This can't be done with a channel.
86 //
87 // FIFO solves this use case:
88 //  * You want to process every object (exactly) once.
89 //  * You want to process the most recent version of the object when you process it.
90 //  * You do not want to process deleted objects, they should be removed from the queue.
91 //  * You do not want to periodically reprocess objects.
92 // Compare with DeltaFIFO for other use cases.
93 type FIFO struct {
94         lock sync.RWMutex
95         cond sync.Cond
96         // We depend on the property that items in the set are in the queue and vice versa.
97         items map[string]interface{}
98         queue []string
99
100         // populated is true if the first batch of items inserted by Replace() has been populated
101         // or Delete/Add/Update was called first.
102         populated bool
103         // initialPopulationCount is the number of items inserted by the first call of Replace()
104         initialPopulationCount int
105
106         // keyFunc is used to make the key used for queued item insertion and retrieval, and
107         // should be deterministic.
108         keyFunc KeyFunc
109
110         // Indication the queue is closed.
111         // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
112         // Currently, not used to gate any of CRED operations.
113         closed     bool
114         closedLock sync.Mutex
115 }
116
117 var (
118         _ = Queue(&FIFO{}) // FIFO is a Queue
119 )
120
121 // Close the queue.
122 func (f *FIFO) Close() {
123         f.closedLock.Lock()
124         defer f.closedLock.Unlock()
125         f.closed = true
126         f.cond.Broadcast()
127 }
128
129 // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
130 // or an Update called first but the first batch of items inserted by Replace() has been popped
131 func (f *FIFO) HasSynced() bool {
132         f.lock.Lock()
133         defer f.lock.Unlock()
134         return f.populated && f.initialPopulationCount == 0
135 }
136
137 // Add inserts an item, and puts it in the queue. The item is only enqueued
138 // if it doesn't already exist in the set.
139 func (f *FIFO) Add(obj interface{}) error {
140         id, err := f.keyFunc(obj)
141         if err != nil {
142                 return KeyError{obj, err}
143         }
144         f.lock.Lock()
145         defer f.lock.Unlock()
146         f.populated = true
147         if _, exists := f.items[id]; !exists {
148                 f.queue = append(f.queue, id)
149         }
150         f.items[id] = obj
151         f.cond.Broadcast()
152         return nil
153 }
154
155 // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
156 // present in the set, it is neither enqueued nor added to the set.
157 //
158 // This is useful in a single producer/consumer scenario so that the consumer can
159 // safely retry items without contending with the producer and potentially enqueueing
160 // stale items.
161 func (f *FIFO) AddIfNotPresent(obj interface{}) error {
162         id, err := f.keyFunc(obj)
163         if err != nil {
164                 return KeyError{obj, err}
165         }
166         f.lock.Lock()
167         defer f.lock.Unlock()
168         f.addIfNotPresent(id, obj)
169         return nil
170 }
171
172 // addIfNotPresent assumes the fifo lock is already held and adds the provided
173 // item to the queue under id if it does not already exist.
174 func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
175         f.populated = true
176         if _, exists := f.items[id]; exists {
177                 return
178         }
179
180         f.queue = append(f.queue, id)
181         f.items[id] = obj
182         f.cond.Broadcast()
183 }
184
185 // Update is the same as Add in this implementation.
186 func (f *FIFO) Update(obj interface{}) error {
187         return f.Add(obj)
188 }
189
190 // Delete removes an item. It doesn't add it to the queue, because
191 // this implementation assumes the consumer only cares about the objects,
192 // not the order in which they were created/added.
193 func (f *FIFO) Delete(obj interface{}) error {
194         id, err := f.keyFunc(obj)
195         if err != nil {
196                 return KeyError{obj, err}
197         }
198         f.lock.Lock()
199         defer f.lock.Unlock()
200         f.populated = true
201         delete(f.items, id)
202         return err
203 }
204
205 // List returns a list of all the items.
206 func (f *FIFO) List() []interface{} {
207         f.lock.RLock()
208         defer f.lock.RUnlock()
209         list := make([]interface{}, 0, len(f.items))
210         for _, item := range f.items {
211                 list = append(list, item)
212         }
213         return list
214 }
215
216 // ListKeys returns a list of all the keys of the objects currently
217 // in the FIFO.
218 func (f *FIFO) ListKeys() []string {
219         f.lock.RLock()
220         defer f.lock.RUnlock()
221         list := make([]string, 0, len(f.items))
222         for key := range f.items {
223                 list = append(list, key)
224         }
225         return list
226 }
227
228 // Get returns the requested item, or sets exists=false.
229 func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
230         key, err := f.keyFunc(obj)
231         if err != nil {
232                 return nil, false, KeyError{obj, err}
233         }
234         return f.GetByKey(key)
235 }
236
237 // GetByKey returns the requested item, or sets exists=false.
238 func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
239         f.lock.RLock()
240         defer f.lock.RUnlock()
241         item, exists = f.items[key]
242         return item, exists, nil
243 }
244
245 // Checks if the queue is closed
246 func (f *FIFO) IsClosed() bool {
247         f.closedLock.Lock()
248         defer f.closedLock.Unlock()
249         if f.closed {
250                 return true
251         }
252         return false
253 }
254
255 // Pop waits until an item is ready and processes it. If multiple items are
256 // ready, they are returned in the order in which they were added/updated.
257 // The item is removed from the queue (and the store) before it is processed,
258 // so if you don't successfully process it, it should be added back with
259 // AddIfNotPresent(). process function is called under lock, so it is safe
260 // update data structures in it that need to be in sync with the queue.
261 func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
262         f.lock.Lock()
263         defer f.lock.Unlock()
264         for {
265                 for len(f.queue) == 0 {
266                         // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
267                         // When Close() is called, the f.closed is set and the condition is broadcasted.
268                         // Which causes this loop to continue and return from the Pop().
269                         if f.IsClosed() {
270                                 return nil, FIFOClosedError
271                         }
272
273                         f.cond.Wait()
274                 }
275                 id := f.queue[0]
276                 f.queue = f.queue[1:]
277                 if f.initialPopulationCount > 0 {
278                         f.initialPopulationCount--
279                 }
280                 item, ok := f.items[id]
281                 if !ok {
282                         // Item may have been deleted subsequently.
283                         continue
284                 }
285                 delete(f.items, id)
286                 err := process(item)
287                 if e, ok := err.(ErrRequeue); ok {
288                         f.addIfNotPresent(id, item)
289                         err = e.Err
290                 }
291                 return item, err
292         }
293 }
294
295 // Replace will delete the contents of 'f', using instead the given map.
296 // 'f' takes ownership of the map, you should not reference the map again
297 // after calling this function. f's queue is reset, too; upon return, it
298 // will contain the items in the map, in no particular order.
299 func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
300         items := make(map[string]interface{}, len(list))
301         for _, item := range list {
302                 key, err := f.keyFunc(item)
303                 if err != nil {
304                         return KeyError{item, err}
305                 }
306                 items[key] = item
307         }
308
309         f.lock.Lock()
310         defer f.lock.Unlock()
311
312         if !f.populated {
313                 f.populated = true
314                 f.initialPopulationCount = len(items)
315         }
316
317         f.items = items
318         f.queue = f.queue[:0]
319         for id := range items {
320                 f.queue = append(f.queue, id)
321         }
322         if len(f.queue) > 0 {
323                 f.cond.Broadcast()
324         }
325         return nil
326 }
327
328 // Resync will touch all objects to put them into the processing queue
329 func (f *FIFO) Resync() error {
330         f.lock.Lock()
331         defer f.lock.Unlock()
332
333         inQueue := sets.NewString()
334         for _, id := range f.queue {
335                 inQueue.Insert(id)
336         }
337         for id := range f.items {
338                 if !inQueue.Has(id) {
339                         f.queue = append(f.queue, id)
340                 }
341         }
342         if len(f.queue) > 0 {
343                 f.cond.Broadcast()
344         }
345         return nil
346 }
347
348 // NewFIFO returns a Store which can be used to queue up items to
349 // process.
350 func NewFIFO(keyFunc KeyFunc) *FIFO {
351         f := &FIFO{
352                 items:   map[string]interface{}{},
353                 queue:   []string{},
354                 keyFunc: keyFunc,
355         }
356         f.cond.L = &f.lock
357         return f
358 }