1
16
17 package cache
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 corev1 "k8s.io/api/core/v1"
25 apimeta "k8s.io/apimachinery/pkg/api/meta"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/runtime"
28 "k8s.io/apimachinery/pkg/runtime/schema"
29 toolscache "k8s.io/client-go/tools/cache"
30
31 "sigs.k8s.io/controller-runtime/pkg/client"
32 "sigs.k8s.io/controller-runtime/pkg/client/apiutil"
33 )
34
35
36 const globalCache = "_cluster-scope"
37
38 func newMultiNamespaceCache(
39 newCache newCacheFunc,
40 scheme *runtime.Scheme,
41 restMapper apimeta.RESTMapper,
42 namespaces map[string]Config,
43 globalConfig *Config,
44 ) Cache {
45
46 caches := map[string]Cache{}
47 for namespace, config := range namespaces {
48 caches[namespace] = newCache(config, namespace)
49 }
50
51
52 var clusterCache Cache
53 if globalConfig != nil {
54 clusterCache = newCache(*globalConfig, corev1.NamespaceAll)
55 }
56
57 return &multiNamespaceCache{
58 namespaceToCache: caches,
59 Scheme: scheme,
60 RESTMapper: restMapper,
61 clusterCache: clusterCache,
62 }
63 }
64
65
66
67
68
69 type multiNamespaceCache struct {
70 Scheme *runtime.Scheme
71 RESTMapper apimeta.RESTMapper
72 namespaceToCache map[string]Cache
73 clusterCache Cache
74 }
75
76 var _ Cache = &multiNamespaceCache{}
77
78
79
80 func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
81
82
83 isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
84 if err != nil {
85 return nil, err
86 }
87 if !isNamespaced {
88 clusterCacheInformer, err := c.clusterCache.GetInformer(ctx, obj, opts...)
89 if err != nil {
90 return nil, err
91 }
92
93 return &multiNamespaceInformer{
94 namespaceToInformer: map[string]Informer{
95 globalCache: clusterCacheInformer,
96 },
97 }, nil
98 }
99
100 namespaceToInformer := map[string]Informer{}
101 for ns, cache := range c.namespaceToCache {
102 informer, err := cache.GetInformer(ctx, obj, opts...)
103 if err != nil {
104 return nil, err
105 }
106 namespaceToInformer[ns] = informer
107 }
108
109 return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
110 }
111
112 func (c *multiNamespaceCache) RemoveInformer(ctx context.Context, obj client.Object) error {
113
114
115 isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
116 if err != nil {
117 return err
118 }
119 if !isNamespaced {
120 return c.clusterCache.RemoveInformer(ctx, obj)
121 }
122
123 for _, cache := range c.namespaceToCache {
124 err := cache.RemoveInformer(ctx, obj)
125 if err != nil {
126 return err
127 }
128 }
129
130 return nil
131 }
132
133 func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
134
135
136 isNamespaced, err := apiutil.IsGVKNamespaced(gvk, c.RESTMapper)
137 if err != nil {
138 return nil, err
139 }
140 if !isNamespaced {
141 clusterCacheInformer, err := c.clusterCache.GetInformerForKind(ctx, gvk, opts...)
142 if err != nil {
143 return nil, err
144 }
145
146 return &multiNamespaceInformer{
147 namespaceToInformer: map[string]Informer{
148 globalCache: clusterCacheInformer,
149 },
150 }, nil
151 }
152
153 namespaceToInformer := map[string]Informer{}
154 for ns, cache := range c.namespaceToCache {
155 informer, err := cache.GetInformerForKind(ctx, gvk, opts...)
156 if err != nil {
157 return nil, err
158 }
159 namespaceToInformer[ns] = informer
160 }
161
162 return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil
163 }
164
165 func (c *multiNamespaceCache) Start(ctx context.Context) error {
166 errs := make(chan error)
167
168 if c.clusterCache != nil {
169 go func() {
170 err := c.clusterCache.Start(ctx)
171 if err != nil {
172 errs <- fmt.Errorf("failed to start cluster-scoped cache: %w", err)
173 }
174 }()
175 }
176
177
178 for ns, cache := range c.namespaceToCache {
179 go func(ns string, cache Cache) {
180 if err := cache.Start(ctx); err != nil {
181 errs <- fmt.Errorf("failed to start cache for namespace %s: %w", ns, err)
182 }
183 }(ns, cache)
184 }
185 select {
186 case <-ctx.Done():
187 return nil
188 case err := <-errs:
189 return err
190 }
191 }
192
193 func (c *multiNamespaceCache) WaitForCacheSync(ctx context.Context) bool {
194 synced := true
195 for _, cache := range c.namespaceToCache {
196 if !cache.WaitForCacheSync(ctx) {
197 synced = false
198 }
199 }
200
201
202 if c.clusterCache != nil && !c.clusterCache.WaitForCacheSync(ctx) {
203 synced = false
204 }
205 return synced
206 }
207
208 func (c *multiNamespaceCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
209 isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
210 if err != nil {
211 return err
212 }
213
214 if !isNamespaced {
215 return c.clusterCache.IndexField(ctx, obj, field, extractValue)
216 }
217
218 for _, cache := range c.namespaceToCache {
219 if err := cache.IndexField(ctx, obj, field, extractValue); err != nil {
220 return err
221 }
222 }
223 return nil
224 }
225
226 func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
227 isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper)
228 if err != nil {
229 return err
230 }
231
232 if !isNamespaced {
233
234 return c.clusterCache.Get(ctx, key, obj)
235 }
236
237 cache, ok := c.namespaceToCache[key.Namespace]
238 if !ok {
239 if global, hasGlobal := c.namespaceToCache[metav1.NamespaceAll]; hasGlobal {
240 return global.Get(ctx, key, obj, opts...)
241 }
242 return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", key)
243 }
244 return cache.Get(ctx, key, obj, opts...)
245 }
246
247
248 func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
249 listOpts := client.ListOptions{}
250 listOpts.ApplyOptions(opts)
251
252 isNamespaced, err := apiutil.IsObjectNamespaced(list, c.Scheme, c.RESTMapper)
253 if err != nil {
254 return err
255 }
256
257 if !isNamespaced {
258
259 return c.clusterCache.List(ctx, list, opts...)
260 }
261
262 if listOpts.Namespace != corev1.NamespaceAll {
263 cache, ok := c.namespaceToCache[listOpts.Namespace]
264 if !ok {
265 return fmt.Errorf("unable to list: %v because of unknown namespace for the cache", listOpts.Namespace)
266 }
267 return cache.List(ctx, list, opts...)
268 }
269
270 listAccessor, err := apimeta.ListAccessor(list)
271 if err != nil {
272 return err
273 }
274
275 allItems, err := apimeta.ExtractList(list)
276 if err != nil {
277 return err
278 }
279
280 limitSet := listOpts.Limit > 0
281
282 var resourceVersion string
283 for _, cache := range c.namespaceToCache {
284 listObj := list.DeepCopyObject().(client.ObjectList)
285 err = cache.List(ctx, listObj, &listOpts)
286 if err != nil {
287 return err
288 }
289 items, err := apimeta.ExtractList(listObj)
290 if err != nil {
291 return err
292 }
293 accessor, err := apimeta.ListAccessor(listObj)
294 if err != nil {
295 return fmt.Errorf("object: %T must be a list type", list)
296 }
297 allItems = append(allItems, items...)
298
299
300 resourceVersion = accessor.GetResourceVersion()
301 if limitSet {
302
303
304 listOpts.Limit -= int64(len(items))
305
306
307
308
309 if listOpts.Limit == 0 {
310 break
311 }
312 }
313 }
314 listAccessor.SetResourceVersion(resourceVersion)
315
316 return apimeta.SetList(list, allItems)
317 }
318
319
320 type multiNamespaceInformer struct {
321 namespaceToInformer map[string]Informer
322 }
323
324 type handlerRegistration struct {
325 handles map[string]toolscache.ResourceEventHandlerRegistration
326 }
327
328 type syncer interface {
329 HasSynced() bool
330 }
331
332
333
334 func (h handlerRegistration) HasSynced() bool {
335 for _, reg := range h.handles {
336 if s, ok := reg.(syncer); ok {
337 if !s.HasSynced() {
338 return false
339 }
340 }
341 }
342 return true
343 }
344
345 var _ Informer = &multiNamespaceInformer{}
346
347
348 func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) {
349 handles := handlerRegistration{
350 handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)),
351 }
352
353 for ns, informer := range i.namespaceToInformer {
354 registration, err := informer.AddEventHandler(handler)
355 if err != nil {
356 return nil, err
357 }
358 handles.handles[ns] = registration
359 }
360
361 return handles, nil
362 }
363
364
365 func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) {
366 handles := handlerRegistration{
367 handles: make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)),
368 }
369
370 for ns, informer := range i.namespaceToInformer {
371 registration, err := informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
372 if err != nil {
373 return nil, err
374 }
375 handles.handles[ns] = registration
376 }
377
378 return handles, nil
379 }
380
381
382 func (i *multiNamespaceInformer) RemoveEventHandler(h toolscache.ResourceEventHandlerRegistration) error {
383 handles, ok := h.(handlerRegistration)
384 if !ok {
385 return fmt.Errorf("registration is not a registration returned by multiNamespaceInformer")
386 }
387 for ns, informer := range i.namespaceToInformer {
388 registration, ok := handles.handles[ns]
389 if !ok {
390 continue
391 }
392 if err := informer.RemoveEventHandler(registration); err != nil {
393 return err
394 }
395 }
396 return nil
397 }
398
399
400 func (i *multiNamespaceInformer) AddIndexers(indexers toolscache.Indexers) error {
401 for _, informer := range i.namespaceToInformer {
402 err := informer.AddIndexers(indexers)
403 if err != nil {
404 return err
405 }
406 }
407 return nil
408 }
409
410
411 func (i *multiNamespaceInformer) HasSynced() bool {
412 for _, informer := range i.namespaceToInformer {
413 if !informer.HasSynced() {
414 return false
415 }
416 }
417 return true
418 }
419
420
421 func (i *multiNamespaceInformer) IsStopped() bool {
422 for _, informer := range i.namespaceToInformer {
423 if stopped := informer.IsStopped(); !stopped {
424 return false
425 }
426 }
427 return true
428 }
429
View as plain text