1
16
17 package cache
18
19 import (
20 "fmt"
21 "sync"
22
23 "k8s.io/apimachinery/pkg/util/sets"
24 )
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 type ThreadSafeStore interface {
42 Add(key string, obj interface{})
43 Update(key string, obj interface{})
44 Delete(key string)
45 Get(key string) (item interface{}, exists bool)
46 List() []interface{}
47 ListKeys() []string
48 Replace(map[string]interface{}, string)
49 Index(indexName string, obj interface{}) ([]interface{}, error)
50 IndexKeys(indexName, indexedValue string) ([]string, error)
51 ListIndexFuncValues(name string) []string
52 ByIndex(indexName, indexedValue string) ([]interface{}, error)
53 GetIndexers() Indexers
54
55
56 AddIndexers(newIndexers Indexers) error
57
58 Resync() error
59 }
60
61
62 type storeIndex struct {
63
64 indexers Indexers
65
66 indices Indices
67 }
68
69 func (i *storeIndex) reset() {
70 i.indices = Indices{}
71 }
72
73 func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.String, error) {
74 indexFunc := i.indexers[indexName]
75 if indexFunc == nil {
76 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
77 }
78
79 indexedValues, err := indexFunc(obj)
80 if err != nil {
81 return nil, err
82 }
83 index := i.indices[indexName]
84
85 var storeKeySet sets.String
86 if len(indexedValues) == 1 {
87
88
89 storeKeySet = index[indexedValues[0]]
90 } else {
91
92
93 storeKeySet = sets.String{}
94 for _, indexedValue := range indexedValues {
95 for key := range index[indexedValue] {
96 storeKeySet.Insert(key)
97 }
98 }
99 }
100
101 return storeKeySet, nil
102 }
103
104 func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.String, error) {
105 indexFunc := i.indexers[indexName]
106 if indexFunc == nil {
107 return nil, fmt.Errorf("Index with name %s does not exist", indexName)
108 }
109
110 index := i.indices[indexName]
111 return index[indexedValue], nil
112 }
113
114 func (i *storeIndex) getIndexValues(indexName string) []string {
115 index := i.indices[indexName]
116 names := make([]string, 0, len(index))
117 for key := range index {
118 names = append(names, key)
119 }
120 return names
121 }
122
123 func (i *storeIndex) addIndexers(newIndexers Indexers) error {
124 oldKeys := sets.StringKeySet(i.indexers)
125 newKeys := sets.StringKeySet(newIndexers)
126
127 if oldKeys.HasAny(newKeys.List()...) {
128 return fmt.Errorf("indexer conflict: %v", oldKeys.Intersection(newKeys))
129 }
130
131 for k, v := range newIndexers {
132 i.indexers[k] = v
133 }
134 return nil
135 }
136
137
138
139
140
141
142 func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) {
143 var oldIndexValues, indexValues []string
144 indexFunc, ok := i.indexers[name]
145 if !ok {
146
147
148 panic(fmt.Errorf("indexer %q does not exist", name))
149 }
150 if oldObj != nil {
151 var err error
152 oldIndexValues, err = indexFunc(oldObj)
153 if err != nil {
154 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
155 }
156 } else {
157 oldIndexValues = oldIndexValues[:0]
158 }
159
160 if newObj != nil {
161 var err error
162 indexValues, err = indexFunc(newObj)
163 if err != nil {
164 panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
165 }
166 } else {
167 indexValues = indexValues[:0]
168 }
169
170 index := i.indices[name]
171 if index == nil {
172 index = Index{}
173 i.indices[name] = index
174 }
175
176 if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
177
178 return
179 }
180
181 for _, value := range oldIndexValues {
182 i.deleteKeyFromIndex(key, value, index)
183 }
184 for _, value := range indexValues {
185 i.addKeyToIndex(key, value, index)
186 }
187 }
188
189
190
191
192
193
194 func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
195 for name := range i.indexers {
196 i.updateSingleIndex(name, oldObj, newObj, key)
197 }
198 }
199
200 func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) {
201 set := index[indexValue]
202 if set == nil {
203 set = sets.String{}
204 index[indexValue] = set
205 }
206 set.Insert(key)
207 }
208
209 func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) {
210 set := index[indexValue]
211 if set == nil {
212 return
213 }
214 set.Delete(key)
215
216
217
218 if len(set) == 0 {
219 delete(index, indexValue)
220 }
221 }
222
223
224 type threadSafeMap struct {
225 lock sync.RWMutex
226 items map[string]interface{}
227
228
229 index *storeIndex
230 }
231
232 func (c *threadSafeMap) Add(key string, obj interface{}) {
233 c.Update(key, obj)
234 }
235
236 func (c *threadSafeMap) Update(key string, obj interface{}) {
237 c.lock.Lock()
238 defer c.lock.Unlock()
239 oldObject := c.items[key]
240 c.items[key] = obj
241 c.index.updateIndices(oldObject, obj, key)
242 }
243
244 func (c *threadSafeMap) Delete(key string) {
245 c.lock.Lock()
246 defer c.lock.Unlock()
247 if obj, exists := c.items[key]; exists {
248 c.index.updateIndices(obj, nil, key)
249 delete(c.items, key)
250 }
251 }
252
253 func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
254 c.lock.RLock()
255 defer c.lock.RUnlock()
256 item, exists = c.items[key]
257 return item, exists
258 }
259
260 func (c *threadSafeMap) List() []interface{} {
261 c.lock.RLock()
262 defer c.lock.RUnlock()
263 list := make([]interface{}, 0, len(c.items))
264 for _, item := range c.items {
265 list = append(list, item)
266 }
267 return list
268 }
269
270
271
272 func (c *threadSafeMap) ListKeys() []string {
273 c.lock.RLock()
274 defer c.lock.RUnlock()
275 list := make([]string, 0, len(c.items))
276 for key := range c.items {
277 list = append(list, key)
278 }
279 return list
280 }
281
282 func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
283 c.lock.Lock()
284 defer c.lock.Unlock()
285 c.items = items
286
287
288 c.index.reset()
289 for key, item := range c.items {
290 c.index.updateIndices(nil, item, key)
291 }
292 }
293
294
295
296 func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
297 c.lock.RLock()
298 defer c.lock.RUnlock()
299
300 storeKeySet, err := c.index.getKeysFromIndex(indexName, obj)
301 if err != nil {
302 return nil, err
303 }
304
305 list := make([]interface{}, 0, storeKeySet.Len())
306 for storeKey := range storeKeySet {
307 list = append(list, c.items[storeKey])
308 }
309 return list, nil
310 }
311
312
313 func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
314 c.lock.RLock()
315 defer c.lock.RUnlock()
316
317 set, err := c.index.getKeysByIndex(indexName, indexedValue)
318 if err != nil {
319 return nil, err
320 }
321 list := make([]interface{}, 0, set.Len())
322 for key := range set {
323 list = append(list, c.items[key])
324 }
325
326 return list, nil
327 }
328
329
330
331 func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
332 c.lock.RLock()
333 defer c.lock.RUnlock()
334
335 set, err := c.index.getKeysByIndex(indexName, indexedValue)
336 if err != nil {
337 return nil, err
338 }
339 return set.List(), nil
340 }
341
342 func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
343 c.lock.RLock()
344 defer c.lock.RUnlock()
345
346 return c.index.getIndexValues(indexName)
347 }
348
349 func (c *threadSafeMap) GetIndexers() Indexers {
350 return c.index.indexers
351 }
352
353 func (c *threadSafeMap) AddIndexers(newIndexers Indexers) error {
354 c.lock.Lock()
355 defer c.lock.Unlock()
356
357 if err := c.index.addIndexers(newIndexers); err != nil {
358 return err
359 }
360
361
362 for key, item := range c.items {
363 for name := range newIndexers {
364 c.index.updateSingleIndex(name, nil, item, key)
365 }
366 }
367
368 return nil
369 }
370
371 func (c *threadSafeMap) Resync() error {
372
373 return nil
374 }
375
376
377 func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
378 return &threadSafeMap{
379 items: map[string]interface{}{},
380 index: &storeIndex{
381 indexers: indexers,
382 indices: indices,
383 },
384 }
385 }
386
View as plain text