...

Source file src/k8s.io/apiextensions-apiserver/pkg/controller/status/naming_controller.go

Documentation: k8s.io/apiextensions-apiserver/pkg/controller/status

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package status
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"reflect"
    23  	"strings"
    24  	"time"
    25  
    26  	"k8s.io/klog/v2"
    27  
    28  	"k8s.io/apimachinery/pkg/api/equality"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/labels"
    32  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    33  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    34  	"k8s.io/apimachinery/pkg/util/sets"
    35  	"k8s.io/apimachinery/pkg/util/wait"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/util/workqueue"
    38  
    39  	apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
    40  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    41  	client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
    42  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    43  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    44  )
    45  
    46  // This controller is reserving names. To avoid conflicts, be sure to run only one instance of the worker at a time.
    47  // This could eventually be lifted, but starting simple.
    48  type NamingConditionController struct {
    49  	crdClient client.CustomResourceDefinitionsGetter
    50  
    51  	crdLister listers.CustomResourceDefinitionLister
    52  	crdSynced cache.InformerSynced
    53  	// crdMutationCache backs our lister and keeps track of committed updates to avoid racy
    54  	// write/lookup cycles.  It's got 100 slots by default, so it unlikely to overrun
    55  	// TODO to revisit this if naming conflicts are found to occur in the wild
    56  	crdMutationCache cache.MutationCache
    57  
    58  	// To allow injection for testing.
    59  	syncFn func(key string) error
    60  
    61  	queue workqueue.RateLimitingInterface
    62  }
    63  
    64  func NewNamingConditionController(
    65  	crdInformer informers.CustomResourceDefinitionInformer,
    66  	crdClient client.CustomResourceDefinitionsGetter,
    67  ) *NamingConditionController {
    68  	c := &NamingConditionController{
    69  		crdClient: crdClient,
    70  		crdLister: crdInformer.Lister(),
    71  		crdSynced: crdInformer.Informer().HasSynced,
    72  		queue:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_naming_condition_controller"),
    73  	}
    74  
    75  	informerIndexer := crdInformer.Informer().GetIndexer()
    76  	c.crdMutationCache = cache.NewIntegerResourceVersionMutationCache(informerIndexer, informerIndexer, 60*time.Second, false)
    77  
    78  	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    79  		AddFunc:    c.addCustomResourceDefinition,
    80  		UpdateFunc: c.updateCustomResourceDefinition,
    81  		DeleteFunc: c.deleteCustomResourceDefinition,
    82  	})
    83  
    84  	c.syncFn = c.sync
    85  
    86  	return c
    87  }
    88  
    89  func (c *NamingConditionController) getAcceptedNamesForGroup(group string) (allResources sets.String, allKinds sets.String) {
    90  	allResources = sets.String{}
    91  	allKinds = sets.String{}
    92  
    93  	list, err := c.crdLister.List(labels.Everything())
    94  	if err != nil {
    95  		panic(err)
    96  	}
    97  
    98  	for _, curr := range list {
    99  		if curr.Spec.Group != group {
   100  			continue
   101  		}
   102  
   103  		// for each item here, see if we have a mutation cache entry that is more recent
   104  		// this makes sure that if we tight loop on update and run, our mutation cache will show
   105  		// us the version of the objects we just updated to.
   106  		item := curr
   107  		obj, exists, err := c.crdMutationCache.GetByKey(curr.Name)
   108  		if exists && err == nil {
   109  			item = obj.(*apiextensionsv1.CustomResourceDefinition)
   110  		}
   111  
   112  		allResources.Insert(item.Status.AcceptedNames.Plural)
   113  		allResources.Insert(item.Status.AcceptedNames.Singular)
   114  		allResources.Insert(item.Status.AcceptedNames.ShortNames...)
   115  
   116  		allKinds.Insert(item.Status.AcceptedNames.Kind)
   117  		allKinds.Insert(item.Status.AcceptedNames.ListKind)
   118  	}
   119  
   120  	return allResources, allKinds
   121  }
   122  
   123  func (c *NamingConditionController) calculateNamesAndConditions(in *apiextensionsv1.CustomResourceDefinition) (apiextensionsv1.CustomResourceDefinitionNames, apiextensionsv1.CustomResourceDefinitionCondition, apiextensionsv1.CustomResourceDefinitionCondition) {
   124  	// Get the names that have already been claimed
   125  	allResources, allKinds := c.getAcceptedNamesForGroup(in.Spec.Group)
   126  
   127  	namesAcceptedCondition := apiextensionsv1.CustomResourceDefinitionCondition{
   128  		Type:   apiextensionsv1.NamesAccepted,
   129  		Status: apiextensionsv1.ConditionUnknown,
   130  	}
   131  
   132  	requestedNames := in.Spec.Names
   133  	acceptedNames := in.Status.AcceptedNames
   134  	newNames := in.Status.AcceptedNames
   135  
   136  	// Check each name for mismatches.  If there's a mismatch between spec and status, then try to deconflict.
   137  	// Continue on errors so that the status is the best match possible
   138  	if err := equalToAcceptedOrFresh(requestedNames.Plural, acceptedNames.Plural, allResources); err != nil {
   139  		namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   140  		namesAcceptedCondition.Reason = "PluralConflict"
   141  		namesAcceptedCondition.Message = err.Error()
   142  	} else {
   143  		newNames.Plural = requestedNames.Plural
   144  	}
   145  	if err := equalToAcceptedOrFresh(requestedNames.Singular, acceptedNames.Singular, allResources); err != nil {
   146  		namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   147  		namesAcceptedCondition.Reason = "SingularConflict"
   148  		namesAcceptedCondition.Message = err.Error()
   149  	} else {
   150  		newNames.Singular = requestedNames.Singular
   151  	}
   152  	if !reflect.DeepEqual(requestedNames.ShortNames, acceptedNames.ShortNames) {
   153  		errs := []error{}
   154  		existingShortNames := sets.NewString(acceptedNames.ShortNames...)
   155  		for _, shortName := range requestedNames.ShortNames {
   156  			// if the shortname is already ours, then we're fine
   157  			if existingShortNames.Has(shortName) {
   158  				continue
   159  			}
   160  			if err := equalToAcceptedOrFresh(shortName, "", allResources); err != nil {
   161  				errs = append(errs, err)
   162  			}
   163  
   164  		}
   165  		if err := utilerrors.NewAggregate(errs); err != nil {
   166  			namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   167  			namesAcceptedCondition.Reason = "ShortNamesConflict"
   168  			namesAcceptedCondition.Message = err.Error()
   169  		} else {
   170  			newNames.ShortNames = requestedNames.ShortNames
   171  		}
   172  	}
   173  
   174  	if err := equalToAcceptedOrFresh(requestedNames.Kind, acceptedNames.Kind, allKinds); err != nil {
   175  		namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   176  		namesAcceptedCondition.Reason = "KindConflict"
   177  		namesAcceptedCondition.Message = err.Error()
   178  	} else {
   179  		newNames.Kind = requestedNames.Kind
   180  	}
   181  	if err := equalToAcceptedOrFresh(requestedNames.ListKind, acceptedNames.ListKind, allKinds); err != nil {
   182  		namesAcceptedCondition.Status = apiextensionsv1.ConditionFalse
   183  		namesAcceptedCondition.Reason = "ListKindConflict"
   184  		namesAcceptedCondition.Message = err.Error()
   185  	} else {
   186  		newNames.ListKind = requestedNames.ListKind
   187  	}
   188  
   189  	newNames.Categories = requestedNames.Categories
   190  
   191  	// if we haven't changed the condition, then our names must be good.
   192  	if namesAcceptedCondition.Status == apiextensionsv1.ConditionUnknown {
   193  		namesAcceptedCondition.Status = apiextensionsv1.ConditionTrue
   194  		namesAcceptedCondition.Reason = "NoConflicts"
   195  		namesAcceptedCondition.Message = "no conflicts found"
   196  	}
   197  
   198  	// set EstablishedCondition initially to false, then set it to true in establishing controller.
   199  	// The Establishing Controller will see the NamesAccepted condition when it arrives through the shared informer.
   200  	// At that time the API endpoint handler will serve the endpoint, avoiding a race
   201  	// which we had if we set Established to true here.
   202  	establishedCondition := apiextensionsv1.CustomResourceDefinitionCondition{
   203  		Type:    apiextensionsv1.Established,
   204  		Status:  apiextensionsv1.ConditionFalse,
   205  		Reason:  "NotAccepted",
   206  		Message: "not all names are accepted",
   207  	}
   208  	if old := apiextensionshelpers.FindCRDCondition(in, apiextensionsv1.Established); old != nil {
   209  		establishedCondition = *old
   210  	}
   211  	if establishedCondition.Status != apiextensionsv1.ConditionTrue && namesAcceptedCondition.Status == apiextensionsv1.ConditionTrue {
   212  		establishedCondition = apiextensionsv1.CustomResourceDefinitionCondition{
   213  			Type:    apiextensionsv1.Established,
   214  			Status:  apiextensionsv1.ConditionFalse,
   215  			Reason:  "Installing",
   216  			Message: "the initial names have been accepted",
   217  		}
   218  	}
   219  
   220  	return newNames, namesAcceptedCondition, establishedCondition
   221  }
   222  
   223  func equalToAcceptedOrFresh(requestedName, acceptedName string, usedNames sets.String) error {
   224  	if requestedName == acceptedName {
   225  		return nil
   226  	}
   227  	if !usedNames.Has(requestedName) {
   228  		return nil
   229  	}
   230  
   231  	return fmt.Errorf("%q is already in use", requestedName)
   232  }
   233  
   234  func (c *NamingConditionController) sync(key string) error {
   235  	inCustomResourceDefinition, err := c.crdLister.Get(key)
   236  	if apierrors.IsNotFound(err) {
   237  		// CRD was deleted and has freed its names.
   238  		// Reconsider all other CRDs in the same group.
   239  		if err := c.requeueAllOtherGroupCRDs(key); err != nil {
   240  			return err
   241  		}
   242  		return nil
   243  	}
   244  	if err != nil {
   245  		return err
   246  	}
   247  
   248  	// Skip checking names if Spec and Status names are same.
   249  	if equality.Semantic.DeepEqual(inCustomResourceDefinition.Spec.Names, inCustomResourceDefinition.Status.AcceptedNames) {
   250  		return nil
   251  	}
   252  
   253  	acceptedNames, namingCondition, establishedCondition := c.calculateNamesAndConditions(inCustomResourceDefinition)
   254  
   255  	// nothing to do if accepted names and NamesAccepted condition didn't change
   256  	if reflect.DeepEqual(inCustomResourceDefinition.Status.AcceptedNames, acceptedNames) &&
   257  		apiextensionshelpers.IsCRDConditionEquivalent(&namingCondition, apiextensionshelpers.FindCRDCondition(inCustomResourceDefinition, apiextensionsv1.NamesAccepted)) {
   258  		return nil
   259  	}
   260  
   261  	crd := inCustomResourceDefinition.DeepCopy()
   262  	crd.Status.AcceptedNames = acceptedNames
   263  	apiextensionshelpers.SetCRDCondition(crd, namingCondition)
   264  	apiextensionshelpers.SetCRDCondition(crd, establishedCondition)
   265  
   266  	updatedObj, err := c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
   267  	if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
   268  		// deleted or changed in the meantime, we'll get called again
   269  		return nil
   270  	}
   271  	if err != nil {
   272  		return err
   273  	}
   274  
   275  	// if the update was successful, go ahead and add the entry to the mutation cache
   276  	c.crdMutationCache.Mutation(updatedObj)
   277  
   278  	// we updated our status, so we may be releasing a name.  When this happens, we need to rekick everything in our group
   279  	// if we fail to rekick, just return as normal.  We'll get everything on a resync
   280  	if err := c.requeueAllOtherGroupCRDs(key); err != nil {
   281  		return err
   282  	}
   283  
   284  	return nil
   285  }
   286  
   287  func (c *NamingConditionController) Run(stopCh <-chan struct{}) {
   288  	defer utilruntime.HandleCrash()
   289  	defer c.queue.ShutDown()
   290  
   291  	klog.Info("Starting NamingConditionController")
   292  	defer klog.Info("Shutting down NamingConditionController")
   293  
   294  	if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
   295  		return
   296  	}
   297  
   298  	// only start one worker thread since its a slow moving API and the naming conflict resolution bits aren't thread-safe
   299  	go wait.Until(c.runWorker, time.Second, stopCh)
   300  
   301  	<-stopCh
   302  }
   303  
   304  func (c *NamingConditionController) runWorker() {
   305  	for c.processNextWorkItem() {
   306  	}
   307  }
   308  
   309  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   310  func (c *NamingConditionController) processNextWorkItem() bool {
   311  	key, quit := c.queue.Get()
   312  	if quit {
   313  		return false
   314  	}
   315  	defer c.queue.Done(key)
   316  
   317  	err := c.syncFn(key.(string))
   318  	if err == nil {
   319  		c.queue.Forget(key)
   320  		return true
   321  	}
   322  
   323  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   324  	c.queue.AddRateLimited(key)
   325  
   326  	return true
   327  }
   328  
   329  func (c *NamingConditionController) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
   330  	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   331  	if err != nil {
   332  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
   333  		return
   334  	}
   335  
   336  	c.queue.Add(key)
   337  }
   338  
   339  func (c *NamingConditionController) addCustomResourceDefinition(obj interface{}) {
   340  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   341  	klog.V(4).Infof("Adding %s", castObj.Name)
   342  	c.enqueue(castObj)
   343  }
   344  
   345  func (c *NamingConditionController) updateCustomResourceDefinition(obj, _ interface{}) {
   346  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   347  	klog.V(4).Infof("Updating %s", castObj.Name)
   348  	c.enqueue(castObj)
   349  }
   350  
   351  func (c *NamingConditionController) deleteCustomResourceDefinition(obj interface{}) {
   352  	castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
   353  	if !ok {
   354  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   355  		if !ok {
   356  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   357  			return
   358  		}
   359  		castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
   360  		if !ok {
   361  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   362  			return
   363  		}
   364  	}
   365  	klog.V(4).Infof("Deleting %q", castObj.Name)
   366  	c.enqueue(castObj)
   367  }
   368  
   369  func (c *NamingConditionController) requeueAllOtherGroupCRDs(name string) error {
   370  	pluralGroup := strings.SplitN(name, ".", 2)
   371  	list, err := c.crdLister.List(labels.Everything())
   372  	if err != nil {
   373  		return err
   374  	}
   375  	for _, curr := range list {
   376  		if curr.Spec.Group == pluralGroup[1] && curr.Name != name {
   377  			c.queue.Add(curr.Name)
   378  		}
   379  	}
   380  	return nil
   381  }
   382  

View as plain text