2 Copyright 2014 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
34 apierrs "k8s.io/apimachinery/pkg/api/errors"
35 "k8s.io/apimachinery/pkg/api/meta"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/runtime"
38 "k8s.io/apimachinery/pkg/util/clock"
39 "k8s.io/apimachinery/pkg/util/naming"
40 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
41 "k8s.io/apimachinery/pkg/util/wait"
42 "k8s.io/apimachinery/pkg/watch"
46 // Reflector watches a specified resource and causes all changes to be reflected in the given store.
47 type Reflector struct {
48 // name identifies this reflector. By default it will be a file:line if possible.
50 // metrics tracks basic metric information about the reflector
51 metrics *reflectorMetrics
53 // The type of object we expect to place in the store.
54 expectedType reflect.Type
55 // The destination to sync up with the watch source
57 // listerWatcher is used to perform lists and watches.
58 listerWatcher ListerWatcher
59 // period controls timing between one watch ending and
60 // the beginning of the next one.
62 resyncPeriod time.Duration
63 ShouldResync func() bool
64 // clock allows tests to manipulate time
66 // lastSyncResourceVersion is the resource version token last
67 // observed when doing a sync with the underlying store
68 // it is thread safe, but not synchronized with the underlying store
69 lastSyncResourceVersion string
70 // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
71 lastSyncResourceVersionMutex sync.RWMutex
75 // We try to spread the load on apiserver by setting timeouts for
76 // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
77 minWatchTimeout = 5 * time.Minute
80 // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
81 // The indexer is configured to key on namespace
82 func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
83 indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
84 reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
85 return indexer, reflector
88 // NewReflector creates a new Reflector object which will keep the given store up to
89 // date with the server's contents for the given resource. Reflector promises to
90 // only put things in the store that have the type of expectedType, unless expectedType
91 // is nil. If resyncPeriod is non-zero, then lists will be executed after every
92 // resyncPeriod, so that you can use reflectors to periodically process everything as
93 // well as incrementally processing the things that change.
94 func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
95 return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
98 // reflectorDisambiguator is used to disambiguate started reflectors.
99 // initialized to an unstable value to ensure meaning isn't attributed to the suffix.
100 var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
102 // NewNamedReflector same as NewReflector, but with a specified name for logging
103 func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
104 reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
107 // we need this to be unique per process (some names are still the same) but obvious who it belongs to
108 metrics: newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
111 expectedType: reflect.TypeOf(expectedType),
113 resyncPeriod: resyncPeriod,
114 clock: &clock.RealClock{},
119 func makeValidPrometheusMetricLabel(in string) string {
120 // this isn't perfect, but it removes our common characters
121 return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in)
124 // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
125 // call chains to NewReflector, so they'd be low entropy names for reflectors
126 var internalPackages = []string{"client-go/tools/cache/"}
128 // Run starts a watch and handles watch events. Will restart the watch if it is closed.
129 // Run will exit when stopCh is closed.
130 func (r *Reflector) Run(stopCh <-chan struct{}) {
131 klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
133 if err := r.ListAndWatch(stopCh); err != nil {
134 utilruntime.HandleError(err)
140 // nothing will ever be sent down this channel
141 neverExitWatch <-chan time.Time = make(chan time.Time)
143 // Used to indicate that watching stopped so that a resync could happen.
144 errorResyncRequested = errors.New("resync channel fired")
146 // Used to indicate that watching stopped because of a signal from the stop
147 // channel passed in from a client of the reflector.
148 errorStopRequested = errors.New("Stop requested")
151 // resyncChan returns a channel which will receive something when a resync is
152 // required, and a cleanup function.
153 func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
154 if r.resyncPeriod == 0 {
155 return neverExitWatch, func() bool { return false }
157 // The cleanup function is required: imagine the scenario where watches
158 // always fail so we end up listing frequently. Then, if we don't
159 // manually stop the timer, we could end up with many timers active
161 t := r.clock.NewTimer(r.resyncPeriod)
165 // ListAndWatch first lists all items and get the resource version at the moment of call,
166 // and then use the resource version to watch.
167 // It returns error if ListAndWatch didn't even try to initialize watch.
168 func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
169 klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
170 var resourceVersion string
172 // Explicitly set "0" as resource version - it's fine for the List()
173 // to be served from cache and potentially be delayed relative to
174 // etcd contents. Reflector framework will catch up via Watch() eventually.
175 options := metav1.ListOptions{ResourceVersion: "0"}
176 r.metrics.numberOfLists.Inc()
177 start := r.clock.Now()
178 list, err := r.listerWatcher.List(options)
180 return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
182 r.metrics.listDuration.Observe(time.Since(start).Seconds())
183 listMetaInterface, err := meta.ListAccessor(list)
185 return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
187 resourceVersion = listMetaInterface.GetResourceVersion()
188 items, err := meta.ExtractList(list)
190 return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
192 r.metrics.numberOfItemsInList.Observe(float64(len(items)))
193 if err := r.syncWith(items, resourceVersion); err != nil {
194 return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
196 r.setLastSyncResourceVersion(resourceVersion)
198 resyncerrc := make(chan error, 1)
199 cancelCh := make(chan struct{})
200 defer close(cancelCh)
202 resyncCh, cleanup := r.resyncChan()
204 cleanup() // Call the last one written into cleanup
214 if r.ShouldResync == nil || r.ShouldResync() {
215 klog.V(4).Infof("%s: forcing resync", r.name)
216 if err := r.store.Resync(); err != nil {
222 resyncCh, cleanup = r.resyncChan()
227 // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
234 timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
235 options = metav1.ListOptions{
236 ResourceVersion: resourceVersion,
237 // We want to avoid situations of hanging watchers. Stop any wachers that do not
238 // receive any events within the timeout window.
239 TimeoutSeconds: &timeoutSeconds,
242 r.metrics.numberOfWatches.Inc()
243 w, err := r.listerWatcher.Watch(options)
247 // watch closed normally
248 case io.ErrUnexpectedEOF:
249 klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
251 utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
253 // If this is "connection refused" error, it means that most likely apiserver is not responsive.
254 // It doesn't make sense to re-list all objects because most likely we will be able to restart
255 // watch where we ended.
256 // If that's the case wait and resend watch request.
257 if urlError, ok := err.(*url.Error); ok {
258 if opError, ok := urlError.Err.(*net.OpError); ok {
259 if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
260 time.Sleep(time.Second)
268 if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
269 if err != errorStopRequested {
270 klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
277 // syncWith replaces the store's items with the given list.
278 func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
279 found := make([]interface{}, 0, len(items))
280 for _, item := range items {
281 found = append(found, item)
283 return r.store.Replace(found, resourceVersion)
286 // watchHandler watches w and keeps *resourceVersion up to date.
287 func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
288 start := r.clock.Now()
291 // Stopping the watcher should be idempotent and if we return from this function there's no way
292 // we're coming back in with the same watch interface.
296 r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
297 r.metrics.watchDuration.Observe(time.Since(start).Seconds())
304 return errorStopRequested
307 case event, ok := <-w.ResultChan():
311 if event.Type == watch.Error {
312 return apierrs.FromObject(event.Object)
314 if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
315 utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
318 meta, err := meta.Accessor(event.Object)
320 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
323 newResourceVersion := meta.GetResourceVersion()
326 err := r.store.Add(event.Object)
328 utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
331 err := r.store.Update(event.Object)
333 utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
336 // TODO: Will any consumers need access to the "last known
337 // state", which is passed in event.Object? If so, may need
339 err := r.store.Delete(event.Object)
341 utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
344 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
346 *resourceVersion = newResourceVersion
347 r.setLastSyncResourceVersion(newResourceVersion)
352 watchDuration := r.clock.Now().Sub(start)
353 if watchDuration < 1*time.Second && eventCount == 0 {
354 r.metrics.numberOfShortWatches.Inc()
355 return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
357 klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
361 // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
362 // The value returned is not synchronized with access to the underlying store and is not thread-safe
363 func (r *Reflector) LastSyncResourceVersion() string {
364 r.lastSyncResourceVersionMutex.RLock()
365 defer r.lastSyncResourceVersionMutex.RUnlock()
366 return r.lastSyncResourceVersion
369 func (r *Reflector) setLastSyncResourceVersion(v string) {
370 r.lastSyncResourceVersionMutex.Lock()
371 defer r.lastSyncResourceVersionMutex.Unlock()
372 r.lastSyncResourceVersion = v
374 rv, err := strconv.Atoi(v)
376 r.metrics.lastResourceVersion.Set(float64(rv))