...

Source file src/k8s.io/kubernetes/pkg/controlplane/controller/crdregistration/crdregistration_controller.go

Documentation: k8s.io/kubernetes/pkg/controlplane/controller/crdregistration

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package crdregistration
    18  
    19  import (
    20  	"fmt"
    21  	"time"
    22  
    23  	"k8s.io/klog/v2"
    24  
    25  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    26  	crdinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    27  	crdlisters "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	"k8s.io/apimachinery/pkg/runtime/schema"
    31  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	"k8s.io/client-go/tools/cache"
    34  	"k8s.io/client-go/util/workqueue"
    35  	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    36  )
    37  
    38  // AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
    39  // adding and removing APIServices
    40  type AutoAPIServiceRegistration interface {
    41  	// AddAPIServiceToSync adds an API service to auto-register.
    42  	AddAPIServiceToSync(in *v1.APIService)
    43  	// RemoveAPIServiceToSync removes an API service to auto-register.
    44  	RemoveAPIServiceToSync(name string)
    45  }
    46  
    47  type crdRegistrationController struct {
    48  	crdLister crdlisters.CustomResourceDefinitionLister
    49  	crdSynced cache.InformerSynced
    50  
    51  	apiServiceRegistration AutoAPIServiceRegistration
    52  
    53  	syncHandler func(groupVersion schema.GroupVersion) error
    54  
    55  	syncedInitialSet chan struct{}
    56  
    57  	// queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors
    58  	// this is actually keyed by a groupVersion
    59  	queue workqueue.RateLimitingInterface
    60  }
    61  
    62  // NewCRDRegistrationController returns a controller which will register CRD GroupVersions with the auto APIService registration
    63  // controller so they automatically stay in sync.
    64  func NewCRDRegistrationController(crdinformer crdinformers.CustomResourceDefinitionInformer, apiServiceRegistration AutoAPIServiceRegistration) *crdRegistrationController {
    65  	c := &crdRegistrationController{
    66  		crdLister:              crdinformer.Lister(),
    67  		crdSynced:              crdinformer.Informer().HasSynced,
    68  		apiServiceRegistration: apiServiceRegistration,
    69  		syncedInitialSet:       make(chan struct{}),
    70  		queue:                  workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_autoregistration_controller"),
    71  	}
    72  	c.syncHandler = c.handleVersionUpdate
    73  
    74  	crdinformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    75  		AddFunc: func(obj interface{}) {
    76  			cast := obj.(*apiextensionsv1.CustomResourceDefinition)
    77  			c.enqueueCRD(cast)
    78  		},
    79  		UpdateFunc: func(oldObj, newObj interface{}) {
    80  			// Enqueue both old and new object to make sure we remove and add appropriate API services.
    81  			// The working queue will resolve any duplicates and only changes will stay in the queue.
    82  			c.enqueueCRD(oldObj.(*apiextensionsv1.CustomResourceDefinition))
    83  			c.enqueueCRD(newObj.(*apiextensionsv1.CustomResourceDefinition))
    84  		},
    85  		DeleteFunc: func(obj interface{}) {
    86  			cast, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
    87  			if !ok {
    88  				tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
    89  				if !ok {
    90  					klog.V(2).Infof("Couldn't get object from tombstone %#v", obj)
    91  					return
    92  				}
    93  				cast, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
    94  				if !ok {
    95  					klog.V(2).Infof("Tombstone contained unexpected object: %#v", obj)
    96  					return
    97  				}
    98  			}
    99  			c.enqueueCRD(cast)
   100  		},
   101  	})
   102  
   103  	return c
   104  }
   105  
   106  func (c *crdRegistrationController) Run(workers int, stopCh <-chan struct{}) {
   107  	defer utilruntime.HandleCrash()
   108  	// make sure the work queue is shutdown which will trigger workers to end
   109  	defer c.queue.ShutDown()
   110  
   111  	klog.Infof("Starting crd-autoregister controller")
   112  	defer klog.Infof("Shutting down crd-autoregister controller")
   113  
   114  	// wait for your secondary caches to fill before starting your work
   115  	if !cache.WaitForNamedCacheSync("crd-autoregister", stopCh, c.crdSynced) {
   116  		return
   117  	}
   118  
   119  	// process each item in the list once
   120  	if crds, err := c.crdLister.List(labels.Everything()); err != nil {
   121  		utilruntime.HandleError(err)
   122  	} else {
   123  		for _, crd := range crds {
   124  			for _, version := range crd.Spec.Versions {
   125  				if err := c.syncHandler(schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name}); err != nil {
   126  					utilruntime.HandleError(err)
   127  				}
   128  			}
   129  		}
   130  	}
   131  	close(c.syncedInitialSet)
   132  
   133  	// start up your worker threads based on workers.  Some controllers have multiple kinds of workers
   134  	for i := 0; i < workers; i++ {
   135  		// runWorker will loop until "something bad" happens.  The .Until will then rekick the worker
   136  		// after one second
   137  		go wait.Until(c.runWorker, time.Second, stopCh)
   138  	}
   139  
   140  	// wait until we're told to stop
   141  	<-stopCh
   142  }
   143  
   144  // WaitForInitialSync blocks until the initial set of CRD resources has been processed
   145  func (c *crdRegistrationController) WaitForInitialSync() {
   146  	<-c.syncedInitialSet
   147  }
   148  
   149  func (c *crdRegistrationController) runWorker() {
   150  	// hot loop until we're told to stop.  processNextWorkItem will automatically wait until there's work
   151  	// available, so we don't worry about secondary waits
   152  	for c.processNextWorkItem() {
   153  	}
   154  }
   155  
   156  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   157  func (c *crdRegistrationController) processNextWorkItem() bool {
   158  	// pull the next work item from queue.  It should be a key we use to lookup something in a cache
   159  	key, quit := c.queue.Get()
   160  	if quit {
   161  		return false
   162  	}
   163  	// you always have to indicate to the queue that you've completed a piece of work
   164  	defer c.queue.Done(key)
   165  
   166  	// do your work on the key.  This method will contains your "do stuff" logic
   167  	err := c.syncHandler(key.(schema.GroupVersion))
   168  	if err == nil {
   169  		// if you had no error, tell the queue to stop tracking history for your key.  This will
   170  		// reset things like failure counts for per-item rate limiting
   171  		c.queue.Forget(key)
   172  		return true
   173  	}
   174  
   175  	// there was a failure so be sure to report it.  This method allows for pluggable error handling
   176  	// which can be used for things like cluster-monitoring
   177  	utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err))
   178  	// since we failed, we should requeue the item to work on later.  This method will add a backoff
   179  	// to avoid hotlooping on particular items (they're probably still not going to work right away)
   180  	// and overall controller protection (everything I've done is broken, this controller needs to
   181  	// calm down or it can starve other useful work) cases.
   182  	c.queue.AddRateLimited(key)
   183  
   184  	return true
   185  }
   186  
   187  func (c *crdRegistrationController) enqueueCRD(crd *apiextensionsv1.CustomResourceDefinition) {
   188  	for _, version := range crd.Spec.Versions {
   189  		c.queue.Add(schema.GroupVersion{Group: crd.Spec.Group, Version: version.Name})
   190  	}
   191  }
   192  
   193  func (c *crdRegistrationController) handleVersionUpdate(groupVersion schema.GroupVersion) error {
   194  	apiServiceName := groupVersion.Version + "." + groupVersion.Group
   195  
   196  	// check all CRDs.  There shouldn't that many, but if we have problems later we can index them
   197  	crds, err := c.crdLister.List(labels.Everything())
   198  	if err != nil {
   199  		return err
   200  	}
   201  	for _, crd := range crds {
   202  		if crd.Spec.Group != groupVersion.Group {
   203  			continue
   204  		}
   205  		for _, version := range crd.Spec.Versions {
   206  			if version.Name != groupVersion.Version || !version.Served {
   207  				continue
   208  			}
   209  
   210  			c.apiServiceRegistration.AddAPIServiceToSync(&v1.APIService{
   211  				ObjectMeta: metav1.ObjectMeta{Name: apiServiceName},
   212  				Spec: v1.APIServiceSpec{
   213  					Group:                groupVersion.Group,
   214  					Version:              groupVersion.Version,
   215  					GroupPriorityMinimum: 1000, // CRDs should have relatively low priority
   216  					VersionPriority:      100,  // CRDs will be sorted by kube-like versions like any other APIService with the same VersionPriority
   217  				},
   218  			})
   219  			return nil
   220  		}
   221  	}
   222  
   223  	c.apiServiceRegistration.RemoveAPIServiceToSync(apiServiceName)
   224  	return nil
   225  }
   226  

View as plain text