1 /* 2 Copyright 2014 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package cache 18 19 import ( 20 "errors" 21 "fmt" 22 "sync" 23 "time" 24 25 "k8s.io/apimachinery/pkg/util/sets" 26 27 "k8s.io/klog/v2" 28 utiltrace "k8s.io/utils/trace" 29 ) 30 31 // DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are 32 // optional. 33 type DeltaFIFOOptions struct { 34 35 // KeyFunction is used to figure out what key an object should have. (It's 36 // exposed in the returned DeltaFIFO's KeyOf() method, with additional 37 // handling around deleted objects and queue state). 38 // Optional, the default is MetaNamespaceKeyFunc. 39 KeyFunction KeyFunc 40 41 // KnownObjects is expected to return a list of keys that the consumer of 42 // this queue "knows about". It is used to decide which items are missing 43 // when Replace() is called; 'Deleted' deltas are produced for the missing items. 44 // KnownObjects may be nil if you can tolerate missing deletions on Replace(). 45 KnownObjects KeyListerGetter 46 47 // EmitDeltaTypeReplaced indicates that the queue consumer 48 // understands the Replaced DeltaType. Before the `Replaced` event type was 49 // added, calls to Replace() were handled the same as Sync(). For 50 // backwards-compatibility purposes, this is false by default. 51 // When true, `Replaced` events will be sent for items passed to a Replace() call. 52 // When false, `Sync` events will be sent instead. 53 EmitDeltaTypeReplaced bool 54 55 // If set, will be called for objects before enqueueing them. Please 56 // see the comment on TransformFunc for details. 57 Transformer TransformFunc 58 } 59 60 // DeltaFIFO is like FIFO, but differs in two ways. One is that the 61 // accumulator associated with a given object's key is not that object 62 // but rather a Deltas, which is a slice of Delta values for that 63 // object. Applying an object to a Deltas means to append a Delta 64 // except when the potentially appended Delta is a Deleted and the 65 // Deltas already ends with a Deleted. In that case the Deltas does 66 // not grow, although the terminal Deleted will be replaced by the new 67 // Deleted if the older Deleted's object is a 68 // DeletedFinalStateUnknown. 69 // 70 // The other difference is that DeltaFIFO has two additional ways that 71 // an object can be applied to an accumulator: Replaced and Sync. 72 // If EmitDeltaTypeReplaced is not set to true, Sync will be used in 73 // replace events for backwards compatibility. Sync is used for periodic 74 // resync events. 75 // 76 // DeltaFIFO is a producer-consumer queue, where a Reflector is 77 // intended to be the producer, and the consumer is whatever calls 78 // the Pop() method. 79 // 80 // DeltaFIFO solves this use case: 81 // - You want to process every object change (delta) at most once. 82 // - When you process an object, you want to see everything 83 // that's happened to it since you last processed it. 84 // - You want to process the deletion of some of the objects. 85 // - You might want to periodically reprocess objects. 86 // 87 // DeltaFIFO's Pop(), Get(), and GetByKey() methods return 88 // interface{} to satisfy the Store/Queue interfaces, but they 89 // will always return an object of type Deltas. List() returns 90 // the newest object from each accumulator in the FIFO. 91 // 92 // A DeltaFIFO's knownObjects KeyListerGetter provides the abilities 93 // to list Store keys and to get objects by Store key. The objects in 94 // question are called "known objects" and this set of objects 95 // modifies the behavior of the Delete, Replace, and Resync methods 96 // (each in a different way). 97 // 98 // A note on threading: If you call Pop() in parallel from multiple 99 // threads, you could end up with multiple threads processing slightly 100 // different versions of the same object. 101 type DeltaFIFO struct { 102 // lock/cond protects access to 'items' and 'queue'. 103 lock sync.RWMutex 104 cond sync.Cond 105 106 // `items` maps a key to a Deltas. 107 // Each such Deltas has at least one Delta. 108 items map[string]Deltas 109 110 // `queue` maintains FIFO order of keys for consumption in Pop(). 111 // There are no duplicates in `queue`. 112 // A key is in `queue` if and only if it is in `items`. 113 queue []string 114 115 // populated is true if the first batch of items inserted by Replace() has been populated 116 // or Delete/Add/Update/AddIfNotPresent was called first. 117 populated bool 118 // initialPopulationCount is the number of items inserted by the first call of Replace() 119 initialPopulationCount int 120 121 // keyFunc is used to make the key used for queued item 122 // insertion and retrieval, and should be deterministic. 123 keyFunc KeyFunc 124 125 // knownObjects list keys that are "known" --- affecting Delete(), 126 // Replace(), and Resync() 127 knownObjects KeyListerGetter 128 129 // Used to indicate a queue is closed so a control loop can exit when a queue is empty. 130 // Currently, not used to gate any of CRUD operations. 131 closed bool 132 133 // emitDeltaTypeReplaced is whether to emit the Replaced or Sync 134 // DeltaType when Replace() is called (to preserve backwards compat). 135 emitDeltaTypeReplaced bool 136 137 // Called with every object if non-nil. 138 transformer TransformFunc 139 } 140 141 // TransformFunc allows for transforming an object before it will be processed. 142 // TransformFunc (similarly to ResourceEventHandler functions) should be able 143 // to correctly handle the tombstone of type cache.DeletedFinalStateUnknown. 144 // 145 // New in v1.27: In such cases, the contained object will already have gone 146 // through the transform object separately (when it was added / updated prior 147 // to the delete), so the TransformFunc can likely safely ignore such objects 148 // (i.e., just return the input object). 149 // 150 // The most common usage pattern is to clean-up some parts of the object to 151 // reduce component memory usage if a given component doesn't care about them. 152 // 153 // New in v1.27: unless the object is a DeletedFinalStateUnknown, TransformFunc 154 // sees the object before any other actor, and it is now safe to mutate the 155 // object in place instead of making a copy. 156 // 157 // Note that TransformFunc is called while inserting objects into the 158 // notification queue and is therefore extremely performance sensitive; please 159 // do not do anything that will take a long time. 160 type TransformFunc func(interface{}) (interface{}, error) 161 162 // DeltaType is the type of a change (addition, deletion, etc) 163 type DeltaType string 164 165 // Change type definition 166 const ( 167 Added DeltaType = "Added" 168 Updated DeltaType = "Updated" 169 Deleted DeltaType = "Deleted" 170 // Replaced is emitted when we encountered watch errors and had to do a 171 // relist. We don't know if the replaced object has changed. 172 // 173 // NOTE: Previous versions of DeltaFIFO would use Sync for Replace events 174 // as well. Hence, Replaced is only emitted when the option 175 // EmitDeltaTypeReplaced is true. 176 Replaced DeltaType = "Replaced" 177 // Sync is for synthetic events during a periodic resync. 178 Sync DeltaType = "Sync" 179 ) 180 181 // Delta is a member of Deltas (a list of Delta objects) which 182 // in its turn is the type stored by a DeltaFIFO. It tells you what 183 // change happened, and the object's state after* that change. 184 // 185 // [*] Unless the change is a deletion, and then you'll get the final 186 // state of the object before it was deleted. 187 type Delta struct { 188 Type DeltaType 189 Object interface{} 190 } 191 192 // Deltas is a list of one or more 'Delta's to an individual object. 193 // The oldest delta is at index 0, the newest delta is the last one. 194 type Deltas []Delta 195 196 // NewDeltaFIFO returns a Queue which can be used to process changes to items. 197 // 198 // keyFunc is used to figure out what key an object should have. (It is 199 // exposed in the returned DeltaFIFO's KeyOf() method, with additional handling 200 // around deleted objects and queue state). 201 // 202 // 'knownObjects' may be supplied to modify the behavior of Delete, 203 // Replace, and Resync. It may be nil if you do not need those 204 // modifications. 205 // 206 // TODO: consider merging keyLister with this object, tracking a list of 207 // "known" keys when Pop() is called. Have to think about how that 208 // affects error retrying. 209 // 210 // NOTE: It is possible to misuse this and cause a race when using an 211 // external known object source. 212 // Whether there is a potential race depends on how the consumer 213 // modifies knownObjects. In Pop(), process function is called under 214 // lock, so it is safe to update data structures in it that need to be 215 // in sync with the queue (e.g. knownObjects). 216 // 217 // Example: 218 // In case of sharedIndexInformer being a consumer 219 // (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192), 220 // there is no race as knownObjects (s.indexer) is modified safely 221 // under DeltaFIFO's lock. The only exceptions are GetStore() and 222 // GetIndexer() methods, which expose ways to modify the underlying 223 // storage. Currently these two methods are used for creating Lister 224 // and internal tests. 225 // 226 // Also see the comment on DeltaFIFO. 227 // 228 // Warning: This constructs a DeltaFIFO that does not differentiate between 229 // events caused by a call to Replace (e.g., from a relist, which may 230 // contain object updates), and synthetic events caused by a periodic resync 231 // (which just emit the existing object). See https://issue.k8s.io/86015 for details. 232 // 233 // Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})` 234 // instead to receive a `Replaced` event depending on the type. 235 // 236 // Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects}) 237 func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO { 238 return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ 239 KeyFunction: keyFunc, 240 KnownObjects: knownObjects, 241 }) 242 } 243 244 // NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to 245 // items. See also the comment on DeltaFIFO. 246 func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { 247 if opts.KeyFunction == nil { 248 opts.KeyFunction = MetaNamespaceKeyFunc 249 } 250 251 f := &DeltaFIFO{ 252 items: map[string]Deltas{}, 253 queue: []string{}, 254 keyFunc: opts.KeyFunction, 255 knownObjects: opts.KnownObjects, 256 257 emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, 258 transformer: opts.Transformer, 259 } 260 f.cond.L = &f.lock 261 return f 262 } 263 264 var ( 265 _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue 266 ) 267 268 var ( 269 // ErrZeroLengthDeltasObject is returned in a KeyError if a Deltas 270 // object with zero length is encountered (should be impossible, 271 // but included for completeness). 272 ErrZeroLengthDeltasObject = errors.New("0 length Deltas object; can't get key") 273 ) 274 275 // Close the queue. 276 func (f *DeltaFIFO) Close() { 277 f.lock.Lock() 278 defer f.lock.Unlock() 279 f.closed = true 280 f.cond.Broadcast() 281 } 282 283 // KeyOf exposes f's keyFunc, but also detects the key of a Deltas object or 284 // DeletedFinalStateUnknown objects. 285 func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { 286 if d, ok := obj.(Deltas); ok { 287 if len(d) == 0 { 288 return "", KeyError{obj, ErrZeroLengthDeltasObject} 289 } 290 obj = d.Newest().Object 291 } 292 if d, ok := obj.(DeletedFinalStateUnknown); ok { 293 return d.Key, nil 294 } 295 return f.keyFunc(obj) 296 } 297 298 // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, 299 // or the first batch of items inserted by Replace() has been popped. 300 func (f *DeltaFIFO) HasSynced() bool { 301 f.lock.Lock() 302 defer f.lock.Unlock() 303 return f.hasSynced_locked() 304 } 305 306 func (f *DeltaFIFO) hasSynced_locked() bool { 307 return f.populated && f.initialPopulationCount == 0 308 } 309 310 // Add inserts an item, and puts it in the queue. The item is only enqueued 311 // if it doesn't already exist in the set. 312 func (f *DeltaFIFO) Add(obj interface{}) error { 313 f.lock.Lock() 314 defer f.lock.Unlock() 315 f.populated = true 316 return f.queueActionLocked(Added, obj) 317 } 318 319 // Update is just like Add, but makes an Updated Delta. 320 func (f *DeltaFIFO) Update(obj interface{}) error { 321 f.lock.Lock() 322 defer f.lock.Unlock() 323 f.populated = true 324 return f.queueActionLocked(Updated, obj) 325 } 326 327 // Delete is just like Add, but makes a Deleted Delta. If the given 328 // object does not already exist, it will be ignored. (It may have 329 // already been deleted by a Replace (re-list), for example.) In this 330 // method `f.knownObjects`, if not nil, provides (via GetByKey) 331 // _additional_ objects that are considered to already exist. 332 func (f *DeltaFIFO) Delete(obj interface{}) error { 333 id, err := f.KeyOf(obj) 334 if err != nil { 335 return KeyError{obj, err} 336 } 337 f.lock.Lock() 338 defer f.lock.Unlock() 339 f.populated = true 340 if f.knownObjects == nil { 341 if _, exists := f.items[id]; !exists { 342 // Presumably, this was deleted when a relist happened. 343 // Don't provide a second report of the same deletion. 344 return nil 345 } 346 } else { 347 // We only want to skip the "deletion" action if the object doesn't 348 // exist in knownObjects and it doesn't have corresponding item in items. 349 // Note that even if there is a "deletion" action in items, we can ignore it, 350 // because it will be deduped automatically in "queueActionLocked" 351 _, exists, err := f.knownObjects.GetByKey(id) 352 _, itemsExist := f.items[id] 353 if err == nil && !exists && !itemsExist { 354 // Presumably, this was deleted when a relist happened. 355 // Don't provide a second report of the same deletion. 356 return nil 357 } 358 } 359 360 // exist in items and/or KnownObjects 361 return f.queueActionLocked(Deleted, obj) 362 } 363 364 // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already 365 // present in the set, it is neither enqueued nor added to the set. 366 // 367 // This is useful in a single producer/consumer scenario so that the consumer can 368 // safely retry items without contending with the producer and potentially enqueueing 369 // stale items. 370 // 371 // Important: obj must be a Deltas (the output of the Pop() function). Yes, this is 372 // different from the Add/Update/Delete functions. 373 func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { 374 deltas, ok := obj.(Deltas) 375 if !ok { 376 return fmt.Errorf("object must be of type deltas, but got: %#v", obj) 377 } 378 id, err := f.KeyOf(deltas) 379 if err != nil { 380 return KeyError{obj, err} 381 } 382 f.lock.Lock() 383 defer f.lock.Unlock() 384 f.addIfNotPresent(id, deltas) 385 return nil 386 } 387 388 // addIfNotPresent inserts deltas under id if it does not exist, and assumes the caller 389 // already holds the fifo lock. 390 func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) { 391 f.populated = true 392 if _, exists := f.items[id]; exists { 393 return 394 } 395 396 f.queue = append(f.queue, id) 397 f.items[id] = deltas 398 f.cond.Broadcast() 399 } 400 401 // re-listing and watching can deliver the same update multiple times in any 402 // order. This will combine the most recent two deltas if they are the same. 403 func dedupDeltas(deltas Deltas) Deltas { 404 n := len(deltas) 405 if n < 2 { 406 return deltas 407 } 408 a := &deltas[n-1] 409 b := &deltas[n-2] 410 if out := isDup(a, b); out != nil { 411 deltas[n-2] = *out 412 return deltas[:n-1] 413 } 414 return deltas 415 } 416 417 // If a & b represent the same event, returns the delta that ought to be kept. 418 // Otherwise, returns nil. 419 // TODO: is there anything other than deletions that need deduping? 420 func isDup(a, b *Delta) *Delta { 421 if out := isDeletionDup(a, b); out != nil { 422 return out 423 } 424 // TODO: Detect other duplicate situations? Are there any? 425 return nil 426 } 427 428 // keep the one with the most information if both are deletions. 429 func isDeletionDup(a, b *Delta) *Delta { 430 if b.Type != Deleted || a.Type != Deleted { 431 return nil 432 } 433 // Do more sophisticated checks, or is this sufficient? 434 if _, ok := b.Object.(DeletedFinalStateUnknown); ok { 435 return a 436 } 437 return b 438 } 439 440 // queueActionLocked appends to the delta list for the object. 441 // Caller must lock first. 442 func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { 443 id, err := f.KeyOf(obj) 444 if err != nil { 445 return KeyError{obj, err} 446 } 447 448 // Every object comes through this code path once, so this is a good 449 // place to call the transform func. If obj is a 450 // DeletedFinalStateUnknown tombstone, then the containted inner object 451 // will already have gone through the transformer, but we document that 452 // this can happen. In cases involving Replace(), such an object can 453 // come through multiple times. 454 if f.transformer != nil { 455 var err error 456 obj, err = f.transformer(obj) 457 if err != nil { 458 return err 459 } 460 } 461 462 oldDeltas := f.items[id] 463 newDeltas := append(oldDeltas, Delta{actionType, obj}) 464 newDeltas = dedupDeltas(newDeltas) 465 466 if len(newDeltas) > 0 { 467 if _, exists := f.items[id]; !exists { 468 f.queue = append(f.queue, id) 469 } 470 f.items[id] = newDeltas 471 f.cond.Broadcast() 472 } else { 473 // This never happens, because dedupDeltas never returns an empty list 474 // when given a non-empty list (as it is here). 475 // If somehow it happens anyway, deal with it but complain. 476 if oldDeltas == nil { 477 klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj) 478 return nil 479 } 480 klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj) 481 f.items[id] = newDeltas 482 return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj) 483 } 484 return nil 485 } 486 487 // List returns a list of all the items; it returns the object 488 // from the most recent Delta. 489 // You should treat the items returned inside the deltas as immutable. 490 func (f *DeltaFIFO) List() []interface{} { 491 f.lock.RLock() 492 defer f.lock.RUnlock() 493 return f.listLocked() 494 } 495 496 func (f *DeltaFIFO) listLocked() []interface{} { 497 list := make([]interface{}, 0, len(f.items)) 498 for _, item := range f.items { 499 list = append(list, item.Newest().Object) 500 } 501 return list 502 } 503 504 // ListKeys returns a list of all the keys of the objects currently 505 // in the FIFO. 506 func (f *DeltaFIFO) ListKeys() []string { 507 f.lock.RLock() 508 defer f.lock.RUnlock() 509 list := make([]string, 0, len(f.queue)) 510 for _, key := range f.queue { 511 list = append(list, key) 512 } 513 return list 514 } 515 516 // Get returns the complete list of deltas for the requested item, 517 // or sets exists=false. 518 // You should treat the items returned inside the deltas as immutable. 519 func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { 520 key, err := f.KeyOf(obj) 521 if err != nil { 522 return nil, false, KeyError{obj, err} 523 } 524 return f.GetByKey(key) 525 } 526 527 // GetByKey returns the complete list of deltas for the requested item, 528 // setting exists=false if that list is empty. 529 // You should treat the items returned inside the deltas as immutable. 530 func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { 531 f.lock.RLock() 532 defer f.lock.RUnlock() 533 d, exists := f.items[key] 534 if exists { 535 // Copy item's slice so operations on this slice 536 // won't interfere with the object we return. 537 d = copyDeltas(d) 538 } 539 return d, exists, nil 540 } 541 542 // IsClosed checks if the queue is closed 543 func (f *DeltaFIFO) IsClosed() bool { 544 f.lock.Lock() 545 defer f.lock.Unlock() 546 return f.closed 547 } 548 549 // Pop blocks until the queue has some items, and then returns one. If 550 // multiple items are ready, they are returned in the order in which they were 551 // added/updated. The item is removed from the queue (and the store) before it 552 // is returned, so if you don't successfully process it, you need to add it back 553 // with AddIfNotPresent(). 554 // process function is called under lock, so it is safe to update data structures 555 // in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc 556 // may return an instance of ErrRequeue with a nested error to indicate the current 557 // item should be requeued (equivalent to calling AddIfNotPresent under the lock). 558 // process should avoid expensive I/O operation so that other queue operations, i.e. 559 // Add() and Get(), won't be blocked for too long. 560 // 561 // Pop returns a 'Deltas', which has a complete list of all the things 562 // that happened to the object (deltas) while it was sitting in the queue. 563 func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { 564 f.lock.Lock() 565 defer f.lock.Unlock() 566 for { 567 for len(f.queue) == 0 { 568 // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. 569 // When Close() is called, the f.closed is set and the condition is broadcasted. 570 // Which causes this loop to continue and return from the Pop(). 571 if f.closed { 572 return nil, ErrFIFOClosed 573 } 574 575 f.cond.Wait() 576 } 577 isInInitialList := !f.hasSynced_locked() 578 id := f.queue[0] 579 f.queue = f.queue[1:] 580 depth := len(f.queue) 581 if f.initialPopulationCount > 0 { 582 f.initialPopulationCount-- 583 } 584 item, ok := f.items[id] 585 if !ok { 586 // This should never happen 587 klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id) 588 continue 589 } 590 delete(f.items, id) 591 // Only log traces if the queue depth is greater than 10 and it takes more than 592 // 100 milliseconds to process one item from the queue. 593 // Queue depth never goes high because processing an item is locking the queue, 594 // and new items can't be added until processing finish. 595 // https://github.com/kubernetes/kubernetes/issues/103789 596 if depth > 10 { 597 trace := utiltrace.New("DeltaFIFO Pop Process", 598 utiltrace.Field{Key: "ID", Value: id}, 599 utiltrace.Field{Key: "Depth", Value: depth}, 600 utiltrace.Field{Key: "Reason", Value: "slow event handlers blocking the queue"}) 601 defer trace.LogIfLong(100 * time.Millisecond) 602 } 603 err := process(item, isInInitialList) 604 if e, ok := err.(ErrRequeue); ok { 605 f.addIfNotPresent(id, item) 606 err = e.Err 607 } 608 // Don't need to copyDeltas here, because we're transferring 609 // ownership to the caller. 610 return item, err 611 } 612 } 613 614 // Replace atomically does two things: (1) it adds the given objects 615 // using the Sync or Replace DeltaType and then (2) it does some deletions. 616 // In particular: for every pre-existing key K that is not the key of 617 // an object in `list` there is the effect of 618 // `Delete(DeletedFinalStateUnknown{K, O})` where O is the latest known 619 // object of K. The pre-existing keys are those in the union set of the keys in 620 // `f.items` and `f.knownObjects` (if not nil). The last known object for key K is 621 // the one present in the last delta in `f.items`. If there is no delta for K 622 // in `f.items`, it is the object in `f.knownObjects` 623 func (f *DeltaFIFO) Replace(list []interface{}, _ string) error { 624 f.lock.Lock() 625 defer f.lock.Unlock() 626 keys := make(sets.String, len(list)) 627 628 // keep backwards compat for old clients 629 action := Sync 630 if f.emitDeltaTypeReplaced { 631 action = Replaced 632 } 633 634 // Add Sync/Replaced action for each new item. 635 for _, item := range list { 636 key, err := f.KeyOf(item) 637 if err != nil { 638 return KeyError{item, err} 639 } 640 keys.Insert(key) 641 if err := f.queueActionLocked(action, item); err != nil { 642 return fmt.Errorf("couldn't enqueue object: %v", err) 643 } 644 } 645 646 // Do deletion detection against objects in the queue 647 queuedDeletions := 0 648 for k, oldItem := range f.items { 649 if keys.Has(k) { 650 continue 651 } 652 // Delete pre-existing items not in the new list. 653 // This could happen if watch deletion event was missed while 654 // disconnected from apiserver. 655 var deletedObj interface{} 656 if n := oldItem.Newest(); n != nil { 657 deletedObj = n.Object 658 659 // if the previous object is a DeletedFinalStateUnknown, we have to extract the actual Object 660 if d, ok := deletedObj.(DeletedFinalStateUnknown); ok { 661 deletedObj = d.Obj 662 } 663 } 664 queuedDeletions++ 665 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { 666 return err 667 } 668 } 669 670 if f.knownObjects != nil { 671 // Detect deletions for objects not present in the queue, but present in KnownObjects 672 knownKeys := f.knownObjects.ListKeys() 673 for _, k := range knownKeys { 674 if keys.Has(k) { 675 continue 676 } 677 if len(f.items[k]) > 0 { 678 continue 679 } 680 681 deletedObj, exists, err := f.knownObjects.GetByKey(k) 682 if err != nil { 683 deletedObj = nil 684 klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) 685 } else if !exists { 686 deletedObj = nil 687 klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) 688 } 689 queuedDeletions++ 690 if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { 691 return err 692 } 693 } 694 } 695 696 if !f.populated { 697 f.populated = true 698 f.initialPopulationCount = keys.Len() + queuedDeletions 699 } 700 701 return nil 702 } 703 704 // Resync adds, with a Sync type of Delta, every object listed by 705 // `f.knownObjects` whose key is not already queued for processing. 706 // If `f.knownObjects` is `nil` then Resync does nothing. 707 func (f *DeltaFIFO) Resync() error { 708 f.lock.Lock() 709 defer f.lock.Unlock() 710 711 if f.knownObjects == nil { 712 return nil 713 } 714 715 keys := f.knownObjects.ListKeys() 716 for _, k := range keys { 717 if err := f.syncKeyLocked(k); err != nil { 718 return err 719 } 720 } 721 return nil 722 } 723 724 func (f *DeltaFIFO) syncKeyLocked(key string) error { 725 obj, exists, err := f.knownObjects.GetByKey(key) 726 if err != nil { 727 klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) 728 return nil 729 } else if !exists { 730 klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) 731 return nil 732 } 733 734 // If we are doing Resync() and there is already an event queued for that object, 735 // we ignore the Resync for it. This is to avoid the race, in which the resync 736 // comes with the previous value of object (since queueing an event for the object 737 // doesn't trigger changing the underlying store <knownObjects>. 738 id, err := f.KeyOf(obj) 739 if err != nil { 740 return KeyError{obj, err} 741 } 742 if len(f.items[id]) > 0 { 743 return nil 744 } 745 746 if err := f.queueActionLocked(Sync, obj); err != nil { 747 return fmt.Errorf("couldn't queue object: %v", err) 748 } 749 return nil 750 } 751 752 // A KeyListerGetter is anything that knows how to list its keys and look up by key. 753 type KeyListerGetter interface { 754 KeyLister 755 KeyGetter 756 } 757 758 // A KeyLister is anything that knows how to list its keys. 759 type KeyLister interface { 760 ListKeys() []string 761 } 762 763 // A KeyGetter is anything that knows how to get the value stored under a given key. 764 type KeyGetter interface { 765 // GetByKey returns the value associated with the key, or sets exists=false. 766 GetByKey(key string) (value interface{}, exists bool, err error) 767 } 768 769 // Oldest is a convenience function that returns the oldest delta, or 770 // nil if there are no deltas. 771 func (d Deltas) Oldest() *Delta { 772 if len(d) > 0 { 773 return &d[0] 774 } 775 return nil 776 } 777 778 // Newest is a convenience function that returns the newest delta, or 779 // nil if there are no deltas. 780 func (d Deltas) Newest() *Delta { 781 if n := len(d); n > 0 { 782 return &d[n-1] 783 } 784 return nil 785 } 786 787 // copyDeltas returns a shallow copy of d; that is, it copies the slice but not 788 // the objects in the slice. This allows Get/List to return an object that we 789 // know won't be clobbered by a subsequent modifications. 790 func copyDeltas(d Deltas) Deltas { 791 d2 := make(Deltas, len(d)) 792 copy(d2, d) 793 return d2 794 } 795 796 // DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object 797 // was deleted but the watch deletion event was missed while disconnected from 798 // apiserver. In this case we don't know the final "resting" state of the object, so 799 // there's a chance the included `Obj` is stale. 800 type DeletedFinalStateUnknown struct { 801 Key string 802 Obj interface{} 803 } 804