Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / tools / cache / delta_fifo.go
1 /*
2 Copyright 2014 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package cache
18
19 import (
20         "errors"
21         "fmt"
22         "sync"
23
24         "k8s.io/apimachinery/pkg/util/sets"
25
26         "k8s.io/klog"
27 )
28
29 // NewDeltaFIFO returns a Store which can be used process changes to items.
30 //
31 // keyFunc is used to figure out what key an object should have. (It's
32 // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
33 //
34 // 'keyLister' is expected to return a list of keys that the consumer of
35 // this queue "knows about". It is used to decide which items are missing
36 // when Replace() is called; 'Deleted' deltas are produced for these items.
37 // It may be nil if you don't need to detect all deletions.
38 // TODO: consider merging keyLister with this object, tracking a list of
39 //       "known" keys when Pop() is called. Have to think about how that
40 //       affects error retrying.
41 // NOTE: It is possible to misuse this and cause a race when using an
42 //       external known object source.
43 //       Whether there is a potential race depends on how the comsumer
44 //       modifies knownObjects. In Pop(), process function is called under
45 //       lock, so it is safe to update data structures in it that need to be
46 //       in sync with the queue (e.g. knownObjects).
47 //
48 //       Example:
49 //       In case of sharedIndexInformer being a consumer
50 //       (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
51 //       src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
52 //       there is no race as knownObjects (s.indexer) is modified safely
53 //       under DeltaFIFO's lock. The only exceptions are GetStore() and
54 //       GetIndexer() methods, which expose ways to modify the underlying
55 //       storage. Currently these two methods are used for creating Lister
56 //       and internal tests.
57 //
58 // Also see the comment on DeltaFIFO.
59 func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
60         f := &DeltaFIFO{
61                 items:        map[string]Deltas{},
62                 queue:        []string{},
63                 keyFunc:      keyFunc,
64                 knownObjects: knownObjects,
65         }
66         f.cond.L = &f.lock
67         return f
68 }
69
70 // DeltaFIFO is like FIFO, but allows you to process deletes.
71 //
72 // DeltaFIFO is a producer-consumer queue, where a Reflector is
73 // intended to be the producer, and the consumer is whatever calls
74 // the Pop() method.
75 //
76 // DeltaFIFO solves this use case:
77 //  * You want to process every object change (delta) at most once.
78 //  * When you process an object, you want to see everything
79 //    that's happened to it since you last processed it.
80 //  * You want to process the deletion of objects.
81 //  * You might want to periodically reprocess objects.
82 //
83 // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
84 // interface{} to satisfy the Store/Queue interfaces, but it
85 // will always return an object of type Deltas.
86 //
87 // A note on threading: If you call Pop() in parallel from multiple
88 // threads, you could end up with multiple threads processing slightly
89 // different versions of the same object.
90 //
91 // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
92 // to list keys that are "known", for the purpose of figuring out which
93 // items have been deleted when Replace() or Delete() are called. The deleted
94 // object will be included in the DeleteFinalStateUnknown markers. These objects
95 // could be stale.
96 type DeltaFIFO struct {
97         // lock/cond protects access to 'items' and 'queue'.
98         lock sync.RWMutex
99         cond sync.Cond
100
101         // We depend on the property that items in the set are in
102         // the queue and vice versa, and that all Deltas in this
103         // map have at least one Delta.
104         items map[string]Deltas
105         queue []string
106
107         // populated is true if the first batch of items inserted by Replace() has been populated
108         // or Delete/Add/Update was called first.
109         populated bool
110         // initialPopulationCount is the number of items inserted by the first call of Replace()
111         initialPopulationCount int
112
113         // keyFunc is used to make the key used for queued item
114         // insertion and retrieval, and should be deterministic.
115         keyFunc KeyFunc
116
117         // knownObjects list keys that are "known", for the
118         // purpose of figuring out which items have been deleted
119         // when Replace() or Delete() is called.
120         knownObjects KeyListerGetter
121
122         // Indication the queue is closed.
123         // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
124         // Currently, not used to gate any of CRED operations.
125         closed     bool
126         closedLock sync.Mutex
127 }
128
129 var (
130         _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
131 )
132
133 var (
134         // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
135         // object with zero length is encountered (should be impossible,
136         // but included for completeness).
137         ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
138 )
139
140 // Close the queue.
141 func (f *DeltaFIFO) Close() {
142         f.closedLock.Lock()
143         defer f.closedLock.Unlock()
144         f.closed = true
145         f.cond.Broadcast()
146 }
147
148 // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
149 // DeletedFinalStateUnknown objects.
150 func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
151         if d, ok := obj.(Deltas); ok {
152                 if len(d) == 0 {
153                         return "", KeyError{obj, ErrZeroLengthDeltasObject}
154                 }
155                 obj = d.Newest().Object
156         }
157         if d, ok := obj.(DeletedFinalStateUnknown); ok {
158                 return d.Key, nil
159         }
160         return f.keyFunc(obj)
161 }
162
163 // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
164 // or an Update called first but the first batch of items inserted by Replace() has been popped
165 func (f *DeltaFIFO) HasSynced() bool {
166         f.lock.Lock()
167         defer f.lock.Unlock()
168         return f.populated && f.initialPopulationCount == 0
169 }
170
171 // Add inserts an item, and puts it in the queue. The item is only enqueued
172 // if it doesn't already exist in the set.
173 func (f *DeltaFIFO) Add(obj interface{}) error {
174         f.lock.Lock()
175         defer f.lock.Unlock()
176         f.populated = true
177         return f.queueActionLocked(Added, obj)
178 }
179
180 // Update is just like Add, but makes an Updated Delta.
181 func (f *DeltaFIFO) Update(obj interface{}) error {
182         f.lock.Lock()
183         defer f.lock.Unlock()
184         f.populated = true
185         return f.queueActionLocked(Updated, obj)
186 }
187
188 // Delete is just like Add, but makes an Deleted Delta. If the item does not
189 // already exist, it will be ignored. (It may have already been deleted by a
190 // Replace (re-list), for example.
191 func (f *DeltaFIFO) Delete(obj interface{}) error {
192         id, err := f.KeyOf(obj)
193         if err != nil {
194                 return KeyError{obj, err}
195         }
196         f.lock.Lock()
197         defer f.lock.Unlock()
198         f.populated = true
199         if f.knownObjects == nil {
200                 if _, exists := f.items[id]; !exists {
201                         // Presumably, this was deleted when a relist happened.
202                         // Don't provide a second report of the same deletion.
203                         return nil
204                 }
205         } else {
206                 // We only want to skip the "deletion" action if the object doesn't
207                 // exist in knownObjects and it doesn't have corresponding item in items.
208                 // Note that even if there is a "deletion" action in items, we can ignore it,
209                 // because it will be deduped automatically in "queueActionLocked"
210                 _, exists, err := f.knownObjects.GetByKey(id)
211                 _, itemsExist := f.items[id]
212                 if err == nil && !exists && !itemsExist {
213                         // Presumably, this was deleted when a relist happened.
214                         // Don't provide a second report of the same deletion.
215                         return nil
216                 }
217         }
218
219         return f.queueActionLocked(Deleted, obj)
220 }
221
222 // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
223 // present in the set, it is neither enqueued nor added to the set.
224 //
225 // This is useful in a single producer/consumer scenario so that the consumer can
226 // safely retry items without contending with the producer and potentially enqueueing
227 // stale items.
228 //
229 // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
230 // different from the Add/Update/Delete functions.
231 func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
232         deltas, ok := obj.(Deltas)
233         if !ok {
234                 return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
235         }
236         id, err := f.KeyOf(deltas.Newest().Object)
237         if err != nil {
238                 return KeyError{obj, err}
239         }
240         f.lock.Lock()
241         defer f.lock.Unlock()
242         f.addIfNotPresent(id, deltas)
243         return nil
244 }
245
246 // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
247 // already holds the fifo lock.
248 func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
249         f.populated = true
250         if _, exists := f.items[id]; exists {
251                 return
252         }
253
254         f.queue = append(f.queue, id)
255         f.items[id] = deltas
256         f.cond.Broadcast()
257 }
258
259 // re-listing and watching can deliver the same update multiple times in any
260 // order. This will combine the most recent two deltas if they are the same.
261 func dedupDeltas(deltas Deltas) Deltas {
262         n := len(deltas)
263         if n < 2 {
264                 return deltas
265         }
266         a := &deltas[n-1]
267         b := &deltas[n-2]
268         if out := isDup(a, b); out != nil {
269                 d := append(Deltas{}, deltas[:n-2]...)
270                 return append(d, *out)
271         }
272         return deltas
273 }
274
275 // If a & b represent the same event, returns the delta that ought to be kept.
276 // Otherwise, returns nil.
277 // TODO: is there anything other than deletions that need deduping?
278 func isDup(a, b *Delta) *Delta {
279         if out := isDeletionDup(a, b); out != nil {
280                 return out
281         }
282         // TODO: Detect other duplicate situations? Are there any?
283         return nil
284 }
285
286 // keep the one with the most information if both are deletions.
287 func isDeletionDup(a, b *Delta) *Delta {
288         if b.Type != Deleted || a.Type != Deleted {
289                 return nil
290         }
291         // Do more sophisticated checks, or is this sufficient?
292         if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
293                 return a
294         }
295         return b
296 }
297
298 // willObjectBeDeletedLocked returns true only if the last delta for the
299 // given object is Delete. Caller must lock first.
300 func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
301         deltas := f.items[id]
302         return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
303 }
304
305 // queueActionLocked appends to the delta list for the object.
306 // Caller must lock first.
307 func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
308         id, err := f.KeyOf(obj)
309         if err != nil {
310                 return KeyError{obj, err}
311         }
312
313         // If object is supposed to be deleted (last event is Deleted),
314         // then we should ignore Sync events, because it would result in
315         // recreation of this object.
316         if actionType == Sync && f.willObjectBeDeletedLocked(id) {
317                 return nil
318         }
319
320         newDeltas := append(f.items[id], Delta{actionType, obj})
321         newDeltas = dedupDeltas(newDeltas)
322
323         if len(newDeltas) > 0 {
324                 if _, exists := f.items[id]; !exists {
325                         f.queue = append(f.queue, id)
326                 }
327                 f.items[id] = newDeltas
328                 f.cond.Broadcast()
329         } else {
330                 // We need to remove this from our map (extra items in the queue are
331                 // ignored if they are not in the map).
332                 delete(f.items, id)
333         }
334         return nil
335 }
336
337 // List returns a list of all the items; it returns the object
338 // from the most recent Delta.
339 // You should treat the items returned inside the deltas as immutable.
340 func (f *DeltaFIFO) List() []interface{} {
341         f.lock.RLock()
342         defer f.lock.RUnlock()
343         return f.listLocked()
344 }
345
346 func (f *DeltaFIFO) listLocked() []interface{} {
347         list := make([]interface{}, 0, len(f.items))
348         for _, item := range f.items {
349                 list = append(list, item.Newest().Object)
350         }
351         return list
352 }
353
354 // ListKeys returns a list of all the keys of the objects currently
355 // in the FIFO.
356 func (f *DeltaFIFO) ListKeys() []string {
357         f.lock.RLock()
358         defer f.lock.RUnlock()
359         list := make([]string, 0, len(f.items))
360         for key := range f.items {
361                 list = append(list, key)
362         }
363         return list
364 }
365
366 // Get returns the complete list of deltas for the requested item,
367 // or sets exists=false.
368 // You should treat the items returned inside the deltas as immutable.
369 func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
370         key, err := f.KeyOf(obj)
371         if err != nil {
372                 return nil, false, KeyError{obj, err}
373         }
374         return f.GetByKey(key)
375 }
376
377 // GetByKey returns the complete list of deltas for the requested item,
378 // setting exists=false if that list is empty.
379 // You should treat the items returned inside the deltas as immutable.
380 func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
381         f.lock.RLock()
382         defer f.lock.RUnlock()
383         d, exists := f.items[key]
384         if exists {
385                 // Copy item's slice so operations on this slice
386                 // won't interfere with the object we return.
387                 d = copyDeltas(d)
388         }
389         return d, exists, nil
390 }
391
392 // Checks if the queue is closed
393 func (f *DeltaFIFO) IsClosed() bool {
394         f.closedLock.Lock()
395         defer f.closedLock.Unlock()
396         return f.closed
397 }
398
399 // Pop blocks until an item is added to the queue, and then returns it.  If
400 // multiple items are ready, they are returned in the order in which they were
401 // added/updated. The item is removed from the queue (and the store) before it
402 // is returned, so if you don't successfully process it, you need to add it back
403 // with AddIfNotPresent().
404 // process function is called under lock, so it is safe update data structures
405 // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
406 // may return an instance of ErrRequeue with a nested error to indicate the current
407 // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
408 //
409 // Pop returns a 'Deltas', which has a complete list of all the things
410 // that happened to the object (deltas) while it was sitting in the queue.
411 func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
412         f.lock.Lock()
413         defer f.lock.Unlock()
414         for {
415                 for len(f.queue) == 0 {
416                         // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
417                         // When Close() is called, the f.closed is set and the condition is broadcasted.
418                         // Which causes this loop to continue and return from the Pop().
419                         if f.IsClosed() {
420                                 return nil, FIFOClosedError
421                         }
422
423                         f.cond.Wait()
424                 }
425                 id := f.queue[0]
426                 f.queue = f.queue[1:]
427                 if f.initialPopulationCount > 0 {
428                         f.initialPopulationCount--
429                 }
430                 item, ok := f.items[id]
431                 if !ok {
432                         // Item may have been deleted subsequently.
433                         continue
434                 }
435                 delete(f.items, id)
436                 err := process(item)
437                 if e, ok := err.(ErrRequeue); ok {
438                         f.addIfNotPresent(id, item)
439                         err = e.Err
440                 }
441                 // Don't need to copyDeltas here, because we're transferring
442                 // ownership to the caller.
443                 return item, err
444         }
445 }
446
447 // Replace will delete the contents of 'f', using instead the given map.
448 // 'f' takes ownership of the map, you should not reference the map again
449 // after calling this function. f's queue is reset, too; upon return, it
450 // will contain the items in the map, in no particular order.
451 func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
452         f.lock.Lock()
453         defer f.lock.Unlock()
454         keys := make(sets.String, len(list))
455
456         for _, item := range list {
457                 key, err := f.KeyOf(item)
458                 if err != nil {
459                         return KeyError{item, err}
460                 }
461                 keys.Insert(key)
462                 if err := f.queueActionLocked(Sync, item); err != nil {
463                         return fmt.Errorf("couldn't enqueue object: %v", err)
464                 }
465         }
466
467         if f.knownObjects == nil {
468                 // Do deletion detection against our own list.
469                 for k, oldItem := range f.items {
470                         if keys.Has(k) {
471                                 continue
472                         }
473                         var deletedObj interface{}
474                         if n := oldItem.Newest(); n != nil {
475                                 deletedObj = n.Object
476                         }
477                         if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
478                                 return err
479                         }
480                 }
481
482                 if !f.populated {
483                         f.populated = true
484                         f.initialPopulationCount = len(list)
485                 }
486
487                 return nil
488         }
489
490         // Detect deletions not already in the queue.
491         knownKeys := f.knownObjects.ListKeys()
492         queuedDeletions := 0
493         for _, k := range knownKeys {
494                 if keys.Has(k) {
495                         continue
496                 }
497
498                 deletedObj, exists, err := f.knownObjects.GetByKey(k)
499                 if err != nil {
500                         deletedObj = nil
501                         klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
502                 } else if !exists {
503                         deletedObj = nil
504                         klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
505                 }
506                 queuedDeletions++
507                 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
508                         return err
509                 }
510         }
511
512         if !f.populated {
513                 f.populated = true
514                 f.initialPopulationCount = len(list) + queuedDeletions
515         }
516
517         return nil
518 }
519
520 // Resync will send a sync event for each item
521 func (f *DeltaFIFO) Resync() error {
522         f.lock.Lock()
523         defer f.lock.Unlock()
524
525         if f.knownObjects == nil {
526                 return nil
527         }
528
529         keys := f.knownObjects.ListKeys()
530         for _, k := range keys {
531                 if err := f.syncKeyLocked(k); err != nil {
532                         return err
533                 }
534         }
535         return nil
536 }
537
538 func (f *DeltaFIFO) syncKey(key string) error {
539         f.lock.Lock()
540         defer f.lock.Unlock()
541
542         return f.syncKeyLocked(key)
543 }
544
545 func (f *DeltaFIFO) syncKeyLocked(key string) error {
546         obj, exists, err := f.knownObjects.GetByKey(key)
547         if err != nil {
548                 klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
549                 return nil
550         } else if !exists {
551                 klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
552                 return nil
553         }
554
555         // If we are doing Resync() and there is already an event queued for that object,
556         // we ignore the Resync for it. This is to avoid the race, in which the resync
557         // comes with the previous value of object (since queueing an event for the object
558         // doesn't trigger changing the underlying store <knownObjects>.
559         id, err := f.KeyOf(obj)
560         if err != nil {
561                 return KeyError{obj, err}
562         }
563         if len(f.items[id]) > 0 {
564                 return nil
565         }
566
567         if err := f.queueActionLocked(Sync, obj); err != nil {
568                 return fmt.Errorf("couldn't queue object: %v", err)
569         }
570         return nil
571 }
572
573 // A KeyListerGetter is anything that knows how to list its keys and look up by key.
574 type KeyListerGetter interface {
575         KeyLister
576         KeyGetter
577 }
578
579 // A KeyLister is anything that knows how to list its keys.
580 type KeyLister interface {
581         ListKeys() []string
582 }
583
584 // A KeyGetter is anything that knows how to get the value stored under a given key.
585 type KeyGetter interface {
586         GetByKey(key string) (interface{}, bool, error)
587 }
588
589 // DeltaType is the type of a change (addition, deletion, etc)
590 type DeltaType string
591
592 const (
593         Added   DeltaType = "Added"
594         Updated DeltaType = "Updated"
595         Deleted DeltaType = "Deleted"
596         // The other types are obvious. You'll get Sync deltas when:
597         //  * A watch expires/errors out and a new list/watch cycle is started.
598         //  * You've turned on periodic syncs.
599         // (Anything that trigger's DeltaFIFO's Replace() method.)
600         Sync DeltaType = "Sync"
601 )
602
603 // Delta is the type stored by a DeltaFIFO. It tells you what change
604 // happened, and the object's state after* that change.
605 //
606 // [*] Unless the change is a deletion, and then you'll get the final
607 //     state of the object before it was deleted.
608 type Delta struct {
609         Type   DeltaType
610         Object interface{}
611 }
612
613 // Deltas is a list of one or more 'Delta's to an individual object.
614 // The oldest delta is at index 0, the newest delta is the last one.
615 type Deltas []Delta
616
617 // Oldest is a convenience function that returns the oldest delta, or
618 // nil if there are no deltas.
619 func (d Deltas) Oldest() *Delta {
620         if len(d) > 0 {
621                 return &d[0]
622         }
623         return nil
624 }
625
626 // Newest is a convenience function that returns the newest delta, or
627 // nil if there are no deltas.
628 func (d Deltas) Newest() *Delta {
629         if n := len(d); n > 0 {
630                 return &d[n-1]
631         }
632         return nil
633 }
634
635 // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
636 // the objects in the slice. This allows Get/List to return an object that we
637 // know won't be clobbered by a subsequent modifications.
638 func copyDeltas(d Deltas) Deltas {
639         d2 := make(Deltas, len(d))
640         copy(d2, d)
641         return d2
642 }
643
644 // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
645 // an object was deleted but the watch deletion event was missed. In this
646 // case we don't know the final "resting" state of the object, so there's
647 // a chance the included `Obj` is stale.
648 type DeletedFinalStateUnknown struct {
649         Key string
650         Obj interface{}
651 }