Code refactoring for bpa operator
[icn.git] / cmd / bpa-operator / vendor / k8s.io / client-go / tools / cache / thread_safe_store.go
1 /*
2 Copyright 2014 The Kubernetes Authors.
3
4 Licensed under the Apache License, Version 2.0 (the "License");
5 you may not use this file except in compliance with the License.
6 You may obtain a copy of the License at
7
8     http://www.apache.org/licenses/LICENSE-2.0
9
10 Unless required by applicable law or agreed to in writing, software
11 distributed under the License is distributed on an "AS IS" BASIS,
12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 See the License for the specific language governing permissions and
14 limitations under the License.
15 */
16
17 package cache
18
19 import (
20         "fmt"
21         "sync"
22
23         "k8s.io/apimachinery/pkg/util/sets"
24 )
25
26 // ThreadSafeStore is an interface that allows concurrent access to a storage backend.
27 // TL;DR caveats: you must not modify anything returned by Get or List as it will break
28 // the indexing feature in addition to not being thread safe.
29 //
30 // The guarantees of thread safety provided by List/Get are only valid if the caller
31 // treats returned items as read-only. For example, a pointer inserted in the store
32 // through `Add` will be returned as is by `Get`. Multiple clients might invoke `Get`
33 // on the same key and modify the pointer in a non-thread-safe way. Also note that
34 // modifying objects stored by the indexers (if any) will *not* automatically lead
35 // to a re-index. So it's not a good idea to directly modify the objects returned by
36 // Get/List, in general.
37 type ThreadSafeStore interface {
38         Add(key string, obj interface{})
39         Update(key string, obj interface{})
40         Delete(key string)
41         Get(key string) (item interface{}, exists bool)
42         List() []interface{}
43         ListKeys() []string
44         Replace(map[string]interface{}, string)
45         Index(indexName string, obj interface{}) ([]interface{}, error)
46         IndexKeys(indexName, indexKey string) ([]string, error)
47         ListIndexFuncValues(name string) []string
48         ByIndex(indexName, indexKey string) ([]interface{}, error)
49         GetIndexers() Indexers
50
51         // AddIndexers adds more indexers to this store.  If you call this after you already have data
52         // in the store, the results are undefined.
53         AddIndexers(newIndexers Indexers) error
54         Resync() error
55 }
56
57 // threadSafeMap implements ThreadSafeStore
58 type threadSafeMap struct {
59         lock  sync.RWMutex
60         items map[string]interface{}
61
62         // indexers maps a name to an IndexFunc
63         indexers Indexers
64         // indices maps a name to an Index
65         indices Indices
66 }
67
68 func (c *threadSafeMap) Add(key string, obj interface{}) {
69         c.lock.Lock()
70         defer c.lock.Unlock()
71         oldObject := c.items[key]
72         c.items[key] = obj
73         c.updateIndices(oldObject, obj, key)
74 }
75
76 func (c *threadSafeMap) Update(key string, obj interface{}) {
77         c.lock.Lock()
78         defer c.lock.Unlock()
79         oldObject := c.items[key]
80         c.items[key] = obj
81         c.updateIndices(oldObject, obj, key)
82 }
83
84 func (c *threadSafeMap) Delete(key string) {
85         c.lock.Lock()
86         defer c.lock.Unlock()
87         if obj, exists := c.items[key]; exists {
88                 c.deleteFromIndices(obj, key)
89                 delete(c.items, key)
90         }
91 }
92
93 func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
94         c.lock.RLock()
95         defer c.lock.RUnlock()
96         item, exists = c.items[key]
97         return item, exists
98 }
99
100 func (c *threadSafeMap) List() []interface{} {
101         c.lock.RLock()
102         defer c.lock.RUnlock()
103         list := make([]interface{}, 0, len(c.items))
104         for _, item := range c.items {
105                 list = append(list, item)
106         }
107         return list
108 }
109
110 // ListKeys returns a list of all the keys of the objects currently
111 // in the threadSafeMap.
112 func (c *threadSafeMap) ListKeys() []string {
113         c.lock.RLock()
114         defer c.lock.RUnlock()
115         list := make([]string, 0, len(c.items))
116         for key := range c.items {
117                 list = append(list, key)
118         }
119         return list
120 }
121
122 func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
123         c.lock.Lock()
124         defer c.lock.Unlock()
125         c.items = items
126
127         // rebuild any index
128         c.indices = Indices{}
129         for key, item := range c.items {
130                 c.updateIndices(nil, item, key)
131         }
132 }
133
134 // Index returns a list of items that match on the index function
135 // Index is thread-safe so long as you treat all items as immutable
136 func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
137         c.lock.RLock()
138         defer c.lock.RUnlock()
139
140         indexFunc := c.indexers[indexName]
141         if indexFunc == nil {
142                 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
143         }
144
145         indexKeys, err := indexFunc(obj)
146         if err != nil {
147                 return nil, err
148         }
149         index := c.indices[indexName]
150
151         // need to de-dupe the return list.  Since multiple keys are allowed, this can happen.
152         returnKeySet := sets.String{}
153         for _, indexKey := range indexKeys {
154                 set := index[indexKey]
155                 for _, key := range set.UnsortedList() {
156                         returnKeySet.Insert(key)
157                 }
158         }
159
160         list := make([]interface{}, 0, returnKeySet.Len())
161         for absoluteKey := range returnKeySet {
162                 list = append(list, c.items[absoluteKey])
163         }
164         return list, nil
165 }
166
167 // ByIndex returns a list of items that match an exact value on the index function
168 func (c *threadSafeMap) ByIndex(indexName, indexKey string) ([]interface{}, error) {
169         c.lock.RLock()
170         defer c.lock.RUnlock()
171
172         indexFunc := c.indexers[indexName]
173         if indexFunc == nil {
174                 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
175         }
176
177         index := c.indices[indexName]
178
179         set := index[indexKey]
180         list := make([]interface{}, 0, set.Len())
181         for _, key := range set.List() {
182                 list = append(list, c.items[key])
183         }
184
185         return list, nil
186 }
187
188 // IndexKeys returns a list of keys that match on the index function.
189 // IndexKeys is thread-safe so long as you treat all items as immutable.
190 func (c *threadSafeMap) IndexKeys(indexName, indexKey string) ([]string, error) {
191         c.lock.RLock()
192         defer c.lock.RUnlock()
193
194         indexFunc := c.indexers[indexName]
195         if indexFunc == nil {
196                 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
197         }
198
199         index := c.indices[indexName]
200
201         set := index[indexKey]
202         return set.List(), nil
203 }
204
205 func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
206         c.lock.RLock()
207         defer c.lock.RUnlock()
208
209         index := c.indices[indexName]
210         names := make([]string, 0, len(index))
211         for key := range index {
212                 names = append(names, key)
213         }
214         return names
215 }
216
217 func (c *threadSafeMap) GetIndexers() Indexers {
218         return c.indexers
219 }
220
221 func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
222         c.lock.Lock()
223         defer c.lock.Unlock()
224
225         if len(c.items) > 0 {
226                 return fmt.Errorf("cannot add indexers to running index")
227         }
228
229         oldKeys := sets.StringKeySet(c.indexers)
230         newKeys := sets.StringKeySet(newIndexers)
231
232         if oldKeys.HasAny(newKeys.List()...) {
233                 return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
234         }
235
236         for k, v := range newIndexers {
237                 c.indexers[k] = v
238         }
239         return nil
240 }
241
242 // updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
243 // updateIndices must be called from a function that already has a lock on the cache
244 func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
245         // if we got an old object, we need to remove it before we add it again
246         if oldObj != nil {
247                 c.deleteFromIndices(oldObj, key)
248         }
249         for name, indexFunc := range c.indexers {
250                 indexValues, err := indexFunc(newObj)
251                 if err != nil {
252                         panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
253                 }
254                 index := c.indices[name]
255                 if index == nil {
256                         index = Index{}
257                         c.indices[name] = index
258                 }
259
260                 for _, indexValue := range indexValues {
261                         set := index[indexValue]
262                         if set == nil {
263                                 set = sets.String{}
264                                 index[indexValue] = set
265                         }
266                         set.Insert(key)
267                 }
268         }
269 }
270
271 // deleteFromIndices removes the object from each of the managed indexes
272 // it is intended to be called from a function that already has a lock on the cache
273 func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
274         for name, indexFunc := range c.indexers {
275                 indexValues, err := indexFunc(obj)
276                 if err != nil {
277                         panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
278                 }
279
280                 index := c.indices[name]
281                 if index == nil {
282                         continue
283                 }
284                 for _, indexValue := range indexValues {
285                         set := index[indexValue]
286                         if set != nil {
287                                 set.Delete(key)
288                         }
289                 }
290         }
291 }
292
293 func (c *threadSafeMap) Resync() error {
294         // Nothing to do
295         return nil
296 }
297
298 func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
299         return &threadSafeMap{
300                 items:    map[string]interface{}{},
301                 indexers: indexers,
302                 indices:  indices,
303         }
304 }