    17  package status
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"strings"
    24  	"time"
    26  	"k8s.io/klog/v2"
    28  	"k8s.io/apimachinery/pkg/api/equality"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/labels"
    32  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    33  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    34  	"k8s.io/apimachinery/pkg/util/sets"
    35  	"k8s.io/apimachinery/pkg/util/wait"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/util/workqueue"
    39  	apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
    40  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    41  	client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
    42  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    43  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    44  )
    46  // This controller is reserving names. To avoid conflicts, be sure to run only one instance of the worker at a time.
    47  // This could eventually be lifted, but starting simple.
    48  type NamingConditionController struct {
    49  	crdClient client.CustomResourceDefinitionsGetter
    51  	crdLister listers.CustomResourceDefinitionLister
    52  	crdSynced cache.InformerSynced
    53  	// crdMutationCache backs our lister and keeps track of committed updates to avoid racy
    54  	// write/lookup cycles.  It's got 100 slots by default, so it unlikely to overrun
    55  	// TODO to revisit this if naming conflicts are found to occur in the wild
    56  	crdMutationCache cache.MutationCache
    58  	// To allow injection for testing.
    59  	syncFn func(key string) error
    61  	queue workqueue.RateLimitingInterface
    62  }
    64  func NewNamingConditionController(
    65  	crdInformer informers.CustomResourceDefinitionInformer,
    66  	crdClient client.CustomResourceDefinitionsGetter,
    67  ) *NamingConditionController {
    68  	c := &NamingConditionController{
    69  		crdClient: crdClient,
    70  		crdLister: crdInformer.Lister(),
    71  		crdSynced: crdInformer.Informer().HasSynced,
    72  		queue:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_naming_condition_controller"),
    73  	}
    75  	informerIndexer := crdInformer.Informer().GetIndexer()
    76  	c.crdMutationCache = cache.NewIntegerResourceVersionMutationCache(informerIndexer, informerIndexer, 60*time.Second, false)
    78  	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    79  		AddFunc:    c.addCustomResourceDefinition,
    80  		UpdateFunc: c.updateCustomResourceDefinition,
    81  		DeleteFunc: c.deleteCustomResourceDefinition,
    82  	})
    84  	c.syncFn = c.sync
    86  	return c
    87  }
    89  func (c *NamingConditionController) getAcceptedNamesForGroup(group string) (allResources sets.String, allKinds sets.String) {
    90  	allResources = sets.String{}
    91  	allKinds = sets.String{}
    93  	list, err := c.crdLister.List(labels.Everything())
    94  	if err != nil {
    95  		panic(err)
    96  	}
    98  	for _, curr := range list {
    99  		if curr.Spec.Group != group {
   100  			continue
   101  		}
   103  		// for each item here, see if we have a mutation cache entry that is more recent
   104  		// this makes sure that if we tight loop on update and run, our mutation cache will show
   105  		// us the version of the objects we just updated to.
   106  		item := curr
   107  		obj, exists, err := c.crdMutationCache.GetByKey(curr.Name)
   108  		if exists && err == nil {
   109  			item = obj.(*apiextensionsv1.CustomResourceDefinition)
   110  		}
   112  		allResources.Insert(item.Status.AcceptedNames.Plural)
   113  		allResources.Insert(item.Status.AcceptedNames.Singular)
   114  		allResources.Insert(item.Status.AcceptedNames.ShortNames...)
   116  		allKinds.Insert(item.Status.AcceptedNames.Kind)
   117  		allKinds.Insert(item.Status.AcceptedNames.ListKind)
   118  	}
   120  	return allResources, allKinds
   121  }
   123  func (c *NamingConditionController) calculateNamesAndConditions(in *apiextensionsv1.CustomResourceDefinition) (apiextensionsv1.CustomResourceDefinitionNames, apiextensionsv1.CustomResourceDefinitionCondition, apiextensionsv1.CustomResourceDefinitionCondition) {
   124  	// Get the names that have already been claimed
   125  	allResources, allKinds := c.getAcceptedNamesForGroup(in.Spec.Group)
   127  	namesAcceptedCondition := apiextensionsv1.CustomResourceDefinitionCondition{
   128  		Type:   apiextensionsv1.NamesAccepted,
   129  		Status: apiextensionsv1.ConditionUnknown,
   130  	}
   132  	requestedNames := in.Spec.Names
   133  	acceptedNames := in.Status.AcceptedNames
   134  	newNames := in.Status.AcceptedNames
   136  	// Check each name for mismatches.  If there's a mismatch between spec and status, then try to deconflict.
   137  	// Continue on errors so that the status is the best match possible
   138  	if err := equalToAcceptedOrFresh(requestedNames.Plural, acceptedNames.Plural, allResources); err != nil {
   139  		namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   140  		namesAcceptedCondition.Reason = "PluralConflict"
   141  		namesAcceptedCondition.Message = err.Error()
   142  	} else {
   143  		newNames.Plural = requestedNames.Plural
   144  	}
   145  	if err := equalToAcceptedOrFresh(requestedNames.Singular, acceptedNames.Singular, allResources); err != nil {
   146  		namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   147  		namesAcceptedCondition.Reason = "SingularConflict"
   148  		namesAcceptedCondition.Message = err.Error()
   149  	} else {
   150  		newNames.Singular = requestedNames.Singular
   151  	}
   152  	if !reflect.DeepEqual(requestedNames.ShortNames, acceptedNames.ShortNames) {
   153  		errs := []error{}
   154  		existingShortNames := sets.NewString(acceptedNames.ShortNames...)
   155  		for _, shortName := range requestedNames.ShortNames {
   156  			// if the shortname is already ours, then we're fine
   157  			if existingShortNames.Has(shortName) {
   158  				continue
   159  			}
   160  			if err := equalToAcceptedOrFresh(shortName, "", allResources); err != nil {
   161  				errs = append(errs, err)
   162  			}
   164  		}
   165  		if err := utilerrors.NewAggregate(errs); err != nil {
   166  			namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   167  			namesAcceptedCondition.Reason = "ShortNamesConflict"
   168  			namesAcceptedCondition.Message = err.Error()
   169  		} else {
   170  			newNames.ShortNames = requestedNames.ShortNames
   171  		}
   172  	}
   174  	if err := equalToAcceptedOrFresh(requestedNames.Kind, acceptedNames.Kind, allKinds); err != nil {
   175  		namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   176  		namesAcceptedCondition.Reason = "KindConflict"
   177  		namesAcceptedCondition.Message = err.Error()
   178  	} else {
   179  		newNames.Kind = requestedNames.Kind
   180  	}
   181  	if err := equalToAcceptedOrFresh(requestedNames.ListKind, acceptedNames.ListKind, allKinds); err != nil {
   182  		namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   183  		namesAcceptedCondition.Reason = "ListKindConflict"
   184  		namesAcceptedCondition.Message = err.Error()
   185  	} else {
   186  		newNames.ListKind = requestedNames.ListKind
   187  	}
   189  	newNames.Categories = requestedNames.Categories
   191  	// if we haven't changed the condition, then our names must be good.
   192  	if namesAcceptedCondition.Status == apiextensionsv1.ConditionUnknown {
   193  		namesAcceptedCondition.Status = apiextensionsv1.ConditionTrue
   194  		namesAcceptedCondition.Reason = "NoConflicts"
   195  		namesAcceptedCondition.Message = "no conflicts found"
   196  	}
   198  	// set EstablishedCondition initially to false, then set it to true in establishing controller.
   199  	// The Establishing Controller will see the NamesAccepted condition when it arrives through the shared informer.
   200  	// At that time the API endpoint handler will serve the endpoint, avoiding a race
   201  	// which we had if we set Established to true here.
   202  	establishedCondition := apiextensionsv1.CustomResourceDefinitionCondition{
   203  		Type:    apiextensionsv1.Established,
   204  		Status:  apiextensionsv1.ConditionFalse,
   205  		Reason:  "NotAccepted",
   206  		Message: "not all names are accepted",
   207  	}
   208  	if old := apiextensionshelpers.FindCRDCondition(in, apiextensionsv1.Established); old != nil {
   209  		establishedCondition = *old
   210  	}
   211  	if establishedCondition.Status != apiextensionsv1.ConditionTrue && namesAcceptedCondition.Status == apiextensionsv1.ConditionTrue {
   212  		establishedCondition = apiextensionsv1.CustomResourceDefinitionCondition{
   213  			Type:    apiextensionsv1.Established,
   214  			Status:  apiextensionsv1.ConditionFalse,
   215  			Reason:  "Installing",
   216  			Message: "the initial names have been accepted",
   217  		}
   218  	}
   220  	return newNames, namesAcceptedCondition, establishedCondition
   221  }
   223  func equalToAcceptedOrFresh(requestedName, acceptedName string, usedNames sets.String) error {
   224  	if requestedName == acceptedName {
   225  		return nil
   226  	}
   227  	if !usedNames.Has(requestedName) {
   228  		return nil
   229  	}
   231  	return fmt.Errorf("%q is already in use", requestedName)
   232  }
   234  func (c *NamingConditionController) sync(key string) error {
   235  	inCustomResourceDefinition, err := c.crdLister.Get(key)
   236  	if apierrors.IsNotFound(err) {
   237  		// CRD was deleted and has freed its names.
   238  		// Reconsider all other CRDs in the same group.
   239  		if err := c.requeueAllOtherGroupCRDs(key); err != nil {
   240  			return err
   241  		}
   242  		return nil
   243  	}
   244  	if err != nil {
   245  		return err
   246  	}
   248  	// Skip checking names if Spec and Status names are same.
   249  	if equality.Semantic.DeepEqual(inCustomResourceDefinition.Spec.Names, inCustomResourceDefinition.Status.AcceptedNames) {
   250  		return nil
   251  	}
   253  	acceptedNames, namingCondition, establishedCondition := c.calculateNamesAndConditions(inCustomResourceDefinition)
   255  	// nothing to do if accepted names and NamesAccepted condition didn't change
   256  	if reflect.DeepEqual(inCustomResourceDefinition.Status.AcceptedNames, acceptedNames) &&
   257  		apiextensionshelpers.IsCRDConditionEquivalent(&namingCondition, apiextensionshelpers.FindCRDCondition(inCustomResourceDefinition, apiextensionsv1.NamesAccepted)) {
   258  		return nil
   259  	}
   261  	crd := inCustomResourceDefinition.DeepCopy()
   262  	crd.Status.AcceptedNames = acceptedNames
   263  	apiextensionshelpers.SetCRDCondition(crd, namingCondition)
   264  	apiextensionshelpers.SetCRDCondition(crd, establishedCondition)
   266  	updatedObj, err := c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
   267  	if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
   268  		// deleted or changed in the meantime, we'll get called again
   269  		return nil
   270  	}
   271  	if err != nil {
   272  		return err
   273  	}
   275  	// if the update was successful, go ahead and add the entry to the mutation cache
   276  	c.crdMutationCache.Mutation(updatedObj)
   278  	// we updated our status, so we may be releasing a name.  When this happens, we need to rekick everything in our group
   279  	// if we fail to rekick, just return as normal.  We'll get everything on a resync
   280  	if err := c.requeueAllOtherGroupCRDs(key); err != nil {
   281  		return err
   282  	}
   284  	return nil
   285  }
   287  func (c *NamingConditionController) Run(stopCh <-chan struct{}) {
   288  	defer utilruntime.HandleCrash()
   289  	defer c.queue.ShutDown()
   291  	klog.Info("Starting NamingConditionController")
   292  	defer klog.Info("Shutting down NamingConditionController")
   294  	if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
   295  		return
   296  	}
   298  	// only start one worker thread since its a slow moving API and the naming conflict resolution bits aren't thread-safe
   299  	go wait.Until(c.runWorker, time.Second, stopCh)
   301  	<-stopCh
   302  }
   304  func (c *NamingConditionController) runWorker() {
   305  	for c.processNextWorkItem() {
   306  	}
   307  }
   309  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   310  func (c *NamingConditionController) processNextWorkItem() bool {
   311  	key, quit := c.queue.Get()
   312  	if quit {
   313  		return false
   314  	}
   315  	defer c.queue.Done(key)
   317  	err := c.syncFn(key.(string))
   318  	if err == nil {
   319  		c.queue.Forget(key)
   320  		return true
   321  	}
   323  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   324  	c.queue.AddRateLimited(key)
   326  	return true
   327  }
   329  func (c *NamingConditionController) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
   330  	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   331  	if err != nil {
   332  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
   333  		return
   334  	}
   336  	c.queue.Add(key)
   337  }
   339  func (c *NamingConditionController) addCustomResourceDefinition(obj interface{}) {
   340  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   341  	klog.V(4).Infof("Adding %s", castObj.Name)
   342  	c.enqueue(castObj)
   343  }
   345  func (c *NamingConditionController) updateCustomResourceDefinition(obj, _ interface{}) {
   346  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   347  	klog.V(4).Infof("Updating %s", castObj.Name)
   348  	c.enqueue(castObj)
   349  }
   351  func (c *NamingConditionController) deleteCustomResourceDefinition(obj interface{}) {
   352  	castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
   353  	if !ok {
   354  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   355  		if !ok {
   356  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   357  			return
   358  		}
   359  		castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
   360  		if !ok {
   361  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   362  			return
   363  		}
   364  	}
   365  	klog.V(4).Infof("Deleting %q", castObj.Name)
   366  	c.enqueue(castObj)
   367  }
   369  func (c *NamingConditionController) requeueAllOtherGroupCRDs(name string) error {
   370  	pluralGroup := strings.SplitN(name, ".", 2)
   371  	list, err := c.crdLister.List(labels.Everything())
   372  	if err != nil {
   373  		return err
   374  	}
   375  	for _, curr := range list {
   376  		if curr.Spec.Group == pluralGroup[1] && curr.Name != name {
   377  			c.queue.Add(curr.Name)
   378  		}
   379  	}
   380  	return nil
   381  }

