...

Source file src/k8s.io/kubernetes/pkg/controller/garbagecollector/graph_builder.go

Documentation: k8s.io/kubernetes/pkg/controller/garbagecollector

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package garbagecollector
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"sync"
    24  	"time"
    25  
    26  	"k8s.io/klog/v2"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	eventv1 "k8s.io/api/events/v1"
    30  	"k8s.io/apimachinery/pkg/api/meta"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/runtime/schema"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    35  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    36  	"k8s.io/apimachinery/pkg/util/sets"
    37  	"k8s.io/apimachinery/pkg/util/wait"
    38  	"k8s.io/client-go/metadata"
    39  	"k8s.io/client-go/tools/cache"
    40  	"k8s.io/client-go/tools/record"
    41  	"k8s.io/client-go/util/workqueue"
    42  	"k8s.io/controller-manager/pkg/informerfactory"
    43  	"k8s.io/kubernetes/pkg/controller/apis/config/scheme"
    44  	"k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly"
    45  )
    46  
    47  type eventType int
    48  
    49  func (e eventType) String() string {
    50  	switch e {
    51  	case addEvent:
    52  		return "add"
    53  	case updateEvent:
    54  		return "update"
    55  	case deleteEvent:
    56  		return "delete"
    57  	default:
    58  		return fmt.Sprintf("unknown(%d)", int(e))
    59  	}
    60  }
    61  
    62  const (
    63  	addEvent eventType = iota
    64  	updateEvent
    65  	deleteEvent
    66  )
    67  
    68  type event struct {
    69  	// virtual indicates this event did not come from an informer, but was constructed artificially
    70  	virtual   bool
    71  	eventType eventType
    72  	obj       interface{}
    73  	// the update event comes with an old object, but it's not used by the garbage collector.
    74  	oldObj interface{}
    75  	gvk    schema.GroupVersionKind
    76  }
    77  
    78  // GraphBuilder processes events supplied by the informers, updates uidToNode,
    79  // a graph that caches the dependencies as we know, and enqueues
    80  // items to the attemptToDelete and attemptToOrphan.
    81  type GraphBuilder struct {
    82  	restMapper meta.RESTMapper
    83  
    84  	// each monitor list/watches a resource, the results are funneled to the
    85  	// dependencyGraphBuilder
    86  	monitors    monitors
    87  	monitorLock sync.RWMutex
    88  	// informersStarted is closed after after all of the controllers have been initialized and are running.
    89  	// After that it is safe to start them here, before that it is not.
    90  	informersStarted <-chan struct{}
    91  
    92  	// stopCh drives shutdown. When a receive from it unblocks, monitors will shut down.
    93  	// This channel is also protected by monitorLock.
    94  	stopCh <-chan struct{}
    95  
    96  	// running tracks whether Run() has been called.
    97  	// it is protected by monitorLock.
    98  	running bool
    99  
   100  	eventRecorder    record.EventRecorder
   101  	eventBroadcaster record.EventBroadcaster
   102  
   103  	metadataClient metadata.Interface
   104  	// monitors are the producer of the graphChanges queue, graphBuilder alters
   105  	// the in-memory graph according to the changes.
   106  	graphChanges workqueue.RateLimitingInterface
   107  	// uidToNode doesn't require a lock to protect, because only the
   108  	// single-threaded GraphBuilder.processGraphChanges() reads/writes it.
   109  	uidToNode *concurrentUIDToNode
   110  	// GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer.
   111  	attemptToDelete workqueue.RateLimitingInterface
   112  	attemptToOrphan workqueue.RateLimitingInterface
   113  	// GraphBuilder and GC share the absentOwnerCache. Objects that are known to
   114  	// be non-existent are added to the cached.
   115  	absentOwnerCache *ReferenceCache
   116  	sharedInformers  informerfactory.InformerFactory
   117  	ignoredResources map[schema.GroupResource]struct{}
   118  }
   119  
   120  // monitor runs a Controller with a local stop channel.
   121  type monitor struct {
   122  	controller cache.Controller
   123  	store      cache.Store
   124  
   125  	// stopCh stops Controller. If stopCh is nil, the monitor is considered to be
   126  	// not yet started.
   127  	stopCh chan struct{}
   128  }
   129  
   130  // Run is intended to be called in a goroutine. Multiple calls of this is an
   131  // error.
   132  func (m *monitor) Run() {
   133  	m.controller.Run(m.stopCh)
   134  }
   135  
   136  type monitors map[schema.GroupVersionResource]*monitor
   137  
   138  func NewDependencyGraphBuilder(
   139  	ctx context.Context,
   140  	metadataClient metadata.Interface,
   141  	mapper meta.ResettableRESTMapper,
   142  	ignoredResources map[schema.GroupResource]struct{},
   143  	sharedInformers informerfactory.InformerFactory,
   144  	informersStarted <-chan struct{},
   145  ) *GraphBuilder {
   146  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   147  
   148  	attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete")
   149  	attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan")
   150  	absentOwnerCache := NewReferenceCache(500)
   151  	graphBuilder := &GraphBuilder{
   152  		eventRecorder:    eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "garbage-collector-controller"}),
   153  		eventBroadcaster: eventBroadcaster,
   154  		metadataClient:   metadataClient,
   155  		informersStarted: informersStarted,
   156  		restMapper:       mapper,
   157  		graphChanges:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"),
   158  		uidToNode: &concurrentUIDToNode{
   159  			uidToNode: make(map[types.UID]*node),
   160  		},
   161  		attemptToDelete:  attemptToDelete,
   162  		attemptToOrphan:  attemptToOrphan,
   163  		absentOwnerCache: absentOwnerCache,
   164  		sharedInformers:  sharedInformers,
   165  		ignoredResources: ignoredResources,
   166  	}
   167  
   168  	return graphBuilder
   169  }
   170  
   171  func (gb *GraphBuilder) controllerFor(logger klog.Logger, resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, cache.Store, error) {
   172  	handlers := cache.ResourceEventHandlerFuncs{
   173  		// add the event to the dependencyGraphBuilder's graphChanges.
   174  		AddFunc: func(obj interface{}) {
   175  			event := &event{
   176  				eventType: addEvent,
   177  				obj:       obj,
   178  				gvk:       kind,
   179  			}
   180  			gb.graphChanges.Add(event)
   181  		},
   182  		UpdateFunc: func(oldObj, newObj interface{}) {
   183  			// TODO: check if there are differences in the ownerRefs,
   184  			// finalizers, and DeletionTimestamp; if not, ignore the update.
   185  			event := &event{
   186  				eventType: updateEvent,
   187  				obj:       newObj,
   188  				oldObj:    oldObj,
   189  				gvk:       kind,
   190  			}
   191  			gb.graphChanges.Add(event)
   192  		},
   193  		DeleteFunc: func(obj interface{}) {
   194  			// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
   195  			if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok {
   196  				obj = deletedFinalStateUnknown.Obj
   197  			}
   198  			event := &event{
   199  				eventType: deleteEvent,
   200  				obj:       obj,
   201  				gvk:       kind,
   202  			}
   203  			gb.graphChanges.Add(event)
   204  		},
   205  	}
   206  
   207  	shared, err := gb.sharedInformers.ForResource(resource)
   208  	if err != nil {
   209  		logger.V(4).Error(err, "unable to use a shared informer", "resource", resource, "kind", kind)
   210  		return nil, nil, err
   211  	}
   212  	logger.V(4).Info("using a shared informer", "resource", resource, "kind", kind)
   213  	// need to clone because it's from a shared cache
   214  	shared.Informer().AddEventHandlerWithResyncPeriod(handlers, ResourceResyncTime)
   215  	return shared.Informer().GetController(), shared.Informer().GetStore(), nil
   216  }
   217  
   218  // syncMonitors rebuilds the monitor set according to the supplied resources,
   219  // creating or deleting monitors as necessary. It will return any error
   220  // encountered, but will make an attempt to create a monitor for each resource
   221  // instead of immediately exiting on an error. It may be called before or after
   222  // Run. Monitors are NOT started as part of the sync. To ensure all existing
   223  // monitors are started, call startMonitors.
   224  func (gb *GraphBuilder) syncMonitors(logger klog.Logger, resources map[schema.GroupVersionResource]struct{}) error {
   225  	gb.monitorLock.Lock()
   226  	defer gb.monitorLock.Unlock()
   227  
   228  	toRemove := gb.monitors
   229  	if toRemove == nil {
   230  		toRemove = monitors{}
   231  	}
   232  	current := monitors{}
   233  	errs := []error{}
   234  	kept := 0
   235  	added := 0
   236  	for resource := range resources {
   237  		if _, ok := gb.ignoredResources[resource.GroupResource()]; ok {
   238  			continue
   239  		}
   240  		if m, ok := toRemove[resource]; ok {
   241  			current[resource] = m
   242  			delete(toRemove, resource)
   243  			kept++
   244  			continue
   245  		}
   246  		kind, err := gb.restMapper.KindFor(resource)
   247  		if err != nil {
   248  			errs = append(errs, fmt.Errorf("couldn't look up resource %q: %v", resource, err))
   249  			continue
   250  		}
   251  		c, s, err := gb.controllerFor(logger, resource, kind)
   252  		if err != nil {
   253  			errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
   254  			continue
   255  		}
   256  		current[resource] = &monitor{store: s, controller: c}
   257  		added++
   258  	}
   259  	gb.monitors = current
   260  
   261  	for _, monitor := range toRemove {
   262  		if monitor.stopCh != nil {
   263  			close(monitor.stopCh)
   264  		}
   265  	}
   266  
   267  	logger.V(4).Info("synced monitors", "added", added, "kept", kept, "removed", len(toRemove))
   268  	// NewAggregate returns nil if errs is 0-length
   269  	return utilerrors.NewAggregate(errs)
   270  }
   271  
   272  // startMonitors ensures the current set of monitors are running. Any newly
   273  // started monitors will also cause shared informers to be started.
   274  //
   275  // If called before Run, startMonitors does nothing (as there is no stop channel
   276  // to support monitor/informer execution).
   277  func (gb *GraphBuilder) startMonitors(logger klog.Logger) {
   278  	gb.monitorLock.Lock()
   279  	defer gb.monitorLock.Unlock()
   280  
   281  	if !gb.running {
   282  		return
   283  	}
   284  
   285  	// we're waiting until after the informer start that happens once all the controllers are initialized.  This ensures
   286  	// that they don't get unexpected events on their work queues.
   287  	<-gb.informersStarted
   288  
   289  	monitors := gb.monitors
   290  	started := 0
   291  	for _, monitor := range monitors {
   292  		if monitor.stopCh == nil {
   293  			monitor.stopCh = make(chan struct{})
   294  			gb.sharedInformers.Start(gb.stopCh)
   295  			go monitor.Run()
   296  			started++
   297  		}
   298  	}
   299  	logger.V(4).Info("started new monitors", "new", started, "current", len(monitors))
   300  }
   301  
   302  // IsResourceSynced returns true if a monitor exists for the given resource and has synced
   303  func (gb *GraphBuilder) IsResourceSynced(resource schema.GroupVersionResource) bool {
   304  	gb.monitorLock.Lock()
   305  	defer gb.monitorLock.Unlock()
   306  	monitor, ok := gb.monitors[resource]
   307  	return ok && monitor.controller.HasSynced()
   308  }
   309  
   310  // IsSynced returns true if any monitors exist AND all those monitors'
   311  // controllers HasSynced functions return true. This means IsSynced could return
   312  // true at one time, and then later return false if all monitors were
   313  // reconstructed.
   314  func (gb *GraphBuilder) IsSynced(logger klog.Logger) bool {
   315  	gb.monitorLock.Lock()
   316  	defer gb.monitorLock.Unlock()
   317  
   318  	if len(gb.monitors) == 0 {
   319  		logger.V(4).Info("garbage controller monitor not synced: no monitors")
   320  		return false
   321  	}
   322  
   323  	for resource, monitor := range gb.monitors {
   324  		if !monitor.controller.HasSynced() {
   325  			logger.V(4).Info("garbage controller monitor not yet synced", "resource", resource)
   326  			return false
   327  		}
   328  	}
   329  	return true
   330  }
   331  
   332  // Run sets the stop channel and starts monitor execution until stopCh is
   333  // closed. Any running monitors will be stopped before Run returns.
   334  func (gb *GraphBuilder) Run(ctx context.Context) {
   335  	logger := klog.FromContext(ctx)
   336  	logger.Info("Running", "component", "GraphBuilder")
   337  	defer logger.Info("Stopping", "component", "GraphBuilder")
   338  
   339  	// Set up the stop channel.
   340  	gb.monitorLock.Lock()
   341  	gb.stopCh = ctx.Done()
   342  	gb.running = true
   343  	gb.monitorLock.Unlock()
   344  
   345  	// Start monitors and begin change processing until the stop channel is
   346  	// closed.
   347  	gb.startMonitors(logger)
   348  	wait.Until(func() { gb.runProcessGraphChanges(logger) }, 1*time.Second, ctx.Done())
   349  
   350  	// Stop any running monitors.
   351  	gb.monitorLock.Lock()
   352  	defer gb.monitorLock.Unlock()
   353  	monitors := gb.monitors
   354  	stopped := 0
   355  	for _, monitor := range monitors {
   356  		if monitor.stopCh != nil {
   357  			stopped++
   358  			close(monitor.stopCh)
   359  		}
   360  	}
   361  
   362  	// reset monitors so that the graph builder can be safely re-run/synced.
   363  	gb.monitors = nil
   364  	logger.Info("stopped monitors", "stopped", stopped, "total", len(monitors))
   365  }
   366  
   367  var ignoredResources = map[schema.GroupResource]struct{}{
   368  	{Group: "", Resource: "events"}:                {},
   369  	{Group: eventv1.GroupName, Resource: "events"}: {},
   370  }
   371  
   372  // DefaultIgnoredResources returns the default set of resources that the garbage collector controller
   373  // should ignore. This is exposed so downstream integrators can have access to the defaults, and add
   374  // to them as necessary when constructing the controller.
   375  func DefaultIgnoredResources() map[schema.GroupResource]struct{} {
   376  	return ignoredResources
   377  }
   378  
   379  // enqueueVirtualDeleteEvent is used to add a virtual delete event to be processed for virtual nodes
   380  // once it is determined they do not have backing objects in storage
   381  func (gb *GraphBuilder) enqueueVirtualDeleteEvent(ref objectReference) {
   382  	gv, _ := schema.ParseGroupVersion(ref.APIVersion)
   383  	gb.graphChanges.Add(&event{
   384  		virtual:   true,
   385  		eventType: deleteEvent,
   386  		gvk:       gv.WithKind(ref.Kind),
   387  		obj: &metaonly.MetadataOnlyObject{
   388  			TypeMeta:   metav1.TypeMeta{APIVersion: ref.APIVersion, Kind: ref.Kind},
   389  			ObjectMeta: metav1.ObjectMeta{Namespace: ref.Namespace, UID: ref.UID, Name: ref.Name},
   390  		},
   391  	})
   392  }
   393  
   394  // addDependentToOwners adds n to owners' dependents list. If the owner does not
   395  // exist in the gb.uidToNode yet, a "virtual" node will be created to represent
   396  // the owner. The "virtual" node will be enqueued to the attemptToDelete, so that
   397  // attemptToDeleteItem() will verify if the owner exists according to the API server.
   398  func (gb *GraphBuilder) addDependentToOwners(logger klog.Logger, n *node, owners []metav1.OwnerReference) {
   399  	// track if some of the referenced owners already exist in the graph and have been observed,
   400  	// and the dependent's ownerRef does not match their observed coordinates
   401  	hasPotentiallyInvalidOwnerReference := false
   402  
   403  	for _, owner := range owners {
   404  		ownerNode, ok := gb.uidToNode.Read(owner.UID)
   405  		if !ok {
   406  			// Create a "virtual" node in the graph for the owner if it doesn't
   407  			// exist in the graph yet.
   408  			ownerNode = &node{
   409  				identity: objectReference{
   410  					OwnerReference: ownerReferenceCoordinates(owner),
   411  					Namespace:      n.identity.Namespace,
   412  				},
   413  				dependents: make(map[*node]struct{}),
   414  				virtual:    true,
   415  			}
   416  			logger.V(5).Info("add virtual item", "identity", ownerNode.identity)
   417  			gb.uidToNode.Write(ownerNode)
   418  		}
   419  		ownerNode.addDependent(n)
   420  		if !ok {
   421  			// Enqueue the virtual node into attemptToDelete.
   422  			// The garbage processor will enqueue a virtual delete
   423  			// event to delete it from the graph if API server confirms this
   424  			// owner doesn't exist.
   425  			gb.attemptToDelete.Add(ownerNode)
   426  		} else if !hasPotentiallyInvalidOwnerReference {
   427  			ownerIsNamespaced := len(ownerNode.identity.Namespace) > 0
   428  			if ownerIsNamespaced && ownerNode.identity.Namespace != n.identity.Namespace {
   429  				if ownerNode.isObserved() {
   430  					// The owner node has been observed via an informer
   431  					// the dependent's namespace doesn't match the observed owner's namespace, this is definitely wrong.
   432  					// cluster-scoped owners can be referenced as an owner from any namespace or cluster-scoped object.
   433  					logger.V(2).Info("item references an owner but does not match namespaces", "item", n.identity, "owner", ownerNode.identity)
   434  					gb.reportInvalidNamespaceOwnerRef(n, owner.UID)
   435  				}
   436  				hasPotentiallyInvalidOwnerReference = true
   437  			} else if !ownerReferenceMatchesCoordinates(owner, ownerNode.identity.OwnerReference) {
   438  				if ownerNode.isObserved() {
   439  					// The owner node has been observed via an informer
   440  					// n's owner reference doesn't match the observed identity, this might be wrong.
   441  					logger.V(2).Info("item references an owner with coordinates that do not match the observed identity", "item", n.identity, "owner", ownerNode.identity)
   442  				}
   443  				hasPotentiallyInvalidOwnerReference = true
   444  			} else if !ownerIsNamespaced && ownerNode.identity.Namespace != n.identity.Namespace && !ownerNode.isObserved() {
   445  				// the ownerNode is cluster-scoped and virtual, and does not match the child node's namespace.
   446  				// the owner could be a missing instance of a namespaced type incorrectly referenced by a cluster-scoped child (issue #98040).
   447  				// enqueue this child to attemptToDelete to verify parent references.
   448  				hasPotentiallyInvalidOwnerReference = true
   449  			}
   450  		}
   451  	}
   452  
   453  	if hasPotentiallyInvalidOwnerReference {
   454  		// Enqueue the potentially invalid dependent node into attemptToDelete.
   455  		// The garbage processor will verify whether the owner references are dangling
   456  		// and delete the dependent if all owner references are confirmed absent.
   457  		gb.attemptToDelete.Add(n)
   458  	}
   459  }
   460  
   461  func (gb *GraphBuilder) reportInvalidNamespaceOwnerRef(n *node, invalidOwnerUID types.UID) {
   462  	var invalidOwnerRef metav1.OwnerReference
   463  	var found = false
   464  	for _, ownerRef := range n.owners {
   465  		if ownerRef.UID == invalidOwnerUID {
   466  			invalidOwnerRef = ownerRef
   467  			found = true
   468  			break
   469  		}
   470  	}
   471  	if !found {
   472  		return
   473  	}
   474  	ref := &v1.ObjectReference{
   475  		Kind:       n.identity.Kind,
   476  		APIVersion: n.identity.APIVersion,
   477  		Namespace:  n.identity.Namespace,
   478  		Name:       n.identity.Name,
   479  		UID:        n.identity.UID,
   480  	}
   481  	invalidIdentity := objectReference{
   482  		OwnerReference: metav1.OwnerReference{
   483  			Kind:       invalidOwnerRef.Kind,
   484  			APIVersion: invalidOwnerRef.APIVersion,
   485  			Name:       invalidOwnerRef.Name,
   486  			UID:        invalidOwnerRef.UID,
   487  		},
   488  		Namespace: n.identity.Namespace,
   489  	}
   490  	gb.eventRecorder.Eventf(ref, v1.EventTypeWarning, "OwnerRefInvalidNamespace", "ownerRef %s does not exist in namespace %q", invalidIdentity, n.identity.Namespace)
   491  }
   492  
   493  // insertNode insert the node to gb.uidToNode; then it finds all owners as listed
   494  // in n.owners, and adds the node to their dependents list.
   495  func (gb *GraphBuilder) insertNode(logger klog.Logger, n *node) {
   496  	gb.uidToNode.Write(n)
   497  	gb.addDependentToOwners(logger, n, n.owners)
   498  }
   499  
   500  // removeDependentFromOwners remove n from owners' dependents list.
   501  func (gb *GraphBuilder) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) {
   502  	for _, owner := range owners {
   503  		ownerNode, ok := gb.uidToNode.Read(owner.UID)
   504  		if !ok {
   505  			continue
   506  		}
   507  		ownerNode.deleteDependent(n)
   508  	}
   509  }
   510  
   511  // removeNode removes the node from gb.uidToNode, then finds all
   512  // owners as listed in n.owners, and removes n from their dependents list.
   513  func (gb *GraphBuilder) removeNode(n *node) {
   514  	gb.uidToNode.Delete(n.identity.UID)
   515  	gb.removeDependentFromOwners(n, n.owners)
   516  }
   517  
   518  type ownerRefPair struct {
   519  	oldRef metav1.OwnerReference
   520  	newRef metav1.OwnerReference
   521  }
   522  
   523  // TODO: profile this function to see if a naive N^2 algorithm performs better
   524  // when the number of references is small.
   525  func referencesDiffs(old []metav1.OwnerReference, new []metav1.OwnerReference) (added []metav1.OwnerReference, removed []metav1.OwnerReference, changed []ownerRefPair) {
   526  	oldUIDToRef := make(map[string]metav1.OwnerReference)
   527  	for _, value := range old {
   528  		oldUIDToRef[string(value.UID)] = value
   529  	}
   530  	oldUIDSet := sets.StringKeySet(oldUIDToRef)
   531  	for _, value := range new {
   532  		newUID := string(value.UID)
   533  		if oldUIDSet.Has(newUID) {
   534  			if !reflect.DeepEqual(oldUIDToRef[newUID], value) {
   535  				changed = append(changed, ownerRefPair{oldRef: oldUIDToRef[newUID], newRef: value})
   536  			}
   537  			oldUIDSet.Delete(newUID)
   538  		} else {
   539  			added = append(added, value)
   540  		}
   541  	}
   542  	for oldUID := range oldUIDSet {
   543  		removed = append(removed, oldUIDToRef[oldUID])
   544  	}
   545  
   546  	return added, removed, changed
   547  }
   548  
   549  func deletionStartsWithFinalizer(oldObj interface{}, newAccessor metav1.Object, matchingFinalizer string) bool {
   550  	// if the new object isn't being deleted, or doesn't have the finalizer we're interested in, return false
   551  	if !beingDeleted(newAccessor) || !hasFinalizer(newAccessor, matchingFinalizer) {
   552  		return false
   553  	}
   554  
   555  	// if the old object is nil, or wasn't being deleted, or didn't have the finalizer, return true
   556  	if oldObj == nil {
   557  		return true
   558  	}
   559  	oldAccessor, err := meta.Accessor(oldObj)
   560  	if err != nil {
   561  		utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err))
   562  		return false
   563  	}
   564  	return !beingDeleted(oldAccessor) || !hasFinalizer(oldAccessor, matchingFinalizer)
   565  }
   566  
   567  func beingDeleted(accessor metav1.Object) bool {
   568  	return accessor.GetDeletionTimestamp() != nil
   569  }
   570  
   571  func hasDeleteDependentsFinalizer(accessor metav1.Object) bool {
   572  	return hasFinalizer(accessor, metav1.FinalizerDeleteDependents)
   573  }
   574  
   575  func hasOrphanFinalizer(accessor metav1.Object) bool {
   576  	return hasFinalizer(accessor, metav1.FinalizerOrphanDependents)
   577  }
   578  
   579  func hasFinalizer(accessor metav1.Object, matchingFinalizer string) bool {
   580  	finalizers := accessor.GetFinalizers()
   581  	for _, finalizer := range finalizers {
   582  		if finalizer == matchingFinalizer {
   583  			return true
   584  		}
   585  	}
   586  	return false
   587  }
   588  
   589  // this function takes newAccessor directly because the caller already
   590  // instantiates an accessor for the newObj.
   591  func startsWaitingForDependentsDeleted(oldObj interface{}, newAccessor metav1.Object) bool {
   592  	return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerDeleteDependents)
   593  }
   594  
   595  // this function takes newAccessor directly because the caller already
   596  // instantiates an accessor for the newObj.
   597  func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.Object) bool {
   598  	return deletionStartsWithFinalizer(oldObj, newAccessor, metav1.FinalizerOrphanDependents)
   599  }
   600  
   601  // if an blocking ownerReference points to an object gets removed, or gets set to
   602  // "BlockOwnerDeletion=false", add the object to the attemptToDelete queue.
   603  func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(logger klog.Logger, removed []metav1.OwnerReference, changed []ownerRefPair) {
   604  	for _, ref := range removed {
   605  		if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion {
   606  			node, found := gb.uidToNode.Read(ref.UID)
   607  			if !found {
   608  				logger.V(5).Info("cannot find uid in uidToNode", "uid", ref.UID)
   609  				continue
   610  			}
   611  			gb.attemptToDelete.Add(node)
   612  		}
   613  	}
   614  	for _, c := range changed {
   615  		wasBlocked := c.oldRef.BlockOwnerDeletion != nil && *c.oldRef.BlockOwnerDeletion
   616  		isUnblocked := c.newRef.BlockOwnerDeletion == nil || (c.newRef.BlockOwnerDeletion != nil && !*c.newRef.BlockOwnerDeletion)
   617  		if wasBlocked && isUnblocked {
   618  			node, found := gb.uidToNode.Read(c.newRef.UID)
   619  			if !found {
   620  				logger.V(5).Info("cannot find uid in uidToNode", "uid", c.newRef.UID)
   621  				continue
   622  			}
   623  			gb.attemptToDelete.Add(node)
   624  		}
   625  	}
   626  }
   627  
   628  func (gb *GraphBuilder) processTransitions(logger klog.Logger, oldObj interface{}, newAccessor metav1.Object, n *node) {
   629  	if startsWaitingForDependentsOrphaned(oldObj, newAccessor) {
   630  		logger.V(5).Info("add item to attemptToOrphan", "item", n.identity)
   631  		gb.attemptToOrphan.Add(n)
   632  		return
   633  	}
   634  	if startsWaitingForDependentsDeleted(oldObj, newAccessor) {
   635  		logger.V(2).Info("add item to attemptToDelete, because it's waiting for its dependents to be deleted", "item", n.identity)
   636  		// if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here.
   637  		n.markDeletingDependents()
   638  		for dep := range n.dependents {
   639  			gb.attemptToDelete.Add(dep)
   640  		}
   641  		gb.attemptToDelete.Add(n)
   642  	}
   643  }
   644  
   645  func (gb *GraphBuilder) runProcessGraphChanges(logger klog.Logger) {
   646  	for gb.processGraphChanges(logger) {
   647  	}
   648  }
   649  
   650  func identityFromEvent(event *event, accessor metav1.Object) objectReference {
   651  	return objectReference{
   652  		OwnerReference: metav1.OwnerReference{
   653  			APIVersion: event.gvk.GroupVersion().String(),
   654  			Kind:       event.gvk.Kind,
   655  			UID:        accessor.GetUID(),
   656  			Name:       accessor.GetName(),
   657  		},
   658  		Namespace: accessor.GetNamespace(),
   659  	}
   660  }
   661  
   662  // Dequeueing an event from graphChanges, updating graph, populating dirty_queue.
   663  func (gb *GraphBuilder) processGraphChanges(logger klog.Logger) bool {
   664  	item, quit := gb.graphChanges.Get()
   665  	if quit {
   666  		return false
   667  	}
   668  	defer gb.graphChanges.Done(item)
   669  	event, ok := item.(*event)
   670  	if !ok {
   671  		utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item))
   672  		return true
   673  	}
   674  	obj := event.obj
   675  	accessor, err := meta.Accessor(obj)
   676  	if err != nil {
   677  		utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
   678  		return true
   679  	}
   680  
   681  	logger.V(5).Info("GraphBuilder process object",
   682  		"apiVersion", event.gvk.GroupVersion().String(),
   683  		"kind", event.gvk.Kind,
   684  		"object", klog.KObj(accessor),
   685  		"uid", string(accessor.GetUID()),
   686  		"eventType", event.eventType,
   687  		"virtual", event.virtual,
   688  	)
   689  
   690  	// Check if the node already exists
   691  	existingNode, found := gb.uidToNode.Read(accessor.GetUID())
   692  	if found && !event.virtual && !existingNode.isObserved() {
   693  		// this marks the node as having been observed via an informer event
   694  		// 1. this depends on graphChanges only containing add/update events from the actual informer
   695  		// 2. this allows things tracking virtual nodes' existence to stop polling and rely on informer events
   696  		observedIdentity := identityFromEvent(event, accessor)
   697  		if observedIdentity != existingNode.identity {
   698  			// find dependents that don't match the identity we observed
   699  			_, potentiallyInvalidDependents := partitionDependents(existingNode.getDependents(), observedIdentity)
   700  			// add those potentially invalid dependents to the attemptToDelete queue.
   701  			// if their owners are still solid the attemptToDelete will be a no-op.
   702  			// this covers the bad child -> good parent observation sequence.
   703  			// the good parent -> bad child observation sequence is handled in addDependentToOwners
   704  			for _, dep := range potentiallyInvalidDependents {
   705  				if len(observedIdentity.Namespace) > 0 && dep.identity.Namespace != observedIdentity.Namespace {
   706  					// Namespace mismatch, this is definitely wrong
   707  					logger.V(2).Info("item references an owner but does not match namespaces",
   708  						"item", dep.identity,
   709  						"owner", observedIdentity,
   710  					)
   711  					gb.reportInvalidNamespaceOwnerRef(dep, observedIdentity.UID)
   712  				}
   713  				gb.attemptToDelete.Add(dep)
   714  			}
   715  
   716  			// make a copy (so we don't modify the existing node in place), store the observed identity, and replace the virtual node
   717  			logger.V(2).Info("replacing virtual item with observed item",
   718  				"virtual", existingNode.identity,
   719  				"observed", observedIdentity,
   720  			)
   721  			existingNode = existingNode.clone()
   722  			existingNode.identity = observedIdentity
   723  			gb.uidToNode.Write(existingNode)
   724  		}
   725  		existingNode.markObserved()
   726  	}
   727  	switch {
   728  	case (event.eventType == addEvent || event.eventType == updateEvent) && !found:
   729  		newNode := &node{
   730  			identity:           identityFromEvent(event, accessor),
   731  			dependents:         make(map[*node]struct{}),
   732  			owners:             accessor.GetOwnerReferences(),
   733  			deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor),
   734  			beingDeleted:       beingDeleted(accessor),
   735  		}
   736  		gb.insertNode(logger, newNode)
   737  		// the underlying delta_fifo may combine a creation and a deletion into
   738  		// one event, so we need to further process the event.
   739  		gb.processTransitions(logger, event.oldObj, accessor, newNode)
   740  	case (event.eventType == addEvent || event.eventType == updateEvent) && found:
   741  		// handle changes in ownerReferences
   742  		added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
   743  		if len(added) != 0 || len(removed) != 0 || len(changed) != 0 {
   744  			// check if the changed dependency graph unblock owners that are
   745  			// waiting for the deletion of their dependents.
   746  			gb.addUnblockedOwnersToDeleteQueue(logger, removed, changed)
   747  			// update the node itself
   748  			existingNode.owners = accessor.GetOwnerReferences()
   749  			// Add the node to its new owners' dependent lists.
   750  			gb.addDependentToOwners(logger, existingNode, added)
   751  			// remove the node from the dependent list of node that are no longer in
   752  			// the node's owners list.
   753  			gb.removeDependentFromOwners(existingNode, removed)
   754  		}
   755  
   756  		if beingDeleted(accessor) {
   757  			existingNode.markBeingDeleted()
   758  		}
   759  		gb.processTransitions(logger, event.oldObj, accessor, existingNode)
   760  	case event.eventType == deleteEvent:
   761  		if !found {
   762  			logger.V(5).Info("item doesn't exist in the graph, this shouldn't happen",
   763  				"item", accessor.GetUID(),
   764  			)
   765  			return true
   766  		}
   767  
   768  		removeExistingNode := true
   769  
   770  		if event.virtual {
   771  			// this is a virtual delete event, not one observed from an informer
   772  			deletedIdentity := identityFromEvent(event, accessor)
   773  			if existingNode.virtual {
   774  
   775  				// our existing node is also virtual, we're not sure of its coordinates.
   776  				// see if any dependents reference this owner with coordinates other than the one we got a virtual delete event for.
   777  				if matchingDependents, nonmatchingDependents := partitionDependents(existingNode.getDependents(), deletedIdentity); len(nonmatchingDependents) > 0 {
   778  
   779  					// some of our dependents disagree on our coordinates, so do not remove the existing virtual node from the graph
   780  					removeExistingNode = false
   781  
   782  					if len(matchingDependents) > 0 {
   783  						// mark the observed deleted identity as absent
   784  						gb.absentOwnerCache.Add(deletedIdentity)
   785  						// attempt to delete dependents that do match the verified deleted identity
   786  						for _, dep := range matchingDependents {
   787  							gb.attemptToDelete.Add(dep)
   788  						}
   789  					}
   790  
   791  					// if the delete event verified existingNode.identity doesn't exist...
   792  					if existingNode.identity == deletedIdentity {
   793  						// find an alternative identity our nonmatching dependents refer to us by
   794  						replacementIdentity := getAlternateOwnerIdentity(nonmatchingDependents, deletedIdentity)
   795  						if replacementIdentity != nil {
   796  							// replace the existing virtual node with a new one with one of our other potential identities
   797  							replacementNode := existingNode.clone()
   798  							replacementNode.identity = *replacementIdentity
   799  							gb.uidToNode.Write(replacementNode)
   800  							// and add the new virtual node back to the attemptToDelete queue
   801  							gb.attemptToDelete.AddRateLimited(replacementNode)
   802  						}
   803  					}
   804  				}
   805  
   806  			} else if existingNode.identity != deletedIdentity {
   807  				// do not remove the existing real node from the graph based on a virtual delete event
   808  				removeExistingNode = false
   809  
   810  				// our existing node which was observed via informer disagrees with the virtual delete event's coordinates
   811  				matchingDependents, _ := partitionDependents(existingNode.getDependents(), deletedIdentity)
   812  
   813  				if len(matchingDependents) > 0 {
   814  					// mark the observed deleted identity as absent
   815  					gb.absentOwnerCache.Add(deletedIdentity)
   816  					// attempt to delete dependents that do match the verified deleted identity
   817  					for _, dep := range matchingDependents {
   818  						gb.attemptToDelete.Add(dep)
   819  					}
   820  				}
   821  			}
   822  		}
   823  
   824  		if removeExistingNode {
   825  			// removeNode updates the graph
   826  			gb.removeNode(existingNode)
   827  			existingNode.dependentsLock.RLock()
   828  			defer existingNode.dependentsLock.RUnlock()
   829  			if len(existingNode.dependents) > 0 {
   830  				gb.absentOwnerCache.Add(identityFromEvent(event, accessor))
   831  			}
   832  			for dep := range existingNode.dependents {
   833  				gb.attemptToDelete.Add(dep)
   834  			}
   835  			for _, owner := range existingNode.owners {
   836  				ownerNode, found := gb.uidToNode.Read(owner.UID)
   837  				if !found || !ownerNode.isDeletingDependents() {
   838  					continue
   839  				}
   840  				// this is to let attempToDeleteItem check if all the owner's
   841  				// dependents are deleted, if so, the owner will be deleted.
   842  				gb.attemptToDelete.Add(ownerNode)
   843  			}
   844  		}
   845  	}
   846  	return true
   847  }
   848  
   849  // partitionDependents divides the provided dependents into a list which have an ownerReference matching the provided identity,
   850  // and ones which have an ownerReference for the given uid that do not match the provided identity.
   851  // Note that a dependent with multiple ownerReferences for the target uid can end up in both lists.
   852  func partitionDependents(dependents []*node, matchOwnerIdentity objectReference) (matching, nonmatching []*node) {
   853  	ownerIsNamespaced := len(matchOwnerIdentity.Namespace) > 0
   854  	for i := range dependents {
   855  		dep := dependents[i]
   856  		foundMatch := false
   857  		foundMismatch := false
   858  		// if the dep namespace matches or the owner is cluster scoped ...
   859  		if ownerIsNamespaced && matchOwnerIdentity.Namespace != dep.identity.Namespace {
   860  			// all references to the parent do not match, since the dependent namespace does not match the owner
   861  			foundMismatch = true
   862  		} else {
   863  			for _, ownerRef := range dep.owners {
   864  				// ... find the ownerRef with a matching uid ...
   865  				if ownerRef.UID == matchOwnerIdentity.UID {
   866  					// ... and check if it matches all coordinates
   867  					if ownerReferenceMatchesCoordinates(ownerRef, matchOwnerIdentity.OwnerReference) {
   868  						foundMatch = true
   869  					} else {
   870  						foundMismatch = true
   871  					}
   872  				}
   873  			}
   874  		}
   875  
   876  		if foundMatch {
   877  			matching = append(matching, dep)
   878  		}
   879  		if foundMismatch {
   880  			nonmatching = append(nonmatching, dep)
   881  		}
   882  	}
   883  	return matching, nonmatching
   884  }
   885  
   886  func referenceLessThan(a, b objectReference) bool {
   887  	// kind/apiVersion are more significant than namespace,
   888  	// so that we get coherent ordering between kinds
   889  	// regardless of whether they are cluster-scoped or namespaced
   890  	if a.Kind != b.Kind {
   891  		return a.Kind < b.Kind
   892  	}
   893  	if a.APIVersion != b.APIVersion {
   894  		return a.APIVersion < b.APIVersion
   895  	}
   896  	// namespace is more significant than name
   897  	if a.Namespace != b.Namespace {
   898  		return a.Namespace < b.Namespace
   899  	}
   900  	// name is more significant than uid
   901  	if a.Name != b.Name {
   902  		return a.Name < b.Name
   903  	}
   904  	// uid is included for completeness, but is expected to be identical
   905  	// when getting alternate identities for an owner since they are keyed by uid
   906  	if a.UID != b.UID {
   907  		return a.UID < b.UID
   908  	}
   909  	return false
   910  }
   911  
   912  // getAlternateOwnerIdentity searches deps for owner references which match
   913  // verifiedAbsentIdentity.UID but differ in apiVersion/kind/name or namespace.
   914  // The first that follows verifiedAbsentIdentity (according to referenceLessThan) is returned.
   915  // If none follow verifiedAbsentIdentity, the first (according to referenceLessThan) is returned.
   916  // If no alternate identities are found, nil is returned.
   917  func getAlternateOwnerIdentity(deps []*node, verifiedAbsentIdentity objectReference) *objectReference {
   918  	absentIdentityIsClusterScoped := len(verifiedAbsentIdentity.Namespace) == 0
   919  
   920  	seenAlternates := map[objectReference]bool{verifiedAbsentIdentity: true}
   921  
   922  	// keep track of the first alternate reference (according to referenceLessThan)
   923  	var first *objectReference
   924  	// keep track of the first reference following verifiedAbsentIdentity (according to referenceLessThan)
   925  	var firstFollowing *objectReference
   926  
   927  	for _, dep := range deps {
   928  		for _, ownerRef := range dep.owners {
   929  			if ownerRef.UID != verifiedAbsentIdentity.UID {
   930  				// skip references that aren't the uid we care about
   931  				continue
   932  			}
   933  
   934  			if ownerReferenceMatchesCoordinates(ownerRef, verifiedAbsentIdentity.OwnerReference) {
   935  				if absentIdentityIsClusterScoped || verifiedAbsentIdentity.Namespace == dep.identity.Namespace {
   936  					// skip references that exactly match verifiedAbsentIdentity
   937  					continue
   938  				}
   939  			}
   940  
   941  			ref := objectReference{OwnerReference: ownerReferenceCoordinates(ownerRef), Namespace: dep.identity.Namespace}
   942  			if absentIdentityIsClusterScoped && ref.APIVersion == verifiedAbsentIdentity.APIVersion && ref.Kind == verifiedAbsentIdentity.Kind {
   943  				// we know this apiVersion/kind is cluster-scoped because of verifiedAbsentIdentity,
   944  				// so clear the namespace from the alternate identity
   945  				ref.Namespace = ""
   946  			}
   947  
   948  			if seenAlternates[ref] {
   949  				// skip references we've already seen
   950  				continue
   951  			}
   952  			seenAlternates[ref] = true
   953  
   954  			if first == nil || referenceLessThan(ref, *first) {
   955  				// this alternate comes first lexically
   956  				first = &ref
   957  			}
   958  			if referenceLessThan(verifiedAbsentIdentity, ref) && (firstFollowing == nil || referenceLessThan(ref, *firstFollowing)) {
   959  				// this alternate is the first following verifiedAbsentIdentity lexically
   960  				firstFollowing = &ref
   961  			}
   962  		}
   963  	}
   964  
   965  	// return the first alternate identity following the verified absent identity, if there is one
   966  	if firstFollowing != nil {
   967  		return firstFollowing
   968  	}
   969  	// otherwise return the first alternate identity
   970  	return first
   971  }
   972  
   973  func (gb *GraphBuilder) GetGraphResources() (
   974  	attemptToDelete workqueue.RateLimitingInterface,
   975  	attemptToOrphan workqueue.RateLimitingInterface,
   976  	absentOwnerCache *ReferenceCache,
   977  ) {
   978  	return gb.attemptToDelete, gb.attemptToOrphan, gb.absentOwnerCache
   979  }
   980  
   981  type Monitor struct {
   982  	Store      cache.Store
   983  	Controller cache.Controller
   984  }
   985  
   986  // GetMonitor returns a monitor for the given resource.
   987  // If the monitor is not synced, it will return an error and the monitor to allow the caller to decide whether to retry.
   988  // If the monitor is not found, it will return only an error.
   989  func (gb *GraphBuilder) GetMonitor(ctx context.Context, resource schema.GroupVersionResource) (*Monitor, error) {
   990  	gb.monitorLock.RLock()
   991  	defer gb.monitorLock.RUnlock()
   992  
   993  	var monitor *monitor
   994  	if m, ok := gb.monitors[resource]; ok {
   995  		monitor = m
   996  	} else {
   997  		for monitorGVR, m := range gb.monitors {
   998  			if monitorGVR.Group == resource.Group && monitorGVR.Resource == resource.Resource {
   999  				monitor = m
  1000  				break
  1001  			}
  1002  		}
  1003  	}
  1004  
  1005  	if monitor == nil {
  1006  		return nil, fmt.Errorf("no monitor found for resource %s", resource.String())
  1007  	}
  1008  
  1009  	resourceMonitor := &Monitor{
  1010  		Store:      monitor.store,
  1011  		Controller: monitor.controller,
  1012  	}
  1013  
  1014  	if !cache.WaitForNamedCacheSync(
  1015  		gb.Name(),
  1016  		ctx.Done(),
  1017  		func() bool {
  1018  			return monitor.controller.HasSynced()
  1019  		},
  1020  	) {
  1021  		// returning monitor to allow the caller to decide whether to retry as it can be synced later
  1022  		return resourceMonitor, fmt.Errorf("dependency graph for resource %s is not synced", resource.String())
  1023  	}
  1024  
  1025  	return resourceMonitor, nil
  1026  }
  1027  
  1028  func (gb *GraphBuilder) Name() string {
  1029  	return "dependencygraphbuilder"
  1030  }
  1031  

View as plain text