2 Copyright 2015 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
24 "k8s.io/apimachinery/pkg/runtime"
25 "k8s.io/apimachinery/pkg/util/clock"
26 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27 "k8s.io/apimachinery/pkg/util/wait"
28 "k8s.io/client-go/util/buffer"
29 "k8s.io/client-go/util/retry"
34 // SharedInformer has a shared data cache and is capable of distributing notifications for changes
35 // to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is
36 // one behavior change compared to a standard Informer. When you receive a notification, the cache
37 // will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend
38 // on the contents of the cache exactly matching the notification you've received in handler
39 // functions. If there was a create, followed by a delete, the cache may NOT have your item. This
40 // has advantages over the broadcaster since it allows us to share a common cache across many
41 // controllers. Extending the broadcaster would have required us keep duplicate caches for each
43 type SharedInformer interface {
44 // AddEventHandler adds an event handler to the shared informer using the shared informer's resync
45 // period. Events to a single handler are delivered sequentially, but there is no coordination
46 // between different handlers.
47 AddEventHandler(handler ResourceEventHandler)
48 // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
49 // specified resync period. Events to a single handler are delivered sequentially, but there is
50 // no coordination between different handlers.
51 AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
52 // GetStore returns the Store.
54 // GetController gives back a synthetic interface that "votes" to start the informer
55 GetController() Controller
56 // Run starts the shared informer, which will be stopped when stopCh is closed.
57 Run(stopCh <-chan struct{})
58 // HasSynced returns true if the shared informer's store has synced.
60 // LastSyncResourceVersion is the resource version observed when last synced with the underlying
61 // store. The value returned is not synchronized with access to the underlying store and is not
63 LastSyncResourceVersion() string
66 type SharedIndexInformer interface {
68 // AddIndexers add indexers to the informer before it starts.
69 AddIndexers(indexers Indexers) error
73 // NewSharedInformer creates a new instance for the listwatcher.
74 func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
75 return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
78 // NewSharedIndexInformer creates a new instance for the listwatcher.
79 func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
80 realClock := &clock.RealClock{}
81 sharedIndexInformer := &sharedIndexInformer{
82 processor: &sharedProcessor{clock: realClock},
83 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
86 resyncCheckPeriod: defaultEventHandlerResyncPeriod,
87 defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
88 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
91 return sharedIndexInformer
94 // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
95 type InformerSynced func() bool
98 // syncedPollPeriod controls how often you look at the status of your sync funcs
99 syncedPollPeriod = 100 * time.Millisecond
101 // initialBufferSize is the initial number of event notifications that can be buffered.
102 initialBufferSize = 1024
105 // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
106 // if the controller should shutdown
107 func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
108 err := wait.PollUntil(syncedPollPeriod,
109 func() (bool, error) {
110 for _, syncFunc := range cacheSyncs {
119 klog.V(2).Infof("stop requested")
123 klog.V(4).Infof("caches populated")
127 type sharedIndexInformer struct {
129 controller Controller
131 processor *sharedProcessor
132 cacheMutationDetector CacheMutationDetector
134 // This block is tracked to handle late initialization of the controller
135 listerWatcher ListerWatcher
136 objectType runtime.Object
138 // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
139 // shouldResync to check if any of our listeners need a resync.
140 resyncCheckPeriod time.Duration
141 // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
142 // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
144 defaultEventHandlerResyncPeriod time.Duration
145 // clock allows for testability
148 started, stopped bool
149 startedLock sync.Mutex
151 // blockDeltas gives a way to stop all event distribution so that a late event handler
152 // can safely join the shared informer.
153 blockDeltas sync.Mutex
156 // dummyController hides the fact that a SharedInformer is different from a dedicated one
157 // where a caller can `Run`. The run method is disconnected in this case, because higher
158 // level logic will decide when to start the SharedInformer and related controller.
159 // Because returning information back is always asynchronous, the legacy callers shouldn't
160 // notice any change in behavior.
161 type dummyController struct {
162 informer *sharedIndexInformer
165 func (v *dummyController) Run(stopCh <-chan struct{}) {
168 func (v *dummyController) HasSynced() bool {
169 return v.informer.HasSynced()
172 func (c *dummyController) LastSyncResourceVersion() string {
176 type updateNotification struct {
181 type addNotification struct {
185 type deleteNotification struct {
189 func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
190 defer utilruntime.HandleCrash()
192 fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
196 ListerWatcher: s.listerWatcher,
197 ObjectType: s.objectType,
198 FullResyncPeriod: s.resyncCheckPeriod,
200 ShouldResync: s.processor.shouldResync,
202 Process: s.HandleDeltas,
207 defer s.startedLock.Unlock()
209 s.controller = New(cfg)
210 s.controller.(*controller).clock = s.clock
214 // Separate stop channel because Processor should be stopped strictly after controller
215 processorStopCh := make(chan struct{})
217 defer wg.Wait() // Wait for Processor to stop
218 defer close(processorStopCh) // Tell Processor to stop
219 wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
220 wg.StartWithChannel(processorStopCh, s.processor.run)
224 defer s.startedLock.Unlock()
225 s.stopped = true // Don't want any new listeners
227 s.controller.Run(stopCh)
230 func (s *sharedIndexInformer) HasSynced() bool {
232 defer s.startedLock.Unlock()
234 if s.controller == nil {
237 return s.controller.HasSynced()
240 func (s *sharedIndexInformer) LastSyncResourceVersion() string {
242 defer s.startedLock.Unlock()
244 if s.controller == nil {
247 return s.controller.LastSyncResourceVersion()
250 func (s *sharedIndexInformer) GetStore() Store {
254 func (s *sharedIndexInformer) GetIndexer() Indexer {
258 func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
260 defer s.startedLock.Unlock()
263 return fmt.Errorf("informer has already started")
266 return s.indexer.AddIndexers(indexers)
269 func (s *sharedIndexInformer) GetController() Controller {
270 return &dummyController{informer: s}
273 func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
274 s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
277 func determineResyncPeriod(desired, check time.Duration) time.Duration {
282 klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
286 klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
292 const minimumResyncPeriod = 1 * time.Second
294 func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
296 defer s.startedLock.Unlock()
299 klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
303 if resyncPeriod > 0 {
304 if resyncPeriod < minimumResyncPeriod {
305 klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
306 resyncPeriod = minimumResyncPeriod
309 if resyncPeriod < s.resyncCheckPeriod {
311 klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
312 resyncPeriod = s.resyncCheckPeriod
314 // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
315 // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
317 s.resyncCheckPeriod = resyncPeriod
318 s.processor.resyncCheckPeriodChanged(resyncPeriod)
323 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
326 s.processor.addListener(listener)
330 // in order to safely join, we have to
331 // 1. stop sending add/update/delete notifications
332 // 2. do a list against the store
333 // 3. send synthetic "Add" events to the new handler
336 defer s.blockDeltas.Unlock()
338 s.processor.addListener(listener)
339 for _, item := range s.indexer.List() {
340 listener.add(addNotification{newObj: item})
344 func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
346 defer s.blockDeltas.Unlock()
348 // from oldest to newest
349 for _, d := range obj.(Deltas) {
351 case Sync, Added, Updated:
352 isSync := d.Type == Sync
353 s.cacheMutationDetector.AddObject(d.Object)
354 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
355 if err := s.indexer.Update(d.Object); err != nil {
358 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
360 if err := s.indexer.Add(d.Object); err != nil {
363 s.processor.distribute(addNotification{newObj: d.Object}, isSync)
366 if err := s.indexer.Delete(d.Object); err != nil {
369 s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
375 type sharedProcessor struct {
376 listenersStarted bool
377 listenersLock sync.RWMutex
378 listeners []*processorListener
379 syncingListeners []*processorListener
384 func (p *sharedProcessor) addListener(listener *processorListener) {
385 p.listenersLock.Lock()
386 defer p.listenersLock.Unlock()
388 p.addListenerLocked(listener)
389 if p.listenersStarted {
390 p.wg.Start(listener.run)
391 p.wg.Start(listener.pop)
395 func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
396 p.listeners = append(p.listeners, listener)
397 p.syncingListeners = append(p.syncingListeners, listener)
400 func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
401 p.listenersLock.RLock()
402 defer p.listenersLock.RUnlock()
405 for _, listener := range p.syncingListeners {
409 for _, listener := range p.listeners {
415 func (p *sharedProcessor) run(stopCh <-chan struct{}) {
417 p.listenersLock.RLock()
418 defer p.listenersLock.RUnlock()
419 for _, listener := range p.listeners {
420 p.wg.Start(listener.run)
421 p.wg.Start(listener.pop)
423 p.listenersStarted = true
426 p.listenersLock.RLock()
427 defer p.listenersLock.RUnlock()
428 for _, listener := range p.listeners {
429 close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
431 p.wg.Wait() // Wait for all .pop() and .run() to stop
434 // shouldResync queries every listener to determine if any of them need a resync, based on each
435 // listener's resyncPeriod.
436 func (p *sharedProcessor) shouldResync() bool {
437 p.listenersLock.Lock()
438 defer p.listenersLock.Unlock()
440 p.syncingListeners = []*processorListener{}
442 resyncNeeded := false
444 for _, listener := range p.listeners {
445 // need to loop through all the listeners to see if they need to resync so we can prepare any
446 // listeners that are going to be resyncing.
447 if listener.shouldResync(now) {
449 p.syncingListeners = append(p.syncingListeners, listener)
450 listener.determineNextResync(now)
456 func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
457 p.listenersLock.RLock()
458 defer p.listenersLock.RUnlock()
460 for _, listener := range p.listeners {
461 resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
462 listener.setResyncPeriod(resyncPeriod)
466 type processorListener struct {
467 nextCh chan interface{}
468 addCh chan interface{}
470 handler ResourceEventHandler
472 // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
473 // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
474 // added until we OOM.
475 // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
476 // we should try to do something better.
477 pendingNotifications buffer.RingGrowing
479 // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
480 requestedResyncPeriod time.Duration
481 // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
482 // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
483 // informer's overall resync check period.
484 resyncPeriod time.Duration
485 // nextResync is the earliest time the listener should get a full resync
487 // resyncLock guards access to resyncPeriod and nextResync
488 resyncLock sync.Mutex
491 func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
492 ret := &processorListener{
493 nextCh: make(chan interface{}),
494 addCh: make(chan interface{}),
496 pendingNotifications: *buffer.NewRingGrowing(bufferSize),
497 requestedResyncPeriod: requestedResyncPeriod,
498 resyncPeriod: resyncPeriod,
501 ret.determineNextResync(now)
506 func (p *processorListener) add(notification interface{}) {
507 p.addCh <- notification
510 func (p *processorListener) pop() {
511 defer utilruntime.HandleCrash()
512 defer close(p.nextCh) // Tell .run() to stop
514 var nextCh chan<- interface{}
515 var notification interface{}
518 case nextCh <- notification:
519 // Notification dispatched
521 notification, ok = p.pendingNotifications.ReadOne()
522 if !ok { // Nothing to pop
523 nextCh = nil // Disable this select case
525 case notificationToAdd, ok := <-p.addCh:
529 if notification == nil { // No notification to pop (and pendingNotifications is empty)
530 // Optimize the case - skip adding to pendingNotifications
531 notification = notificationToAdd
533 } else { // There is already a notification waiting to be dispatched
534 p.pendingNotifications.WriteOne(notificationToAdd)
540 func (p *processorListener) run() {
541 // this call blocks until the channel is closed. When a panic happens during the notification
542 // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
543 // the next notification will be attempted. This is usually better than the alternative of never
545 stopCh := make(chan struct{})
547 // this gives us a few quick retries before a long pause and then a few more quick retries
548 err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
549 for next := range p.nextCh {
550 switch notification := next.(type) {
551 case updateNotification:
552 p.handler.OnUpdate(notification.oldObj, notification.newObj)
553 case addNotification:
554 p.handler.OnAdd(notification.newObj)
555 case deleteNotification:
556 p.handler.OnDelete(notification.oldObj)
558 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
561 // the only way to get here is if the p.nextCh is empty and closed
565 // the only way to get here is if the p.nextCh is empty and closed
569 }, 1*time.Minute, stopCh)
572 // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
573 // this always returns false.
574 func (p *processorListener) shouldResync(now time.Time) bool {
576 defer p.resyncLock.Unlock()
578 if p.resyncPeriod == 0 {
582 return now.After(p.nextResync) || now.Equal(p.nextResync)
585 func (p *processorListener) determineNextResync(now time.Time) {
587 defer p.resyncLock.Unlock()
589 p.nextResync = now.Add(p.resyncPeriod)
592 func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) {
594 defer p.resyncLock.Unlock()
596 p.resyncPeriod = resyncPeriod