1 /* 2 Copyright 2018 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package cache 18 19 import ( 20 "context" 21 "fmt" 22 "net/http" 23 "sort" 24 "time" 25 26 "golang.org/x/exp/maps" 27 corev1 "k8s.io/api/core/v1" 28 "k8s.io/apimachinery/pkg/api/meta" 29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" 30 "k8s.io/apimachinery/pkg/fields" 31 "k8s.io/apimachinery/pkg/labels" 32 "k8s.io/apimachinery/pkg/runtime" 33 "k8s.io/apimachinery/pkg/runtime/schema" 34 "k8s.io/client-go/kubernetes/scheme" 35 "k8s.io/client-go/rest" 36 toolscache "k8s.io/client-go/tools/cache" 37 "k8s.io/utils/ptr" 38 39 "sigs.k8s.io/controller-runtime/pkg/cache/internal" 40 "sigs.k8s.io/controller-runtime/pkg/client" 41 "sigs.k8s.io/controller-runtime/pkg/client/apiutil" 42 ) 43 44 var ( 45 defaultSyncPeriod = 10 * time.Hour 46 ) 47 48 // InformerGetOptions defines the behavior of how informers are retrieved. 49 type InformerGetOptions internal.GetOptions 50 51 // InformerGetOption defines an option that alters the behavior of how informers are retrieved. 52 type InformerGetOption func(*InformerGetOptions) 53 54 // BlockUntilSynced determines whether a get request for an informer should block 55 // until the informer's cache has synced. 56 func BlockUntilSynced(shouldBlock bool) InformerGetOption { 57 return func(opts *InformerGetOptions) { 58 opts.BlockUntilSynced = &shouldBlock 59 } 60 } 61 62 // Cache knows how to load Kubernetes objects, fetch informers to request 63 // to receive events for Kubernetes objects (at a low-level), 64 // and add indices to fields on the objects stored in the cache. 65 type Cache interface { 66 // Reader acts as a client to objects stored in the cache. 67 client.Reader 68 69 // Informers loads informers and adds field indices. 70 Informers 71 } 72 73 // Informers knows how to create or fetch informers for different 74 // group-version-kinds, and add indices to those informers. It's safe to call 75 // GetInformer from multiple threads. 76 type Informers interface { 77 // GetInformer fetches or constructs an informer for the given object that corresponds to a single 78 // API kind and resource. 79 GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) 80 81 // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead 82 // of the underlying object. 83 GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) 84 85 // RemoveInformer removes an informer entry and stops it if it was running. 86 RemoveInformer(ctx context.Context, obj client.Object) error 87 88 // Start runs all the informers known to this cache until the context is closed. 89 // It blocks. 90 Start(ctx context.Context) error 91 92 // WaitForCacheSync waits for all the caches to sync. Returns false if it could not sync a cache. 93 WaitForCacheSync(ctx context.Context) bool 94 95 // FieldIndexer adds indices to the managed informers. 96 client.FieldIndexer 97 } 98 99 // Informer allows you to interact with the underlying informer. 100 type Informer interface { 101 // AddEventHandler adds an event handler to the shared informer using the shared informer's resync 102 // period. Events to a single handler are delivered sequentially, but there is no coordination 103 // between different handlers. 104 // It returns a registration handle for the handler that can be used to remove 105 // the handler again and an error if the handler cannot be added. 106 AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) 107 108 // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the 109 // specified resync period. Events to a single handler are delivered sequentially, but there is 110 // no coordination between different handlers. 111 // It returns a registration handle for the handler that can be used to remove 112 // the handler again and an error if the handler cannot be added. 113 AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) 114 115 // RemoveEventHandler removes a previously added event handler given by 116 // its registration handle. 117 // This function is guaranteed to be idempotent and thread-safe. 118 RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error 119 120 // AddIndexers adds indexers to this store. If this is called after there is already data 121 // in the store, the results are undefined. 122 AddIndexers(indexers toolscache.Indexers) error 123 124 // HasSynced return true if the informers underlying store has synced. 125 HasSynced() bool 126 // IsStopped returns true if the informer has been stopped. 127 IsStopped() bool 128 } 129 130 // AllNamespaces should be used as the map key to deliminate namespace settings 131 // that apply to all namespaces that themselves do not have explicit settings. 132 const AllNamespaces = metav1.NamespaceAll 133 134 // Options are the optional arguments for creating a new Cache object. 135 type Options struct { 136 // HTTPClient is the http client to use for the REST client 137 HTTPClient *http.Client 138 139 // Scheme is the scheme to use for mapping objects to GroupVersionKinds 140 Scheme *runtime.Scheme 141 142 // Mapper is the RESTMapper to use for mapping GroupVersionKinds to Resources 143 Mapper meta.RESTMapper 144 145 // SyncPeriod determines the minimum frequency at which watched resources are 146 // reconciled. A lower period will correct entropy more quickly, but reduce 147 // responsiveness to change if there are many watched resources. Change this 148 // value only if you know what you are doing. Defaults to 10 hours if unset. 149 // there will a 10 percent jitter between the SyncPeriod of all controllers 150 // so that all controllers will not send list requests simultaneously. 151 // 152 // This applies to all controllers. 153 // 154 // A period sync happens for two reasons: 155 // 1. To insure against a bug in the controller that causes an object to not 156 // be requeued, when it otherwise should be requeued. 157 // 2. To insure against an unknown bug in controller-runtime, or its dependencies, 158 // that causes an object to not be requeued, when it otherwise should be 159 // requeued, or to be removed from the queue, when it otherwise should not 160 // be removed. 161 // 162 // If you want 163 // 1. to insure against missed watch events, or 164 // 2. to poll services that cannot be watched, 165 // then we recommend that, instead of changing the default period, the 166 // controller requeue, with a constant duration `t`, whenever the controller 167 // is "done" with an object, and would otherwise not requeue it, i.e., we 168 // recommend the `Reconcile` function return `reconcile.Result{RequeueAfter: t}`, 169 // instead of `reconcile.Result{}`. 170 SyncPeriod *time.Duration 171 172 // ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user 173 // requests, using Get() and List(), a resource the cache does not already have an informer for. 174 // 175 // This error is distinct from an errors.NotFound. 176 // 177 // Defaults to false, which means that the cache will start a new informer 178 // for every new requested resource. 179 ReaderFailOnMissingInformer bool 180 181 // DefaultNamespaces maps namespace names to cache configs. If set, only 182 // the namespaces in here will be watched and it will by used to default 183 // ByObject.Namespaces for all objects if that is nil. 184 // 185 // It is possible to have specific Config for just some namespaces 186 // but cache all namespaces by using the AllNamespaces const as the map key. 187 // This will then include all namespaces that do not have a more specific 188 // setting. 189 // 190 // The options in the Config that are nil will be defaulted from 191 // the respective Default* settings. 192 DefaultNamespaces map[string]Config 193 194 // DefaultLabelSelector will be used as a label selector for all objects 195 // unless there is already one set in ByObject or DefaultNamespaces. 196 DefaultLabelSelector labels.Selector 197 198 // DefaultFieldSelector will be used as a field selector for all object types 199 // unless there is already one set in ByObject or DefaultNamespaces. 200 DefaultFieldSelector fields.Selector 201 202 // DefaultTransform will be used as transform for all object types 203 // unless there is already one set in ByObject or DefaultNamespaces. 204 // 205 // A typical usecase for this is to use TransformStripManagedFields 206 // to reduce the caches memory usage. 207 DefaultTransform toolscache.TransformFunc 208 209 // DefaultWatchErrorHandler will be used to the WatchErrorHandler which is called 210 // whenever ListAndWatch drops the connection with an error. 211 // 212 // After calling this handler, the informer will backoff and retry. 213 DefaultWatchErrorHandler toolscache.WatchErrorHandler 214 215 // DefaultUnsafeDisableDeepCopy is the default for UnsafeDisableDeepCopy 216 // for everything that doesn't specify this. 217 // 218 // Be very careful with this, when enabled you must DeepCopy any object before mutating it, 219 // otherwise you will mutate the object in the cache. 220 // 221 // This will be used for all object types, unless it is set in ByObject or 222 // DefaultNamespaces. 223 DefaultUnsafeDisableDeepCopy *bool 224 225 // ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object. 226 // If unset, this will fall through to the Default* settings. 227 ByObject map[client.Object]ByObject 228 229 // newInformer allows overriding of NewSharedIndexInformer for testing. 230 newInformer *func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer 231 } 232 233 // ByObject offers more fine-grained control over the cache's ListWatch by object. 234 type ByObject struct { 235 // Namespaces maps a namespace name to cache configs. If set, only the 236 // namespaces in this map will be cached. 237 // 238 // Settings in the map value that are unset will be defaulted. 239 // Use an empty value for the specific setting to prevent that. 240 // 241 // It is possible to have specific Config for just some namespaces 242 // but cache all namespaces by using the AllNamespaces const as the map key. 243 // This will then include all namespaces that do not have a more specific 244 // setting. 245 // 246 // A nil map allows to default this to the cache's DefaultNamespaces setting. 247 // An empty map prevents this and means that all namespaces will be cached. 248 // 249 // The defaulting follows the following precedence order: 250 // 1. ByObject 251 // 2. DefaultNamespaces[namespace] 252 // 3. Default* 253 // 254 // This must be unset for cluster-scoped objects. 255 Namespaces map[string]Config 256 257 // Label represents a label selector for the object. 258 Label labels.Selector 259 260 // Field represents a field selector for the object. 261 Field fields.Selector 262 263 // Transform is a transformer function for the object which gets applied 264 // when objects of the transformation are about to be committed to the cache. 265 // 266 // This function is called both for new objects to enter the cache, 267 // and for updated objects. 268 Transform toolscache.TransformFunc 269 270 // UnsafeDisableDeepCopy indicates not to deep copy objects during get or 271 // list objects per GVK at the specified object. 272 // Be very careful with this, when enabled you must DeepCopy any object before mutating it, 273 // otherwise you will mutate the object in the cache. 274 UnsafeDisableDeepCopy *bool 275 } 276 277 // Config describes all potential options for a given watch. 278 type Config struct { 279 // LabelSelector specifies a label selector. A nil value allows to 280 // default this. 281 // 282 // Set to labels.Everything() if you don't want this defaulted. 283 LabelSelector labels.Selector 284 285 // FieldSelector specifics a field selector. A nil value allows to 286 // default this. 287 // 288 // Set to fields.Everything() if you don't want this defaulted. 289 FieldSelector fields.Selector 290 291 // Transform specifies a transform func. A nil value allows to default 292 // this. 293 // 294 // Set to an empty func to prevent this: 295 // func(in interface{}) (interface{}, error) { return in, nil } 296 Transform toolscache.TransformFunc 297 298 // UnsafeDisableDeepCopy specifies if List and Get requests against the 299 // cache should not DeepCopy. A nil value allows to default this. 300 UnsafeDisableDeepCopy *bool 301 } 302 303 // NewCacheFunc - Function for creating a new cache from the options and a rest config. 304 type NewCacheFunc func(config *rest.Config, opts Options) (Cache, error) 305 306 // New initializes and returns a new Cache. 307 func New(cfg *rest.Config, opts Options) (Cache, error) { 308 opts, err := defaultOpts(cfg, opts) 309 if err != nil { 310 return nil, err 311 } 312 313 newCacheFunc := newCache(cfg, opts) 314 315 var defaultCache Cache 316 if len(opts.DefaultNamespaces) > 0 { 317 defaultConfig := optionDefaultsToConfig(&opts) 318 defaultCache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, opts.DefaultNamespaces, &defaultConfig) 319 } else { 320 defaultCache = newCacheFunc(optionDefaultsToConfig(&opts), corev1.NamespaceAll) 321 } 322 323 if len(opts.ByObject) == 0 { 324 return defaultCache, nil 325 } 326 327 delegating := &delegatingByGVKCache{ 328 scheme: opts.Scheme, 329 caches: make(map[schema.GroupVersionKind]Cache, len(opts.ByObject)), 330 defaultCache: defaultCache, 331 } 332 333 for obj, config := range opts.ByObject { 334 gvk, err := apiutil.GVKForObject(obj, opts.Scheme) 335 if err != nil { 336 return nil, fmt.Errorf("failed to get GVK for type %T: %w", obj, err) 337 } 338 var cache Cache 339 if len(config.Namespaces) > 0 { 340 cache = newMultiNamespaceCache(newCacheFunc, opts.Scheme, opts.Mapper, config.Namespaces, nil) 341 } else { 342 cache = newCacheFunc(byObjectToConfig(config), corev1.NamespaceAll) 343 } 344 delegating.caches[gvk] = cache 345 } 346 347 return delegating, nil 348 } 349 350 // TransformStripManagedFields strips the managed fields of an object before it is committed to the cache. 351 // If you are not explicitly accessing managedFields from your code, setting this as `DefaultTransform` 352 // on the cache can lead to a significant reduction in memory usage. 353 func TransformStripManagedFields() toolscache.TransformFunc { 354 return func(in any) (any, error) { 355 // Nilcheck managed fields to avoid hitting https://github.com/kubernetes/kubernetes/issues/124337 356 if obj, err := meta.Accessor(in); err == nil && obj.GetManagedFields() != nil { 357 obj.SetManagedFields(nil) 358 } 359 360 return in, nil 361 } 362 } 363 364 func optionDefaultsToConfig(opts *Options) Config { 365 return Config{ 366 LabelSelector: opts.DefaultLabelSelector, 367 FieldSelector: opts.DefaultFieldSelector, 368 Transform: opts.DefaultTransform, 369 UnsafeDisableDeepCopy: opts.DefaultUnsafeDisableDeepCopy, 370 } 371 } 372 373 func byObjectToConfig(byObject ByObject) Config { 374 return Config{ 375 LabelSelector: byObject.Label, 376 FieldSelector: byObject.Field, 377 Transform: byObject.Transform, 378 UnsafeDisableDeepCopy: byObject.UnsafeDisableDeepCopy, 379 } 380 } 381 382 type newCacheFunc func(config Config, namespace string) Cache 383 384 func newCache(restConfig *rest.Config, opts Options) newCacheFunc { 385 return func(config Config, namespace string) Cache { 386 return &informerCache{ 387 scheme: opts.Scheme, 388 Informers: internal.NewInformers(restConfig, &internal.InformersOpts{ 389 HTTPClient: opts.HTTPClient, 390 Scheme: opts.Scheme, 391 Mapper: opts.Mapper, 392 ResyncPeriod: *opts.SyncPeriod, 393 Namespace: namespace, 394 Selector: internal.Selector{ 395 Label: config.LabelSelector, 396 Field: config.FieldSelector, 397 }, 398 Transform: config.Transform, 399 WatchErrorHandler: opts.DefaultWatchErrorHandler, 400 UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), 401 NewInformer: opts.newInformer, 402 }), 403 readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, 404 } 405 } 406 } 407 408 func defaultOpts(config *rest.Config, opts Options) (Options, error) { 409 config = rest.CopyConfig(config) 410 if config.UserAgent == "" { 411 config.UserAgent = rest.DefaultKubernetesUserAgent() 412 } 413 414 // Use the rest HTTP client for the provided config if unset 415 if opts.HTTPClient == nil { 416 var err error 417 opts.HTTPClient, err = rest.HTTPClientFor(config) 418 if err != nil { 419 return Options{}, fmt.Errorf("could not create HTTP client from config: %w", err) 420 } 421 } 422 423 // Use the default Kubernetes Scheme if unset 424 if opts.Scheme == nil { 425 opts.Scheme = scheme.Scheme 426 } 427 428 // Construct a new Mapper if unset 429 if opts.Mapper == nil { 430 var err error 431 opts.Mapper, err = apiutil.NewDynamicRESTMapper(config, opts.HTTPClient) 432 if err != nil { 433 return Options{}, fmt.Errorf("could not create RESTMapper from config: %w", err) 434 } 435 } 436 437 for obj, byObject := range opts.ByObject { 438 isNamespaced, err := apiutil.IsObjectNamespaced(obj, opts.Scheme, opts.Mapper) 439 if err != nil { 440 return opts, fmt.Errorf("failed to determine if %T is namespaced: %w", obj, err) 441 } 442 if !isNamespaced && byObject.Namespaces != nil { 443 return opts, fmt.Errorf("type %T is not namespaced, but its ByObject.Namespaces setting is not nil", obj) 444 } 445 446 if isNamespaced && byObject.Namespaces == nil { 447 byObject.Namespaces = maps.Clone(opts.DefaultNamespaces) 448 } 449 450 // Default the namespace-level configs first, because they need to use the undefaulted type-level config 451 // to be able to potentially fall through to settings from DefaultNamespaces. 452 for namespace, config := range byObject.Namespaces { 453 // 1. Default from the undefaulted type-level config 454 config = defaultConfig(config, byObjectToConfig(byObject)) 455 456 // 2. Default from the namespace-level config. This was defaulted from the global default config earlier, but 457 // might not have an entry for the current namespace. 458 if defaultNamespaceSettings, hasDefaultNamespace := opts.DefaultNamespaces[namespace]; hasDefaultNamespace { 459 config = defaultConfig(config, defaultNamespaceSettings) 460 } 461 462 // 3. Default from the global defaults 463 config = defaultConfig(config, optionDefaultsToConfig(&opts)) 464 465 if namespace == metav1.NamespaceAll { 466 config.FieldSelector = fields.AndSelectors( 467 appendIfNotNil( 468 namespaceAllSelector(maps.Keys(byObject.Namespaces)), 469 config.FieldSelector, 470 )..., 471 ) 472 } 473 474 byObject.Namespaces[namespace] = config 475 } 476 477 // Only default ByObject iself if it isn't namespaced or has no namespaces configured, as only 478 // then any of this will be honored. 479 if !isNamespaced || len(byObject.Namespaces) == 0 { 480 defaultedConfig := defaultConfig(byObjectToConfig(byObject), optionDefaultsToConfig(&opts)) 481 byObject.Label = defaultedConfig.LabelSelector 482 byObject.Field = defaultedConfig.FieldSelector 483 byObject.Transform = defaultedConfig.Transform 484 byObject.UnsafeDisableDeepCopy = defaultedConfig.UnsafeDisableDeepCopy 485 } 486 487 opts.ByObject[obj] = byObject 488 } 489 490 // Default namespaces after byObject has been defaulted, otherwise a namespace without selectors 491 // will get the `Default` selectors, then get copied to byObject and then not get defaulted from 492 // byObject, as it already has selectors. 493 for namespace, cfg := range opts.DefaultNamespaces { 494 cfg = defaultConfig(cfg, optionDefaultsToConfig(&opts)) 495 if namespace == metav1.NamespaceAll { 496 cfg.FieldSelector = fields.AndSelectors( 497 appendIfNotNil( 498 namespaceAllSelector(maps.Keys(opts.DefaultNamespaces)), 499 cfg.FieldSelector, 500 )..., 501 ) 502 } 503 opts.DefaultNamespaces[namespace] = cfg 504 } 505 506 // Default the resync period to 10 hours if unset 507 if opts.SyncPeriod == nil { 508 opts.SyncPeriod = &defaultSyncPeriod 509 } 510 return opts, nil 511 } 512 513 func defaultConfig(toDefault, defaultFrom Config) Config { 514 if toDefault.LabelSelector == nil { 515 toDefault.LabelSelector = defaultFrom.LabelSelector 516 } 517 if toDefault.FieldSelector == nil { 518 toDefault.FieldSelector = defaultFrom.FieldSelector 519 } 520 if toDefault.Transform == nil { 521 toDefault.Transform = defaultFrom.Transform 522 } 523 if toDefault.UnsafeDisableDeepCopy == nil { 524 toDefault.UnsafeDisableDeepCopy = defaultFrom.UnsafeDisableDeepCopy 525 } 526 527 return toDefault 528 } 529 530 func namespaceAllSelector(namespaces []string) []fields.Selector { 531 selectors := make([]fields.Selector, 0, len(namespaces)-1) 532 sort.Strings(namespaces) 533 for _, namespace := range namespaces { 534 if namespace != metav1.NamespaceAll { 535 selectors = append(selectors, fields.OneTermNotEqualSelector("metadata.namespace", namespace)) 536 } 537 } 538 539 return selectors 540 } 541 542 func appendIfNotNil[T comparable](a []T, b T) []T { 543 if b != *new(T) { 544 return append(a, b) 545 } 546 return a 547 } 548