Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / tools / cache / mutation_cache.go
1 /*
2 Copyright 2017 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package cache
18
19 import (
20         "fmt"
21         "strconv"
22         "sync"
23         "time"
24
25         "k8s.io/klog"
26
27         "k8s.io/apimachinery/pkg/api/meta"
28         "k8s.io/apimachinery/pkg/runtime"
29         utilcache "k8s.io/apimachinery/pkg/util/cache"
30         utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31         "k8s.io/apimachinery/pkg/util/sets"
32 )
33
34 // MutationCache is able to take the result of update operations and stores them in an LRU
35 // that can be used to provide a more current view of a requested object.  It requires interpreting
36 // resourceVersions for comparisons.
37 // Implementations must be thread-safe.
38 // TODO find a way to layer this into an informer/lister
39 type MutationCache interface {
40         GetByKey(key string) (interface{}, bool, error)
41         ByIndex(indexName, indexKey string) ([]interface{}, error)
42         Mutation(interface{})
43 }
44
45 type ResourceVersionComparator interface {
46         CompareResourceVersion(lhs, rhs runtime.Object) int
47 }
48
49 // NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to
50 // deal with objects that have a resource version that:
51 //
52 //   - is an integer
53 //   - increases when updated
54 //   - is comparable across the same resource in a namespace
55 //
56 // Most backends will have these semantics. Indexer may be nil. ttl controls how long an item
57 // remains in the mutation cache before it is removed.
58 //
59 // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
60 // in the underlying store. This is only safe if your use of the cache can handle mutation entries
61 // remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
62 func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
63         return &mutationCache{
64                 backingCache:  backingCache,
65                 indexer:       indexer,
66                 mutationCache: utilcache.NewLRUExpireCache(100),
67                 comparator:    etcdObjectVersioner{},
68                 ttl:           ttl,
69                 includeAdds:   includeAdds,
70         }
71 }
72
73 // mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
74 // since you can't distinguish between, "didn't observe create" and "was deleted after create",
75 // if the key is missing from the backing cache, we always return it as missing
76 type mutationCache struct {
77         lock          sync.Mutex
78         backingCache  Store
79         indexer       Indexer
80         mutationCache *utilcache.LRUExpireCache
81         includeAdds   bool
82         ttl           time.Duration
83
84         comparator ResourceVersionComparator
85 }
86
87 // GetByKey is never guaranteed to return back the value set in Mutation.  It could be paged out, it could
88 // be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
89 // You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
90 func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
91         c.lock.Lock()
92         defer c.lock.Unlock()
93
94         obj, exists, err := c.backingCache.GetByKey(key)
95         if err != nil {
96                 return nil, false, err
97         }
98         if !exists {
99                 if !c.includeAdds {
100                         // we can't distinguish between, "didn't observe create" and "was deleted after create", so
101                         // if the key is missing, we always return it as missing
102                         return nil, false, nil
103                 }
104                 obj, exists = c.mutationCache.Get(key)
105                 if !exists {
106                         return nil, false, nil
107                 }
108         }
109         objRuntime, ok := obj.(runtime.Object)
110         if !ok {
111                 return obj, true, nil
112         }
113         return c.newerObject(key, objRuntime), true, nil
114 }
115
116 // ByIndex returns the newer objects that match the provided index and indexer key.
117 // Will return an error if no indexer was provided.
118 func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
119         c.lock.Lock()
120         defer c.lock.Unlock()
121         if c.indexer == nil {
122                 return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
123         }
124         keys, err := c.indexer.IndexKeys(name, indexKey)
125         if err != nil {
126                 return nil, err
127         }
128         var items []interface{}
129         keySet := sets.NewString()
130         for _, key := range keys {
131                 keySet.Insert(key)
132                 obj, exists, err := c.indexer.GetByKey(key)
133                 if err != nil {
134                         return nil, err
135                 }
136                 if !exists {
137                         continue
138                 }
139                 if objRuntime, ok := obj.(runtime.Object); ok {
140                         items = append(items, c.newerObject(key, objRuntime))
141                 } else {
142                         items = append(items, obj)
143                 }
144         }
145
146         if c.includeAdds {
147                 fn := c.indexer.GetIndexers()[name]
148                 // Keys() is returned oldest to newest, so full traversal does not alter the LRU behavior
149                 for _, key := range c.mutationCache.Keys() {
150                         updated, ok := c.mutationCache.Get(key)
151                         if !ok {
152                                 continue
153                         }
154                         if keySet.Has(key.(string)) {
155                                 continue
156                         }
157                         elements, err := fn(updated)
158                         if err != nil {
159                                 klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
160                                 continue
161                         }
162                         for _, inIndex := range elements {
163                                 if inIndex != indexKey {
164                                         continue
165                                 }
166                                 items = append(items, updated)
167                                 break
168                         }
169                 }
170         }
171
172         return items, nil
173 }
174
175 // newerObject checks the mutation cache for a newer object and returns one if found. If the
176 // mutated object is older than the backing object, it is removed from the  Must be
177 // called while the lock is held.
178 func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
179         mutatedObj, exists := c.mutationCache.Get(key)
180         if !exists {
181                 return backing
182         }
183         mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
184         if !ok {
185                 return backing
186         }
187         if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
188                 c.mutationCache.Remove(key)
189                 return backing
190         }
191         return mutatedObjRuntime
192 }
193
194 // Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
195 // copy.  If you call Mutation twice with the same object on different threads, one will win, but its not defined
196 // which one.  This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
197 // preserved, but you may not get the version of the object you want.  The object you get is only guaranteed to
198 // "one that was valid at some point in time", not "the one that I want".
199 func (c *mutationCache) Mutation(obj interface{}) {
200         c.lock.Lock()
201         defer c.lock.Unlock()
202
203         key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
204         if err != nil {
205                 // this is a "nice to have", so failures shouldn't do anything weird
206                 utilruntime.HandleError(err)
207                 return
208         }
209
210         if objRuntime, ok := obj.(runtime.Object); ok {
211                 if mutatedObj, exists := c.mutationCache.Get(key); exists {
212                         if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
213                                 if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
214                                         return
215                                 }
216                         }
217                 }
218         }
219         c.mutationCache.Add(key, obj, c.ttl)
220 }
221
222 // etcdObjectVersioner implements versioning and extracting etcd node information
223 // for objects that have an embedded ObjectMeta or ListMeta field.
224 type etcdObjectVersioner struct{}
225
226 // ObjectResourceVersion implements Versioner
227 func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
228         accessor, err := meta.Accessor(obj)
229         if err != nil {
230                 return 0, err
231         }
232         version := accessor.GetResourceVersion()
233         if len(version) == 0 {
234                 return 0, nil
235         }
236         return strconv.ParseUint(version, 10, 64)
237 }
238
239 // CompareResourceVersion compares etcd resource versions.  Outside this API they are all strings,
240 // but etcd resource versions are special, they're actually ints, so we can easily compare them.
241 func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
242         lhsVersion, err := a.ObjectResourceVersion(lhs)
243         if err != nil {
244                 // coder error
245                 panic(err)
246         }
247         rhsVersion, err := a.ObjectResourceVersion(rhs)
248         if err != nil {
249                 // coder error
250                 panic(err)
251         }
252
253         if lhsVersion == rhsVersion {
254                 return 0
255         }
256         if lhsVersion < rhsVersion {
257                 return -1
258         }
259
260         return 1
261 }