2 Copyright 2014 The Kubernetes Authors.
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
8 http://www.apache.org/licenses/LICENSE-2.0
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
23 "k8s.io/apimachinery/pkg/util/sets"
26 // ThreadSafeStore is an interface that allows concurrent access to a storage backend.
27 // TL;DR caveats: you must not modify anything returned by Get or List as it will break
28 // the indexing feature in addition to not being thread safe.
30 // The guarantees of thread safety provided by List/Get are only valid if the caller
31 // treats returned items as read-only. For example, a pointer inserted in the store
32 // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
33 // on the same key and modify the pointer in a non-thread-safe way. Also note that
34 // modifying objects stored by the indexers (if any) will *not* automatically lead
35 // to a re-index. So it's not a good idea to directly modify the objects returned by
36 // Get/List, in general.
37 type ThreadSafeStore interface {
38 Add(key string, obj interface{})
39 Update(key string, obj interface{})
41 Get(key string) (item interface{}, exists bool)
44 Replace(map[string]interface{}, string)
45 Index(indexName string, obj interface{}) ([]interface{}, error)
46 IndexKeys(indexName, indexKey string) ([]string, error)
47 ListIndexFuncValues(name string) []string
48 ByIndex(indexName, indexKey string) ([]interface{}, error)
49 GetIndexers() Indexers
51 // AddIndexers adds more indexers to this store. If you call this after you already have data
52 // in the store, the results are undefined.
53 AddIndexers(newIndexers Indexers) error
57 // threadSafeMap implements ThreadSafeStore
58 type threadSafeMap struct {
60 items map[string]interface{}
62 // indexers maps a name to an IndexFunc
64 // indices maps a name to an Index
68 func (c *threadSafeMap) Add(key string, obj interface{}) {
71 oldObject := c.items[key]
73 c.updateIndices(oldObject, obj, key)
76 func (c *threadSafeMap) Update(key string, obj interface{}) {
79 oldObject := c.items[key]
81 c.updateIndices(oldObject, obj, key)
84 func (c *threadSafeMap) Delete(key string) {
87 if obj, exists := c.items[key]; exists {
88 c.deleteFromIndices(obj, key)
93 func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
95 defer c.lock.RUnlock()
96 item, exists = c.items[key]
100 func (c *threadSafeMap) List() []interface{} {
102 defer c.lock.RUnlock()
103 list := make([]interface{}, 0, len(c.items))
104 for _, item := range c.items {
105 list = append(list, item)
110 // ListKeys returns a list of all the keys of the objects currently
111 // in the threadSafeMap.
112 func (c *threadSafeMap) ListKeys() []string {
114 defer c.lock.RUnlock()
115 list := make([]string, 0, len(c.items))
116 for key := range c.items {
117 list = append(list, key)
122 func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
124 defer c.lock.Unlock()
128 c.indices = Indices{}
129 for key, item := range c.items {
130 c.updateIndices(nil, item, key)
134 // Index returns a list of items that match on the index function
135 // Index is thread-safe so long as you treat all items as immutable
136 func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
138 defer c.lock.RUnlock()
140 indexFunc := c.indexers[indexName]
141 if indexFunc == nil {
142 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
145 indexKeys, err := indexFunc(obj)
149 index := c.indices[indexName]
151 // need to de-dupe the return list. Since multiple keys are allowed, this can happen.
152 returnKeySet := sets.String{}
153 for _, indexKey := range indexKeys {
154 set := index[indexKey]
155 for _, key := range set.UnsortedList() {
156 returnKeySet.Insert(key)
160 list := make([]interface{}, 0, returnKeySet.Len())
161 for absoluteKey := range returnKeySet {
162 list = append(list, c.items[absoluteKey])
167 // ByIndex returns a list of items that match an exact value on the index function
168 func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
170 defer c.lock.RUnlock()
172 indexFunc := c.indexers[indexName]
173 if indexFunc == nil {
174 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
177 index := c.indices[indexName]
179 set := index[indexKey]
180 list := make([]interface{}, 0, set.Len())
181 for _, key := range set.List() {
182 list = append(list, c.items[key])
188 // IndexKeys returns a list of keys that match on the index function.
189 // IndexKeys is thread-safe so long as you treat all items as immutable.
190 func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
192 defer c.lock.RUnlock()
194 indexFunc := c.indexers[indexName]
195 if indexFunc == nil {
196 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
199 index := c.indices[indexName]
201 set := index[indexKey]
202 return set.List(), nil
205 func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
207 defer c.lock.RUnlock()
209 index := c.indices[indexName]
210 names := make([]string, 0, len(index))
211 for key := range index {
212 names = append(names, key)
217 func (c *threadSafeMap) GetIndexers() Indexers {
221 func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
223 defer c.lock.Unlock()
225 if len(c.items) > 0 {
226 return fmt.Errorf("cannot add indexers to running index")
229 oldKeys := sets.StringKeySet(c.indexers)
230 newKeys := sets.StringKeySet(newIndexers)
232 if oldKeys.HasAny(newKeys.List()...) {
233 return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
236 for k, v := range newIndexers {
242 // updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
243 // updateIndices must be called from a function that already has a lock on the cache
244 func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
245 // if we got an old object, we need to remove it before we add it again
247 c.deleteFromIndices(oldObj, key)
249 for name, indexFunc := range c.indexers {
250 indexValues, err := indexFunc(newObj)
252 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
254 index := c.indices[name]
257 c.indices[name] = index
260 for _, indexValue := range indexValues {
261 set := index[indexValue]
264 index[indexValue] = set
271 // deleteFromIndices removes the object from each of the managed indexes
272 // it is intended to be called from a function that already has a lock on the cache
273 func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
274 for name, indexFunc := range c.indexers {
275 indexValues, err := indexFunc(obj)
277 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
280 index := c.indices[name]
284 for _, indexValue := range indexValues {
285 set := index[indexValue]
293 func (c *threadSafeMap) Resync() error {
298 func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
299 return &threadSafeMap{
300 items: map[string]interface{}{},