...
1
16
17 package cache
18
19 import (
20 "context"
21 "strings"
22 "sync"
23
24 "golang.org/x/exp/maps"
25 "k8s.io/apimachinery/pkg/runtime"
26 "k8s.io/apimachinery/pkg/runtime/schema"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
29 )
30
31
32
33 type delegatingByGVKCache struct {
34 scheme *runtime.Scheme
35 caches map[schema.GroupVersionKind]Cache
36 defaultCache Cache
37 }
38
39 func (dbt *delegatingByGVKCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
40 cache, err := dbt.cacheForObject(obj)
41 if err != nil {
42 return err
43 }
44 return cache.Get(ctx, key, obj, opts...)
45 }
46
47 func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
48 cache, err := dbt.cacheForObject(list)
49 if err != nil {
50 return err
51 }
52 return cache.List(ctx, list, opts...)
53 }
54
55 func (dbt *delegatingByGVKCache) RemoveInformer(ctx context.Context, obj client.Object) error {
56 cache, err := dbt.cacheForObject(obj)
57 if err != nil {
58 return err
59 }
60 return cache.RemoveInformer(ctx, obj)
61 }
62
63 func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
64 cache, err := dbt.cacheForObject(obj)
65 if err != nil {
66 return nil, err
67 }
68 return cache.GetInformer(ctx, obj, opts...)
69 }
70
71 func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
72 return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk, opts...)
73 }
74
75 func (dbt *delegatingByGVKCache) Start(ctx context.Context) error {
76 allCaches := maps.Values(dbt.caches)
77 allCaches = append(allCaches, dbt.defaultCache)
78
79 wg := &sync.WaitGroup{}
80 errs := make(chan error)
81 for idx := range allCaches {
82 cache := allCaches[idx]
83 wg.Add(1)
84 go func() {
85 defer wg.Done()
86 if err := cache.Start(ctx); err != nil {
87 errs <- err
88 }
89 }()
90 }
91
92 select {
93 case err := <-errs:
94 return err
95 case <-ctx.Done():
96 wg.Wait()
97 return nil
98 }
99 }
100
101 func (dbt *delegatingByGVKCache) WaitForCacheSync(ctx context.Context) bool {
102 synced := true
103 for _, cache := range append(maps.Values(dbt.caches), dbt.defaultCache) {
104 if !cache.WaitForCacheSync(ctx) {
105 synced = false
106 }
107 }
108
109 return synced
110 }
111
112 func (dbt *delegatingByGVKCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
113 cache, err := dbt.cacheForObject(obj)
114 if err != nil {
115 return err
116 }
117 return cache.IndexField(ctx, obj, field, extractValue)
118 }
119
120 func (dbt *delegatingByGVKCache) cacheForObject(o runtime.Object) (Cache, error) {
121 gvk, err := apiutil.GVKForObject(o, dbt.scheme)
122 if err != nil {
123 return nil, err
124 }
125 gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
126 return dbt.cacheForGVK(gvk), nil
127 }
128
129 func (dbt *delegatingByGVKCache) cacheForGVK(gvk schema.GroupVersionKind) Cache {
130 if specific, hasSpecific := dbt.caches[gvk]; hasSpecific {
131 return specific
132 }
133
134 return dbt.defaultCache
135 }
136
View as plain text