...

Source file src/k8s.io/apiextensions-apiserver/pkg/controller/establish/establishing_controller.go

Documentation: k8s.io/apiextensions-apiserver/pkg/controller/establish

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package establish
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    25  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    26  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    27  	"k8s.io/apimachinery/pkg/util/wait"
    28  	"k8s.io/client-go/tools/cache"
    29  	"k8s.io/client-go/util/workqueue"
    30  	"k8s.io/klog/v2"
    31  
    32  	apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
    33  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    34  	client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
    35  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    36  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    37  )
    38  
    39  // EstablishingController controls how and when CRD is established.
    40  type EstablishingController struct {
    41  	crdClient client.CustomResourceDefinitionsGetter
    42  	crdLister listers.CustomResourceDefinitionLister
    43  	crdSynced cache.InformerSynced
    44  
    45  	// To allow injection for testing.
    46  	syncFn func(key string) error
    47  
    48  	queue workqueue.RateLimitingInterface
    49  }
    50  
    51  // NewEstablishingController creates new EstablishingController.
    52  func NewEstablishingController(crdInformer informers.CustomResourceDefinitionInformer,
    53  	crdClient client.CustomResourceDefinitionsGetter) *EstablishingController {
    54  	ec := &EstablishingController{
    55  		crdClient: crdClient,
    56  		crdLister: crdInformer.Lister(),
    57  		crdSynced: crdInformer.Informer().HasSynced,
    58  		queue:     workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crdEstablishing"),
    59  	}
    60  
    61  	ec.syncFn = ec.sync
    62  
    63  	return ec
    64  }
    65  
    66  // QueueCRD adds CRD into the establishing queue.
    67  func (ec *EstablishingController) QueueCRD(key string, timeout time.Duration) {
    68  	ec.queue.AddAfter(key, timeout)
    69  }
    70  
    71  // Run starts the EstablishingController.
    72  func (ec *EstablishingController) Run(stopCh <-chan struct{}) {
    73  	defer utilruntime.HandleCrash()
    74  	defer ec.queue.ShutDown()
    75  
    76  	klog.Info("Starting EstablishingController")
    77  	defer klog.Info("Shutting down EstablishingController")
    78  
    79  	if !cache.WaitForCacheSync(stopCh, ec.crdSynced) {
    80  		return
    81  	}
    82  
    83  	// only start one worker thread since its a slow moving API
    84  	go wait.Until(ec.runWorker, time.Second, stopCh)
    85  
    86  	<-stopCh
    87  }
    88  
    89  func (ec *EstablishingController) runWorker() {
    90  	for ec.processNextWorkItem() {
    91  	}
    92  }
    93  
    94  // processNextWorkItem deals with one key off the queue.
    95  // It returns false when it's time to quit.
    96  func (ec *EstablishingController) processNextWorkItem() bool {
    97  	key, quit := ec.queue.Get()
    98  	if quit {
    99  		return false
   100  	}
   101  	defer ec.queue.Done(key)
   102  
   103  	err := ec.syncFn(key.(string))
   104  	if err == nil {
   105  		ec.queue.Forget(key)
   106  		return true
   107  	}
   108  
   109  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   110  	ec.queue.AddRateLimited(key)
   111  
   112  	return true
   113  }
   114  
   115  // sync is used to turn CRDs into the Established state.
   116  func (ec *EstablishingController) sync(key string) error {
   117  	cachedCRD, err := ec.crdLister.Get(key)
   118  	if apierrors.IsNotFound(err) {
   119  		return nil
   120  	}
   121  	if err != nil {
   122  		return err
   123  	}
   124  
   125  	if !apiextensionshelpers.IsCRDConditionTrue(cachedCRD, apiextensionsv1.NamesAccepted) ||
   126  		apiextensionshelpers.IsCRDConditionTrue(cachedCRD, apiextensionsv1.Established) {
   127  		return nil
   128  	}
   129  
   130  	crd := cachedCRD.DeepCopy()
   131  	establishedCondition := apiextensionsv1.CustomResourceDefinitionCondition{
   132  		Type:    apiextensionsv1.Established,
   133  		Status:  apiextensionsv1.ConditionTrue,
   134  		Reason:  "InitialNamesAccepted",
   135  		Message: "the initial names have been accepted",
   136  	}
   137  	apiextensionshelpers.SetCRDCondition(crd, establishedCondition)
   138  
   139  	// Update server with new CRD condition.
   140  	_, err = ec.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
   141  	if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
   142  		// deleted or changed in the meantime, we'll get called again
   143  		return nil
   144  	}
   145  	if err != nil {
   146  		return err
   147  	}
   148  
   149  	return nil
   150  }
   151  

View as plain text