2 Copyright 2014 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
23 "k8s.io/apimachinery/pkg/util/sets"
26 // PopProcessFunc is passed to Pop() method of Queue interface.
27 // It is supposed to process the element popped from the queue.
28 type PopProcessFunc func(interface{}) error
30 // ErrRequeue may be returned by a PopProcessFunc to safely requeue
31 // the current item. The value of Err will be returned from Pop.
32 type ErrRequeue struct {
33 // Err is returned by the Pop function
37 var FIFOClosedError error = errors.New("DeltaFIFO: manipulating with closed queue")
39 func (e ErrRequeue) Error() string {
41 return "the popped item should be requeued without returning an error"
46 // Queue is exactly like a Store, but has a Pop() method too.
47 type Queue interface {
50 // Pop blocks until it has something to process.
51 // It returns the object that was process and the result of processing.
52 // The PopProcessFunc may return an ErrRequeue{...} to indicate the item
53 // should be requeued before releasing the lock on the queue.
54 Pop(PopProcessFunc) (interface{}, error)
56 // AddIfNotPresent adds a value previously
57 // returned by Pop back into the queue as long
58 // as nothing else (presumably more recent)
59 // has since been added.
60 AddIfNotPresent(interface{}) error
62 // HasSynced returns true if the first batch of items has been popped
69 // Helper function for popping from Queue.
70 // WARNING: Do NOT use this function in non-test code to avoid races
71 // unless you really really really really know what you are doing.
72 func Pop(queue Queue) interface{} {
73 var result interface{}
74 queue.Pop(func(obj interface{}) error {
81 // FIFO receives adds and updates from a Reflector, and puts them in a queue for
82 // FIFO order processing. If multiple adds/updates of a single item happen while
83 // an item is in the queue before it has been processed, it will only be
84 // processed once, and when it is processed, the most recent version will be
85 // processed. This can't be done with a channel.
87 // FIFO solves this use case:
88 // * You want to process every object (exactly) once.
89 // * You want to process the most recent version of the object when you process it.
90 // * You do not want to process deleted objects, they should be removed from the queue.
91 // * You do not want to periodically reprocess objects.
92 // Compare with DeltaFIFO for other use cases.
96 // We depend on the property that items in the set are in the queue and vice versa.
97 items map[string]interface{}
100 // populated is true if the first batch of items inserted by Replace() has been populated
101 // or Delete/Add/Update was called first.
103 // initialPopulationCount is the number of items inserted by the first call of Replace()
104 initialPopulationCount int
106 // keyFunc is used to make the key used for queued item insertion and retrieval, and
107 // should be deterministic.
110 // Indication the queue is closed.
111 // Used to indicate a queue is closed so a control loop can exit when a queue is empty.
112 // Currently, not used to gate any of CRED operations.
114 closedLock sync.Mutex
118 _ = Queue(&FIFO{}) // FIFO is a Queue
122 func (f *FIFO) Close() {
124 defer f.closedLock.Unlock()
129 // Return true if an Add/Update/Delete/AddIfNotPresent are called first,
130 // or an Update called first but the first batch of items inserted by Replace() has been popped
131 func (f *FIFO) HasSynced() bool {
133 defer f.lock.Unlock()
134 return f.populated && f.initialPopulationCount == 0
137 // Add inserts an item, and puts it in the queue. The item is only enqueued
138 // if it doesn't already exist in the set.
139 func (f *FIFO) Add(obj interface{}) error {
140 id, err := f.keyFunc(obj)
142 return KeyError{obj, err}
145 defer f.lock.Unlock()
147 if _, exists := f.items[id]; !exists {
148 f.queue = append(f.queue, id)
155 // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
156 // present in the set, it is neither enqueued nor added to the set.
158 // This is useful in a single producer/consumer scenario so that the consumer can
159 // safely retry items without contending with the producer and potentially enqueueing
161 func (f *FIFO) AddIfNotPresent(obj interface{}) error {
162 id, err := f.keyFunc(obj)
164 return KeyError{obj, err}
167 defer f.lock.Unlock()
168 f.addIfNotPresent(id, obj)
172 // addIfNotPresent assumes the fifo lock is already held and adds the provided
173 // item to the queue under id if it does not already exist.
174 func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
176 if _, exists := f.items[id]; exists {
180 f.queue = append(f.queue, id)
185 // Update is the same as Add in this implementation.
186 func (f *FIFO) Update(obj interface{}) error {
190 // Delete removes an item. It doesn't add it to the queue, because
191 // this implementation assumes the consumer only cares about the objects,
192 // not the order in which they were created/added.
193 func (f *FIFO) Delete(obj interface{}) error {
194 id, err := f.keyFunc(obj)
196 return KeyError{obj, err}
199 defer f.lock.Unlock()
205 // List returns a list of all the items.
206 func (f *FIFO) List() []interface{} {
208 defer f.lock.RUnlock()
209 list := make([]interface{}, 0, len(f.items))
210 for _, item := range f.items {
211 list = append(list, item)
216 // ListKeys returns a list of all the keys of the objects currently
218 func (f *FIFO) ListKeys() []string {
220 defer f.lock.RUnlock()
221 list := make([]string, 0, len(f.items))
222 for key := range f.items {
223 list = append(list, key)
228 // Get returns the requested item, or sets exists=false.
229 func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
230 key, err := f.keyFunc(obj)
232 return nil, false, KeyError{obj, err}
234 return f.GetByKey(key)
237 // GetByKey returns the requested item, or sets exists=false.
238 func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
240 defer f.lock.RUnlock()
241 item, exists = f.items[key]
242 return item, exists, nil
245 // Checks if the queue is closed
246 func (f *FIFO) IsClosed() bool {
248 defer f.closedLock.Unlock()
255 // Pop waits until an item is ready and processes it. If multiple items are
256 // ready, they are returned in the order in which they were added/updated.
257 // The item is removed from the queue (and the store) before it is processed,
258 // so if you don't successfully process it, it should be added back with
259 // AddIfNotPresent(). process function is called under lock, so it is safe
260 // update data structures in it that need to be in sync with the queue.
261 func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
263 defer f.lock.Unlock()
265 for len(f.queue) == 0 {
266 // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
267 // When Close() is called, the f.closed is set and the condition is broadcasted.
268 // Which causes this loop to continue and return from the Pop().
270 return nil, FIFOClosedError
276 f.queue = f.queue[1:]
277 if f.initialPopulationCount > 0 {
278 f.initialPopulationCount--
280 item, ok := f.items[id]
282 // Item may have been deleted subsequently.
287 if e, ok := err.(ErrRequeue); ok {
288 f.addIfNotPresent(id, item)
295 // Replace will delete the contents of 'f', using instead the given map.
296 // 'f' takes ownership of the map, you should not reference the map again
297 // after calling this function. f's queue is reset, too; upon return, it
298 // will contain the items in the map, in no particular order.
299 func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
300 items := make(map[string]interface{}, len(list))
301 for _, item := range list {
302 key, err := f.keyFunc(item)
304 return KeyError{item, err}
310 defer f.lock.Unlock()
314 f.initialPopulationCount = len(items)
318 f.queue = f.queue[:0]
319 for id := range items {
320 f.queue = append(f.queue, id)
322 if len(f.queue) > 0 {
328 // Resync will touch all objects to put them into the processing queue
329 func (f *FIFO) Resync() error {
331 defer f.lock.Unlock()
333 inQueue := sets.NewString()
334 for _, id := range f.queue {
337 for id := range f.items {
338 if !inQueue.Has(id) {
339 f.queue = append(f.queue, id)
342 if len(f.queue) > 0 {
348 // NewFIFO returns a Store which can be used to queue up items to
350 func NewFIFO(keyFunc KeyFunc) *FIFO {
352 items: map[string]interface{}{},