1 /* 2 Copyright 2015 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package cache 18 19 import ( 20 "errors" 21 "fmt" 22 "sync" 23 "time" 24 25 "k8s.io/apimachinery/pkg/api/meta" 26 "k8s.io/apimachinery/pkg/runtime" 27 utilruntime "k8s.io/apimachinery/pkg/util/runtime" 28 "k8s.io/apimachinery/pkg/util/wait" 29 "k8s.io/client-go/tools/cache/synctrack" 30 "k8s.io/utils/buffer" 31 "k8s.io/utils/clock" 32 33 "k8s.io/klog/v2" 34 35 clientgofeaturegate "k8s.io/client-go/features" 36 ) 37 38 // SharedInformer provides eventually consistent linkage of its 39 // clients to the authoritative state of a given collection of 40 // objects. An object is identified by its API group, kind/resource, 41 // namespace (if any), and name; the `ObjectMeta.UID` is not part of 42 // an object's ID as far as this contract is concerned. One 43 // SharedInformer provides linkage to objects of a particular API 44 // group and kind/resource. The linked object collection of a 45 // SharedInformer may be further restricted to one namespace (if 46 // applicable) and/or by label selector and/or field selector. 47 // 48 // The authoritative state of an object is what apiservers provide 49 // access to, and an object goes through a strict sequence of states. 50 // An object state is either (1) present with a ResourceVersion and 51 // other appropriate content or (2) "absent". 52 // 53 // A SharedInformer maintains a local cache --- exposed by GetStore(), 54 // by GetIndexer() in the case of an indexed informer, and possibly by 55 // machinery involved in creating and/or accessing the informer --- of 56 // the state of each relevant object. This cache is eventually 57 // consistent with the authoritative state. This means that, unless 58 // prevented by persistent communication problems, if ever a 59 // particular object ID X is authoritatively associated with a state S 60 // then for every SharedInformer I whose collection includes (X, S) 61 // eventually either (1) I's cache associates X with S or a later 62 // state of X, (2) I is stopped, or (3) the authoritative state 63 // service for X terminates. To be formally complete, we say that the 64 // absent state meets any restriction by label selector or field 65 // selector. 66 // 67 // For a given informer and relevant object ID X, the sequence of 68 // states that appears in the informer's cache is a subsequence of the 69 // states authoritatively associated with X. That is, some states 70 // might never appear in the cache but ordering among the appearing 71 // states is correct. Note, however, that there is no promise about 72 // ordering between states seen for different objects. 73 // 74 // The local cache starts out empty, and gets populated and updated 75 // during `Run()`. 76 // 77 // As a simple example, if a collection of objects is henceforth 78 // unchanging, a SharedInformer is created that links to that 79 // collection, and that SharedInformer is `Run()` then that 80 // SharedInformer's cache eventually holds an exact copy of that 81 // collection (unless it is stopped too soon, the authoritative state 82 // service ends, or communication problems between the two 83 // persistently thwart achievement). 84 // 85 // As another simple example, if the local cache ever holds a 86 // non-absent state for some object ID and the object is eventually 87 // removed from the authoritative state then eventually the object is 88 // removed from the local cache (unless the SharedInformer is stopped 89 // too soon, the authoritative state service ends, or communication 90 // problems persistently thwart the desired result). 91 // 92 // The keys in the Store are of the form namespace/name for namespaced 93 // objects, and are simply the name for non-namespaced objects. 94 // Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for 95 // a given object, and `SplitMetaNamespaceKey(key)` to split a key 96 // into its constituent parts. 97 // 98 // Every query against the local cache is answered entirely from one 99 // snapshot of the cache's state. Thus, the result of a `List` call 100 // will not contain two entries with the same namespace and name. 101 // 102 // A client is identified here by a ResourceEventHandler. For every 103 // update to the SharedInformer's local cache and for every client 104 // added before `Run()`, eventually either the SharedInformer is 105 // stopped or the client is notified of the update. A client added 106 // after `Run()` starts gets a startup batch of notifications of 107 // additions of the objects existing in the cache at the time that 108 // client was added; also, for every update to the SharedInformer's 109 // local cache after that client was added, eventually either the 110 // SharedInformer is stopped or that client is notified of that 111 // update. Client notifications happen after the corresponding cache 112 // update and, in the case of a SharedIndexInformer, after the 113 // corresponding index updates. It is possible that additional cache 114 // and index updates happen before such a prescribed notification. 115 // For a given SharedInformer and client, the notifications are 116 // delivered sequentially. For a given SharedInformer, client, and 117 // object ID, the notifications are delivered in order. Because 118 // `ObjectMeta.UID` has no role in identifying objects, it is possible 119 // that when (1) object O1 with ID (e.g. namespace and name) X and 120 // `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted 121 // and later (2) another object O2 with ID X and ObjectMeta.UID U2 is 122 // created the informer's clients are not notified of (1) and (2) but 123 // rather are notified only of an update from O1 to O2. Clients that 124 // need to detect such cases might do so by comparing the `ObjectMeta.UID` 125 // field of the old and the new object in the code that handles update 126 // notifications (i.e. `OnUpdate` method of ResourceEventHandler). 127 // 128 // A client must process each notification promptly; a SharedInformer 129 // is not engineered to deal well with a large backlog of 130 // notifications to deliver. Lengthy processing should be passed off 131 // to something else, for example through a 132 // `client-go/util/workqueue`. 133 // 134 // A delete notification exposes the last locally known non-absent 135 // state, except that its ResourceVersion is replaced with a 136 // ResourceVersion in which the object is actually absent. 137 type SharedInformer interface { 138 // AddEventHandler adds an event handler to the shared informer using 139 // the shared informer's resync period. Events to a single handler are 140 // delivered sequentially, but there is no coordination between 141 // different handlers. 142 // It returns a registration handle for the handler that can be used to 143 // remove the handler again, or to tell if the handler is synced (has 144 // seen every item in the initial list). 145 AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) 146 // AddEventHandlerWithResyncPeriod adds an event handler to the 147 // shared informer with the requested resync period; zero means 148 // this handler does not care about resyncs. The resync operation 149 // consists of delivering to the handler an update notification 150 // for every object in the informer's local cache; it does not add 151 // any interactions with the authoritative storage. Some 152 // informers do no resyncs at all, not even for handlers added 153 // with a non-zero resyncPeriod. For an informer that does 154 // resyncs, and for each handler that requests resyncs, that 155 // informer develops a nominal resync period that is no shorter 156 // than the requested period but may be longer. The actual time 157 // between any two resyncs may be longer than the nominal period 158 // because the implementation takes time to do work and there may 159 // be competing load and scheduling noise. 160 // It returns a registration handle for the handler that can be used to remove 161 // the handler again and an error if the handler cannot be added. 162 AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) 163 // RemoveEventHandler removes a formerly added event handler given by 164 // its registration handle. 165 // This function is guaranteed to be idempotent, and thread-safe. 166 RemoveEventHandler(handle ResourceEventHandlerRegistration) error 167 // GetStore returns the informer's local cache as a Store. 168 GetStore() Store 169 // GetController is deprecated, it does nothing useful 170 GetController() Controller 171 // Run starts and runs the shared informer, returning after it stops. 172 // The informer will be stopped when stopCh is closed. 173 Run(stopCh <-chan struct{}) 174 // HasSynced returns true if the shared informer's store has been 175 // informed by at least one full LIST of the authoritative state 176 // of the informer's object collection. This is unrelated to "resync". 177 // 178 // Note that this doesn't tell you if an individual handler is synced!! 179 // For that, please call HasSynced on the handle returned by 180 // AddEventHandler. 181 HasSynced() bool 182 // LastSyncResourceVersion is the resource version observed when last synced with the underlying 183 // store. The value returned is not synchronized with access to the underlying store and is not 184 // thread-safe. 185 LastSyncResourceVersion() string 186 187 // The WatchErrorHandler is called whenever ListAndWatch drops the 188 // connection with an error. After calling this handler, the informer 189 // will backoff and retry. 190 // 191 // The default implementation looks at the error type and tries to log 192 // the error message at an appropriate level. 193 // 194 // There's only one handler, so if you call this multiple times, last one 195 // wins; calling after the informer has been started returns an error. 196 // 197 // The handler is intended for visibility, not to e.g. pause the consumers. 198 // The handler should return quickly - any expensive processing should be 199 // offloaded. 200 SetWatchErrorHandler(handler WatchErrorHandler) error 201 202 // The TransformFunc is called for each object which is about to be stored. 203 // 204 // This function is intended for you to take the opportunity to 205 // remove, transform, or normalize fields. One use case is to strip unused 206 // metadata fields out of objects to save on RAM cost. 207 // 208 // Must be set before starting the informer. 209 // 210 // Please see the comment on TransformFunc for more details. 211 SetTransform(handler TransformFunc) error 212 213 // IsStopped reports whether the informer has already been stopped. 214 // Adding event handlers to already stopped informers is not possible. 215 // An informer already stopped will never be started again. 216 IsStopped() bool 217 } 218 219 // Opaque interface representing the registration of ResourceEventHandler for 220 // a SharedInformer. Must be supplied back to the same SharedInformer's 221 // `RemoveEventHandler` to unregister the handlers. 222 // 223 // Also used to tell if the handler is synced (has had all items in the initial 224 // list delivered). 225 type ResourceEventHandlerRegistration interface { 226 // HasSynced reports if both the parent has synced and all pre-sync 227 // events have been delivered. 228 HasSynced() bool 229 } 230 231 // SharedIndexInformer provides add and get Indexers ability based on SharedInformer. 232 type SharedIndexInformer interface { 233 SharedInformer 234 // AddIndexers add indexers to the informer before it starts. 235 AddIndexers(indexers Indexers) error 236 GetIndexer() Indexer 237 } 238 239 // NewSharedInformer creates a new instance for the ListerWatcher. See NewSharedIndexInformerWithOptions for full details. 240 func NewSharedInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration) SharedInformer { 241 return NewSharedIndexInformer(lw, exampleObject, defaultEventHandlerResyncPeriod, Indexers{}) 242 } 243 244 // NewSharedIndexInformer creates a new instance for the ListerWatcher and specified Indexers. See 245 // NewSharedIndexInformerWithOptions for full details. 246 func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { 247 return NewSharedIndexInformerWithOptions( 248 lw, 249 exampleObject, 250 SharedIndexInformerOptions{ 251 ResyncPeriod: defaultEventHandlerResyncPeriod, 252 Indexers: indexers, 253 }, 254 ) 255 } 256 257 // NewSharedIndexInformerWithOptions creates a new instance for the ListerWatcher. 258 // The created informer will not do resyncs if options.ResyncPeriod is zero. Otherwise: for each 259 // handler that with a non-zero requested resync period, whether added 260 // before or after the informer starts, the nominal resync period is 261 // the requested resync period rounded up to a multiple of the 262 // informer's resync checking period. Such an informer's resync 263 // checking period is established when the informer starts running, 264 // and is the maximum of (a) the minimum of the resync periods 265 // requested before the informer starts and the 266 // options.ResyncPeriod given here and (b) the constant 267 // `minimumResyncPeriod` defined in this file. 268 func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedIndexInformer { 269 realClock := &clock.RealClock{} 270 271 return &sharedIndexInformer{ 272 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers), 273 processor: &sharedProcessor{clock: realClock}, 274 listerWatcher: lw, 275 objectType: exampleObject, 276 objectDescription: options.ObjectDescription, 277 resyncCheckPeriod: options.ResyncPeriod, 278 defaultEventHandlerResyncPeriod: options.ResyncPeriod, 279 clock: realClock, 280 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), 281 } 282 } 283 284 // SharedIndexInformerOptions configures a sharedIndexInformer. 285 type SharedIndexInformerOptions struct { 286 // ResyncPeriod is the default event handler resync period and resync check 287 // period. If unset/unspecified, these are defaulted to 0 (do not resync). 288 ResyncPeriod time.Duration 289 290 // Indexers is the sharedIndexInformer's indexers. If unset/unspecified, no indexers are configured. 291 Indexers Indexers 292 293 // ObjectDescription is the sharedIndexInformer's object description. This is passed through to the 294 // underlying Reflector's type description. 295 ObjectDescription string 296 } 297 298 // InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. 299 type InformerSynced func() bool 300 301 const ( 302 // syncedPollPeriod controls how often you look at the status of your sync funcs 303 syncedPollPeriod = 100 * time.Millisecond 304 305 // initialBufferSize is the initial number of event notifications that can be buffered. 306 initialBufferSize = 1024 307 ) 308 309 // WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages 310 // indicating that the caller identified by name is waiting for syncs, followed by 311 // either a successful or failed sync. 312 func WaitForNamedCacheSync(controllerName string, stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 313 klog.Infof("Waiting for caches to sync for %s", controllerName) 314 315 if !WaitForCacheSync(stopCh, cacheSyncs...) { 316 utilruntime.HandleError(fmt.Errorf("unable to sync caches for %s", controllerName)) 317 return false 318 } 319 320 klog.Infof("Caches are synced for %s", controllerName) 321 return true 322 } 323 324 // WaitForCacheSync waits for caches to populate. It returns true if it was successful, false 325 // if the controller should shutdown 326 // callers should prefer WaitForNamedCacheSync() 327 func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { 328 err := wait.PollImmediateUntil(syncedPollPeriod, 329 func() (bool, error) { 330 for _, syncFunc := range cacheSyncs { 331 if !syncFunc() { 332 return false, nil 333 } 334 } 335 return true, nil 336 }, 337 stopCh) 338 if err != nil { 339 return false 340 } 341 342 return true 343 } 344 345 // `*sharedIndexInformer` implements SharedIndexInformer and has three 346 // main components. One is an indexed local cache, `indexer Indexer`. 347 // The second main component is a Controller that pulls 348 // objects/notifications using the ListerWatcher and pushes them into 349 // a DeltaFIFO --- whose knownObjects is the informer's local cache 350 // --- while concurrently Popping Deltas values from that fifo and 351 // processing them with `sharedIndexInformer::HandleDeltas`. Each 352 // invocation of HandleDeltas, which is done with the fifo's lock 353 // held, processes each Delta in turn. For each Delta this both 354 // updates the local cache and stuffs the relevant notification into 355 // the sharedProcessor. The third main component is that 356 // sharedProcessor, which is responsible for relaying those 357 // notifications to each of the informer's clients. 358 type sharedIndexInformer struct { 359 indexer Indexer 360 controller Controller 361 362 processor *sharedProcessor 363 cacheMutationDetector MutationDetector 364 365 listerWatcher ListerWatcher 366 367 // objectType is an example object of the type this informer is expected to handle. If set, an event 368 // with an object with a mismatching type is dropped instead of being delivered to listeners. 369 objectType runtime.Object 370 371 // objectDescription is the description of this informer's objects. This typically defaults to 372 objectDescription string 373 374 // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call 375 // shouldResync to check if any of our listeners need a resync. 376 resyncCheckPeriod time.Duration 377 // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via 378 // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default 379 // value). 380 defaultEventHandlerResyncPeriod time.Duration 381 // clock allows for testability 382 clock clock.Clock 383 384 started, stopped bool 385 startedLock sync.Mutex 386 387 // blockDeltas gives a way to stop all event distribution so that a late event handler 388 // can safely join the shared informer. 389 blockDeltas sync.Mutex 390 391 // Called whenever the ListAndWatch drops the connection with an error. 392 watchErrorHandler WatchErrorHandler 393 394 transform TransformFunc 395 } 396 397 // dummyController hides the fact that a SharedInformer is different from a dedicated one 398 // where a caller can `Run`. The run method is disconnected in this case, because higher 399 // level logic will decide when to start the SharedInformer and related controller. 400 // Because returning information back is always asynchronous, the legacy callers shouldn't 401 // notice any change in behavior. 402 type dummyController struct { 403 informer *sharedIndexInformer 404 } 405 406 func (v *dummyController) Run(stopCh <-chan struct{}) { 407 } 408 409 func (v *dummyController) HasSynced() bool { 410 return v.informer.HasSynced() 411 } 412 413 func (v *dummyController) LastSyncResourceVersion() string { 414 if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InformerResourceVersion) { 415 return v.informer.LastSyncResourceVersion() 416 } 417 418 return "" 419 } 420 421 type updateNotification struct { 422 oldObj interface{} 423 newObj interface{} 424 } 425 426 type addNotification struct { 427 newObj interface{} 428 isInInitialList bool 429 } 430 431 type deleteNotification struct { 432 oldObj interface{} 433 } 434 435 func (s *sharedIndexInformer) SetWatchErrorHandler(handler WatchErrorHandler) error { 436 s.startedLock.Lock() 437 defer s.startedLock.Unlock() 438 439 if s.started { 440 return fmt.Errorf("informer has already started") 441 } 442 443 s.watchErrorHandler = handler 444 return nil 445 } 446 447 func (s *sharedIndexInformer) SetTransform(handler TransformFunc) error { 448 s.startedLock.Lock() 449 defer s.startedLock.Unlock() 450 451 if s.started { 452 return fmt.Errorf("informer has already started") 453 } 454 455 s.transform = handler 456 return nil 457 } 458 459 func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { 460 defer utilruntime.HandleCrash() 461 462 if s.HasStarted() { 463 klog.Warningf("The sharedIndexInformer has started, run more than once is not allowed") 464 return 465 } 466 467 func() { 468 s.startedLock.Lock() 469 defer s.startedLock.Unlock() 470 471 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 472 KnownObjects: s.indexer, 473 EmitDeltaTypeReplaced: true, 474 Transformer: s.transform, 475 }) 476 477 cfg := &Config{ 478 Queue: fifo, 479 ListerWatcher: s.listerWatcher, 480 ObjectType: s.objectType, 481 ObjectDescription: s.objectDescription, 482 FullResyncPeriod: s.resyncCheckPeriod, 483 RetryOnError: false, 484 ShouldResync: s.processor.shouldResync, 485 486 Process: s.HandleDeltas, 487 WatchErrorHandler: s.watchErrorHandler, 488 } 489 490 s.controller = New(cfg) 491 s.controller.(*controller).clock = s.clock 492 s.started = true 493 }() 494 495 // Separate stop channel because Processor should be stopped strictly after controller 496 processorStopCh := make(chan struct{}) 497 var wg wait.Group 498 defer wg.Wait() // Wait for Processor to stop 499 defer close(processorStopCh) // Tell Processor to stop 500 wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) 501 wg.StartWithChannel(processorStopCh, s.processor.run) 502 503 defer func() { 504 s.startedLock.Lock() 505 defer s.startedLock.Unlock() 506 s.stopped = true // Don't want any new listeners 507 }() 508 s.controller.Run(stopCh) 509 } 510 511 func (s *sharedIndexInformer) HasStarted() bool { 512 s.startedLock.Lock() 513 defer s.startedLock.Unlock() 514 return s.started 515 } 516 517 func (s *sharedIndexInformer) HasSynced() bool { 518 s.startedLock.Lock() 519 defer s.startedLock.Unlock() 520 521 if s.controller == nil { 522 return false 523 } 524 return s.controller.HasSynced() 525 } 526 527 func (s *sharedIndexInformer) LastSyncResourceVersion() string { 528 s.startedLock.Lock() 529 defer s.startedLock.Unlock() 530 531 if s.controller == nil { 532 return "" 533 } 534 return s.controller.LastSyncResourceVersion() 535 } 536 537 func (s *sharedIndexInformer) GetStore() Store { 538 return s.indexer 539 } 540 541 func (s *sharedIndexInformer) GetIndexer() Indexer { 542 return s.indexer 543 } 544 545 func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error { 546 s.startedLock.Lock() 547 defer s.startedLock.Unlock() 548 549 if s.stopped { 550 return fmt.Errorf("indexer was not added because it has stopped already") 551 } 552 553 return s.indexer.AddIndexers(indexers) 554 } 555 556 func (s *sharedIndexInformer) GetController() Controller { 557 return &dummyController{informer: s} 558 } 559 560 func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error) { 561 return s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) 562 } 563 564 func determineResyncPeriod(desired, check time.Duration) time.Duration { 565 if desired == 0 { 566 return desired 567 } 568 if check == 0 { 569 klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired) 570 return 0 571 } 572 if desired < check { 573 klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check) 574 return check 575 } 576 return desired 577 } 578 579 const minimumResyncPeriod = 1 * time.Second 580 581 func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) (ResourceEventHandlerRegistration, error) { 582 s.startedLock.Lock() 583 defer s.startedLock.Unlock() 584 585 if s.stopped { 586 return nil, fmt.Errorf("handler %v was not added to shared informer because it has stopped already", handler) 587 } 588 589 if resyncPeriod > 0 { 590 if resyncPeriod < minimumResyncPeriod { 591 klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod) 592 resyncPeriod = minimumResyncPeriod 593 } 594 595 if resyncPeriod < s.resyncCheckPeriod { 596 if s.started { 597 klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) 598 resyncPeriod = s.resyncCheckPeriod 599 } else { 600 // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update 601 // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners 602 // accordingly 603 s.resyncCheckPeriod = resyncPeriod 604 s.processor.resyncCheckPeriodChanged(resyncPeriod) 605 } 606 } 607 } 608 609 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize, s.HasSynced) 610 611 if !s.started { 612 return s.processor.addListener(listener), nil 613 } 614 615 // in order to safely join, we have to 616 // 1. stop sending add/update/delete notifications 617 // 2. do a list against the store 618 // 3. send synthetic "Add" events to the new handler 619 // 4. unblock 620 s.blockDeltas.Lock() 621 defer s.blockDeltas.Unlock() 622 623 handle := s.processor.addListener(listener) 624 for _, item := range s.indexer.List() { 625 // Note that we enqueue these notifications with the lock held 626 // and before returning the handle. That means there is never a 627 // chance for anyone to call the handle's HasSynced method in a 628 // state when it would falsely return true (i.e., when the 629 // shared informer is synced but it has not observed an Add 630 // with isInitialList being true, nor when the thread 631 // processing notifications somehow goes faster than this 632 // thread adding them and the counter is temporarily zero). 633 listener.add(addNotification{newObj: item, isInInitialList: true}) 634 } 635 return handle, nil 636 } 637 638 func (s *sharedIndexInformer) HandleDeltas(obj interface{}, isInInitialList bool) error { 639 s.blockDeltas.Lock() 640 defer s.blockDeltas.Unlock() 641 642 if deltas, ok := obj.(Deltas); ok { 643 return processDeltas(s, s.indexer, deltas, isInInitialList) 644 } 645 return errors.New("object given as Process argument is not Deltas") 646 } 647 648 // Conforms to ResourceEventHandler 649 func (s *sharedIndexInformer) OnAdd(obj interface{}, isInInitialList bool) { 650 // Invocation of this function is locked under s.blockDeltas, so it is 651 // save to distribute the notification 652 s.cacheMutationDetector.AddObject(obj) 653 s.processor.distribute(addNotification{newObj: obj, isInInitialList: isInInitialList}, false) 654 } 655 656 // Conforms to ResourceEventHandler 657 func (s *sharedIndexInformer) OnUpdate(old, new interface{}) { 658 isSync := false 659 660 // If is a Sync event, isSync should be true 661 // If is a Replaced event, isSync is true if resource version is unchanged. 662 // If RV is unchanged: this is a Sync/Replaced event, so isSync is true 663 664 if accessor, err := meta.Accessor(new); err == nil { 665 if oldAccessor, err := meta.Accessor(old); err == nil { 666 // Events that didn't change resourceVersion are treated as resync events 667 // and only propagated to listeners that requested resync 668 isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() 669 } 670 } 671 672 // Invocation of this function is locked under s.blockDeltas, so it is 673 // save to distribute the notification 674 s.cacheMutationDetector.AddObject(new) 675 s.processor.distribute(updateNotification{oldObj: old, newObj: new}, isSync) 676 } 677 678 // Conforms to ResourceEventHandler 679 func (s *sharedIndexInformer) OnDelete(old interface{}) { 680 // Invocation of this function is locked under s.blockDeltas, so it is 681 // save to distribute the notification 682 s.processor.distribute(deleteNotification{oldObj: old}, false) 683 } 684 685 // IsStopped reports whether the informer has already been stopped 686 func (s *sharedIndexInformer) IsStopped() bool { 687 s.startedLock.Lock() 688 defer s.startedLock.Unlock() 689 return s.stopped 690 } 691 692 func (s *sharedIndexInformer) RemoveEventHandler(handle ResourceEventHandlerRegistration) error { 693 s.startedLock.Lock() 694 defer s.startedLock.Unlock() 695 696 // in order to safely remove, we have to 697 // 1. stop sending add/update/delete notifications 698 // 2. remove and stop listener 699 // 3. unblock 700 s.blockDeltas.Lock() 701 defer s.blockDeltas.Unlock() 702 return s.processor.removeListener(handle) 703 } 704 705 // sharedProcessor has a collection of processorListener and can 706 // distribute a notification object to its listeners. There are two 707 // kinds of distribute operations. The sync distributions go to a 708 // subset of the listeners that (a) is recomputed in the occasional 709 // calls to shouldResync and (b) every listener is initially put in. 710 // The non-sync distributions go to every listener. 711 type sharedProcessor struct { 712 listenersStarted bool 713 listenersLock sync.RWMutex 714 // Map from listeners to whether or not they are currently syncing 715 listeners map[*processorListener]bool 716 clock clock.Clock 717 wg wait.Group 718 } 719 720 func (p *sharedProcessor) getListener(registration ResourceEventHandlerRegistration) *processorListener { 721 p.listenersLock.RLock() 722 defer p.listenersLock.RUnlock() 723 724 if p.listeners == nil { 725 return nil 726 } 727 728 if result, ok := registration.(*processorListener); ok { 729 if _, exists := p.listeners[result]; exists { 730 return result 731 } 732 } 733 734 return nil 735 } 736 737 func (p *sharedProcessor) addListener(listener *processorListener) ResourceEventHandlerRegistration { 738 p.listenersLock.Lock() 739 defer p.listenersLock.Unlock() 740 741 if p.listeners == nil { 742 p.listeners = make(map[*processorListener]bool) 743 } 744 745 p.listeners[listener] = true 746 747 if p.listenersStarted { 748 p.wg.Start(listener.run) 749 p.wg.Start(listener.pop) 750 } 751 752 return listener 753 } 754 755 func (p *sharedProcessor) removeListener(handle ResourceEventHandlerRegistration) error { 756 p.listenersLock.Lock() 757 defer p.listenersLock.Unlock() 758 759 listener, ok := handle.(*processorListener) 760 if !ok { 761 return fmt.Errorf("invalid key type %t", handle) 762 } else if p.listeners == nil { 763 // No listeners are registered, do nothing 764 return nil 765 } else if _, exists := p.listeners[listener]; !exists { 766 // Listener is not registered, just do nothing 767 return nil 768 } 769 770 delete(p.listeners, listener) 771 772 if p.listenersStarted { 773 close(listener.addCh) 774 } 775 776 return nil 777 } 778 779 func (p *sharedProcessor) distribute(obj interface{}, sync bool) { 780 p.listenersLock.RLock() 781 defer p.listenersLock.RUnlock() 782 783 for listener, isSyncing := range p.listeners { 784 switch { 785 case !sync: 786 // non-sync messages are delivered to every listener 787 listener.add(obj) 788 case isSyncing: 789 // sync messages are delivered to every syncing listener 790 listener.add(obj) 791 default: 792 // skipping a sync obj for a non-syncing listener 793 } 794 } 795 } 796 797 func (p *sharedProcessor) run(stopCh <-chan struct{}) { 798 func() { 799 p.listenersLock.RLock() 800 defer p.listenersLock.RUnlock() 801 for listener := range p.listeners { 802 p.wg.Start(listener.run) 803 p.wg.Start(listener.pop) 804 } 805 p.listenersStarted = true 806 }() 807 <-stopCh 808 809 p.listenersLock.Lock() 810 defer p.listenersLock.Unlock() 811 for listener := range p.listeners { 812 close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop 813 } 814 815 // Wipe out list of listeners since they are now closed 816 // (processorListener cannot be re-used) 817 p.listeners = nil 818 819 // Reset to false since no listeners are running 820 p.listenersStarted = false 821 822 p.wg.Wait() // Wait for all .pop() and .run() to stop 823 } 824 825 // shouldResync queries every listener to determine if any of them need a resync, based on each 826 // listener's resyncPeriod. 827 func (p *sharedProcessor) shouldResync() bool { 828 p.listenersLock.Lock() 829 defer p.listenersLock.Unlock() 830 831 resyncNeeded := false 832 now := p.clock.Now() 833 for listener := range p.listeners { 834 // need to loop through all the listeners to see if they need to resync so we can prepare any 835 // listeners that are going to be resyncing. 836 shouldResync := listener.shouldResync(now) 837 p.listeners[listener] = shouldResync 838 839 if shouldResync { 840 resyncNeeded = true 841 listener.determineNextResync(now) 842 } 843 } 844 return resyncNeeded 845 } 846 847 func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { 848 p.listenersLock.RLock() 849 defer p.listenersLock.RUnlock() 850 851 for listener := range p.listeners { 852 resyncPeriod := determineResyncPeriod( 853 listener.requestedResyncPeriod, resyncCheckPeriod) 854 listener.setResyncPeriod(resyncPeriod) 855 } 856 } 857 858 // processorListener relays notifications from a sharedProcessor to 859 // one ResourceEventHandler --- using two goroutines, two unbuffered 860 // channels, and an unbounded ring buffer. The `add(notification)` 861 // function sends the given notification to `addCh`. One goroutine 862 // runs `pop()`, which pumps notifications from `addCh` to `nextCh` 863 // using storage in the ring buffer while `nextCh` is not keeping up. 864 // Another goroutine runs `run()`, which receives notifications from 865 // `nextCh` and synchronously invokes the appropriate handler method. 866 // 867 // processorListener also keeps track of the adjusted requested resync 868 // period of the listener. 869 type processorListener struct { 870 nextCh chan interface{} 871 addCh chan interface{} 872 873 handler ResourceEventHandler 874 875 syncTracker *synctrack.SingleFileTracker 876 877 // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. 878 // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications 879 // added until we OOM. 880 // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but 881 // we should try to do something better. 882 pendingNotifications buffer.RingGrowing 883 884 // requestedResyncPeriod is how frequently the listener wants a 885 // full resync from the shared informer, but modified by two 886 // adjustments. One is imposing a lower bound, 887 // `minimumResyncPeriod`. The other is another lower bound, the 888 // sharedIndexInformer's `resyncCheckPeriod`, that is imposed (a) only 889 // in AddEventHandlerWithResyncPeriod invocations made after the 890 // sharedIndexInformer starts and (b) only if the informer does 891 // resyncs at all. 892 requestedResyncPeriod time.Duration 893 // resyncPeriod is the threshold that will be used in the logic 894 // for this listener. This value differs from 895 // requestedResyncPeriod only when the sharedIndexInformer does 896 // not do resyncs, in which case the value here is zero. The 897 // actual time between resyncs depends on when the 898 // sharedProcessor's `shouldResync` function is invoked and when 899 // the sharedIndexInformer processes `Sync` type Delta objects. 900 resyncPeriod time.Duration 901 // nextResync is the earliest time the listener should get a full resync 902 nextResync time.Time 903 // resyncLock guards access to resyncPeriod and nextResync 904 resyncLock sync.Mutex 905 } 906 907 // HasSynced returns true if the source informer has synced, and all 908 // corresponding events have been delivered. 909 func (p *processorListener) HasSynced() bool { 910 return p.syncTracker.HasSynced() 911 } 912 913 func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int, hasSynced func() bool) *processorListener { 914 ret := &processorListener{ 915 nextCh: make(chan interface{}), 916 addCh: make(chan interface{}), 917 handler: handler, 918 syncTracker: &synctrack.SingleFileTracker{UpstreamHasSynced: hasSynced}, 919 pendingNotifications: *buffer.NewRingGrowing(bufferSize), 920 requestedResyncPeriod: requestedResyncPeriod, 921 resyncPeriod: resyncPeriod, 922 } 923 924 ret.determineNextResync(now) 925 926 return ret 927 } 928 929 func (p *processorListener) add(notification interface{}) { 930 if a, ok := notification.(addNotification); ok && a.isInInitialList { 931 p.syncTracker.Start() 932 } 933 p.addCh <- notification 934 } 935 936 func (p *processorListener) pop() { 937 defer utilruntime.HandleCrash() 938 defer close(p.nextCh) // Tell .run() to stop 939 940 var nextCh chan<- interface{} 941 var notification interface{} 942 for { 943 select { 944 case nextCh <- notification: 945 // Notification dispatched 946 var ok bool 947 notification, ok = p.pendingNotifications.ReadOne() 948 if !ok { // Nothing to pop 949 nextCh = nil // Disable this select case 950 } 951 case notificationToAdd, ok := <-p.addCh: 952 if !ok { 953 return 954 } 955 if notification == nil { // No notification to pop (and pendingNotifications is empty) 956 // Optimize the case - skip adding to pendingNotifications 957 notification = notificationToAdd 958 nextCh = p.nextCh 959 } else { // There is already a notification waiting to be dispatched 960 p.pendingNotifications.WriteOne(notificationToAdd) 961 } 962 } 963 } 964 } 965 966 func (p *processorListener) run() { 967 // this call blocks until the channel is closed. When a panic happens during the notification 968 // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) 969 // the next notification will be attempted. This is usually better than the alternative of never 970 // delivering again. 971 stopCh := make(chan struct{}) 972 wait.Until(func() { 973 for next := range p.nextCh { 974 switch notification := next.(type) { 975 case updateNotification: 976 p.handler.OnUpdate(notification.oldObj, notification.newObj) 977 case addNotification: 978 p.handler.OnAdd(notification.newObj, notification.isInInitialList) 979 if notification.isInInitialList { 980 p.syncTracker.Finished() 981 } 982 case deleteNotification: 983 p.handler.OnDelete(notification.oldObj) 984 default: 985 utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) 986 } 987 } 988 // the only way to get here is if the p.nextCh is empty and closed 989 close(stopCh) 990 }, 1*time.Second, stopCh) 991 } 992 993 // shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, 994 // this always returns false. 995 func (p *processorListener) shouldResync(now time.Time) bool { 996 p.resyncLock.Lock() 997 defer p.resyncLock.Unlock() 998 999 if p.resyncPeriod == 0 { 1000 return false 1001 } 1002 1003 return now.After(p.nextResync) || now.Equal(p.nextResync) 1004 } 1005 1006 func (p *processorListener) determineNextResync(now time.Time) { 1007 p.resyncLock.Lock() 1008 defer p.resyncLock.Unlock() 1009 1010 p.nextResync = now.Add(p.resyncPeriod) 1011 } 1012 1013 func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) { 1014 p.resyncLock.Lock() 1015 defer p.resyncLock.Unlock() 1016 1017 p.resyncPeriod = resyncPeriod 1018 } 1019