Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / tools / cache / shared_informer.go
1 /*
2 Copyright 2015 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package cache
18
19 import (
20         "fmt"
21         "sync"
22         "time"
23
24         "k8s.io/apimachinery/pkg/runtime"
25         "k8s.io/apimachinery/pkg/util/clock"
26         utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27         "k8s.io/apimachinery/pkg/util/wait"
28         "k8s.io/client-go/util/buffer"
29         "k8s.io/client-go/util/retry"
30
31         "k8s.io/klog"
32 )
33
34 // SharedInformer has a shared data cache and is capable of distributing notifications for changes
35 // to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
36 // one behavior change compared to a standard Informer.  When you receive a notification, the cache
37 // will be AT LEAST as fresh as the notification, but it MAY be more fresh.  You should NOT depend
38 // on the contents of the cache exactly matching the notification you've received in handler
39 // functions.  If there was a create, followed by a delete, the cache may NOT have your item.  This
40 // has advantages over the broadcaster since it allows us to share a common cache across many
41 // controllers. Extending the broadcaster would have required us keep duplicate caches for each
42 // watch.
43 type SharedInformer interface {
44         // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
45         // period.  Events to a single handler are delivered sequentially, but there is no coordination
46         // between different handlers.
47         AddEventHandler(handler ResourceEventHandler)
48         // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
49         // specified resync period.  Events to a single handler are delivered sequentially, but there is
50         // no coordination between different handlers.
51         AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
52         // GetStore returns the Store.
53         GetStore() Store
54         // GetController gives back a synthetic interface that "votes" to start the informer
55         GetController() Controller
56         // Run starts the shared informer, which will be stopped when stopCh is closed.
57         Run(stopCh <-chan struct{})
58         // HasSynced returns true if the shared informer's store has synced.
59         HasSynced() bool
60         // LastSyncResourceVersion is the resource version observed when last synced with the underlying
61         // store. The value returned is not synchronized with access to the underlying store and is not
62         // thread-safe.
63         LastSyncResourceVersion() string
64 }
65
66 type SharedIndexInformer interface {
67         SharedInformer
68         // AddIndexers add indexers to the informer before it starts.
69         AddIndexers(indexers Indexers) error
70         GetIndexer() Indexer
71 }
72
73 // NewSharedInformer creates a new instance for the listwatcher.
74 func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
75         return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
76 }
77
78 // NewSharedIndexInformer creates a new instance for the listwatcher.
79 func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
80         realClock := &clock.RealClock{}
81         sharedIndexInformer := &sharedIndexInformer{
82                 processor:                       &sharedProcessor{clock: realClock},
83                 indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
84                 listerWatcher:                   lw,
85                 objectType:                      objType,
86                 resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
87                 defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
88                 cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
89                 clock:                           realClock,
90         }
91         return sharedIndexInformer
92 }
93
94 // InformerSynced is a function that can be used to determine if an informer has synced.  This is useful for determining if caches have synced.
95 type InformerSynced func() bool
96
97 const (
98         // syncedPollPeriod controls how often you look at the status of your sync funcs
99         syncedPollPeriod = 100 * time.Millisecond
100
101         // initialBufferSize is the initial number of event notifications that can be buffered.
102         initialBufferSize = 1024
103 )
104
105 // WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
106 // if the controller should shutdown
107 func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
108         err := wait.PollUntil(syncedPollPeriod,
109                 func() (bool, error) {
110                         for _, syncFunc := range cacheSyncs {
111                                 if !syncFunc() {
112                                         return false, nil
113                                 }
114                         }
115                         return true, nil
116                 },
117                 stopCh)
118         if err != nil {
119                 klog.V(2).Infof("stop requested")
120                 return false
121         }
122
123         klog.V(4).Infof("caches populated")
124         return true
125 }
126
127 type sharedIndexInformer struct {
128         indexer    Indexer
129         controller Controller
130
131         processor             *sharedProcessor
132         cacheMutationDetector CacheMutationDetector
133
134         // This block is tracked to handle late initialization of the controller
135         listerWatcher ListerWatcher
136         objectType    runtime.Object
137
138         // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
139         // shouldResync to check if any of our listeners need a resync.
140         resyncCheckPeriod time.Duration
141         // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
142         // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
143         // value).
144         defaultEventHandlerResyncPeriod time.Duration
145         // clock allows for testability
146         clock clock.Clock
147
148         started, stopped bool
149         startedLock      sync.Mutex
150
151         // blockDeltas gives a way to stop all event distribution so that a late event handler
152         // can safely join the shared informer.
153         blockDeltas sync.Mutex
154 }
155
156 // dummyController hides the fact that a SharedInformer is different from a dedicated one
157 // where a caller can `Run`.  The run method is disconnected in this case, because higher
158 // level logic will decide when to start the SharedInformer and related controller.
159 // Because returning information back is always asynchronous, the legacy callers shouldn't
160 // notice any change in behavior.
161 type dummyController struct {
162         informer *sharedIndexInformer
163 }
164
165 func (v *dummyController) Run(stopCh <-chan struct{}) {
166 }
167
168 func (v *dummyController) HasSynced() bool {
169         return v.informer.HasSynced()
170 }
171
172 func (c *dummyController) LastSyncResourceVersion() string {
173         return ""
174 }
175
176 type updateNotification struct {
177         oldObj interface{}
178         newObj interface{}
179 }
180
181 type addNotification struct {
182         newObj interface{}
183 }
184
185 type deleteNotification struct {
186         oldObj interface{}
187 }
188
189 func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
190         defer utilruntime.HandleCrash()
191
192         fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
193
194         cfg := &Config{
195                 Queue:            fifo,
196                 ListerWatcher:    s.listerWatcher,
197                 ObjectType:       s.objectType,
198                 FullResyncPeriod: s.resyncCheckPeriod,
199                 RetryOnError:     false,
200                 ShouldResync:     s.processor.shouldResync,
201
202                 Process: s.HandleDeltas,
203         }
204
205         func() {
206                 s.startedLock.Lock()
207                 defer s.startedLock.Unlock()
208
209                 s.controller = New(cfg)
210                 s.controller.(*controller).clock = s.clock
211                 s.started = true
212         }()
213
214         // Separate stop channel because Processor should be stopped strictly after controller
215         processorStopCh := make(chan struct{})
216         var wg wait.Group
217         defer wg.Wait()              // Wait for Processor to stop
218         defer close(processorStopCh) // Tell Processor to stop
219         wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
220         wg.StartWithChannel(processorStopCh, s.processor.run)
221
222         defer func() {
223                 s.startedLock.Lock()
224                 defer s.startedLock.Unlock()
225                 s.stopped = true // Don't want any new listeners
226         }()
227         s.controller.Run(stopCh)
228 }
229
230 func (s *sharedIndexInformer) HasSynced() bool {
231         s.startedLock.Lock()
232         defer s.startedLock.Unlock()
233
234         if s.controller == nil {
235                 return false
236         }
237         return s.controller.HasSynced()
238 }
239
240 func (s *sharedIndexInformer) LastSyncResourceVersion() string {
241         s.startedLock.Lock()
242         defer s.startedLock.Unlock()
243
244         if s.controller == nil {
245                 return ""
246         }
247         return s.controller.LastSyncResourceVersion()
248 }
249
250 func (s *sharedIndexInformer) GetStore() Store {
251         return s.indexer
252 }
253
254 func (s *sharedIndexInformer) GetIndexer() Indexer {
255         return s.indexer
256 }
257
258 func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
259         s.startedLock.Lock()
260         defer s.startedLock.Unlock()
261
262         if s.started {
263                 return fmt.Errorf("informer has already started")
264         }
265
266         return s.indexer.AddIndexers(indexers)
267 }
268
269 func (s *sharedIndexInformer) GetController() Controller {
270         return &dummyController{informer: s}
271 }
272
273 func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
274         s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
275 }
276
277 func determineResyncPeriod(desired, check time.Duration) time.Duration {
278         if desired == 0 {
279                 return desired
280         }
281         if check == 0 {
282                 klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
283                 return 0
284         }
285         if desired < check {
286                 klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
287                 return check
288         }
289         return desired
290 }
291
292 const minimumResyncPeriod = 1 * time.Second
293
294 func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
295         s.startedLock.Lock()
296         defer s.startedLock.Unlock()
297
298         if s.stopped {
299                 klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
300                 return
301         }
302
303         if resyncPeriod > 0 {
304                 if resyncPeriod < minimumResyncPeriod {
305                         klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
306                         resyncPeriod = minimumResyncPeriod
307                 }
308
309                 if resyncPeriod < s.resyncCheckPeriod {
310                         if s.started {
311                                 klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
312                                 resyncPeriod = s.resyncCheckPeriod
313                         } else {
314                                 // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
315                                 // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
316                                 // accordingly
317                                 s.resyncCheckPeriod = resyncPeriod
318                                 s.processor.resyncCheckPeriodChanged(resyncPeriod)
319                         }
320                 }
321         }
322
323         listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
324
325         if !s.started {
326                 s.processor.addListener(listener)
327                 return
328         }
329
330         // in order to safely join, we have to
331         // 1. stop sending add/update/delete notifications
332         // 2. do a list against the store
333         // 3. send synthetic "Add" events to the new handler
334         // 4. unblock
335         s.blockDeltas.Lock()
336         defer s.blockDeltas.Unlock()
337
338         s.processor.addListener(listener)
339         for _, item := range s.indexer.List() {
340                 listener.add(addNotification{newObj: item})
341         }
342 }
343
344 func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
345         s.blockDeltas.Lock()
346         defer s.blockDeltas.Unlock()
347
348         // from oldest to newest
349         for _, d := range obj.(Deltas) {
350                 switch d.Type {
351                 case Sync, Added, Updated:
352                         isSync := d.Type == Sync
353                         s.cacheMutationDetector.AddObject(d.Object)
354                         if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
355                                 if err := s.indexer.Update(d.Object); err != nil {
356                                         return err
357                                 }
358                                 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
359                         } else {
360                                 if err := s.indexer.Add(d.Object); err != nil {
361                                         return err
362                                 }
363                                 s.processor.distribute(addNotification{newObj: d.Object}, isSync)
364                         }
365                 case Deleted:
366                         if err := s.indexer.Delete(d.Object); err != nil {
367                                 return err
368                         }
369                         s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
370                 }
371         }
372         return nil
373 }
374
375 type sharedProcessor struct {
376         listenersStarted bool
377         listenersLock    sync.RWMutex
378         listeners        []*processorListener
379         syncingListeners []*processorListener
380         clock            clock.Clock
381         wg               wait.Group
382 }
383
384 func (p *sharedProcessor) addListener(listener *processorListener) {
385         p.listenersLock.Lock()
386         defer p.listenersLock.Unlock()
387
388         p.addListenerLocked(listener)
389         if p.listenersStarted {
390                 p.wg.Start(listener.run)
391                 p.wg.Start(listener.pop)
392         }
393 }
394
395 func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
396         p.listeners = append(p.listeners, listener)
397         p.syncingListeners = append(p.syncingListeners, listener)
398 }
399
400 func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
401         p.listenersLock.RLock()
402         defer p.listenersLock.RUnlock()
403
404         if sync {
405                 for _, listener := range p.syncingListeners {
406                         listener.add(obj)
407                 }
408         } else {
409                 for _, listener := range p.listeners {
410                         listener.add(obj)
411                 }
412         }
413 }
414
415 func (p *sharedProcessor) run(stopCh <-chan struct{}) {
416         func() {
417                 p.listenersLock.RLock()
418                 defer p.listenersLock.RUnlock()
419                 for _, listener := range p.listeners {
420                         p.wg.Start(listener.run)
421                         p.wg.Start(listener.pop)
422                 }
423                 p.listenersStarted = true
424         }()
425         <-stopCh
426         p.listenersLock.RLock()
427         defer p.listenersLock.RUnlock()
428         for _, listener := range p.listeners {
429                 close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
430         }
431         p.wg.Wait() // Wait for all .pop() and .run() to stop
432 }
433
434 // shouldResync queries every listener to determine if any of them need a resync, based on each
435 // listener's resyncPeriod.
436 func (p *sharedProcessor) shouldResync() bool {
437         p.listenersLock.Lock()
438         defer p.listenersLock.Unlock()
439
440         p.syncingListeners = []*processorListener{}
441
442         resyncNeeded := false
443         now := p.clock.Now()
444         for _, listener := range p.listeners {
445                 // need to loop through all the listeners to see if they need to resync so we can prepare any
446                 // listeners that are going to be resyncing.
447                 if listener.shouldResync(now) {
448                         resyncNeeded = true
449                         p.syncingListeners = append(p.syncingListeners, listener)
450                         listener.determineNextResync(now)
451                 }
452         }
453         return resyncNeeded
454 }
455
456 func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
457         p.listenersLock.RLock()
458         defer p.listenersLock.RUnlock()
459
460         for _, listener := range p.listeners {
461                 resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
462                 listener.setResyncPeriod(resyncPeriod)
463         }
464 }
465
466 type processorListener struct {
467         nextCh chan interface{}
468         addCh  chan interface{}
469
470         handler ResourceEventHandler
471
472         // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
473         // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
474         // added until we OOM.
475         // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
476         // we should try to do something better.
477         pendingNotifications buffer.RingGrowing
478
479         // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
480         requestedResyncPeriod time.Duration
481         // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
482         // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
483         // informer's overall resync check period.
484         resyncPeriod time.Duration
485         // nextResync is the earliest time the listener should get a full resync
486         nextResync time.Time
487         // resyncLock guards access to resyncPeriod and nextResync
488         resyncLock sync.Mutex
489 }
490
491 func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
492         ret := &processorListener{
493                 nextCh:                make(chan interface{}),
494                 addCh:                 make(chan interface{}),
495                 handler:               handler,
496                 pendingNotifications:  *buffer.NewRingGrowing(bufferSize),
497                 requestedResyncPeriod: requestedResyncPeriod,
498                 resyncPeriod:          resyncPeriod,
499         }
500
501         ret.determineNextResync(now)
502
503         return ret
504 }
505
506 func (p *processorListener) add(notification interface{}) {
507         p.addCh <- notification
508 }
509
510 func (p *processorListener) pop() {
511         defer utilruntime.HandleCrash()
512         defer close(p.nextCh) // Tell .run() to stop
513
514         var nextCh chan<- interface{}
515         var notification interface{}
516         for {
517                 select {
518                 case nextCh <- notification:
519                         // Notification dispatched
520                         var ok bool
521                         notification, ok = p.pendingNotifications.ReadOne()
522                         if !ok { // Nothing to pop
523                                 nextCh = nil // Disable this select case
524                         }
525                 case notificationToAdd, ok := <-p.addCh:
526                         if !ok {
527                                 return
528                         }
529                         if notification == nil { // No notification to pop (and pendingNotifications is empty)
530                                 // Optimize the case - skip adding to pendingNotifications
531                                 notification = notificationToAdd
532                                 nextCh = p.nextCh
533                         } else { // There is already a notification waiting to be dispatched
534                                 p.pendingNotifications.WriteOne(notificationToAdd)
535                         }
536                 }
537         }
538 }
539
540 func (p *processorListener) run() {
541         // this call blocks until the channel is closed.  When a panic happens during the notification
542         // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
543         // the next notification will be attempted.  This is usually better than the alternative of never
544         // delivering again.
545         stopCh := make(chan struct{})
546         wait.Until(func() {
547                 // this gives us a few quick retries before a long pause and then a few more quick retries
548                 err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
549                         for next := range p.nextCh {
550                                 switch notification := next.(type) {
551                                 case updateNotification:
552                                         p.handler.OnUpdate(notification.oldObj, notification.newObj)
553                                 case addNotification:
554                                         p.handler.OnAdd(notification.newObj)
555                                 case deleteNotification:
556                                         p.handler.OnDelete(notification.oldObj)
557                                 default:
558                                         utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
559                                 }
560                         }
561                         // the only way to get here is if the p.nextCh is empty and closed
562                         return true, nil
563                 })
564
565                 // the only way to get here is if the p.nextCh is empty and closed
566                 if err == nil {
567                         close(stopCh)
568                 }
569         }, 1*time.Minute, stopCh)
570 }
571
572 // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
573 // this always returns false.
574 func (p *processorListener) shouldResync(now time.Time) bool {
575         p.resyncLock.Lock()
576         defer p.resyncLock.Unlock()
577
578         if p.resyncPeriod == 0 {
579                 return false
580         }
581
582         return now.After(p.nextResync) || now.Equal(p.nextResync)
583 }
584
585 func (p *processorListener) determineNextResync(now time.Time) {
586         p.resyncLock.Lock()
587         defer p.resyncLock.Unlock()
588
589         p.nextResync = now.Add(p.resyncPeriod)
590 }
591
592 func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
593         p.resyncLock.Lock()
594         defer p.resyncLock.Unlock()
595
596         p.resyncPeriod = resyncPeriod
597 }