...

Source file src/k8s.io/apiextensions-apiserver/pkg/controller/finalizer/crd_finalizer.go

Documentation: k8s.io/apiextensions-apiserver/pkg/controller/finalizer

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package finalizer
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"time"
    24  
    25  	"k8s.io/klog/v2"
    26  
    27  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    28  	"k8s.io/apimachinery/pkg/api/meta"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    31  	"k8s.io/apimachinery/pkg/runtime/schema"
    32  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    33  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    34  	"k8s.io/apimachinery/pkg/util/sets"
    35  	"k8s.io/apimachinery/pkg/util/wait"
    36  	genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
    37  	"k8s.io/apiserver/pkg/registry/rest"
    38  	"k8s.io/client-go/tools/cache"
    39  	"k8s.io/client-go/util/workqueue"
    40  
    41  	apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
    42  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    43  	client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
    44  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    45  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    46  )
    47  
    48  // OverlappingBuiltInResources returns the set of built-in group/resources that are persisted
    49  // in storage paths that overlap with CRD storage paths, and should not be deleted
    50  // by this controller if an associated CRD is deleted.
    51  func OverlappingBuiltInResources() map[schema.GroupResource]bool {
    52  	return map[schema.GroupResource]bool{
    53  		{Group: "apiregistration.k8s.io", Resource: "apiservices"}:             true,
    54  		{Group: "apiextensions.k8s.io", Resource: "customresourcedefinitions"}: true,
    55  	}
    56  }
    57  
    58  // CRDFinalizer is a controller that finalizes the CRD by deleting all the CRs associated with it.
    59  type CRDFinalizer struct {
    60  	crdClient      client.CustomResourceDefinitionsGetter
    61  	crClientGetter CRClientGetter
    62  
    63  	crdLister listers.CustomResourceDefinitionLister
    64  	crdSynced cache.InformerSynced
    65  
    66  	// To allow injection for testing.
    67  	syncFn func(key string) error
    68  
    69  	queue workqueue.RateLimitingInterface
    70  }
    71  
    72  // ListerCollectionDeleter combines rest.Lister and rest.CollectionDeleter.
    73  type ListerCollectionDeleter interface {
    74  	rest.Lister
    75  	rest.CollectionDeleter
    76  }
    77  
    78  // CRClientGetter knows how to get a ListerCollectionDeleter for a given CRD UID.
    79  type CRClientGetter interface {
    80  	// GetCustomResourceListerCollectionDeleter gets the ListerCollectionDeleter for the given CRD
    81  	// UID.
    82  	GetCustomResourceListerCollectionDeleter(crd *apiextensionsv1.CustomResourceDefinition) (ListerCollectionDeleter, error)
    83  }
    84  
    85  // NewCRDFinalizer creates a new CRDFinalizer.
    86  func NewCRDFinalizer(
    87  	crdInformer informers.CustomResourceDefinitionInformer,
    88  	crdClient client.CustomResourceDefinitionsGetter,
    89  	crClientGetter CRClientGetter,
    90  ) *CRDFinalizer {
    91  	c := &CRDFinalizer{
    92  		crdClient:      crdClient,
    93  		crdLister:      crdInformer.Lister(),
    94  		crdSynced:      crdInformer.Informer().HasSynced,
    95  		crClientGetter: crClientGetter,
    96  		queue:          workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_finalizer"),
    97  	}
    98  
    99  	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   100  		AddFunc:    c.addCustomResourceDefinition,
   101  		UpdateFunc: c.updateCustomResourceDefinition,
   102  	})
   103  
   104  	c.syncFn = c.sync
   105  
   106  	return c
   107  }
   108  
   109  func (c *CRDFinalizer) sync(key string) error {
   110  	cachedCRD, err := c.crdLister.Get(key)
   111  	if apierrors.IsNotFound(err) {
   112  		return nil
   113  	}
   114  	if err != nil {
   115  		return err
   116  	}
   117  
   118  	// no work to do
   119  	if cachedCRD.DeletionTimestamp.IsZero() || !apiextensionshelpers.CRDHasFinalizer(cachedCRD, apiextensionsv1.CustomResourceCleanupFinalizer) {
   120  		return nil
   121  	}
   122  
   123  	crd := cachedCRD.DeepCopy()
   124  
   125  	// update the status condition.  This cleanup could take a while.
   126  	apiextensionshelpers.SetCRDCondition(crd, apiextensionsv1.CustomResourceDefinitionCondition{
   127  		Type:    apiextensionsv1.Terminating,
   128  		Status:  apiextensionsv1.ConditionTrue,
   129  		Reason:  "InstanceDeletionInProgress",
   130  		Message: "CustomResource deletion is in progress",
   131  	})
   132  	crd, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
   133  	if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
   134  		// deleted or changed in the meantime, we'll get called again
   135  		return nil
   136  	}
   137  	if err != nil {
   138  		return err
   139  	}
   140  
   141  	// Now we can start deleting items.  We should use the REST API to ensure that all normal admission runs.
   142  	// Since we control the endpoints, we know that delete collection works. No need to delete if not established.
   143  	if OverlappingBuiltInResources()[schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural}] {
   144  		// Skip deletion, explain why, and proceed to remove the finalizer and delete the CRD
   145  		apiextensionshelpers.SetCRDCondition(crd, apiextensionsv1.CustomResourceDefinitionCondition{
   146  			Type:    apiextensionsv1.Terminating,
   147  			Status:  apiextensionsv1.ConditionFalse,
   148  			Reason:  "OverlappingBuiltInResource",
   149  			Message: "instances overlap with built-in resources in storage",
   150  		})
   151  	} else if apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
   152  		cond, deleteErr := c.deleteInstances(crd)
   153  		apiextensionshelpers.SetCRDCondition(crd, cond)
   154  		if deleteErr != nil {
   155  			if _, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{}); err != nil {
   156  				utilruntime.HandleError(err)
   157  			}
   158  			return deleteErr
   159  		}
   160  	} else {
   161  		apiextensionshelpers.SetCRDCondition(crd, apiextensionsv1.CustomResourceDefinitionCondition{
   162  			Type:    apiextensionsv1.Terminating,
   163  			Status:  apiextensionsv1.ConditionFalse,
   164  			Reason:  "NeverEstablished",
   165  			Message: "resource was never established",
   166  		})
   167  	}
   168  
   169  	apiextensionshelpers.CRDRemoveFinalizer(crd, apiextensionsv1.CustomResourceCleanupFinalizer)
   170  	_, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
   171  	if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
   172  		// deleted or changed in the meantime, we'll get called again
   173  		return nil
   174  	}
   175  	return err
   176  }
   177  
   178  func (c *CRDFinalizer) deleteInstances(crd *apiextensionsv1.CustomResourceDefinition) (apiextensionsv1.CustomResourceDefinitionCondition, error) {
   179  	// Now we can start deleting items. While it would be ideal to use a REST API client, doing so
   180  	// could incorrectly delete a ThirdPartyResource with the same URL as the CustomResource, so we go
   181  	// directly to the storage instead. Since we control the storage, we know that delete collection works.
   182  	crClient, err := c.crClientGetter.GetCustomResourceListerCollectionDeleter(crd)
   183  	if err != nil {
   184  		err = fmt.Errorf("unable to find a custom resource client for %s.%s: %v", crd.Status.AcceptedNames.Plural, crd.Spec.Group, err)
   185  		return apiextensionsv1.CustomResourceDefinitionCondition{
   186  			Type:    apiextensionsv1.Terminating,
   187  			Status:  apiextensionsv1.ConditionTrue,
   188  			Reason:  "InstanceDeletionFailed",
   189  			Message: fmt.Sprintf("could not list instances: %v", err),
   190  		}, err
   191  	}
   192  
   193  	ctx := genericapirequest.NewContext()
   194  	allResources, err := crClient.List(ctx, nil)
   195  	if err != nil {
   196  		return apiextensionsv1.CustomResourceDefinitionCondition{
   197  			Type:    apiextensionsv1.Terminating,
   198  			Status:  apiextensionsv1.ConditionTrue,
   199  			Reason:  "InstanceDeletionFailed",
   200  			Message: fmt.Sprintf("could not list instances: %v", err),
   201  		}, err
   202  	}
   203  
   204  	deletedNamespaces := sets.String{}
   205  	deleteErrors := []error{}
   206  	for _, item := range allResources.(*unstructured.UnstructuredList).Items {
   207  		metadata, err := meta.Accessor(&item)
   208  		if err != nil {
   209  			utilruntime.HandleError(err)
   210  			continue
   211  		}
   212  		if deletedNamespaces.Has(metadata.GetNamespace()) {
   213  			continue
   214  		}
   215  		// don't retry deleting the same namespace
   216  		deletedNamespaces.Insert(metadata.GetNamespace())
   217  		nsCtx := genericapirequest.WithNamespace(ctx, metadata.GetNamespace())
   218  		if _, err := crClient.DeleteCollection(nsCtx, rest.ValidateAllObjectFunc, nil, nil); err != nil {
   219  			deleteErrors = append(deleteErrors, err)
   220  			continue
   221  		}
   222  	}
   223  	if deleteError := utilerrors.NewAggregate(deleteErrors); deleteError != nil {
   224  		return apiextensionsv1.CustomResourceDefinitionCondition{
   225  			Type:    apiextensionsv1.Terminating,
   226  			Status:  apiextensionsv1.ConditionTrue,
   227  			Reason:  "InstanceDeletionFailed",
   228  			Message: fmt.Sprintf("could not issue all deletes: %v", deleteError),
   229  		}, deleteError
   230  	}
   231  
   232  	// now we need to wait until all the resources are deleted.  Start with a simple poll before we do anything fancy.
   233  	// TODO not all servers are synchronized on caches.  It is possible for a stale one to still be creating things.
   234  	// Once we have a mechanism for servers to indicate their states, we should check that for concurrence.
   235  	err = wait.PollImmediate(5*time.Second, 1*time.Minute, func() (bool, error) {
   236  		listObj, err := crClient.List(ctx, nil)
   237  		if err != nil {
   238  			return false, err
   239  		}
   240  		if len(listObj.(*unstructured.UnstructuredList).Items) == 0 {
   241  			return true, nil
   242  		}
   243  		klog.V(2).Infof("%s.%s waiting for %d items to be removed", crd.Status.AcceptedNames.Plural, crd.Spec.Group, len(listObj.(*unstructured.UnstructuredList).Items))
   244  		return false, nil
   245  	})
   246  	if err != nil {
   247  		return apiextensionsv1.CustomResourceDefinitionCondition{
   248  			Type:    apiextensionsv1.Terminating,
   249  			Status:  apiextensionsv1.ConditionTrue,
   250  			Reason:  "InstanceDeletionCheck",
   251  			Message: fmt.Sprintf("could not confirm zero CustomResources remaining: %v", err),
   252  		}, err
   253  	}
   254  	return apiextensionsv1.CustomResourceDefinitionCondition{
   255  		Type:    apiextensionsv1.Terminating,
   256  		Status:  apiextensionsv1.ConditionFalse,
   257  		Reason:  "InstanceDeletionCompleted",
   258  		Message: "removed all instances",
   259  	}, nil
   260  }
   261  
   262  func (c *CRDFinalizer) Run(workers int, stopCh <-chan struct{}) {
   263  	defer utilruntime.HandleCrash()
   264  	defer c.queue.ShutDown()
   265  
   266  	klog.Info("Starting CRDFinalizer")
   267  	defer klog.Info("Shutting down CRDFinalizer")
   268  
   269  	if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
   270  		return
   271  	}
   272  
   273  	for i := 0; i < workers; i++ {
   274  		go wait.Until(c.runWorker, time.Second, stopCh)
   275  	}
   276  
   277  	<-stopCh
   278  }
   279  
   280  func (c *CRDFinalizer) runWorker() {
   281  	for c.processNextWorkItem() {
   282  	}
   283  }
   284  
   285  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   286  func (c *CRDFinalizer) processNextWorkItem() bool {
   287  	key, quit := c.queue.Get()
   288  	if quit {
   289  		return false
   290  	}
   291  	defer c.queue.Done(key)
   292  
   293  	err := c.syncFn(key.(string))
   294  	if err == nil {
   295  		c.queue.Forget(key)
   296  		return true
   297  	}
   298  
   299  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   300  	c.queue.AddRateLimited(key)
   301  
   302  	return true
   303  }
   304  
   305  func (c *CRDFinalizer) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
   306  	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   307  	if err != nil {
   308  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
   309  		return
   310  	}
   311  
   312  	c.queue.Add(key)
   313  }
   314  
   315  func (c *CRDFinalizer) addCustomResourceDefinition(obj interface{}) {
   316  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   317  	// only queue deleted things
   318  	if !castObj.DeletionTimestamp.IsZero() && apiextensionshelpers.CRDHasFinalizer(castObj, apiextensionsv1.CustomResourceCleanupFinalizer) {
   319  		c.enqueue(castObj)
   320  	}
   321  }
   322  
   323  func (c *CRDFinalizer) updateCustomResourceDefinition(oldObj, newObj interface{}) {
   324  	oldCRD := oldObj.(*apiextensionsv1.CustomResourceDefinition)
   325  	newCRD := newObj.(*apiextensionsv1.CustomResourceDefinition)
   326  	// only queue deleted things that haven't been finalized by us
   327  	if newCRD.DeletionTimestamp.IsZero() || !apiextensionshelpers.CRDHasFinalizer(newCRD, apiextensionsv1.CustomResourceCleanupFinalizer) {
   328  		return
   329  	}
   330  
   331  	// always requeue resyncs just in case
   332  	if oldCRD.ResourceVersion == newCRD.ResourceVersion {
   333  		c.enqueue(newCRD)
   334  		return
   335  	}
   336  
   337  	// If the only difference is in the terminating condition, then there's no reason to requeue here.  This controller
   338  	// is likely to be the originator, so requeuing would hot-loop us.  Failures are requeued by the workqueue directly.
   339  	// This is a low traffic and scale resource, so the copy is terrible.  It's not good, so better ideas
   340  	// are welcome.
   341  	oldCopy := oldCRD.DeepCopy()
   342  	newCopy := newCRD.DeepCopy()
   343  	oldCopy.ResourceVersion = ""
   344  	newCopy.ResourceVersion = ""
   345  	apiextensionshelpers.RemoveCRDCondition(oldCopy, apiextensionsv1.Terminating)
   346  	apiextensionshelpers.RemoveCRDCondition(newCopy, apiextensionsv1.Terminating)
   347  
   348  	if !reflect.DeepEqual(oldCopy, newCopy) {
   349  		c.enqueue(newCRD)
   350  	}
   351  }
   352  

View as plain text