2 Copyright 2018 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
24 "k8s.io/apimachinery/pkg/api/meta"
25 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/apimachinery/pkg/runtime/schema"
28 "k8s.io/apimachinery/pkg/runtime/serializer"
29 "k8s.io/apimachinery/pkg/watch"
30 "k8s.io/client-go/dynamic"
31 "k8s.io/client-go/rest"
32 "k8s.io/client-go/tools/cache"
34 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
37 // clientListWatcherFunc knows how to create a ListWatcher
38 type createListWatcherFunc func(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error)
40 // newSpecificInformersMap returns a new specificInformersMap (like
41 // the generical InformersMap, except that it doesn't implement WaitForCacheSync).
42 func newSpecificInformersMap(config *rest.Config,
43 scheme *runtime.Scheme,
44 mapper meta.RESTMapper,
47 createListWatcher createListWatcherFunc) *specificInformersMap {
48 ip := &specificInformersMap{
52 informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
53 codecs: serializer.NewCodecFactory(scheme),
54 paramCodec: runtime.NewParameterCodec(scheme),
56 createListWatcher: createListWatcher,
62 // MapEntry contains the cached data for an Informer
63 type MapEntry struct {
64 // Informer is the cached informer
65 Informer cache.SharedIndexInformer
67 // CacheReader wraps Informer and implements the CacheReader interface for a single type
71 // specificInformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
72 // It uses a standard parameter codec constructed based on the given generated Scheme.
73 type specificInformersMap struct {
74 // Scheme maps runtime.Objects to GroupVersionKinds
75 Scheme *runtime.Scheme
77 // config is used to talk to the apiserver
80 // mapper maps GroupVersionKinds to Resources
81 mapper meta.RESTMapper
83 // informersByGVK is the cache of informers keyed by groupVersionKind
84 informersByGVK map[schema.GroupVersionKind]*MapEntry
86 // codecs is used to create a new REST client
87 codecs serializer.CodecFactory
89 // paramCodec is used by list and watch
90 paramCodec runtime.ParameterCodec
92 // stop is the stop channel to stop informers
95 // resync is the frequency the informers are resynced
98 // mu guards access to the map
101 // start is true if the informers have been started
104 // createClient knows how to create a client and a list object,
105 // and allows for abstracting over the particulars of structured vs
106 // unstructured objects.
107 createListWatcher createListWatcherFunc
109 // namespace is the namespace that all ListWatches are restricted to
110 // default or empty string means all namespaces
114 // Start calls Run on each of the informers and sets started to true. Blocks on the stop channel.
115 // It doesn't return start because it can't return an error, and it's not a runnable directly.
116 func (ip *specificInformersMap) Start(stop <-chan struct{}) {
121 // Set the stop channel so it can be passed to informers that are added later
124 // Start each informer
125 for _, informer := range ip.informersByGVK {
126 go informer.Informer.Run(stop)
129 // Set started to true so we immediately start any informers added later.
135 // HasSyncedFuncs returns all the HasSynced functions for the informers in this map.
136 func (ip *specificInformersMap) HasSyncedFuncs() []cache.InformerSynced {
138 defer ip.mu.RUnlock()
139 syncedFuncs := make([]cache.InformerSynced, 0, len(ip.informersByGVK))
140 for _, informer := range ip.informersByGVK {
141 syncedFuncs = append(syncedFuncs, informer.Informer.HasSynced)
146 // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns
147 // the Informer from the map.
148 func (ip *specificInformersMap) Get(gvk schema.GroupVersionKind, obj runtime.Object) (*MapEntry, error) {
149 // Return the informer if it is found
150 i, ok := func() (*MapEntry, bool) {
152 defer ip.mu.RUnlock()
153 i, ok := ip.informersByGVK[gvk]
160 // Do the mutex part in its own function so we can use defer without blocking pieces that don't
163 i, err := func() (*MapEntry, error) {
167 // Check the cache to see if we already have an Informer. If we do, return the Informer.
168 // This is for the case where 2 routines tried to get the informer when it wasn't in the map
169 // so neither returned early, but the first one created it.
171 i, ok := ip.informersByGVK[gvk]
176 // Create a NewSharedIndexInformer and add it to the map.
177 var lw *cache.ListWatch
178 lw, err := ip.createListWatcher(gvk, ip)
182 ni := cache.NewSharedIndexInformer(lw, obj, ip.resync, cache.Indexers{
183 cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
187 Reader: CacheReader{indexer: ni.GetIndexer(), groupVersionKind: gvk},
189 ip.informersByGVK[gvk] = i
191 // Start the Informer if need by
192 // TODO(seans): write thorough tests and document what happens here - can you add indexers?
193 // can you add eventhandlers?
196 go i.Informer.Run(ip.stop)
205 // Wait for it to sync before returning the Informer so that folks don't read from a stale cache.
206 if !cache.WaitForCacheSync(ip.stop, i.Informer.HasSynced) {
207 return nil, fmt.Errorf("failed waiting for %T Informer to sync", obj)
214 // newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
215 func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
216 // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
217 // groupVersionKind to the Resource API we will use.
218 mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
223 client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
227 listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
228 listObj, err := ip.Scheme.New(listGVK)
233 // Create a new ListWatch for the obj
234 return &cache.ListWatch{
235 ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
236 res := listObj.DeepCopyObject()
237 isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
238 err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do().Into(res)
241 // Setup the watch function
242 WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
243 // Watch needs to be set to true separately
245 isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
246 return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch()
251 func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
252 // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
253 // groupVersionKind to the Resource API we will use.
254 mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
258 dynamicClient, err := dynamic.NewForConfig(ip.config)
263 // Create a new ListWatch for the obj
264 return &cache.ListWatch{
265 ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
266 if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
267 return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(opts)
269 return dynamicClient.Resource(mapping.Resource).List(opts)
271 // Setup the watch function
272 WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
273 // Watch needs to be set to true separately
275 if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
276 return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).Watch(opts)
278 return dynamicClient.Resource(mapping.Resource).Watch(opts)