Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / sigs.k8s.io / controller-runtime / pkg / cache / internal / informers_map.go
1 /*
2 Copyright 2018 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package internal
18
19 import (
20         "fmt"
21         "sync"
22         "time"
23
24         "k8s.io/apimachinery/pkg/api/meta"
25         metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26         "k8s.io/apimachinery/pkg/runtime"
27         "k8s.io/apimachinery/pkg/runtime/schema"
28         "k8s.io/apimachinery/pkg/runtime/serializer"
29         "k8s.io/apimachinery/pkg/watch"
30         "k8s.io/client-go/dynamic"
31         "k8s.io/client-go/rest"
32         "k8s.io/client-go/tools/cache"
33
34         "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
35 )
36
37 // clientListWatcherFunc knows how to create a ListWatcher
38 type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
39
40 // newSpecificInformersMap returns a new specificInformersMap (like
41 // the generical InformersMap, except that it doesn't implement WaitForCacheSync).
42 func newSpecificInformersMap(config *rest.Config,
43         scheme *runtime.Scheme,
44         mapper meta.RESTMapper,
45         resync time.Duration,
46         namespace string,
47         createListWatcher createListWatcherFunc) *specificInformersMap {
48         ip := &specificInformersMap{
49                 config:            config,
50                 Scheme:            scheme,
51                 mapper:            mapper,
52                 informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),
53                 codecs:            serializer.NewCodecFactory(scheme),
54                 paramCodec:        runtime.NewParameterCodec(scheme),
55                 resync:            resync,
56                 createListWatcher: createListWatcher,
57                 namespace:         namespace,
58         }
59         return ip
60 }
61
62 // MapEntry contains the cached data for an Informer
63 type MapEntry struct {
64         // Informer is the cached informer
65         Informer cache.SharedIndexInformer
66
67         // CacheReader wraps Informer and implements the CacheReader interface for a single type
68         Reader CacheReader
69 }
70
71 // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
72 // It uses a standard parameter codec constructed based on the given generated Scheme.
73 type specificInformersMap struct {
74         // Scheme maps runtime.Objects to GroupVersionKinds
75         Scheme *runtime.Scheme
76
77         // config is used to talk to the apiserver
78         config *rest.Config
79
80         // mapper maps GroupVersionKinds to Resources
81         mapper meta.RESTMapper
82
83         // informersByGVK is the cache of informers keyed by groupVersionKind
84         informersByGVK map[schema.GroupVersionKind]*MapEntry
85
86         // codecs is used to create a new REST client
87         codecs serializer.CodecFactory
88
89         // paramCodec is used by list and watch
90         paramCodec runtime.ParameterCodec
91
92         // stop is the stop channel to stop informers
93         stop <-chan struct{}
94
95         // resync is the frequency the informers are resynced
96         resync time.Duration
97
98         // mu guards access to the map
99         mu sync.RWMutex
100
101         // start is true if the informers have been started
102         started bool
103
104         // createClient knows how to create a client and a list object,
105         // and allows for abstracting over the particulars of structured vs
106         // unstructured objects.
107         createListWatcher createListWatcherFunc
108
109         // namespace is the namespace that all ListWatches are restricted to
110         // default or empty string means all namespaces
111         namespace string
112 }
113
114 // Start calls Run on each of the informers and sets started to true.  Blocks on the stop channel.
115 // It doesn't return start because it can't return an error, and it's not a runnable directly.
116 func (ip *specificInformersMap) Start(stop <-chan struct{}) {
117         func() {
118                 ip.mu.Lock()
119                 defer ip.mu.Unlock()
120
121                 // Set the stop channel so it can be passed to informers that are added later
122                 ip.stop = stop
123
124                 // Start each informer
125                 for _, informer := range ip.informersByGVK {
126                         go informer.Informer.Run(stop)
127                 }
128
129                 // Set started to true so we immediately start any informers added later.
130                 ip.started = true
131         }()
132         <-stop
133 }
134
135 // HasSyncedFuncs returns all the HasSynced functions for the informers in this map.
136 func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
137         ip.mu.RLock()
138         defer ip.mu.RUnlock()
139         syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
140         for _, informer := range ip.informersByGVK {
141                 syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
142         }
143         return syncedFuncs
144 }
145
146 // Get will create a new Informer and add it to the map of specificInformersMap if none exists.  Returns
147 // the Informer from the map.
148 func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
149         // Return the informer if it is found
150         i, ok := func() (*MapEntry, bool) {
151                 ip.mu.RLock()
152                 defer ip.mu.RUnlock()
153                 i, ok := ip.informersByGVK[gvk]
154                 return i, ok
155         }()
156         if ok {
157                 return i, nil
158         }
159
160         // Do the mutex part in its own function so we can use defer without blocking pieces that don't
161         // need to be locked
162         var sync bool
163         i, err := func() (*MapEntry, error) {
164                 ip.mu.Lock()
165                 defer ip.mu.Unlock()
166
167                 // Check the cache to see if we already have an Informer.  If we do, return the Informer.
168                 // This is for the case where 2 routines tried to get the informer when it wasn't in the map
169                 // so neither returned early, but the first one created it.
170                 var ok bool
171                 i, ok := ip.informersByGVK[gvk]
172                 if ok {
173                         return i, nil
174                 }
175
176                 // Create a NewSharedIndexInformer and add it to the map.
177                 var lw *cache.ListWatch
178                 lw, err := ip.createListWatcher(gvk, ip)
179                 if err != nil {
180                         return nil, err
181                 }
182                 ni := cache.NewSharedIndexInformer(lw, obj, ip.resync, cache.Indexers{
183                         cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
184                 })
185                 i = &MapEntry{
186                         Informer: ni,
187                         Reader:   CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
188                 }
189                 ip.informersByGVK[gvk] = i
190
191                 // Start the Informer if need by
192                 // TODO(seans): write thorough tests and document what happens here - can you add indexers?
193                 // can you add eventhandlers?
194                 if ip.started {
195                         sync = true
196                         go i.Informer.Run(ip.stop)
197                 }
198                 return i, nil
199         }()
200         if err != nil {
201                 return nil, err
202         }
203
204         if sync {
205                 // Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
206                 if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
207                         return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
208                 }
209         }
210
211         return i, err
212 }
213
214 // newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
215 func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
216         // Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
217         // groupVersionKind to the Resource API we will use.
218         mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
219         if err != nil {
220                 return nil, err
221         }
222
223         client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
224         if err != nil {
225                 return nil, err
226         }
227         listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
228         listObj, err := ip.Scheme.New(listGVK)
229         if err != nil {
230                 return nil, err
231         }
232
233         // Create a new ListWatch for the obj
234         return &cache.ListWatch{
235                 ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
236                         res := listObj.DeepCopyObject()
237                         isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
238                         err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
239                         return res, err
240                 },
241                 // Setup the watch function
242                 WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
243                         // Watch needs to be set to true separately
244                         opts.Watch = true
245                         isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
246                         return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
247                 },
248         }, nil
249 }
250
251 func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
252         // Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
253         // groupVersionKind to the Resource API we will use.
254         mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
255         if err != nil {
256                 return nil, err
257         }
258         dynamicClient, err := dynamic.NewForConfig(ip.config)
259         if err != nil {
260                 return nil, err
261         }
262
263         // Create a new ListWatch for the obj
264         return &cache.ListWatch{
265                 ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
266                         if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
267                                 return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
268                         }
269                         return dynamicClient.Resource(mapping.Resource).List(opts)
270                 },
271                 // Setup the watch function
272                 WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
273                         // Watch needs to be set to true separately
274                         opts.Watch = true
275                         if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
276                                 return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
277                         }
278                         return dynamicClient.Resource(mapping.Resource).Watch(opts)
279                 },
280         }, nil
281 }