...

Source file src/sigs.k8s.io/controller-runtime/pkg/cache/delegating_by_gvk_cache.go

Documentation: sigs.k8s.io/controller-runtime/pkg/cache

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"context"
    21  	"strings"
    22  	"sync"
    23  
    24  	"golang.org/x/exp/maps"
    25  	"k8s.io/apimachinery/pkg/runtime"
    26  	"k8s.io/apimachinery/pkg/runtime/schema"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  	"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
    29  )
    30  
    31  // delegatingByGVKCache delegates to a type-specific cache if present
    32  // and uses the defaultCache otherwise.
    33  type delegatingByGVKCache struct {
    34  	scheme       *runtime.Scheme
    35  	caches       map[schema.GroupVersionKind]Cache
    36  	defaultCache Cache
    37  }
    38  
    39  func (dbt *delegatingByGVKCache) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error {
    40  	cache, err := dbt.cacheForObject(obj)
    41  	if err != nil {
    42  		return err
    43  	}
    44  	return cache.Get(ctx, key, obj, opts...)
    45  }
    46  
    47  func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
    48  	cache, err := dbt.cacheForObject(list)
    49  	if err != nil {
    50  		return err
    51  	}
    52  	return cache.List(ctx, list, opts...)
    53  }
    54  
    55  func (dbt *delegatingByGVKCache) RemoveInformer(ctx context.Context, obj client.Object) error {
    56  	cache, err := dbt.cacheForObject(obj)
    57  	if err != nil {
    58  		return err
    59  	}
    60  	return cache.RemoveInformer(ctx, obj)
    61  }
    62  
    63  func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) {
    64  	cache, err := dbt.cacheForObject(obj)
    65  	if err != nil {
    66  		return nil, err
    67  	}
    68  	return cache.GetInformer(ctx, obj, opts...)
    69  }
    70  
    71  func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) {
    72  	return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk, opts...)
    73  }
    74  
    75  func (dbt *delegatingByGVKCache) Start(ctx context.Context) error {
    76  	allCaches := maps.Values(dbt.caches)
    77  	allCaches = append(allCaches, dbt.defaultCache)
    78  
    79  	wg := &sync.WaitGroup{}
    80  	errs := make(chan error)
    81  	for idx := range allCaches {
    82  		cache := allCaches[idx]
    83  		wg.Add(1)
    84  		go func() {
    85  			defer wg.Done()
    86  			if err := cache.Start(ctx); err != nil {
    87  				errs <- err
    88  			}
    89  		}()
    90  	}
    91  
    92  	select {
    93  	case err := <-errs:
    94  		return err
    95  	case <-ctx.Done():
    96  		wg.Wait()
    97  		return nil
    98  	}
    99  }
   100  
   101  func (dbt *delegatingByGVKCache) WaitForCacheSync(ctx context.Context) bool {
   102  	synced := true
   103  	for _, cache := range append(maps.Values(dbt.caches), dbt.defaultCache) {
   104  		if !cache.WaitForCacheSync(ctx) {
   105  			synced = false
   106  		}
   107  	}
   108  
   109  	return synced
   110  }
   111  
   112  func (dbt *delegatingByGVKCache) IndexField(ctx context.Context, obj client.Object, field string, extractValue client.IndexerFunc) error {
   113  	cache, err := dbt.cacheForObject(obj)
   114  	if err != nil {
   115  		return err
   116  	}
   117  	return cache.IndexField(ctx, obj, field, extractValue)
   118  }
   119  
   120  func (dbt *delegatingByGVKCache) cacheForObject(o runtime.Object) (Cache, error) {
   121  	gvk, err := apiutil.GVKForObject(o, dbt.scheme)
   122  	if err != nil {
   123  		return nil, err
   124  	}
   125  	gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
   126  	return dbt.cacheForGVK(gvk), nil
   127  }
   128  
   129  func (dbt *delegatingByGVKCache) cacheForGVK(gvk schema.GroupVersionKind) Cache {
   130  	if specific, hasSpecific := dbt.caches[gvk]; hasSpecific {
   131  		return specific
   132  	}
   133  
   134  	return dbt.defaultCache
   135  }
   136  

View as plain text