2 Copyright 2014 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
24 "k8s.io/apimachinery/pkg/util/sets"
29 // NewDeltaFIFO returns a Store which can be used process changes to items.
31 // keyFunc is used to figure out what key an object should have. (It's
32 // exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
34 // 'keyLister' is expected to return a list of keys that the consumer of
35 // this queue "knows about". It is used to decide which items are missing
36 // when Replace() is called; 'Deleted' deltas are produced for these items.
37 // It may be nil if you don't need to detect all deletions.
38 // TODO: consider merging keyLister with this object, tracking a list of
39 // "known" keys when Pop() is called. Have to think about how that
40 // affects error retrying.
41 // NOTE: It is possible to misuse this and cause a race when using an
42 // external known object source.
43 // Whether there is a potential race depends on how the comsumer
44 // modifies knownObjects. In Pop(), process function is called under
45 // lock, so it is safe to update data structures in it that need to be
46 // in sync with the queue (e.g. knownObjects).
49 // In case of sharedIndexInformer being a consumer
50 // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
51 // src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
52 // there is no race as knownObjects (s.indexer) is modified safely
53 // under DeltaFIFO's lock. The only exceptions are GetStore() and
54 // GetIndexer() methods, which expose ways to modify the underlying
55 // storage. Currently these two methods are used for creating Lister
56 // and internal tests.
58 // Also see the comment on DeltaFIFO.
59 func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
61 items: map[string]Deltas{},
64 knownObjects: knownObjects,
70 // DeltaFIFO is like FIFO, but allows you to process deletes.
72 // DeltaFIFO is a producer-consumer queue, where a Reflector is
73 // intended to be the producer, and the consumer is whatever calls
76 // DeltaFIFO solves this use case:
77 // * You want to process every object change (delta) at most once.
78 // * When you process an object, you want to see everything
79 // that's happened to it since you last processed it.
80 // * You want to process the deletion of objects.
81 // * You might want to periodically reprocess objects.
83 // DeltaFIFO's Pop(), Get(), and GetByKey() methods return
84 // interface{} to satisfy the Store/Queue interfaces, but it
85 // will always return an object of type Deltas.
87 // A note on threading: If you call Pop() in parallel from multiple
88 // threads, you could end up with multiple threads processing slightly
89 // different versions of the same object.
91 // A note on the KeyLister used by the DeltaFIFO: It's main purpose is
92 // to list keys that are "known", for the purpose of figuring out which
93 // items have been deleted when Replace() or Delete() are called. The deleted
94 // object will be included in the DeleteFinalStateUnknown markers. These objects
96 type DeltaFIFO struct {
97 // lock/cond protects access to 'items' and 'queue'.
101 // We depend on the property that items in the set are in
102 // the queue and vice versa, and that all Deltas in this
103 // map have at least one Delta.
104 items map[string]Deltas
107 // populated is true if the first batch of items inserted by Replace() has been populated
108 // or Delete/Add/Update was called first.
110 // initialPopulationCount is the number of items inserted by the first call of Replace()
111 initialPopulationCount int
113 // keyFunc is used to make the key used for queued item
114 // insertion and retrieval, and should be deterministic.
117 // knownObjects list keys that are "known", for the
118 // purpose of figuring out which items have been deleted
119 // when Replace() or Delete() is called.
120 knownObjects KeyListerGetter
122 // Indication the queue is closed.
123 // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
124 // Currently, not used to gate any of CRED operations.
126 closedLock sync.Mutex
130 _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
134 // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas
135 // object with zero length is encountered (should be impossible,
136 // but included for completeness).
137 ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key")
141 func (f *DeltaFIFO) Close() {
143 defer f.closedLock.Unlock()
148 // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or
149 // DeletedFinalStateUnknown objects.
150 func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
151 if d, ok := obj.(Deltas); ok {
153 return "", KeyError{obj, ErrZeroLengthDeltasObject}
155 obj = d.Newest().Object
157 if d, ok := obj.(DeletedFinalStateUnknown); ok {
160 return f.keyFunc(obj)
163 // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
164 // or an Update called first but the first batch of items inserted by Replace() has been popped
165 func (f *DeltaFIFO) HasSynced() bool {
167 defer f.lock.Unlock()
168 return f.populated && f.initialPopulationCount == 0
171 // Add inserts an item, and puts it in the queue. The item is only enqueued
172 // if it doesn't already exist in the set.
173 func (f *DeltaFIFO) Add(obj interface{}) error {
175 defer f.lock.Unlock()
177 return f.queueActionLocked(Added, obj)
180 // Update is just like Add, but makes an Updated Delta.
181 func (f *DeltaFIFO) Update(obj interface{}) error {
183 defer f.lock.Unlock()
185 return f.queueActionLocked(Updated, obj)
188 // Delete is just like Add, but makes an Deleted Delta. If the item does not
189 // already exist, it will be ignored. (It may have already been deleted by a
190 // Replace (re-list), for example.
191 func (f *DeltaFIFO) Delete(obj interface{}) error {
192 id, err := f.KeyOf(obj)
194 return KeyError{obj, err}
197 defer f.lock.Unlock()
199 if f.knownObjects == nil {
200 if _, exists := f.items[id]; !exists {
201 // Presumably, this was deleted when a relist happened.
202 // Don't provide a second report of the same deletion.
206 // We only want to skip the "deletion" action if the object doesn't
207 // exist in knownObjects and it doesn't have corresponding item in items.
208 // Note that even if there is a "deletion" action in items, we can ignore it,
209 // because it will be deduped automatically in "queueActionLocked"
210 _, exists, err := f.knownObjects.GetByKey(id)
211 _, itemsExist := f.items[id]
212 if err == nil && !exists && !itemsExist {
213 // Presumably, this was deleted when a relist happened.
214 // Don't provide a second report of the same deletion.
219 return f.queueActionLocked(Deleted, obj)
222 // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
223 // present in the set, it is neither enqueued nor added to the set.
225 // This is useful in a single producer/consumer scenario so that the consumer can
226 // safely retry items without contending with the producer and potentially enqueueing
229 // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is
230 // different from the Add/Update/Delete functions.
231 func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
232 deltas, ok := obj.(Deltas)
234 return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
236 id, err := f.KeyOf(deltas.Newest().Object)
238 return KeyError{obj, err}
241 defer f.lock.Unlock()
242 f.addIfNotPresent(id, deltas)
246 // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller
247 // already holds the fifo lock.
248 func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
250 if _, exists := f.items[id]; exists {
254 f.queue = append(f.queue, id)
259 // re-listing and watching can deliver the same update multiple times in any
260 // order. This will combine the most recent two deltas if they are the same.
261 func dedupDeltas(deltas Deltas) Deltas {
268 if out := isDup(a, b); out != nil {
269 d := append(Deltas{}, deltas[:n-2]...)
270 return append(d, *out)
275 // If a & b represent the same event, returns the delta that ought to be kept.
276 // Otherwise, returns nil.
277 // TODO: is there anything other than deletions that need deduping?
278 func isDup(a, b *Delta) *Delta {
279 if out := isDeletionDup(a, b); out != nil {
282 // TODO: Detect other duplicate situations? Are there any?
286 // keep the one with the most information if both are deletions.
287 func isDeletionDup(a, b *Delta) *Delta {
288 if b.Type != Deleted || a.Type != Deleted {
291 // Do more sophisticated checks, or is this sufficient?
292 if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
298 // willObjectBeDeletedLocked returns true only if the last delta for the
299 // given object is Delete. Caller must lock first.
300 func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
301 deltas := f.items[id]
302 return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
305 // queueActionLocked appends to the delta list for the object.
306 // Caller must lock first.
307 func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
308 id, err := f.KeyOf(obj)
310 return KeyError{obj, err}
313 // If object is supposed to be deleted (last event is Deleted),
314 // then we should ignore Sync events, because it would result in
315 // recreation of this object.
316 if actionType == Sync && f.willObjectBeDeletedLocked(id) {
320 newDeltas := append(f.items[id], Delta{actionType, obj})
321 newDeltas = dedupDeltas(newDeltas)
323 if len(newDeltas) > 0 {
324 if _, exists := f.items[id]; !exists {
325 f.queue = append(f.queue, id)
327 f.items[id] = newDeltas
330 // We need to remove this from our map (extra items in the queue are
331 // ignored if they are not in the map).
337 // List returns a list of all the items; it returns the object
338 // from the most recent Delta.
339 // You should treat the items returned inside the deltas as immutable.
340 func (f *DeltaFIFO) List() []interface{} {
342 defer f.lock.RUnlock()
343 return f.listLocked()
346 func (f *DeltaFIFO) listLocked() []interface{} {
347 list := make([]interface{}, 0, len(f.items))
348 for _, item := range f.items {
349 list = append(list, item.Newest().Object)
354 // ListKeys returns a list of all the keys of the objects currently
356 func (f *DeltaFIFO) ListKeys() []string {
358 defer f.lock.RUnlock()
359 list := make([]string, 0, len(f.items))
360 for key := range f.items {
361 list = append(list, key)
366 // Get returns the complete list of deltas for the requested item,
367 // or sets exists=false.
368 // You should treat the items returned inside the deltas as immutable.
369 func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
370 key, err := f.KeyOf(obj)
372 return nil, false, KeyError{obj, err}
374 return f.GetByKey(key)
377 // GetByKey returns the complete list of deltas for the requested item,
378 // setting exists=false if that list is empty.
379 // You should treat the items returned inside the deltas as immutable.
380 func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
382 defer f.lock.RUnlock()
383 d, exists := f.items[key]
385 // Copy item's slice so operations on this slice
386 // won't interfere with the object we return.
389 return d, exists, nil
392 // Checks if the queue is closed
393 func (f *DeltaFIFO) IsClosed() bool {
395 defer f.closedLock.Unlock()
399 // Pop blocks until an item is added to the queue, and then returns it. If
400 // multiple items are ready, they are returned in the order in which they were
401 // added/updated. The item is removed from the queue (and the store) before it
402 // is returned, so if you don't successfully process it, you need to add it back
403 // with AddIfNotPresent().
404 // process function is called under lock, so it is safe update data structures
405 // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
406 // may return an instance of ErrRequeue with a nested error to indicate the current
407 // item should be requeued (equivalent to calling AddIfNotPresent under the lock).
409 // Pop returns a 'Deltas', which has a complete list of all the things
410 // that happened to the object (deltas) while it was sitting in the queue.
411 func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
413 defer f.lock.Unlock()
415 for len(f.queue) == 0 {
416 // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
417 // When Close() is called, the f.closed is set and the condition is broadcasted.
418 // Which causes this loop to continue and return from the Pop().
420 return nil, FIFOClosedError
426 f.queue = f.queue[1:]
427 if f.initialPopulationCount > 0 {
428 f.initialPopulationCount--
430 item, ok := f.items[id]
432 // Item may have been deleted subsequently.
437 if e, ok := err.(ErrRequeue); ok {
438 f.addIfNotPresent(id, item)
441 // Don't need to copyDeltas here, because we're transferring
442 // ownership to the caller.
447 // Replace will delete the contents of 'f', using instead the given map.
448 // 'f' takes ownership of the map, you should not reference the map again
449 // after calling this function. f's queue is reset, too; upon return, it
450 // will contain the items in the map, in no particular order.
451 func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
453 defer f.lock.Unlock()
454 keys := make(sets.String, len(list))
456 for _, item := range list {
457 key, err := f.KeyOf(item)
459 return KeyError{item, err}
462 if err := f.queueActionLocked(Sync, item); err != nil {
463 return fmt.Errorf("couldn't enqueue object: %v", err)
467 if f.knownObjects == nil {
468 // Do deletion detection against our own list.
469 for k, oldItem := range f.items {
473 var deletedObj interface{}
474 if n := oldItem.Newest(); n != nil {
475 deletedObj = n.Object
477 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
484 f.initialPopulationCount = len(list)
490 // Detect deletions not already in the queue.
491 knownKeys := f.knownObjects.ListKeys()
493 for _, k := range knownKeys {
498 deletedObj, exists, err := f.knownObjects.GetByKey(k)
501 klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
504 klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
507 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
514 f.initialPopulationCount = len(list) + queuedDeletions
520 // Resync will send a sync event for each item
521 func (f *DeltaFIFO) Resync() error {
523 defer f.lock.Unlock()
525 if f.knownObjects == nil {
529 keys := f.knownObjects.ListKeys()
530 for _, k := range keys {
531 if err := f.syncKeyLocked(k); err != nil {
538 func (f *DeltaFIFO) syncKey(key string) error {
540 defer f.lock.Unlock()
542 return f.syncKeyLocked(key)
545 func (f *DeltaFIFO) syncKeyLocked(key string) error {
546 obj, exists, err := f.knownObjects.GetByKey(key)
548 klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
551 klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
555 // If we are doing Resync() and there is already an event queued for that object,
556 // we ignore the Resync for it. This is to avoid the race, in which the resync
557 // comes with the previous value of object (since queueing an event for the object
558 // doesn't trigger changing the underlying store <knownObjects>.
559 id, err := f.KeyOf(obj)
561 return KeyError{obj, err}
563 if len(f.items[id]) > 0 {
567 if err := f.queueActionLocked(Sync, obj); err != nil {
568 return fmt.Errorf("couldn't queue object: %v", err)
573 // A KeyListerGetter is anything that knows how to list its keys and look up by key.
574 type KeyListerGetter interface {
579 // A KeyLister is anything that knows how to list its keys.
580 type KeyLister interface {
584 // A KeyGetter is anything that knows how to get the value stored under a given key.
585 type KeyGetter interface {
586 GetByKey(key string) (interface{}, bool, error)
589 // DeltaType is the type of a change (addition, deletion, etc)
590 type DeltaType string
593 Added DeltaType = "Added"
594 Updated DeltaType = "Updated"
595 Deleted DeltaType = "Deleted"
596 // The other types are obvious. You'll get Sync deltas when:
597 // * A watch expires/errors out and a new list/watch cycle is started.
598 // * You've turned on periodic syncs.
599 // (Anything that trigger's DeltaFIFO's Replace() method.)
600 Sync DeltaType = "Sync"
603 // Delta is the type stored by a DeltaFIFO. It tells you what change
604 // happened, and the object's state after* that change.
606 // [*] Unless the change is a deletion, and then you'll get the final
607 // state of the object before it was deleted.
613 // Deltas is a list of one or more 'Delta's to an individual object.
614 // The oldest delta is at index 0, the newest delta is the last one.
617 // Oldest is a convenience function that returns the oldest delta, or
618 // nil if there are no deltas.
619 func (d Deltas) Oldest() *Delta {
626 // Newest is a convenience function that returns the newest delta, or
627 // nil if there are no deltas.
628 func (d Deltas) Newest() *Delta {
629 if n := len(d); n > 0 {
635 // copyDeltas returns a shallow copy of d; that is, it copies the slice but not
636 // the objects in the slice. This allows Get/List to return an object that we
637 // know won't be clobbered by a subsequent modifications.
638 func copyDeltas(d Deltas) Deltas {
639 d2 := make(Deltas, len(d))
644 // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
645 // an object was deleted but the watch deletion event was missed. In this
646 // case we don't know the final "resting" state of the object, so there's
647 // a chance the included `Obj` is stale.
648 type DeletedFinalStateUnknown struct {