Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / tools / cache / reflector.go
1 /*
2 Copyright 2014 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package cache
18
19 import (
20         "errors"
21         "fmt"
22         "io"
23         "math/rand"
24         "net"
25         "net/url"
26         "reflect"
27         "strconv"
28         "strings"
29         "sync"
30         "sync/atomic"
31         "syscall"
32         "time"
33
34         apierrs "k8s.io/apimachinery/pkg/api/errors"
35         "k8s.io/apimachinery/pkg/api/meta"
36         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37         "k8s.io/apimachinery/pkg/runtime"
38         "k8s.io/apimachinery/pkg/util/clock"
39         "k8s.io/apimachinery/pkg/util/naming"
40         utilruntime "k8s.io/apimachinery/pkg/util/runtime"
41         "k8s.io/apimachinery/pkg/util/wait"
42         "k8s.io/apimachinery/pkg/watch"
43         "k8s.io/klog"
44 )
45
46 // Reflector watches a specified resource and causes all changes to be reflected in the given store.
47 type Reflector struct {
48         // name identifies this reflector. By default it will be a file:line if possible.
49         name string
50         // metrics tracks basic metric information about the reflector
51         metrics *reflectorMetrics
52
53         // The type of object we expect to place in the store.
54         expectedType reflect.Type
55         // The destination to sync up with the watch source
56         store Store
57         // listerWatcher is used to perform lists and watches.
58         listerWatcher ListerWatcher
59         // period controls timing between one watch ending and
60         // the beginning of the next one.
61         period       time.Duration
62         resyncPeriod time.Duration
63         ShouldResync func() bool
64         // clock allows tests to manipulate time
65         clock clock.Clock
66         // lastSyncResourceVersion is the resource version token last
67         // observed when doing a sync with the underlying store
68         // it is thread safe, but not synchronized with the underlying store
69         lastSyncResourceVersion string
70         // lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
71         lastSyncResourceVersionMutex sync.RWMutex
72 }
73
74 var (
75         // We try to spread the load on apiserver by setting timeouts for
76         // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout].
77         minWatchTimeout = 5 * time.Minute
78 )
79
80 // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
81 // The indexer is configured to key on namespace
82 func NewNamespaceKeyedIndexerAndReflector(lw ListerWatcher, expectedType interface{}, resyncPeriod time.Duration) (indexer Indexer, reflector *Reflector) {
83         indexer = NewIndexer(MetaNamespaceKeyFunc, Indexers{"namespace": MetaNamespaceIndexFunc})
84         reflector = NewReflector(lw, expectedType, indexer, resyncPeriod)
85         return indexer, reflector
86 }
87
88 // NewReflector creates a new Reflector object which will keep the given store up to
89 // date with the server's contents for the given resource. Reflector promises to
90 // only put things in the store that have the type of expectedType, unless expectedType
91 // is nil. If resyncPeriod is non-zero, then lists will be executed after every
92 // resyncPeriod, so that you can use reflectors to periodically process everything as
93 // well as incrementally processing the things that change.
94 func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
95         return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
96 }
97
98 // reflectorDisambiguator is used to disambiguate started reflectors.
99 // initialized to an unstable value to ensure meaning isn't attributed to the suffix.
100 var reflectorDisambiguator = int64(time.Now().UnixNano() % 12345)
101
102 // NewNamedReflector same as NewReflector, but with a specified name for logging
103 func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
104         reflectorSuffix := atomic.AddInt64(&reflectorDisambiguator, 1)
105         r := &Reflector{
106                 name: name,
107                 // we need this to be unique per process (some names are still the same) but obvious who it belongs to
108                 metrics:       newReflectorMetrics(makeValidPrometheusMetricLabel(fmt.Sprintf("reflector_"+name+"_%d", reflectorSuffix))),
109                 listerWatcher: lw,
110                 store:         store,
111                 expectedType:  reflect.TypeOf(expectedType),
112                 period:        time.Second,
113                 resyncPeriod:  resyncPeriod,
114                 clock:         &clock.RealClock{},
115         }
116         return r
117 }
118
119 func makeValidPrometheusMetricLabel(in string) string {
120         // this isn't perfect, but it removes our common characters
121         return strings.NewReplacer("/", "_", ".", "_", "-", "_", ":", "_").Replace(in)
122 }
123
124 // internalPackages are packages that ignored when creating a default reflector name. These packages are in the common
125 // call chains to NewReflector, so they'd be low entropy names for reflectors
126 var internalPackages = []string{"client-go/tools/cache/"}
127
128 // Run starts a watch and handles watch events. Will restart the watch if it is closed.
129 // Run will exit when stopCh is closed.
130 func (r *Reflector) Run(stopCh <-chan struct{}) {
131         klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
132         wait.Until(func() {
133                 if err := r.ListAndWatch(stopCh); err != nil {
134                         utilruntime.HandleError(err)
135                 }
136         }, r.period, stopCh)
137 }
138
139 var (
140         // nothing will ever be sent down this channel
141         neverExitWatch <-chan time.Time = make(chan time.Time)
142
143         // Used to indicate that watching stopped so that a resync could happen.
144         errorResyncRequested = errors.New("resync channel fired")
145
146         // Used to indicate that watching stopped because of a signal from the stop
147         // channel passed in from a client of the reflector.
148         errorStopRequested = errors.New("Stop requested")
149 )
150
151 // resyncChan returns a channel which will receive something when a resync is
152 // required, and a cleanup function.
153 func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) {
154         if r.resyncPeriod == 0 {
155                 return neverExitWatch, func() bool { return false }
156         }
157         // The cleanup function is required: imagine the scenario where watches
158         // always fail so we end up listing frequently. Then, if we don't
159         // manually stop the timer, we could end up with many timers active
160         // concurrently.
161         t := r.clock.NewTimer(r.resyncPeriod)
162         return t.C(), t.Stop
163 }
164
165 // ListAndWatch first lists all items and get the resource version at the moment of call,
166 // and then use the resource version to watch.
167 // It returns error if ListAndWatch didn't even try to initialize watch.
168 func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
169         klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
170         var resourceVersion string
171
172         // Explicitly set "0" as resource version - it's fine for the List()
173         // to be served from cache and potentially be delayed relative to
174         // etcd contents. Reflector framework will catch up via Watch() eventually.
175         options := metav1.ListOptions{ResourceVersion: "0"}
176         r.metrics.numberOfLists.Inc()
177         start := r.clock.Now()
178         list, err := r.listerWatcher.List(options)
179         if err != nil {
180                 return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
181         }
182         r.metrics.listDuration.Observe(time.Since(start).Seconds())
183         listMetaInterface, err := meta.ListAccessor(list)
184         if err != nil {
185                 return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
186         }
187         resourceVersion = listMetaInterface.GetResourceVersion()
188         items, err := meta.ExtractList(list)
189         if err != nil {
190                 return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
191         }
192         r.metrics.numberOfItemsInList.Observe(float64(len(items)))
193         if err := r.syncWith(items, resourceVersion); err != nil {
194                 return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
195         }
196         r.setLastSyncResourceVersion(resourceVersion)
197
198         resyncerrc := make(chan error, 1)
199         cancelCh := make(chan struct{})
200         defer close(cancelCh)
201         go func() {
202                 resyncCh, cleanup := r.resyncChan()
203                 defer func() {
204                         cleanup() // Call the last one written into cleanup
205                 }()
206                 for {
207                         select {
208                         case <-resyncCh:
209                         case <-stopCh:
210                                 return
211                         case <-cancelCh:
212                                 return
213                         }
214                         if r.ShouldResync == nil || r.ShouldResync() {
215                                 klog.V(4).Infof("%s: forcing resync", r.name)
216                                 if err := r.store.Resync(); err != nil {
217                                         resyncerrc <- err
218                                         return
219                                 }
220                         }
221                         cleanup()
222                         resyncCh, cleanup = r.resyncChan()
223                 }
224         }()
225
226         for {
227                 // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
228                 select {
229                 case <-stopCh:
230                         return nil
231                 default:
232                 }
233
234                 timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
235                 options = metav1.ListOptions{
236                         ResourceVersion: resourceVersion,
237                         // We want to avoid situations of hanging watchers. Stop any wachers that do not
238                         // receive any events within the timeout window.
239                         TimeoutSeconds: &timeoutSeconds,
240                 }
241
242                 r.metrics.numberOfWatches.Inc()
243                 w, err := r.listerWatcher.Watch(options)
244                 if err != nil {
245                         switch err {
246                         case io.EOF:
247                                 // watch closed normally
248                         case io.ErrUnexpectedEOF:
249                                 klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
250                         default:
251                                 utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
252                         }
253                         // If this is "connection refused" error, it means that most likely apiserver is not responsive.
254                         // It doesn't make sense to re-list all objects because most likely we will be able to restart
255                         // watch where we ended.
256                         // If that's the case wait and resend watch request.
257                         if urlError, ok := err.(*url.Error); ok {
258                                 if opError, ok := urlError.Err.(*net.OpError); ok {
259                                         if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
260                                                 time.Sleep(time.Second)
261                                                 continue
262                                         }
263                                 }
264                         }
265                         return nil
266                 }
267
268                 if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
269                         if err != errorStopRequested {
270                                 klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
271                         }
272                         return nil
273                 }
274         }
275 }
276
277 // syncWith replaces the store's items with the given list.
278 func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
279         found := make([]interface{}, 0, len(items))
280         for _, item := range items {
281                 found = append(found, item)
282         }
283         return r.store.Replace(found, resourceVersion)
284 }
285
286 // watchHandler watches w and keeps *resourceVersion up to date.
287 func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
288         start := r.clock.Now()
289         eventCount := 0
290
291         // Stopping the watcher should be idempotent and if we return from this function there's no way
292         // we're coming back in with the same watch interface.
293         defer w.Stop()
294         // update metrics
295         defer func() {
296                 r.metrics.numberOfItemsInWatch.Observe(float64(eventCount))
297                 r.metrics.watchDuration.Observe(time.Since(start).Seconds())
298         }()
299
300 loop:
301         for {
302                 select {
303                 case <-stopCh:
304                         return errorStopRequested
305                 case err := <-errc:
306                         return err
307                 case event, ok := <-w.ResultChan():
308                         if !ok {
309                                 break loop
310                         }
311                         if event.Type == watch.Error {
312                                 return apierrs.FromObject(event.Object)
313                         }
314                         if e, a := r.expectedType, reflect.TypeOf(event.Object); e != nil && e != a {
315                                 utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
316                                 continue
317                         }
318                         meta, err := meta.Accessor(event.Object)
319                         if err != nil {
320                                 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
321                                 continue
322                         }
323                         newResourceVersion := meta.GetResourceVersion()
324                         switch event.Type {
325                         case watch.Added:
326                                 err := r.store.Add(event.Object)
327                                 if err != nil {
328                                         utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
329                                 }
330                         case watch.Modified:
331                                 err := r.store.Update(event.Object)
332                                 if err != nil {
333                                         utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
334                                 }
335                         case watch.Deleted:
336                                 // TODO: Will any consumers need access to the "last known
337                                 // state", which is passed in event.Object? If so, may need
338                                 // to change this.
339                                 err := r.store.Delete(event.Object)
340                                 if err != nil {
341                                         utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
342                                 }
343                         default:
344                                 utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
345                         }
346                         *resourceVersion = newResourceVersion
347                         r.setLastSyncResourceVersion(newResourceVersion)
348                         eventCount++
349                 }
350         }
351
352         watchDuration := r.clock.Now().Sub(start)
353         if watchDuration < 1*time.Second && eventCount == 0 {
354                 r.metrics.numberOfShortWatches.Inc()
355                 return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name)
356         }
357         klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedType, eventCount)
358         return nil
359 }
360
361 // LastSyncResourceVersion is the resource version observed when last sync with the underlying store
362 // The value returned is not synchronized with access to the underlying store and is not thread-safe
363 func (r *Reflector) LastSyncResourceVersion() string {
364         r.lastSyncResourceVersionMutex.RLock()
365         defer r.lastSyncResourceVersionMutex.RUnlock()
366         return r.lastSyncResourceVersion
367 }
368
369 func (r *Reflector) setLastSyncResourceVersion(v string) {
370         r.lastSyncResourceVersionMutex.Lock()
371         defer r.lastSyncResourceVersionMutex.Unlock()
372         r.lastSyncResourceVersion = v
373
374         rv, err := strconv.Atoi(v)
375         if err == nil {
376                 r.metrics.lastResourceVersion.Set(float64(rv))
377         }
378 }