1
16
17 package volumebinding
18
19 import (
20 "fmt"
21 "strconv"
22 "sync"
23
24 "k8s.io/klog/v2"
25
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/meta"
28 "k8s.io/client-go/tools/cache"
29 storagehelpers "k8s.io/component-helpers/storage/volume"
30 )
31
32
33
34
35
36 type AssumeCache interface {
37
38 Assume(obj interface{}) error
39
40
41 Restore(objName string)
42
43
44 Get(objName string) (interface{}, error)
45
46
47 GetAPIObj(objName string) (interface{}, error)
48
49
50 List(indexObj interface{}) []interface{}
51 }
52
53 type errWrongType struct {
54 typeName string
55 object interface{}
56 }
57
58 func (e *errWrongType) Error() string {
59 return fmt.Sprintf("could not convert object to type %v: %+v", e.typeName, e.object)
60 }
61
62 type errNotFound struct {
63 typeName string
64 objectName string
65 }
66
67 func (e *errNotFound) Error() string {
68 return fmt.Sprintf("could not find %v %q", e.typeName, e.objectName)
69 }
70
71 type errObjectName struct {
72 detailedErr error
73 }
74
75 func (e *errObjectName) Error() string {
76 return fmt.Sprintf("failed to get object name: %v", e.detailedErr)
77 }
78
79
80
81
82
83
84
85
86
87
88
89 type assumeCache struct {
90
91
92 logger klog.Logger
93
94
95 rwMutex sync.RWMutex
96
97
98 description string
99
100
101 store cache.Indexer
102
103
104 indexFunc cache.IndexFunc
105 indexName string
106 }
107
108 type objInfo struct {
109
110 name string
111
112
113 latestObj interface{}
114
115
116 apiObj interface{}
117 }
118
119 func objInfoKeyFunc(obj interface{}) (string, error) {
120 objInfo, ok := obj.(*objInfo)
121 if !ok {
122 return "", &errWrongType{"objInfo", obj}
123 }
124 return objInfo.name, nil
125 }
126
127 func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
128 objInfo, ok := obj.(*objInfo)
129 if !ok {
130 return []string{""}, &errWrongType{"objInfo", obj}
131 }
132 return c.indexFunc(objInfo.latestObj)
133 }
134
135
136 func NewAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) AssumeCache {
137 c := &assumeCache{
138 logger: logger,
139 description: description,
140 indexFunc: indexFunc,
141 indexName: indexName,
142 }
143 indexers := cache.Indexers{}
144 if indexName != "" && indexFunc != nil {
145 indexers[indexName] = c.objInfoIndexFunc
146 }
147 c.store = cache.NewIndexer(objInfoKeyFunc, indexers)
148
149
150 if informer != nil {
151 informer.AddEventHandler(
152 cache.ResourceEventHandlerFuncs{
153 AddFunc: c.add,
154 UpdateFunc: c.update,
155 DeleteFunc: c.delete,
156 },
157 )
158 }
159 return c
160 }
161
162 func (c *assumeCache) add(obj interface{}) {
163 if obj == nil {
164 return
165 }
166
167 name, err := cache.MetaNamespaceKeyFunc(obj)
168 if err != nil {
169 c.logger.Error(&errObjectName{err}, "Add failed")
170 return
171 }
172
173 c.rwMutex.Lock()
174 defer c.rwMutex.Unlock()
175
176 if objInfo, _ := c.getObjInfo(name); objInfo != nil {
177 newVersion, err := c.getObjVersion(name, obj)
178 if err != nil {
179 c.logger.Error(err, "Add failed: couldn't get object version")
180 return
181 }
182
183 storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
184 if err != nil {
185 c.logger.Error(err, "Add failed: couldn't get stored object version")
186 return
187 }
188
189
190
191 if newVersion <= storedVersion {
192 c.logger.V(10).Info("Skip adding object to assume cache because version is not newer than storedVersion", "description", c.description, "cacheKey", name, "newVersion", newVersion, "storedVersion", storedVersion)
193 return
194 }
195 }
196
197 objInfo := &objInfo{name: name, latestObj: obj, apiObj: obj}
198 if err = c.store.Update(objInfo); err != nil {
199 c.logger.Info("Error occurred while updating stored object", "err", err)
200 } else {
201 c.logger.V(10).Info("Adding object to assume cache", "description", c.description, "cacheKey", name, "assumeCache", obj)
202 }
203 }
204
205 func (c *assumeCache) update(oldObj interface{}, newObj interface{}) {
206 c.add(newObj)
207 }
208
209 func (c *assumeCache) delete(obj interface{}) {
210 if obj == nil {
211 return
212 }
213
214 name, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
215 if err != nil {
216 c.logger.Error(&errObjectName{err}, "Failed to delete")
217 return
218 }
219
220 c.rwMutex.Lock()
221 defer c.rwMutex.Unlock()
222
223 objInfo := &objInfo{name: name}
224 err = c.store.Delete(objInfo)
225 if err != nil {
226 c.logger.Error(err, "Failed to delete", "description", c.description, "cacheKey", name)
227 }
228 }
229
230 func (c *assumeCache) getObjVersion(name string, obj interface{}) (int64, error) {
231 objAccessor, err := meta.Accessor(obj)
232 if err != nil {
233 return -1, err
234 }
235
236 objResourceVersion, err := strconv.ParseInt(objAccessor.GetResourceVersion(), 10, 64)
237 if err != nil {
238 return -1, fmt.Errorf("error parsing ResourceVersion %q for %v %q: %s", objAccessor.GetResourceVersion(), c.description, name, err)
239 }
240 return objResourceVersion, nil
241 }
242
243 func (c *assumeCache) getObjInfo(name string) (*objInfo, error) {
244 obj, ok, err := c.store.GetByKey(name)
245 if err != nil {
246 return nil, err
247 }
248 if !ok {
249 return nil, &errNotFound{c.description, name}
250 }
251
252 objInfo, ok := obj.(*objInfo)
253 if !ok {
254 return nil, &errWrongType{"objInfo", obj}
255 }
256 return objInfo, nil
257 }
258
259 func (c *assumeCache) Get(objName string) (interface{}, error) {
260 c.rwMutex.RLock()
261 defer c.rwMutex.RUnlock()
262
263 objInfo, err := c.getObjInfo(objName)
264 if err != nil {
265 return nil, err
266 }
267 return objInfo.latestObj, nil
268 }
269
270 func (c *assumeCache) GetAPIObj(objName string) (interface{}, error) {
271 c.rwMutex.RLock()
272 defer c.rwMutex.RUnlock()
273
274 objInfo, err := c.getObjInfo(objName)
275 if err != nil {
276 return nil, err
277 }
278 return objInfo.apiObj, nil
279 }
280
281 func (c *assumeCache) List(indexObj interface{}) []interface{} {
282 c.rwMutex.RLock()
283 defer c.rwMutex.RUnlock()
284
285 allObjs := []interface{}{}
286 var objs []interface{}
287 if c.indexName != "" {
288 o, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
289 if err != nil {
290 c.logger.Error(err, "List index error")
291 return nil
292 }
293 objs = o
294 } else {
295 objs = c.store.List()
296 }
297
298 for _, obj := range objs {
299 objInfo, ok := obj.(*objInfo)
300 if !ok {
301 c.logger.Error(&errWrongType{"objInfo", obj}, "List error")
302 continue
303 }
304 allObjs = append(allObjs, objInfo.latestObj)
305 }
306 return allObjs
307 }
308
309 func (c *assumeCache) Assume(obj interface{}) error {
310 name, err := cache.MetaNamespaceKeyFunc(obj)
311 if err != nil {
312 return &errObjectName{err}
313 }
314
315 c.rwMutex.Lock()
316 defer c.rwMutex.Unlock()
317
318 objInfo, err := c.getObjInfo(name)
319 if err != nil {
320 return err
321 }
322
323 newVersion, err := c.getObjVersion(name, obj)
324 if err != nil {
325 return err
326 }
327
328 storedVersion, err := c.getObjVersion(name, objInfo.latestObj)
329 if err != nil {
330 return err
331 }
332
333 if newVersion < storedVersion {
334 return fmt.Errorf("%v %q is out of sync (stored: %d, assume: %d)", c.description, name, storedVersion, newVersion)
335 }
336
337
338 objInfo.latestObj = obj
339 c.logger.V(4).Info("Assumed object", "description", c.description, "cacheKey", name, "version", newVersion)
340 return nil
341 }
342
343 func (c *assumeCache) Restore(objName string) {
344 c.rwMutex.Lock()
345 defer c.rwMutex.Unlock()
346
347 objInfo, err := c.getObjInfo(objName)
348 if err != nil {
349
350 c.logger.V(5).Info("Restore object", "description", c.description, "cacheKey", objName, "err", err)
351 } else {
352 objInfo.latestObj = objInfo.apiObj
353 c.logger.V(4).Info("Restored object", "description", c.description, "cacheKey", objName)
354 }
355 }
356
357
358 type PVAssumeCache interface {
359 AssumeCache
360
361 GetPV(pvName string) (*v1.PersistentVolume, error)
362 GetAPIPV(pvName string) (*v1.PersistentVolume, error)
363 ListPVs(storageClassName string) []*v1.PersistentVolume
364 }
365
366 type pvAssumeCache struct {
367 AssumeCache
368 logger klog.Logger
369 }
370
371 func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
372 if pv, ok := obj.(*v1.PersistentVolume); ok {
373 return []string{storagehelpers.GetPersistentVolumeClass(pv)}, nil
374 }
375 return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
376 }
377
378
379 func NewPVAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVAssumeCache {
380 logger = klog.LoggerWithName(logger, "PV Cache")
381 return &pvAssumeCache{
382 AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc),
383 logger: logger,
384 }
385 }
386
387 func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
388 obj, err := c.Get(pvName)
389 if err != nil {
390 return nil, err
391 }
392
393 pv, ok := obj.(*v1.PersistentVolume)
394 if !ok {
395 return nil, &errWrongType{"v1.PersistentVolume", obj}
396 }
397 return pv, nil
398 }
399
400 func (c *pvAssumeCache) GetAPIPV(pvName string) (*v1.PersistentVolume, error) {
401 obj, err := c.GetAPIObj(pvName)
402 if err != nil {
403 return nil, err
404 }
405 pv, ok := obj.(*v1.PersistentVolume)
406 if !ok {
407 return nil, &errWrongType{"v1.PersistentVolume", obj}
408 }
409 return pv, nil
410 }
411
412 func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
413 objs := c.List(&v1.PersistentVolume{
414 Spec: v1.PersistentVolumeSpec{
415 StorageClassName: storageClassName,
416 },
417 })
418 pvs := []*v1.PersistentVolume{}
419 for _, obj := range objs {
420 pv, ok := obj.(*v1.PersistentVolume)
421 if !ok {
422 c.logger.Error(&errWrongType{"v1.PersistentVolume", obj}, "ListPVs")
423 continue
424 }
425 pvs = append(pvs, pv)
426 }
427 return pvs
428 }
429
430
431 type PVCAssumeCache interface {
432 AssumeCache
433
434
435
436 GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
437 GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error)
438 }
439
440 type pvcAssumeCache struct {
441 AssumeCache
442 logger klog.Logger
443 }
444
445
446 func NewPVCAssumeCache(logger klog.Logger, informer cache.SharedIndexInformer) PVCAssumeCache {
447 logger = klog.LoggerWithName(logger, "PVC Cache")
448 return &pvcAssumeCache{
449 AssumeCache: NewAssumeCache(logger, informer, "v1.PersistentVolumeClaim", "", nil),
450 logger: logger,
451 }
452 }
453
454 func (c *pvcAssumeCache) GetPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
455 obj, err := c.Get(pvcKey)
456 if err != nil {
457 return nil, err
458 }
459
460 pvc, ok := obj.(*v1.PersistentVolumeClaim)
461 if !ok {
462 return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
463 }
464 return pvc, nil
465 }
466
467 func (c *pvcAssumeCache) GetAPIPVC(pvcKey string) (*v1.PersistentVolumeClaim, error) {
468 obj, err := c.GetAPIObj(pvcKey)
469 if err != nil {
470 return nil, err
471 }
472 pvc, ok := obj.(*v1.PersistentVolumeClaim)
473 if !ok {
474 return nil, &errWrongType{"v1.PersistentVolumeClaim", obj}
475 }
476 return pvc, nil
477 }
478
View as plain text