2 Copyright 2017 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
27 "k8s.io/apimachinery/pkg/api/meta"
28 "k8s.io/apimachinery/pkg/runtime"
29 utilcache "k8s.io/apimachinery/pkg/util/cache"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/util/sets"
34 // MutationCache is able to take the result of update operations and stores them in an LRU
35 // that can be used to provide a more current view of a requested object. It requires interpreting
36 // resourceVersions for comparisons.
37 // Implementations must be thread-safe.
38 // TODO find a way to layer this into an informer/lister
39 type MutationCache interface {
40 GetByKey(key string) (interface{}, bool, error)
41 ByIndex(indexName, indexKey string) ([]interface{}, error)
45 type ResourceVersionComparator interface {
46 CompareResourceVersion(lhs, rhs runtime.Object) int
49 // NewIntegerResourceVersionMutationCache returns a MutationCache that understands how to
50 // deal with objects that have a resource version that:
53 // - increases when updated
54 // - is comparable across the same resource in a namespace
56 // Most backends will have these semantics. Indexer may be nil. ttl controls how long an item
57 // remains in the mutation cache before it is removed.
59 // If includeAdds is true, objects in the mutation cache will be returned even if they don't exist
60 // in the underlying store. This is only safe if your use of the cache can handle mutation entries
61 // remaining in the cache for up to ttl when mutations and deletes occur very closely in time.
62 func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
63 return &mutationCache{
64 backingCache: backingCache,
66 mutationCache: utilcache.NewLRUExpireCache(100),
67 comparator: etcdObjectVersioner{},
69 includeAdds: includeAdds,
73 // mutationCache doesn't guarantee that it returns values added via Mutation since they can page out and
74 // since you can't distinguish between, "didn't observe create" and "was deleted after create",
75 // if the key is missing from the backing cache, we always return it as missing
76 type mutationCache struct {
80 mutationCache *utilcache.LRUExpireCache
84 comparator ResourceVersionComparator
87 // GetByKey is never guaranteed to return back the value set in Mutation. It could be paged out, it could
88 // be older than another copy, the backingCache may be more recent or, you might have written twice into the same key.
89 // You get a value that was valid at some snapshot of time and will always return the newer of backingCache and mutationCache.
90 func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
94 obj, exists, err := c.backingCache.GetByKey(key)
96 return nil, false, err
100 // we can't distinguish between, "didn't observe create" and "was deleted after create", so
101 // if the key is missing, we always return it as missing
102 return nil, false, nil
104 obj, exists = c.mutationCache.Get(key)
106 return nil, false, nil
109 objRuntime, ok := obj.(runtime.Object)
111 return obj, true, nil
113 return c.newerObject(key, objRuntime), true, nil
116 // ByIndex returns the newer objects that match the provided index and indexer key.
117 // Will return an error if no indexer was provided.
118 func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
120 defer c.lock.Unlock()
121 if c.indexer == nil {
122 return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
124 keys, err := c.indexer.IndexKeys(name, indexKey)
128 var items []interface{}
129 keySet := sets.NewString()
130 for _, key := range keys {
132 obj, exists, err := c.indexer.GetByKey(key)
139 if objRuntime, ok := obj.(runtime.Object); ok {
140 items = append(items, c.newerObject(key, objRuntime))
142 items = append(items, obj)
147 fn := c.indexer.GetIndexers()[name]
148 // Keys() is returned oldest to newest, so full traversal does not alter the LRU behavior
149 for _, key := range c.mutationCache.Keys() {
150 updated, ok := c.mutationCache.Get(key)
154 if keySet.Has(key.(string)) {
157 elements, err := fn(updated)
159 klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
162 for _, inIndex := range elements {
163 if inIndex != indexKey {
166 items = append(items, updated)
175 // newerObject checks the mutation cache for a newer object and returns one if found. If the
176 // mutated object is older than the backing object, it is removed from the Must be
177 // called while the lock is held.
178 func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
179 mutatedObj, exists := c.mutationCache.Get(key)
183 mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
187 if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
188 c.mutationCache.Remove(key)
191 return mutatedObjRuntime
194 // Mutation adds a change to the cache that can be returned in GetByKey if it is newer than the backingCache
195 // copy. If you call Mutation twice with the same object on different threads, one will win, but its not defined
196 // which one. This doesn't affect correctness, since the GetByKey guaranteed of "later of these two caches" is
197 // preserved, but you may not get the version of the object you want. The object you get is only guaranteed to
198 // "one that was valid at some point in time", not "the one that I want".
199 func (c *mutationCache) Mutation(obj interface{}) {
201 defer c.lock.Unlock()
203 key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
205 // this is a "nice to have", so failures shouldn't do anything weird
206 utilruntime.HandleError(err)
210 if objRuntime, ok := obj.(runtime.Object); ok {
211 if mutatedObj, exists := c.mutationCache.Get(key); exists {
212 if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
213 if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
219 c.mutationCache.Add(key, obj, c.ttl)
222 // etcdObjectVersioner implements versioning and extracting etcd node information
223 // for objects that have an embedded ObjectMeta or ListMeta field.
224 type etcdObjectVersioner struct{}
226 // ObjectResourceVersion implements Versioner
227 func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
228 accessor, err := meta.Accessor(obj)
232 version := accessor.GetResourceVersion()
233 if len(version) == 0 {
236 return strconv.ParseUint(version, 10, 64)
239 // CompareResourceVersion compares etcd resource versions. Outside this API they are all strings,
240 // but etcd resource versions are special, they're actually ints, so we can easily compare them.
241 func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
242 lhsVersion, err := a.ObjectResourceVersion(lhs)
247 rhsVersion, err := a.ObjectResourceVersion(rhs)
253 if lhsVersion == rhsVersion {
256 if lhsVersion < rhsVersion {