1
16
17 package garbagecollector
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "sync"
24 "time"
25
26 "k8s.io/klog/v2"
27
28 v1 "k8s.io/api/core/v1"
29 eventv1 "k8s.io/api/events/v1"
30 "k8s.io/apimachinery/pkg/api/meta"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/runtime/schema"
33 "k8s.io/apimachinery/pkg/types"
34 utilerrors "k8s.io/apimachinery/pkg/util/errors"
35 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
36 "k8s.io/apimachinery/pkg/util/sets"
37 "k8s.io/apimachinery/pkg/util/wait"
38 "k8s.io/client-go/metadata"
39 "k8s.io/client-go/tools/cache"
40 "k8s.io/client-go/tools/record"
41 "k8s.io/client-go/util/workqueue"
42 "k8s.io/controller-manager/pkg/informerfactory"
43 "k8s.io/kubernetes/pkg/controller/apis/config/scheme"
44 "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
45 )
46
47 type eventType int
48
49 func (e eventType) String() string {
50 switch e {
51 case addEvent:
52 return "add"
53 case updateEvent:
54 return "update"
55 case deleteEvent:
56 return "delete"
57 default:
58 return fmt.Sprintf("unknown(%d)", int(e))
59 }
60 }
61
62 const (
63 addEvent eventType = iota
64 updateEvent
65 deleteEvent
66 )
67
68 type event struct {
69
70 virtual bool
71 eventType eventType
72 obj interface{}
73
74 oldObj interface{}
75 gvk schema.GroupVersionKind
76 }
77
78
79
80
81 type GraphBuilder struct {
82 restMapper meta.RESTMapper
83
84
85
86 monitors monitors
87 monitorLock sync.RWMutex
88
89
90 informersStarted <-chan struct{}
91
92
93
94 stopCh <-chan struct{}
95
96
97
98 running bool
99
100 eventRecorder record.EventRecorder
101 eventBroadcaster record.EventBroadcaster
102
103 metadataClient metadata.Interface
104
105
106 graphChanges workqueue.RateLimitingInterface
107
108
109 uidToNode *concurrentUIDToNode
110
111 attemptToDelete workqueue.RateLimitingInterface
112 attemptToOrphan workqueue.RateLimitingInterface
113
114
115 absentOwnerCache *ReferenceCache
116 sharedInformers informerfactory.InformerFactory
117 ignoredResources map[schema.GroupResource]struct{}
118 }
119
120
121 type monitor struct {
122 controller cache.Controller
123 store cache.Store
124
125
126
127 stopCh chan struct{}
128 }
129
130
131
132 func (m *monitor) Run() {
133 m.controller.Run(m.stopCh)
134 }
135
136 type monitors map[schema.GroupVersionResource]*monitor
137
138 func NewDependencyGraphBuilder(
139 ctx context.Context,
140 metadataClient metadata.Interface,
141 mapper meta.ResettableRESTMapper,
142 ignoredResources map[schema.GroupResource]struct{},
143 sharedInformers informerfactory.InformerFactory,
144 informersStarted <-chan struct{},
145 ) *GraphBuilder {
146 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
147
148 attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
149 attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
150 absentOwnerCache := NewReferenceCache(500)
151 graphBuilder := &GraphBuilder{
152 eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "garbage-collector-controller"}),
153 eventBroadcaster: eventBroadcaster,
154 metadataClient: metadataClient,
155 informersStarted: informersStarted,
156 restMapper: mapper,
157 graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
158 uidToNode: &concurrentUIDToNode{
159 uidToNode: make(map[types.UID]*node),
160 },
161 attemptToDelete: attemptToDelete,
162 attemptToOrphan: attemptToOrphan,
163 absentOwnerCache: absentOwnerCache,
164 sharedInformers: sharedInformers,
165 ignoredResources: ignoredResources,
166 }
167
168 return graphBuilder
169 }
170
171 func (gb *GraphBuilder) controllerFor(logger klog.Logger, resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
172 handlers := cache.ResourceEventHandlerFuncs{
173
174 AddFunc: func(obj interface{}) {
175 event := &event{
176 eventType: addEvent,
177 obj: obj,
178 gvk: kind,
179 }
180 gb.graphChanges.Add(event)
181 },
182 UpdateFunc: func(oldObj, newObj interface{}) {
183
184
185 event := &event{
186 eventType: updateEvent,
187 obj: newObj,
188 oldObj: oldObj,
189 gvk: kind,
190 }
191 gb.graphChanges.Add(event)
192 },
193 DeleteFunc: func(obj interface{}) {
194
195 if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
196 obj = deletedFinalStateUnknown.Obj
197 }
198 event := &event{
199 eventType: deleteEvent,
200 obj: obj,
201 gvk: kind,
202 }
203 gb.graphChanges.Add(event)
204 },
205 }
206
207 shared, err := gb.sharedInformers.ForResource(resource)
208 if err != nil {
209 logger.V(4).Error(err, "unable to use a shared informer", "resource", resource, "kind", kind)
210 return nil, nil, err
211 }
212 logger.V(4).Info("using a shared informer", "resource", resource, "kind", kind)
213
214 shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
215 return shared.Informer().GetController(), shared.Informer().GetStore(), nil
216 }
217
218
219
220
221
222
223
224 func (gb *GraphBuilder) syncMonitors(logger klog.Logger, resources map[schema.GroupVersionResource]struct{}) error {
225 gb.monitorLock.Lock()
226 defer gb.monitorLock.Unlock()
227
228 toRemove := gb.monitors
229 if toRemove == nil {
230 toRemove = monitors{}
231 }
232 current := monitors{}
233 errs := []error{}
234 kept := 0
235 added := 0
236 for resource := range resources {
237 if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
238 continue
239 }
240 if m, ok := toRemove[resource]; ok {
241 current[resource] = m
242 delete(toRemove, resource)
243 kept++
244 continue
245 }
246 kind, err := gb.restMapper.KindFor(resource)
247 if err != nil {
248 errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
249 continue
250 }
251 c, s, err := gb.controllerFor(logger, resource, kind)
252 if err != nil {
253 errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
254 continue
255 }
256 current[resource] = &monitor{store: s, controller: c}
257 added++
258 }
259 gb.monitors = current
260
261 for _, monitor := range toRemove {
262 if monitor.stopCh != nil {
263 close(monitor.stopCh)
264 }
265 }
266
267 logger.V(4).Info("synced monitors", "added", added, "kept", kept, "removed", len(toRemove))
268
269 return utilerrors.NewAggregate(errs)
270 }
271
272
273
274
275
276
277 func (gb *GraphBuilder) startMonitors(logger klog.Logger) {
278 gb.monitorLock.Lock()
279 defer gb.monitorLock.Unlock()
280
281 if !gb.running {
282 return
283 }
284
285
286
287 <-gb.informersStarted
288
289 monitors := gb.monitors
290 started := 0
291 for _, monitor := range monitors {
292 if monitor.stopCh == nil {
293 monitor.stopCh = make(chan struct{})
294 gb.sharedInformers.Start(gb.stopCh)
295 go monitor.Run()
296 started++
297 }
298 }
299 logger.V(4).Info("started new monitors", "new", started, "current", len(monitors))
300 }
301
302
303 func (gb *GraphBuilder) IsResourceSynced(resource schema.GroupVersionResource) bool {
304 gb.monitorLock.Lock()
305 defer gb.monitorLock.Unlock()
306 monitor, ok := gb.monitors[resource]
307 return ok && monitor.controller.HasSynced()
308 }
309
310
311
312
313
314 func (gb *GraphBuilder) IsSynced(logger klog.Logger) bool {
315 gb.monitorLock.Lock()
316 defer gb.monitorLock.Unlock()
317
318 if len(gb.monitors) == 0 {
319 logger.V(4).Info("garbage controller monitor not synced: no monitors")
320 return false
321 }
322
323 for resource, monitor := range gb.monitors {
324 if !monitor.controller.HasSynced() {
325 logger.V(4).Info("garbage controller monitor not yet synced", "resource", resource)
326 return false
327 }
328 }
329 return true
330 }
331
332
333
334 func (gb *GraphBuilder) Run(ctx context.Context) {
335 logger := klog.FromContext(ctx)
336 logger.Info("Running", "component", "GraphBuilder")
337 defer logger.Info("Stopping", "component", "GraphBuilder")
338
339
340 gb.monitorLock.Lock()
341 gb.stopCh = ctx.Done()
342 gb.running = true
343 gb.monitorLock.Unlock()
344
345
346
347 gb.startMonitors(logger)
348 wait.Until(func() { gb.runProcessGraphChanges(logger) }, 1*time.Second, ctx.Done())
349
350
351 gb.monitorLock.Lock()
352 defer gb.monitorLock.Unlock()
353 monitors := gb.monitors
354 stopped := 0
355 for _, monitor := range monitors {
356 if monitor.stopCh != nil {
357 stopped++
358 close(monitor.stopCh)
359 }
360 }
361
362
363 gb.monitors = nil
364 logger.Info("stopped monitors", "stopped", stopped, "total", len(monitors))
365 }
366
367 var ignoredResources = map[schema.GroupResource]struct{}{
368 {Group: "", Resource: "events"}: {},
369 {Group: eventv1.GroupName, Resource: "events"}: {},
370 }
371
372
373
374
375 func DefaultIgnoredResources() map[schema.GroupResource]struct{} {
376 return ignoredResources
377 }
378
379
380
381 func (gb *GraphBuilder) enqueueVirtualDeleteEvent(ref objectReference) {
382 gv, _ := schema.ParseGroupVersion(ref.APIVersion)
383 gb.graphChanges.Add(&event{
384 virtual: true,
385 eventType: deleteEvent,
386 gvk: gv.WithKind(ref.Kind),
387 obj: &metaonly.MetadataOnlyObject{
388 TypeMeta: metav1.TypeMeta{APIVersion: ref.APIVersion, Kind: ref.Kind},
389 ObjectMeta: metav1.ObjectMeta{Namespace: ref.Namespace, UID: ref.UID, Name: ref.Name},
390 },
391 })
392 }
393
394
395
396
397
398 func (gb *GraphBuilder) addDependentToOwners(logger klog.Logger, n *node, owners []metav1.OwnerReference) {
399
400
401 hasPotentiallyInvalidOwnerReference := false
402
403 for _, owner := range owners {
404 ownerNode, ok := gb.uidToNode.Read(owner.UID)
405 if !ok {
406
407
408 ownerNode = &node{
409 identity: objectReference{
410 OwnerReference: ownerReferenceCoordinates(owner),
411 Namespace: n.identity.Namespace,
412 },
413 dependents: make(map[*node]struct{}),
414 virtual: true,
415 }
416 logger.V(5).Info("add virtual item", "identity", ownerNode.identity)
417 gb.uidToNode.Write(ownerNode)
418 }
419 ownerNode.addDependent(n)
420 if !ok {
421
422
423
424
425 gb.attemptToDelete.Add(ownerNode)
426 } else if !hasPotentiallyInvalidOwnerReference {
427 ownerIsNamespaced := len(ownerNode.identity.Namespace) > 0
428 if ownerIsNamespaced && ownerNode.identity.Namespace != n.identity.Namespace {
429 if ownerNode.isObserved() {
430
431
432
433 logger.V(2).Info("item references an owner but does not match namespaces", "item", n.identity, "owner", ownerNode.identity)
434 gb.reportInvalidNamespaceOwnerRef(n, owner.UID)
435 }
436 hasPotentiallyInvalidOwnerReference = true
437 } else if !ownerReferenceMatchesCoordinates(owner, ownerNode.identity.OwnerReference) {
438 if ownerNode.isObserved() {
439
440
441 logger.V(2).Info("item references an owner with coordinates that do not match the observed identity", "item", n.identity, "owner", ownerNode.identity)
442 }
443 hasPotentiallyInvalidOwnerReference = true
444 } else if !ownerIsNamespaced && ownerNode.identity.Namespace != n.identity.Namespace && !ownerNode.isObserved() {
445
446
447
448 hasPotentiallyInvalidOwnerReference = true
449 }
450 }
451 }
452
453 if hasPotentiallyInvalidOwnerReference {
454
455
456
457 gb.attemptToDelete.Add(n)
458 }
459 }
460
461 func (gb *GraphBuilder) reportInvalidNamespaceOwnerRef(n *node, invalidOwnerUID types.UID) {
462 var invalidOwnerRef metav1.OwnerReference
463 var found = false
464 for _, ownerRef := range n.owners {
465 if ownerRef.UID == invalidOwnerUID {
466 invalidOwnerRef = ownerRef
467 found = true
468 break
469 }
470 }
471 if !found {
472 return
473 }
474 ref := &v1.ObjectReference{
475 Kind: n.identity.Kind,
476 APIVersion: n.identity.APIVersion,
477 Namespace: n.identity.Namespace,
478 Name: n.identity.Name,
479 UID: n.identity.UID,
480 }
481 invalidIdentity := objectReference{
482 OwnerReference: metav1.OwnerReference{
483 Kind: invalidOwnerRef.Kind,
484 APIVersion: invalidOwnerRef.APIVersion,
485 Name: invalidOwnerRef.Name,
486 UID: invalidOwnerRef.UID,
487 },
488 Namespace: n.identity.Namespace,
489 }
490 gb.eventRecorder.Eventf(ref, v1.EventTypeWarning, "OwnerRefInvalidNamespace", "ownerRef %s does not exist in namespace %q", invalidIdentity, n.identity.Namespace)
491 }
492
493
494
495 func (gb *GraphBuilder) insertNode(logger klog.Logger, n *node) {
496 gb.uidToNode.Write(n)
497 gb.addDependentToOwners(logger, n, n.owners)
498 }
499
500
501 func (gb *GraphBuilder) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) {
502 for _, owner := range owners {
503 ownerNode, ok := gb.uidToNode.Read(owner.UID)
504 if !ok {
505 continue
506 }
507 ownerNode.deleteDependent(n)
508 }
509 }
510
511
512
513 func (gb *GraphBuilder) removeNode(n *node) {
514 gb.uidToNode.Delete(n.identity.UID)
515 gb.removeDependentFromOwners(n, n.owners)
516 }
517
518 type ownerRefPair struct {
519 oldRef metav1.OwnerReference
520 newRef metav1.OwnerReference
521 }
522
523
524
525 func referencesDiffs(old []metav1.OwnerReference, new []metav1.OwnerReference) (added []metav1.OwnerReference, removed []metav1.OwnerReference, changed []ownerRefPair) {
526 oldUIDToRef := make(map[string]metav1.OwnerReference)
527 for _, value := range old {
528 oldUIDToRef[string(value.UID)] = value
529 }
530 oldUIDSet := sets.StringKeySet(oldUIDToRef)
531 for _, value := range new {
532 newUID := string(value.UID)
533 if oldUIDSet.Has(newUID) {
534 if !reflect.DeepEqual(oldUIDToRef[newUID], value) {
535 changed = append(changed, ownerRefPair{oldRef: oldUIDToRef[newUID], newRef: value})
536 }
537 oldUIDSet.Delete(newUID)
538 } else {
539 added = append(added, value)
540 }
541 }
542 for oldUID := range oldUIDSet {
543 removed = append(removed, oldUIDToRef[oldUID])
544 }
545
546 return added, removed, changed
547 }
548
549 func deletionStartsWithFinalizer(oldObj interface{}, newAccessor metav1.Object, matchingFinalizer string) bool {
550
551 if !beingDeleted(newAccessor) || !hasFinalizer(newAccessor, matchingFinalizer) {
552 return false
553 }
554
555
556 if oldObj == nil {
557 return true
558 }
559 oldAccessor, err := meta.Accessor(oldObj)
560 if err != nil {
561 utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
562 return false
563 }
564 return !beingDeleted(oldAccessor) || !hasFinalizer(oldAccessor, matchingFinalizer)
565 }
566
567 func beingDeleted(accessor metav1.Object) bool {
568 return accessor.GetDeletionTimestamp() != nil
569 }
570
571 func hasDeleteDependentsFinalizer(accessor metav1.Object) bool {
572 return hasFinalizer(accessor, metav1.FinalizerDeleteDependents)
573 }
574
575 func hasOrphanFinalizer(accessor metav1.Object) bool {
576 return hasFinalizer(accessor, metav1.FinalizerOrphanDependents)
577 }
578
579 func hasFinalizer(accessor metav1.Object, matchingFinalizer string) bool {
580 finalizers := accessor.GetFinalizers()
581 for _, finalizer := range finalizers {
582 if finalizer == matchingFinalizer {
583 return true
584 }
585 }
586 return false
587 }
588
589
590
591 func startsWaitingForDependentsDeleted(oldObj interface{}, newAccessor metav1.Object) bool {
592 return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerDeleteDependents)
593 }
594
595
596
597 func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.Object) bool {
598 return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerOrphanDependents)
599 }
600
601
602
603 func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(logger klog.Logger, removed []metav1.OwnerReference, changed []ownerRefPair) {
604 for _, ref := range removed {
605 if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion {
606 node, found := gb.uidToNode.Read(ref.UID)
607 if !found {
608 logger.V(5).Info("cannot find uid in uidToNode", "uid", ref.UID)
609 continue
610 }
611 gb.attemptToDelete.Add(node)
612 }
613 }
614 for _, c := range changed {
615 wasBlocked := c.oldRef.BlockOwnerDeletion != nil && *c.oldRef.BlockOwnerDeletion
616 isUnblocked := c.newRef.BlockOwnerDeletion == nil || (c.newRef.BlockOwnerDeletion != nil && !*c.newRef.BlockOwnerDeletion)
617 if wasBlocked && isUnblocked {
618 node, found := gb.uidToNode.Read(c.newRef.UID)
619 if !found {
620 logger.V(5).Info("cannot find uid in uidToNode", "uid", c.newRef.UID)
621 continue
622 }
623 gb.attemptToDelete.Add(node)
624 }
625 }
626 }
627
628 func (gb *GraphBuilder) processTransitions(logger klog.Logger, oldObj interface{}, newAccessor metav1.Object, n *node) {
629 if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
630 logger.V(5).Info("add item to attemptToOrphan", "item", n.identity)
631 gb.attemptToOrphan.Add(n)
632 return
633 }
634 if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
635 logger.V(2).Info("add item to attemptToDelete, because it's waiting for its dependents to be deleted", "item", n.identity)
636
637 n.markDeletingDependents()
638 for dep := range n.dependents {
639 gb.attemptToDelete.Add(dep)
640 }
641 gb.attemptToDelete.Add(n)
642 }
643 }
644
645 func (gb *GraphBuilder) runProcessGraphChanges(logger klog.Logger) {
646 for gb.processGraphChanges(logger) {
647 }
648 }
649
650 func identityFromEvent(event *event, accessor metav1.Object) objectReference {
651 return objectReference{
652 OwnerReference: metav1.OwnerReference{
653 APIVersion: event.gvk.GroupVersion().String(),
654 Kind: event.gvk.Kind,
655 UID: accessor.GetUID(),
656 Name: accessor.GetName(),
657 },
658 Namespace: accessor.GetNamespace(),
659 }
660 }
661
662
663 func (gb *GraphBuilder) processGraphChanges(logger klog.Logger) bool {
664 item, quit := gb.graphChanges.Get()
665 if quit {
666 return false
667 }
668 defer gb.graphChanges.Done(item)
669 event, ok := item.(*event)
670 if !ok {
671 utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
672 return true
673 }
674 obj := event.obj
675 accessor, err := meta.Accessor(obj)
676 if err != nil {
677 utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
678 return true
679 }
680
681 logger.V(5).Info("GraphBuilder process object",
682 "apiVersion", event.gvk.GroupVersion().String(),
683 "kind", event.gvk.Kind,
684 "object", klog.KObj(accessor),
685 "uid", string(accessor.GetUID()),
686 "eventType", event.eventType,
687 "virtual", event.virtual,
688 )
689
690
691 existingNode, found := gb.uidToNode.Read(accessor.GetUID())
692 if found && !event.virtual && !existingNode.isObserved() {
693
694
695
696 observedIdentity := identityFromEvent(event, accessor)
697 if observedIdentity != existingNode.identity {
698
699 _, potentiallyInvalidDependents := partitionDependents(existingNode.getDependents(), observedIdentity)
700
701
702
703
704 for _, dep := range potentiallyInvalidDependents {
705 if len(observedIdentity.Namespace) > 0 && dep.identity.Namespace != observedIdentity.Namespace {
706
707 logger.V(2).Info("item references an owner but does not match namespaces",
708 "item", dep.identity,
709 "owner", observedIdentity,
710 )
711 gb.reportInvalidNamespaceOwnerRef(dep, observedIdentity.UID)
712 }
713 gb.attemptToDelete.Add(dep)
714 }
715
716
717 logger.V(2).Info("replacing virtual item with observed item",
718 "virtual", existingNode.identity,
719 "observed", observedIdentity,
720 )
721 existingNode = existingNode.clone()
722 existingNode.identity = observedIdentity
723 gb.uidToNode.Write(existingNode)
724 }
725 existingNode.markObserved()
726 }
727 switch {
728 case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
729 newNode := &node{
730 identity: identityFromEvent(event, accessor),
731 dependents: make(map[*node]struct{}),
732 owners: accessor.GetOwnerReferences(),
733 deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
734 beingDeleted: beingDeleted(accessor),
735 }
736 gb.insertNode(logger, newNode)
737
738
739 gb.processTransitions(logger, event.oldObj, accessor, newNode)
740 case (event.eventType == addEvent || event.eventType == updateEvent) && found:
741
742 added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
743 if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
744
745
746 gb.addUnblockedOwnersToDeleteQueue(logger, removed, changed)
747
748 existingNode.owners = accessor.GetOwnerReferences()
749
750 gb.addDependentToOwners(logger, existingNode, added)
751
752
753 gb.removeDependentFromOwners(existingNode, removed)
754 }
755
756 if beingDeleted(accessor) {
757 existingNode.markBeingDeleted()
758 }
759 gb.processTransitions(logger, event.oldObj, accessor, existingNode)
760 case event.eventType == deleteEvent:
761 if !found {
762 logger.V(5).Info("item doesn't exist in the graph, this shouldn't happen",
763 "item", accessor.GetUID(),
764 )
765 return true
766 }
767
768 removeExistingNode := true
769
770 if event.virtual {
771
772 deletedIdentity := identityFromEvent(event, accessor)
773 if existingNode.virtual {
774
775
776
777 if matchingDependents, nonmatchingDependents := partitionDependents(existingNode.getDependents(), deletedIdentity); len(nonmatchingDependents) > 0 {
778
779
780 removeExistingNode = false
781
782 if len(matchingDependents) > 0 {
783
784 gb.absentOwnerCache.Add(deletedIdentity)
785
786 for _, dep := range matchingDependents {
787 gb.attemptToDelete.Add(dep)
788 }
789 }
790
791
792 if existingNode.identity == deletedIdentity {
793
794 replacementIdentity := getAlternateOwnerIdentity(nonmatchingDependents, deletedIdentity)
795 if replacementIdentity != nil {
796
797 replacementNode := existingNode.clone()
798 replacementNode.identity = *replacementIdentity
799 gb.uidToNode.Write(replacementNode)
800
801 gb.attemptToDelete.AddRateLimited(replacementNode)
802 }
803 }
804 }
805
806 } else if existingNode.identity != deletedIdentity {
807
808 removeExistingNode = false
809
810
811 matchingDependents, _ := partitionDependents(existingNode.getDependents(), deletedIdentity)
812
813 if len(matchingDependents) > 0 {
814
815 gb.absentOwnerCache.Add(deletedIdentity)
816
817 for _, dep := range matchingDependents {
818 gb.attemptToDelete.Add(dep)
819 }
820 }
821 }
822 }
823
824 if removeExistingNode {
825
826 gb.removeNode(existingNode)
827 existingNode.dependentsLock.RLock()
828 defer existingNode.dependentsLock.RUnlock()
829 if len(existingNode.dependents) > 0 {
830 gb.absentOwnerCache.Add(identityFromEvent(event, accessor))
831 }
832 for dep := range existingNode.dependents {
833 gb.attemptToDelete.Add(dep)
834 }
835 for _, owner := range existingNode.owners {
836 ownerNode, found := gb.uidToNode.Read(owner.UID)
837 if !found || !ownerNode.isDeletingDependents() {
838 continue
839 }
840
841
842 gb.attemptToDelete.Add(ownerNode)
843 }
844 }
845 }
846 return true
847 }
848
849
850
851
852 func partitionDependents(dependents []*node, matchOwnerIdentity objectReference) (matching, nonmatching []*node) {
853 ownerIsNamespaced := len(matchOwnerIdentity.Namespace) > 0
854 for i := range dependents {
855 dep := dependents[i]
856 foundMatch := false
857 foundMismatch := false
858
859 if ownerIsNamespaced && matchOwnerIdentity.Namespace != dep.identity.Namespace {
860
861 foundMismatch = true
862 } else {
863 for _, ownerRef := range dep.owners {
864
865 if ownerRef.UID == matchOwnerIdentity.UID {
866
867 if ownerReferenceMatchesCoordinates(ownerRef, matchOwnerIdentity.OwnerReference) {
868 foundMatch = true
869 } else {
870 foundMismatch = true
871 }
872 }
873 }
874 }
875
876 if foundMatch {
877 matching = append(matching, dep)
878 }
879 if foundMismatch {
880 nonmatching = append(nonmatching, dep)
881 }
882 }
883 return matching, nonmatching
884 }
885
886 func referenceLessThan(a, b objectReference) bool {
887
888
889
890 if a.Kind != b.Kind {
891 return a.Kind < b.Kind
892 }
893 if a.APIVersion != b.APIVersion {
894 return a.APIVersion < b.APIVersion
895 }
896
897 if a.Namespace != b.Namespace {
898 return a.Namespace < b.Namespace
899 }
900
901 if a.Name != b.Name {
902 return a.Name < b.Name
903 }
904
905
906 if a.UID != b.UID {
907 return a.UID < b.UID
908 }
909 return false
910 }
911
912
913
914
915
916
917 func getAlternateOwnerIdentity(deps []*node, verifiedAbsentIdentity objectReference) *objectReference {
918 absentIdentityIsClusterScoped := len(verifiedAbsentIdentity.Namespace) == 0
919
920 seenAlternates := map[objectReference]bool{verifiedAbsentIdentity: true}
921
922
923 var first *objectReference
924
925 var firstFollowing *objectReference
926
927 for _, dep := range deps {
928 for _, ownerRef := range dep.owners {
929 if ownerRef.UID != verifiedAbsentIdentity.UID {
930
931 continue
932 }
933
934 if ownerReferenceMatchesCoordinates(ownerRef, verifiedAbsentIdentity.OwnerReference) {
935 if absentIdentityIsClusterScoped || verifiedAbsentIdentity.Namespace == dep.identity.Namespace {
936
937 continue
938 }
939 }
940
941 ref := objectReference{OwnerReference: ownerReferenceCoordinates(ownerRef), Namespace: dep.identity.Namespace}
942 if absentIdentityIsClusterScoped && ref.APIVersion == verifiedAbsentIdentity.APIVersion && ref.Kind == verifiedAbsentIdentity.Kind {
943
944
945 ref.Namespace = ""
946 }
947
948 if seenAlternates[ref] {
949
950 continue
951 }
952 seenAlternates[ref] = true
953
954 if first == nil || referenceLessThan(ref, *first) {
955
956 first = &ref
957 }
958 if referenceLessThan(verifiedAbsentIdentity, ref) && (firstFollowing == nil || referenceLessThan(ref, *firstFollowing)) {
959
960 firstFollowing = &ref
961 }
962 }
963 }
964
965
966 if firstFollowing != nil {
967 return firstFollowing
968 }
969
970 return first
971 }
972
973 func (gb *GraphBuilder) GetGraphResources() (
974 attemptToDelete workqueue.RateLimitingInterface,
975 attemptToOrphan workqueue.RateLimitingInterface,
976 absentOwnerCache *ReferenceCache,
977 ) {
978 return gb.attemptToDelete, gb.attemptToOrphan, gb.absentOwnerCache
979 }
980
981 type Monitor struct {
982 Store cache.Store
983 Controller cache.Controller
984 }
985
986
987
988
989 func (gb *GraphBuilder) GetMonitor(ctx context.Context, resource schema.GroupVersionResource) (*Monitor, error) {
990 gb.monitorLock.RLock()
991 defer gb.monitorLock.RUnlock()
992
993 var monitor *monitor
994 if m, ok := gb.monitors[resource]; ok {
995 monitor = m
996 } else {
997 for monitorGVR, m := range gb.monitors {
998 if monitorGVR.Group == resource.Group && monitorGVR.Resource == resource.Resource {
999 monitor = m
1000 break
1001 }
1002 }
1003 }
1004
1005 if monitor == nil {
1006 return nil, fmt.Errorf("no monitor found for resource %s", resource.String())
1007 }
1008
1009 resourceMonitor := &Monitor{
1010 Store: monitor.store,
1011 Controller: monitor.controller,
1012 }
1013
1014 if !cache.WaitForNamedCacheSync(
1015 gb.Name(),
1016 ctx.Done(),
1017 func() bool {
1018 return monitor.controller.HasSynced()
1019 },
1020 ) {
1021
1022 return resourceMonitor, fmt.Errorf("dependency graph for resource %s is not synced", resource.String())
1023 }
1024
1025 return resourceMonitor, nil
1026 }
1027
1028 func (gb *GraphBuilder) Name() string {
1029 return "dependencygraphbuilder"
1030 }
1031
View as plain text