2 Copyright 2015 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
23 "k8s.io/apimachinery/pkg/runtime"
24 "k8s.io/apimachinery/pkg/util/clock"
25 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26 "k8s.io/apimachinery/pkg/util/wait"
29 // Config contains all the settings for a Controller.
31 // The queue for your objects; either a FIFO or
32 // a DeltaFIFO. Your Process() function should accept
33 // the output of this Queue's Pop() method.
36 // Something that can list and watch your objects.
39 // Something that can process your objects.
42 // The type of your objects.
43 ObjectType runtime.Object
45 // Reprocess everything at least this often.
46 // Note that if it takes longer for you to clear the queue than this
47 // period, you will end up processing items in the order determined
48 // by FIFO.Replace(). Currently, this is random. If this is a
49 // problem, we can change that replacement policy to append new
50 // things to the end of the queue instead of replacing the entire
52 FullResyncPeriod time.Duration
54 // ShouldResync, if specified, is invoked when the controller's reflector determines the next
55 // periodic sync should occur. If this returns true, it means the reflector should proceed with
57 ShouldResync ShouldResyncFunc
59 // If true, when Process() returns an error, re-enqueue the object.
60 // TODO: add interface to let you inject a delay/backoff or drop
61 // the object completely if desired. Pass the object in
62 // question to this interface as a parameter.
66 // ShouldResyncFunc is a type of function that indicates if a reflector should perform a
67 // resync or not. It can be used by a shared informer to support multiple event handlers with custom
69 type ShouldResyncFunc func() bool
71 // ProcessFunc processes a single object.
72 type ProcessFunc func(obj interface{}) error
74 // Controller is a generic controller framework.
75 type controller struct {
78 reflectorMutex sync.RWMutex
82 type Controller interface {
83 Run(stopCh <-chan struct{})
85 LastSyncResourceVersion() string
88 // New makes a new Controller from the given Config.
89 func New(c *Config) Controller {
92 clock: &clock.RealClock{},
97 // Run begins processing items, and will continue until a value is sent down stopCh.
98 // It's an error to call Run more than once.
99 // Run blocks; call via go.
100 func (c *controller) Run(stopCh <-chan struct{}) {
101 defer utilruntime.HandleCrash()
104 c.config.Queue.Close()
107 c.config.ListerWatcher,
110 c.config.FullResyncPeriod,
112 r.ShouldResync = c.config.ShouldResync
115 c.reflectorMutex.Lock()
117 c.reflectorMutex.Unlock()
122 wg.StartWithChannel(stopCh, r.Run)
124 wait.Until(c.processLoop, time.Second, stopCh)
127 // Returns true once this controller has completed an initial resource listing
128 func (c *controller) HasSynced() bool {
129 return c.config.Queue.HasSynced()
132 func (c *controller) LastSyncResourceVersion() string {
133 if c.reflector == nil {
136 return c.reflector.LastSyncResourceVersion()
139 // processLoop drains the work queue.
140 // TODO: Consider doing the processing in parallel. This will require a little thought
141 // to make sure that we don't end up processing the same object multiple times
144 // TODO: Plumb through the stopCh here (and down to the queue) so that this can
145 // actually exit when the controller is stopped. Or just give up on this stuff
146 // ever being stoppable. Converting this whole package to use Context would
148 func (c *controller) processLoop() {
150 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
152 if err == FIFOClosedError {
155 if c.config.RetryOnError {
156 // This is the safe way to re-enqueue.
157 c.config.Queue.AddIfNotPresent(obj)
163 // ResourceEventHandler can handle notifications for events that happen to a
164 // resource. The events are informational only, so you can't return an
166 // * OnAdd is called when an object is added.
167 // * OnUpdate is called when an object is modified. Note that oldObj is the
168 // last known state of the object-- it is possible that several changes
169 // were combined together, so you can't use this to see every single
170 // change. OnUpdate is also called when a re-list happens, and it will
171 // get called even if nothing changed. This is useful for periodically
172 // evaluating or syncing something.
173 // * OnDelete will get the final state of the item if it is known, otherwise
174 // it will get an object of type DeletedFinalStateUnknown. This can
175 // happen if the watch is closed and misses the delete event and we don't
176 // notice the deletion until the subsequent re-list.
177 type ResourceEventHandler interface {
178 OnAdd(obj interface{})
179 OnUpdate(oldObj, newObj interface{})
180 OnDelete(obj interface{})
183 // ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
184 // as few of the notification functions as you want while still implementing
185 // ResourceEventHandler.
186 type ResourceEventHandlerFuncs struct {
187 AddFunc func(obj interface{})
188 UpdateFunc func(oldObj, newObj interface{})
189 DeleteFunc func(obj interface{})
192 // OnAdd calls AddFunc if it's not nil.
193 func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
194 if r.AddFunc != nil {
199 // OnUpdate calls UpdateFunc if it's not nil.
200 func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
201 if r.UpdateFunc != nil {
202 r.UpdateFunc(oldObj, newObj)
206 // OnDelete calls DeleteFunc if it's not nil.
207 func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
208 if r.DeleteFunc != nil {
213 // FilteringResourceEventHandler applies the provided filter to all events coming
214 // in, ensuring the appropriate nested handler method is invoked. An object
215 // that starts passing the filter after an update is considered an add, and an
216 // object that stops passing the filter after an update is considered a delete.
217 type FilteringResourceEventHandler struct {
218 FilterFunc func(obj interface{}) bool
219 Handler ResourceEventHandler
222 // OnAdd calls the nested handler only if the filter succeeds
223 func (r FilteringResourceEventHandler) OnAdd(obj interface{}) {
224 if !r.FilterFunc(obj) {
230 // OnUpdate ensures the proper handler is called depending on whether the filter matches
231 func (r FilteringResourceEventHandler) OnUpdate(oldObj, newObj interface{}) {
232 newer := r.FilterFunc(newObj)
233 older := r.FilterFunc(oldObj)
236 r.Handler.OnUpdate(oldObj, newObj)
237 case newer && !older:
238 r.Handler.OnAdd(newObj)
239 case !newer && older:
240 r.Handler.OnDelete(oldObj)
246 // OnDelete calls the nested handler only if the filter succeeds
247 func (r FilteringResourceEventHandler) OnDelete(obj interface{}) {
248 if !r.FilterFunc(obj) {
251 r.Handler.OnDelete(obj)
254 // DeletionHandlingMetaNamespaceKeyFunc checks for
255 // DeletedFinalStateUnknown objects before calling
256 // MetaNamespaceKeyFunc.
257 func DeletionHandlingMetaNamespaceKeyFunc(obj interface{}) (string, error) {
258 if d, ok := obj.(DeletedFinalStateUnknown); ok {
261 return MetaNamespaceKeyFunc(obj)
264 // NewInformer returns a Store and a controller for populating the store
265 // while also providing event notifications. You should only used the returned
266 // Store for Get/List operations; Add/Modify/Deletes will cause the event
267 // notifications to be faulty.
270 // * lw is list and watch functions for the source of the resource you want to
272 // * objType is an object of the type that you expect to receive.
273 // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
274 // calls, even if nothing changed). Otherwise, re-list will be delayed as
275 // long as possible (until the upstream source closes the watch or times out,
276 // or you stop the controller).
277 // * h is the object you want notifications sent to.
281 objType runtime.Object,
282 resyncPeriod time.Duration,
283 h ResourceEventHandler,
284 ) (Store, Controller) {
285 // This will hold the client state, as we know it.
286 clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
288 // This will hold incoming changes. Note how we pass clientState in as a
289 // KeyLister, that way resync operations will result in the correct set
290 // of update/delete deltas.
291 fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
297 FullResyncPeriod: resyncPeriod,
300 Process: func(obj interface{}) error {
301 // from oldest to newest
302 for _, d := range obj.(Deltas) {
304 case Sync, Added, Updated:
305 if old, exists, err := clientState.Get(d.Object); err == nil && exists {
306 if err := clientState.Update(d.Object); err != nil {
309 h.OnUpdate(old, d.Object)
311 if err := clientState.Add(d.Object); err != nil {
317 if err := clientState.Delete(d.Object); err != nil {
326 return clientState, New(cfg)
329 // NewIndexerInformer returns a Indexer and a controller for populating the index
330 // while also providing event notifications. You should only used the returned
331 // Index for Get/List operations; Add/Modify/Deletes will cause the event
332 // notifications to be faulty.
335 // * lw is list and watch functions for the source of the resource you want to
337 // * objType is an object of the type that you expect to receive.
338 // * resyncPeriod: if non-zero, will re-list this often (you will get OnUpdate
339 // calls, even if nothing changed). Otherwise, re-list will be delayed as
340 // long as possible (until the upstream source closes the watch or times out,
341 // or you stop the controller).
342 // * h is the object you want notifications sent to.
343 // * indexers is the indexer for the received object type.
345 func NewIndexerInformer(
347 objType runtime.Object,
348 resyncPeriod time.Duration,
349 h ResourceEventHandler,
351 ) (Indexer, Controller) {
352 // This will hold the client state, as we know it.
353 clientState := NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
355 // This will hold incoming changes. Note how we pass clientState in as a
356 // KeyLister, that way resync operations will result in the correct set
357 // of update/delete deltas.
358 fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
364 FullResyncPeriod: resyncPeriod,
367 Process: func(obj interface{}) error {
368 // from oldest to newest
369 for _, d := range obj.(Deltas) {
371 case Sync, Added, Updated:
372 if old, exists, err := clientState.Get(d.Object); err == nil && exists {
373 if err := clientState.Update(d.Object); err != nil {
376 h.OnUpdate(old, d.Object)
378 if err := clientState.Add(d.Object); err != nil {
384 if err := clientState.Delete(d.Object); err != nil {
393 return clientState, New(cfg)