...

Source file src/k8s.io/kubernetes/pkg/controller/garbagecollector/garbagecollector.go

Documentation: k8s.io/kubernetes/pkg/controller/garbagecollector

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package garbagecollector
    18  
    19  import (
    20  	"context"
    21  	goerrors "errors"
    22  	"fmt"
    23  	"k8s.io/controller-manager/pkg/informerfactory"
    24  	"reflect"
    25  	"sync"
    26  	"time"
    27  
    28  	"k8s.io/apimachinery/pkg/api/errors"
    29  	"k8s.io/apimachinery/pkg/api/meta"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/runtime/schema"
    32  	"k8s.io/apimachinery/pkg/types"
    33  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    34  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	"k8s.io/client-go/discovery"
    38  	clientset "k8s.io/client-go/kubernetes" // import known versions
    39  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    40  	"k8s.io/client-go/metadata"
    41  	"k8s.io/client-go/tools/cache"
    42  	"k8s.io/client-go/tools/record"
    43  	"k8s.io/client-go/util/workqueue"
    44  	"k8s.io/controller-manager/controller"
    45  	"k8s.io/klog/v2"
    46  	c "k8s.io/kubernetes/pkg/controller"
    47  	"k8s.io/kubernetes/pkg/controller/garbagecollector/metrics"
    48  )
    49  
    50  // ResourceResyncTime defines the resync period of the garbage collector's informers.
    51  const ResourceResyncTime time.Duration = 0
    52  
    53  // GarbageCollector runs reflectors to watch for changes of managed API
    54  // objects, funnels the results to a single-threaded dependencyGraphBuilder,
    55  // which builds a graph caching the dependencies among objects. Triggered by the
    56  // graph changes, the dependencyGraphBuilder enqueues objects that can
    57  // potentially be garbage-collected to the `attemptToDelete` queue, and enqueues
    58  // objects whose dependents need to be orphaned to the `attemptToOrphan` queue.
    59  // The GarbageCollector has workers who consume these two queues, send requests
    60  // to the API server to delete/update the objects accordingly.
    61  // Note that having the dependencyGraphBuilder notify the garbage collector
    62  // ensures that the garbage collector operates with a graph that is at least as
    63  // up to date as the notification is sent.
    64  type GarbageCollector struct {
    65  	restMapper     meta.ResettableRESTMapper
    66  	metadataClient metadata.Interface
    67  	// garbage collector attempts to delete the items in attemptToDelete queue when the time is ripe.
    68  	attemptToDelete workqueue.RateLimitingInterface
    69  	// garbage collector attempts to orphan the dependents of the items in the attemptToOrphan queue, then deletes the items.
    70  	attemptToOrphan        workqueue.RateLimitingInterface
    71  	dependencyGraphBuilder *GraphBuilder
    72  	// GC caches the owners that do not exist according to the API server.
    73  	absentOwnerCache *ReferenceCache
    74  
    75  	kubeClient       clientset.Interface
    76  	eventBroadcaster record.EventBroadcaster
    77  
    78  	workerLock sync.RWMutex
    79  }
    80  
    81  var _ controller.Interface = (*GarbageCollector)(nil)
    82  var _ controller.Debuggable = (*GarbageCollector)(nil)
    83  
    84  // NewGarbageCollector creates a new GarbageCollector.
    85  func NewGarbageCollector(
    86  	ctx context.Context,
    87  	kubeClient clientset.Interface,
    88  	metadataClient metadata.Interface,
    89  	mapper meta.ResettableRESTMapper,
    90  	ignoredResources map[schema.GroupResource]struct{},
    91  	sharedInformers informerfactory.InformerFactory,
    92  	informersStarted <-chan struct{},
    93  ) (*GarbageCollector, error) {
    94  	graphBuilder := NewDependencyGraphBuilder(ctx, metadataClient, mapper, ignoredResources, sharedInformers, informersStarted)
    95  	return NewComposedGarbageCollector(ctx, kubeClient, metadataClient, mapper, graphBuilder)
    96  }
    97  
    98  func NewComposedGarbageCollector(
    99  	ctx context.Context,
   100  	kubeClient clientset.Interface,
   101  	metadataClient metadata.Interface,
   102  	mapper meta.ResettableRESTMapper,
   103  	graphBuilder *GraphBuilder,
   104  ) (*GarbageCollector, error) {
   105  	attemptToDelete, attemptToOrphan, absentOwnerCache := graphBuilder.GetGraphResources()
   106  
   107  	gc := &GarbageCollector{
   108  		metadataClient:         metadataClient,
   109  		restMapper:             mapper,
   110  		attemptToDelete:        attemptToDelete,
   111  		attemptToOrphan:        attemptToOrphan,
   112  		absentOwnerCache:       absentOwnerCache,
   113  		kubeClient:             kubeClient,
   114  		eventBroadcaster:       graphBuilder.eventBroadcaster,
   115  		dependencyGraphBuilder: graphBuilder,
   116  	}
   117  
   118  	metrics.Register()
   119  
   120  	return gc, nil
   121  }
   122  
   123  // resyncMonitors starts or stops resource monitors as needed to ensure that all
   124  // (and only) those resources present in the map are monitored.
   125  func (gc *GarbageCollector) resyncMonitors(logger klog.Logger, deletableResources map[schema.GroupVersionResource]struct{}) error {
   126  	if err := gc.dependencyGraphBuilder.syncMonitors(logger, deletableResources); err != nil {
   127  		return err
   128  	}
   129  	gc.dependencyGraphBuilder.startMonitors(logger)
   130  	return nil
   131  }
   132  
   133  // Run starts garbage collector workers.
   134  func (gc *GarbageCollector) Run(ctx context.Context, workers int) {
   135  	defer utilruntime.HandleCrash()
   136  	defer gc.attemptToDelete.ShutDown()
   137  	defer gc.attemptToOrphan.ShutDown()
   138  	defer gc.dependencyGraphBuilder.graphChanges.ShutDown()
   139  
   140  	// Start events processing pipeline.
   141  	gc.eventBroadcaster.StartStructuredLogging(3)
   142  	gc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: gc.kubeClient.CoreV1().Events("")})
   143  	defer gc.eventBroadcaster.Shutdown()
   144  
   145  	logger := klog.FromContext(ctx)
   146  	logger.Info("Starting controller", "controller", "garbagecollector")
   147  	defer logger.Info("Shutting down controller", "controller", "garbagecollector")
   148  
   149  	go gc.dependencyGraphBuilder.Run(ctx)
   150  
   151  	if !cache.WaitForNamedCacheSync("garbage collector", ctx.Done(), func() bool {
   152  		return gc.dependencyGraphBuilder.IsSynced(logger)
   153  	}) {
   154  		return
   155  	}
   156  
   157  	logger.Info("All resource monitors have synced. Proceeding to collect garbage")
   158  
   159  	// gc workers
   160  	for i := 0; i < workers; i++ {
   161  		go wait.UntilWithContext(ctx, gc.runAttemptToDeleteWorker, 1*time.Second)
   162  		go wait.Until(func() { gc.runAttemptToOrphanWorker(logger) }, 1*time.Second, ctx.Done())
   163  	}
   164  
   165  	<-ctx.Done()
   166  }
   167  
   168  // Sync periodically resyncs the garbage collector when new resources are
   169  // observed from discovery. When new resources are detected, Sync will stop all
   170  // GC workers, reset gc.restMapper, and resync the monitors.
   171  //
   172  // Note that discoveryClient should NOT be shared with gc.restMapper, otherwise
   173  // the mapper's underlying discovery client will be unnecessarily reset during
   174  // the course of detecting new resources.
   175  func (gc *GarbageCollector) Sync(ctx context.Context, discoveryClient discovery.ServerResourcesInterface, period time.Duration) {
   176  	oldResources := make(map[schema.GroupVersionResource]struct{})
   177  	wait.UntilWithContext(ctx, func(ctx context.Context) {
   178  		logger := klog.FromContext(ctx)
   179  
   180  		// Get the current resource list from discovery.
   181  		newResources, err := GetDeletableResources(logger, discoveryClient)
   182  
   183  		if len(newResources) == 0 {
   184  			logger.V(2).Info("no resources reported by discovery, skipping garbage collector sync")
   185  			metrics.GarbageCollectorResourcesSyncError.Inc()
   186  			return
   187  		}
   188  		if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
   189  			// In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
   190  			for k, v := range oldResources {
   191  				if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) {
   192  					newResources[k] = v
   193  				}
   194  			}
   195  		}
   196  
   197  		// Decide whether discovery has reported a change.
   198  		if reflect.DeepEqual(oldResources, newResources) {
   199  			logger.V(5).Info("no resource updates from discovery, skipping garbage collector sync")
   200  			return
   201  		}
   202  
   203  		// Ensure workers are paused to avoid processing events before informers
   204  		// have resynced.
   205  		gc.workerLock.Lock()
   206  		defer gc.workerLock.Unlock()
   207  
   208  		// Once we get here, we should not unpause workers until we've successfully synced
   209  		attempt := 0
   210  		wait.PollImmediateUntilWithContext(ctx, 100*time.Millisecond, func(ctx context.Context) (bool, error) {
   211  			attempt++
   212  
   213  			// On a reattempt, check if available resources have changed
   214  			if attempt > 1 {
   215  				newResources, err = GetDeletableResources(logger, discoveryClient)
   216  
   217  				if len(newResources) == 0 {
   218  					logger.V(2).Info("no resources reported by discovery", "attempt", attempt)
   219  					metrics.GarbageCollectorResourcesSyncError.Inc()
   220  					return false, nil
   221  				}
   222  				if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(err); isLookupFailure {
   223  					// In partial discovery cases, preserve existing synced informers for resources in the failed groups, so resyncMonitors will only add informers for newly seen resources
   224  					for k, v := range oldResources {
   225  						if _, failed := groupLookupFailures[k.GroupVersion()]; failed && gc.dependencyGraphBuilder.IsResourceSynced(k) {
   226  							newResources[k] = v
   227  						}
   228  					}
   229  				}
   230  			}
   231  
   232  			logger.V(2).Info(
   233  				"syncing garbage collector with updated resources from discovery",
   234  				"attempt", attempt,
   235  				"diff", printDiff(oldResources, newResources),
   236  			)
   237  
   238  			// Resetting the REST mapper will also invalidate the underlying discovery
   239  			// client. This is a leaky abstraction and assumes behavior about the REST
   240  			// mapper, but we'll deal with it for now.
   241  			gc.restMapper.Reset()
   242  			logger.V(4).Info("reset restmapper")
   243  
   244  			// Perform the monitor resync and wait for controllers to report cache sync.
   245  			//
   246  			// NOTE: It's possible that newResources will diverge from the resources
   247  			// discovered by restMapper during the call to Reset, since they are
   248  			// distinct discovery clients invalidated at different times. For example,
   249  			// newResources may contain resources not returned in the restMapper's
   250  			// discovery call if the resources appeared in-between the calls. In that
   251  			// case, the restMapper will fail to map some of newResources until the next
   252  			// attempt.
   253  			if err := gc.resyncMonitors(logger, newResources); err != nil {
   254  				utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
   255  				metrics.GarbageCollectorResourcesSyncError.Inc()
   256  				return false, nil
   257  			}
   258  			logger.V(4).Info("resynced monitors")
   259  
   260  			// wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
   261  			// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
   262  			// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
   263  			// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
   264  			// note that workers stay paused until we successfully resync.
   265  			if !cache.WaitForNamedCacheSync("garbage collector", waitForStopOrTimeout(ctx.Done(), period), func() bool {
   266  				return gc.dependencyGraphBuilder.IsSynced(logger)
   267  			}) {
   268  				utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
   269  				metrics.GarbageCollectorResourcesSyncError.Inc()
   270  				return false, nil
   271  			}
   272  
   273  			// success, break out of the loop
   274  			return true, nil
   275  		})
   276  
   277  		// Finally, keep track of our new state. Do this after all preceding steps
   278  		// have succeeded to ensure we'll retry on subsequent syncs if an error
   279  		// occurred.
   280  		oldResources = newResources
   281  		logger.V(2).Info("synced garbage collector")
   282  	}, period)
   283  }
   284  
   285  // printDiff returns a human-readable summary of what resources were added and removed
   286  func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
   287  	removed := sets.NewString()
   288  	for oldResource := range oldResources {
   289  		if _, ok := newResources[oldResource]; !ok {
   290  			removed.Insert(fmt.Sprintf("%+v", oldResource))
   291  		}
   292  	}
   293  	added := sets.NewString()
   294  	for newResource := range newResources {
   295  		if _, ok := oldResources[newResource]; !ok {
   296  			added.Insert(fmt.Sprintf("%+v", newResource))
   297  		}
   298  	}
   299  	return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
   300  }
   301  
   302  // waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
   303  func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
   304  	stopChWithTimeout := make(chan struct{})
   305  	go func() {
   306  		select {
   307  		case <-stopCh:
   308  		case <-time.After(timeout):
   309  		}
   310  		close(stopChWithTimeout)
   311  	}()
   312  	return stopChWithTimeout
   313  }
   314  
   315  // IsSynced returns true if dependencyGraphBuilder is synced.
   316  func (gc *GarbageCollector) IsSynced(logger klog.Logger) bool {
   317  	return gc.dependencyGraphBuilder.IsSynced(logger)
   318  }
   319  
   320  func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) {
   321  	for gc.processAttemptToDeleteWorker(ctx) {
   322  	}
   323  }
   324  
   325  var enqueuedVirtualDeleteEventErr = goerrors.New("enqueued virtual delete event")
   326  
   327  var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objects cannot refer to namespaced owners")
   328  
   329  func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool {
   330  	item, quit := gc.attemptToDelete.Get()
   331  	gc.workerLock.RLock()
   332  	defer gc.workerLock.RUnlock()
   333  	if quit {
   334  		return false
   335  	}
   336  	defer gc.attemptToDelete.Done(item)
   337  
   338  	action := gc.attemptToDeleteWorker(ctx, item)
   339  	switch action {
   340  	case forgetItem:
   341  		gc.attemptToDelete.Forget(item)
   342  	case requeueItem:
   343  		gc.attemptToDelete.AddRateLimited(item)
   344  	}
   345  
   346  	return true
   347  }
   348  
   349  type workQueueItemAction int
   350  
   351  const (
   352  	requeueItem = iota
   353  	forgetItem
   354  )
   355  
   356  func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item interface{}) workQueueItemAction {
   357  	n, ok := item.(*node)
   358  	if !ok {
   359  		utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
   360  		return forgetItem
   361  	}
   362  
   363  	logger := klog.FromContext(ctx)
   364  
   365  	if !n.isObserved() {
   366  		nodeFromGraph, existsInGraph := gc.dependencyGraphBuilder.uidToNode.Read(n.identity.UID)
   367  		if !existsInGraph {
   368  			// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
   369  			// and in the meantime a deletion of the real object associated with that uid was observed
   370  			logger.V(5).Info("item no longer in the graph, skipping attemptToDeleteItem", "item", n.identity)
   371  			return forgetItem
   372  		}
   373  		if nodeFromGraph.isObserved() {
   374  			// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
   375  			// and in the meantime the real object associated with that uid was observed
   376  			logger.V(5).Info("item no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", "item", n.identity)
   377  			return forgetItem
   378  		}
   379  	}
   380  
   381  	err := gc.attemptToDeleteItem(ctx, n)
   382  	if err == enqueuedVirtualDeleteEventErr {
   383  		// a virtual event was produced and will be handled by processGraphChanges, no need to requeue this node
   384  		return forgetItem
   385  	} else if err == namespacedOwnerOfClusterScopedObjectErr {
   386  		// a cluster-scoped object referring to a namespaced owner is an error that will not resolve on retry, no need to requeue this node
   387  		return forgetItem
   388  	} else if err != nil {
   389  		if _, ok := err.(*restMappingError); ok {
   390  			// There are at least two ways this can happen:
   391  			// 1. The reference is to an object of a custom type that has not yet been
   392  			//    recognized by gc.restMapper (this is a transient error).
   393  			// 2. The reference is to an invalid group/version. We don't currently
   394  			//    have a way to distinguish this from a valid type we will recognize
   395  			//    after the next discovery sync.
   396  			// For now, record the error and retry.
   397  			logger.V(5).Error(err, "error syncing item", "item", n.identity)
   398  		} else {
   399  			utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
   400  		}
   401  		// retry if garbage collection of an object failed.
   402  		return requeueItem
   403  	} else if !n.isObserved() {
   404  		// requeue if item hasn't been observed via an informer event yet.
   405  		// otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
   406  		// see https://issue.k8s.io/56121
   407  		logger.V(5).Info("item hasn't been observed via informer yet", "item", n.identity)
   408  		return requeueItem
   409  	}
   410  
   411  	return forgetItem
   412  }
   413  
   414  // isDangling check if a reference is pointing to an object that doesn't exist.
   415  // If isDangling looks up the referenced object at the API server, it also
   416  // returns its latest state.
   417  func (gc *GarbageCollector) isDangling(ctx context.Context, reference metav1.OwnerReference, item *node) (
   418  	dangling bool, owner *metav1.PartialObjectMetadata, err error) {
   419  
   420  	logger := klog.FromContext(ctx)
   421  	// check for recorded absent cluster-scoped parent
   422  	absentOwnerCacheKey := objectReference{OwnerReference: ownerReferenceCoordinates(reference)}
   423  	if gc.absentOwnerCache.Has(absentOwnerCacheKey) {
   424  		logger.V(5).Info("according to the absentOwnerCache, item's owner does not exist",
   425  			"item", item.identity,
   426  			"owner", reference,
   427  		)
   428  		return true, nil, nil
   429  	}
   430  
   431  	// check for recorded absent namespaced parent
   432  	absentOwnerCacheKey.Namespace = item.identity.Namespace
   433  	if gc.absentOwnerCache.Has(absentOwnerCacheKey) {
   434  		logger.V(5).Info("according to the absentOwnerCache, item's owner does not exist in namespace",
   435  			"item", item.identity,
   436  			"owner", reference,
   437  		)
   438  		return true, nil, nil
   439  	}
   440  
   441  	// TODO: we need to verify the reference resource is supported by the
   442  	// system. If it's not a valid resource, the garbage collector should i)
   443  	// ignore the reference when decide if the object should be deleted, and
   444  	// ii) should update the object to remove such references. This is to
   445  	// prevent objects having references to an old resource from being
   446  	// deleted during a cluster upgrade.
   447  	resource, namespaced, err := gc.apiResource(reference.APIVersion, reference.Kind)
   448  	if err != nil {
   449  		return false, nil, err
   450  	}
   451  	if !namespaced {
   452  		absentOwnerCacheKey.Namespace = ""
   453  	}
   454  
   455  	if len(item.identity.Namespace) == 0 && namespaced {
   456  		// item is a cluster-scoped object referring to a namespace-scoped owner, which is not valid.
   457  		// return a marker error, rather than retrying on the lookup failure forever.
   458  		logger.V(2).Info("item is cluster-scoped, but refers to a namespaced owner",
   459  			"item", item.identity,
   460  			"owner", reference,
   461  		)
   462  		return false, nil, namespacedOwnerOfClusterScopedObjectErr
   463  	}
   464  
   465  	// TODO: It's only necessary to talk to the API server if the owner node
   466  	// is a "virtual" node. The local graph could lag behind the real
   467  	// status, but in practice, the difference is small.
   468  	owner, err = gc.metadataClient.Resource(resource).Namespace(resourceDefaultNamespace(namespaced, item.identity.Namespace)).Get(ctx, reference.Name, metav1.GetOptions{})
   469  	switch {
   470  	case errors.IsNotFound(err):
   471  		gc.absentOwnerCache.Add(absentOwnerCacheKey)
   472  		logger.V(5).Info("item's owner is not found",
   473  			"item", item.identity,
   474  			"owner", reference,
   475  		)
   476  		return true, nil, nil
   477  	case err != nil:
   478  		return false, nil, err
   479  	}
   480  
   481  	if owner.GetUID() != reference.UID {
   482  		logger.V(5).Info("item's owner is not found, UID mismatch",
   483  			"item", item.identity,
   484  			"owner", reference,
   485  		)
   486  		gc.absentOwnerCache.Add(absentOwnerCacheKey)
   487  		return true, nil, nil
   488  	}
   489  	return false, owner, nil
   490  }
   491  
   492  // classify the latestReferences to three categories:
   493  // solid: the owner exists, and is not "waitingForDependentsDeletion"
   494  // dangling: the owner does not exist
   495  // waitingForDependentsDeletion: the owner exists, its deletionTimestamp is non-nil, and it has
   496  // FinalizerDeletingDependents
   497  // This function communicates with the server.
   498  func (gc *GarbageCollector) classifyReferences(ctx context.Context, item *node, latestReferences []metav1.OwnerReference) (
   499  	solid, dangling, waitingForDependentsDeletion []metav1.OwnerReference, err error) {
   500  	for _, reference := range latestReferences {
   501  		isDangling, owner, err := gc.isDangling(ctx, reference, item)
   502  		if err != nil {
   503  			return nil, nil, nil, err
   504  		}
   505  		if isDangling {
   506  			dangling = append(dangling, reference)
   507  			continue
   508  		}
   509  
   510  		ownerAccessor, err := meta.Accessor(owner)
   511  		if err != nil {
   512  			return nil, nil, nil, err
   513  		}
   514  		if ownerAccessor.GetDeletionTimestamp() != nil && hasDeleteDependentsFinalizer(ownerAccessor) {
   515  			waitingForDependentsDeletion = append(waitingForDependentsDeletion, reference)
   516  		} else {
   517  			solid = append(solid, reference)
   518  		}
   519  	}
   520  	return solid, dangling, waitingForDependentsDeletion, nil
   521  }
   522  
   523  func ownerRefsToUIDs(refs []metav1.OwnerReference) []types.UID {
   524  	var ret []types.UID
   525  	for _, ref := range refs {
   526  		ret = append(ret, ref.UID)
   527  	}
   528  	return ret
   529  }
   530  
   531  // attemptToDeleteItem looks up the live API object associated with the node,
   532  // and issues a delete IFF the uid matches, the item is not blocked on deleting dependents,
   533  // and all owner references are dangling.
   534  //
   535  // if the API get request returns a NotFound error, or the retrieved item's uid does not match,
   536  // a virtual delete event for the node is enqueued and enqueuedVirtualDeleteEventErr is returned.
   537  func (gc *GarbageCollector) attemptToDeleteItem(ctx context.Context, item *node) error {
   538  	logger := klog.FromContext(ctx)
   539  
   540  	logger.V(2).Info("Processing item",
   541  		"item", item.identity,
   542  		"virtual", !item.isObserved(),
   543  	)
   544  
   545  	// "being deleted" is an one-way trip to the final deletion. We'll just wait for the final deletion, and then process the object's dependents.
   546  	if item.isBeingDeleted() && !item.isDeletingDependents() {
   547  		logger.V(5).Info("processing item returned at once, because its DeletionTimestamp is non-nil",
   548  			"item", item.identity,
   549  		)
   550  		return nil
   551  	}
   552  	// TODO: It's only necessary to talk to the API server if this is a
   553  	// "virtual" node. The local graph could lag behind the real status, but in
   554  	// practice, the difference is small.
   555  	latest, err := gc.getObject(item.identity)
   556  	switch {
   557  	case errors.IsNotFound(err):
   558  		// the GraphBuilder can add "virtual" node for an owner that doesn't
   559  		// exist yet, so we need to enqueue a virtual Delete event to remove
   560  		// the virtual node from GraphBuilder.uidToNode.
   561  		logger.V(5).Info("item not found, generating a virtual delete event",
   562  			"item", item.identity,
   563  		)
   564  		gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
   565  		return enqueuedVirtualDeleteEventErr
   566  	case err != nil:
   567  		return err
   568  	}
   569  
   570  	if latest.GetUID() != item.identity.UID {
   571  		logger.V(5).Info("UID doesn't match, item not found, generating a virtual delete event",
   572  			"item", item.identity,
   573  		)
   574  		gc.dependencyGraphBuilder.enqueueVirtualDeleteEvent(item.identity)
   575  		return enqueuedVirtualDeleteEventErr
   576  	}
   577  
   578  	// TODO: attemptToOrphanWorker() routine is similar. Consider merging
   579  	// attemptToOrphanWorker() into attemptToDeleteItem() as well.
   580  	if item.isDeletingDependents() {
   581  		return gc.processDeletingDependentsItem(logger, item)
   582  	}
   583  
   584  	// compute if we should delete the item
   585  	ownerReferences := latest.GetOwnerReferences()
   586  	if len(ownerReferences) == 0 {
   587  		logger.V(2).Info("item doesn't have an owner, continue on next item",
   588  			"item", item.identity,
   589  		)
   590  		return nil
   591  	}
   592  
   593  	solid, dangling, waitingForDependentsDeletion, err := gc.classifyReferences(ctx, item, ownerReferences)
   594  	if err != nil {
   595  		return err
   596  	}
   597  	logger.V(5).Info("classify item's references",
   598  		"item", item.identity,
   599  		"solid", solid,
   600  		"dangling", dangling,
   601  		"waitingForDependentsDeletion", waitingForDependentsDeletion,
   602  	)
   603  
   604  	switch {
   605  	case len(solid) != 0:
   606  		logger.V(2).Info("item has at least one existing owner, will not garbage collect",
   607  			"item", item.identity,
   608  			"owner", solid,
   609  		)
   610  		if len(dangling) == 0 && len(waitingForDependentsDeletion) == 0 {
   611  			return nil
   612  		}
   613  		logger.V(2).Info("remove dangling references and waiting references for item",
   614  			"item", item.identity,
   615  			"dangling", dangling,
   616  			"waitingForDependentsDeletion", waitingForDependentsDeletion,
   617  		)
   618  		// waitingForDependentsDeletion needs to be deleted from the
   619  		// ownerReferences, otherwise the referenced objects will be stuck with
   620  		// the FinalizerDeletingDependents and never get deleted.
   621  		ownerUIDs := append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...)
   622  		p, err := c.GenerateDeleteOwnerRefStrategicMergeBytes(item.identity.UID, ownerUIDs)
   623  		if err != nil {
   624  			return err
   625  		}
   626  		_, err = gc.patch(item, p, func(n *node) ([]byte, error) {
   627  			return gc.deleteOwnerRefJSONMergePatch(n, ownerUIDs...)
   628  		})
   629  		return err
   630  	case len(waitingForDependentsDeletion) != 0 && item.dependentsLength() != 0:
   631  		deps := item.getDependents()
   632  		for _, dep := range deps {
   633  			if dep.isDeletingDependents() {
   634  				// this circle detection has false positives, we need to
   635  				// apply a more rigorous detection if this turns out to be a
   636  				// problem.
   637  				// there are multiple workers run attemptToDeleteItem in
   638  				// parallel, the circle detection can fail in a race condition.
   639  				logger.V(2).Info("processing item, some of its owners and its dependent have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the item is going to be deleted with Foreground",
   640  					"item", item.identity,
   641  					"dependent", dep.identity,
   642  				)
   643  				patch, err := item.unblockOwnerReferencesStrategicMergePatch()
   644  				if err != nil {
   645  					return err
   646  				}
   647  				if _, err := gc.patch(item, patch, gc.unblockOwnerReferencesJSONMergePatch); err != nil {
   648  					return err
   649  				}
   650  				break
   651  			}
   652  		}
   653  		logger.V(2).Info("at least one owner of item has FinalizerDeletingDependents, and the item itself has dependents, so it is going to be deleted in Foreground",
   654  			"item", item.identity,
   655  		)
   656  		// the deletion event will be observed by the graphBuilder, so the item
   657  		// will be processed again in processDeletingDependentsItem. If it
   658  		// doesn't have dependents, the function will remove the
   659  		// FinalizerDeletingDependents from the item, resulting in the final
   660  		// deletion of the item.
   661  		policy := metav1.DeletePropagationForeground
   662  		return gc.deleteObject(item.identity, &policy)
   663  	default:
   664  		// item doesn't have any solid owner, so it needs to be garbage
   665  		// collected. Also, none of item's owners is waiting for the deletion of
   666  		// the dependents, so set propagationPolicy based on existing finalizers.
   667  		var policy metav1.DeletionPropagation
   668  		switch {
   669  		case hasOrphanFinalizer(latest):
   670  			// if an existing orphan finalizer is already on the object, honor it.
   671  			policy = metav1.DeletePropagationOrphan
   672  		case hasDeleteDependentsFinalizer(latest):
   673  			// if an existing foreground finalizer is already on the object, honor it.
   674  			policy = metav1.DeletePropagationForeground
   675  		default:
   676  			// otherwise, default to background.
   677  			policy = metav1.DeletePropagationBackground
   678  		}
   679  		logger.V(2).Info("Deleting item",
   680  			"item", item.identity,
   681  			"propagationPolicy", policy,
   682  		)
   683  		return gc.deleteObject(item.identity, &policy)
   684  	}
   685  }
   686  
   687  // process item that's waiting for its dependents to be deleted
   688  func (gc *GarbageCollector) processDeletingDependentsItem(logger klog.Logger, item *node) error {
   689  	blockingDependents := item.blockingDependents()
   690  	if len(blockingDependents) == 0 {
   691  		logger.V(2).Info("remove DeleteDependents finalizer for item", "item", item.identity)
   692  		return gc.removeFinalizer(logger, item, metav1.FinalizerDeleteDependents)
   693  	}
   694  	for _, dep := range blockingDependents {
   695  		if !dep.isDeletingDependents() {
   696  			logger.V(2).Info("adding dependent to attemptToDelete, because its owner is deletingDependents",
   697  				"item", item.identity,
   698  				"dependent", dep.identity,
   699  			)
   700  			gc.attemptToDelete.Add(dep)
   701  		}
   702  	}
   703  	return nil
   704  }
   705  
   706  // dependents are copies of pointers to the owner's dependents, they don't need to be locked.
   707  func (gc *GarbageCollector) orphanDependents(logger klog.Logger, owner objectReference, dependents []*node) error {
   708  	errCh := make(chan error, len(dependents))
   709  	wg := sync.WaitGroup{}
   710  	wg.Add(len(dependents))
   711  	for i := range dependents {
   712  		go func(dependent *node) {
   713  			defer wg.Done()
   714  			// the dependent.identity.UID is used as precondition
   715  			p, err := c.GenerateDeleteOwnerRefStrategicMergeBytes(dependent.identity.UID, []types.UID{owner.UID})
   716  			if err != nil {
   717  				errCh <- fmt.Errorf("orphaning %s failed, %v", dependent.identity, err)
   718  				return
   719  			}
   720  			_, err = gc.patch(dependent, p, func(n *node) ([]byte, error) {
   721  				return gc.deleteOwnerRefJSONMergePatch(n, owner.UID)
   722  			})
   723  			// note that if the target ownerReference doesn't exist in the
   724  			// dependent, strategic merge patch will NOT return an error.
   725  			if err != nil && !errors.IsNotFound(err) {
   726  				errCh <- fmt.Errorf("orphaning %s failed, %v", dependent.identity, err)
   727  			}
   728  		}(dependents[i])
   729  	}
   730  	wg.Wait()
   731  	close(errCh)
   732  
   733  	var errorsSlice []error
   734  	for e := range errCh {
   735  		errorsSlice = append(errorsSlice, e)
   736  	}
   737  
   738  	if len(errorsSlice) != 0 {
   739  		return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error())
   740  	}
   741  	logger.V(5).Info("successfully updated all dependents", "owner", owner)
   742  	return nil
   743  }
   744  
   745  func (gc *GarbageCollector) runAttemptToOrphanWorker(logger klog.Logger) {
   746  	for gc.processAttemptToOrphanWorker(logger) {
   747  	}
   748  }
   749  
   750  // processAttemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its
   751  // dependents based on the graph maintained by the GC, then removes it from the
   752  // OwnerReferences of its dependents, and finally updates the owner to remove
   753  // the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of
   754  // these steps fail.
   755  func (gc *GarbageCollector) processAttemptToOrphanWorker(logger klog.Logger) bool {
   756  	item, quit := gc.attemptToOrphan.Get()
   757  	gc.workerLock.RLock()
   758  	defer gc.workerLock.RUnlock()
   759  	if quit {
   760  		return false
   761  	}
   762  	defer gc.attemptToOrphan.Done(item)
   763  
   764  	action := gc.attemptToOrphanWorker(logger, item)
   765  	switch action {
   766  	case forgetItem:
   767  		gc.attemptToOrphan.Forget(item)
   768  	case requeueItem:
   769  		gc.attemptToOrphan.AddRateLimited(item)
   770  	}
   771  
   772  	return true
   773  }
   774  
   775  func (gc *GarbageCollector) attemptToOrphanWorker(logger klog.Logger, item interface{}) workQueueItemAction {
   776  	owner, ok := item.(*node)
   777  	if !ok {
   778  		utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
   779  		return forgetItem
   780  	}
   781  	// we don't need to lock each element, because they never get updated
   782  	owner.dependentsLock.RLock()
   783  	dependents := make([]*node, 0, len(owner.dependents))
   784  	for dependent := range owner.dependents {
   785  		dependents = append(dependents, dependent)
   786  	}
   787  	owner.dependentsLock.RUnlock()
   788  
   789  	err := gc.orphanDependents(logger, owner.identity, dependents)
   790  	if err != nil {
   791  		utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
   792  		return requeueItem
   793  	}
   794  	// update the owner, remove "orphaningFinalizer" from its finalizers list
   795  	err = gc.removeFinalizer(logger, owner, metav1.FinalizerOrphanDependents)
   796  	if err != nil {
   797  		utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
   798  		return requeueItem
   799  	}
   800  	return forgetItem
   801  }
   802  
   803  // *FOR TEST USE ONLY*
   804  // GraphHasUID returns if the GraphBuilder has a particular UID store in its
   805  // uidToNode graph. It's useful for debugging.
   806  // This method is used by integration tests.
   807  func (gc *GarbageCollector) GraphHasUID(u types.UID) bool {
   808  	_, ok := gc.dependencyGraphBuilder.uidToNode.Read(u)
   809  	return ok
   810  }
   811  
   812  // GetDeletableResources returns all resources from discoveryClient that the
   813  // garbage collector should recognize and work with. More specifically, all
   814  // preferred resources which support the 'delete', 'list', and 'watch' verbs.
   815  //
   816  // If an error was encountered fetching resources from the server,
   817  // it is included as well, along with any resources that were successfully resolved.
   818  //
   819  // All discovery errors are considered temporary. Upon encountering any error,
   820  // GetDeletableResources will log and return any discovered resources it was
   821  // able to process (which may be none).
   822  func GetDeletableResources(logger klog.Logger, discoveryClient discovery.ServerResourcesInterface) (map[schema.GroupVersionResource]struct{}, error) {
   823  	preferredResources, lookupErr := discoveryClient.ServerPreferredResources()
   824  	if lookupErr != nil {
   825  		if groupLookupFailures, isLookupFailure := discovery.GroupDiscoveryFailedErrorGroups(lookupErr); isLookupFailure {
   826  			logger.Info("failed to discover some groups", "groups", groupLookupFailures)
   827  		} else {
   828  			logger.Info("failed to discover preferred resources", "error", lookupErr)
   829  		}
   830  	}
   831  	if preferredResources == nil {
   832  		return map[schema.GroupVersionResource]struct{}{}, lookupErr
   833  	}
   834  
   835  	// This is extracted from discovery.GroupVersionResources to allow tolerating
   836  	// failures on a per-resource basis.
   837  	deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete", "list", "watch"}}, preferredResources)
   838  	deletableGroupVersionResources := map[schema.GroupVersionResource]struct{}{}
   839  	for _, rl := range deletableResources {
   840  		gv, err := schema.ParseGroupVersion(rl.GroupVersion)
   841  		if err != nil {
   842  			logger.Info("ignoring invalid discovered resource", "groupversion", rl.GroupVersion, "error", err)
   843  			continue
   844  		}
   845  		for i := range rl.APIResources {
   846  			deletableGroupVersionResources[schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: rl.APIResources[i].Name}] = struct{}{}
   847  		}
   848  	}
   849  
   850  	return deletableGroupVersionResources, lookupErr
   851  }
   852  
   853  func (gc *GarbageCollector) Name() string {
   854  	return "garbagecollector"
   855  }
   856  
   857  // GetDependencyGraphBuilder return graph builder which is particularly helpful for testing where controllerContext is not available
   858  func (gc *GarbageCollector) GetDependencyGraphBuilder() *GraphBuilder {
   859  	return gc.dependencyGraphBuilder
   860  }
   861  

View as plain text