...

Source file src/k8s.io/apiextensions-apiserver/pkg/controller/nonstructuralschema/nonstructuralschema_controller.go

Documentation: k8s.io/apiextensions-apiserver/pkg/controller/nonstructuralschema

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package nonstructuralschema
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"time"
    24  
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    28  	"k8s.io/apimachinery/pkg/util/validation/field"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	"k8s.io/client-go/tools/cache"
    31  	"k8s.io/client-go/util/workqueue"
    32  	"k8s.io/klog/v2"
    33  
    34  	apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
    35  	apiextensionsinternal "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
    36  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    37  	"k8s.io/apiextensions-apiserver/pkg/apiserver/schema"
    38  	client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
    39  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    40  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    41  )
    42  
    43  // ConditionController is maintaining the NonStructuralSchema condition.
    44  type ConditionController struct {
    45  	crdClient client.CustomResourceDefinitionsGetter
    46  
    47  	crdLister listers.CustomResourceDefinitionLister
    48  	crdSynced cache.InformerSynced
    49  
    50  	// To allow injection for testing.
    51  	syncFn func(key string) error
    52  
    53  	queue workqueue.RateLimitingInterface
    54  
    55  	// last generation this controller updated the condition per CRD name (to avoid two
    56  	// different version of the apiextensions-apiservers in HA to fight for the right message)
    57  	lastSeenGenerationLock sync.Mutex
    58  	lastSeenGeneration     map[string]int64
    59  }
    60  
    61  // NewConditionController constructs a non-structural schema condition controller.
    62  func NewConditionController(
    63  	crdInformer informers.CustomResourceDefinitionInformer,
    64  	crdClient client.CustomResourceDefinitionsGetter,
    65  ) *ConditionController {
    66  	c := &ConditionController{
    67  		crdClient:          crdClient,
    68  		crdLister:          crdInformer.Lister(),
    69  		crdSynced:          crdInformer.Informer().HasSynced,
    70  		queue:              workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "non_structural_schema_condition_controller"),
    71  		lastSeenGeneration: map[string]int64{},
    72  	}
    73  
    74  	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    75  		AddFunc:    c.addCustomResourceDefinition,
    76  		UpdateFunc: c.updateCustomResourceDefinition,
    77  		DeleteFunc: c.deleteCustomResourceDefinition,
    78  	})
    79  
    80  	c.syncFn = c.sync
    81  
    82  	return c
    83  }
    84  
    85  func calculateCondition(in *apiextensionsv1.CustomResourceDefinition) *apiextensionsv1.CustomResourceDefinitionCondition {
    86  	cond := &apiextensionsv1.CustomResourceDefinitionCondition{
    87  		Type:   apiextensionsv1.NonStructuralSchema,
    88  		Status: apiextensionsv1.ConditionUnknown,
    89  	}
    90  
    91  	allErrs := field.ErrorList{}
    92  
    93  	if in.Spec.PreserveUnknownFields {
    94  		allErrs = append(allErrs, field.Invalid(field.NewPath("spec", "preserveUnknownFields"),
    95  			in.Spec.PreserveUnknownFields,
    96  			fmt.Sprint("must be false")))
    97  	}
    98  
    99  	for i, v := range in.Spec.Versions {
   100  		if v.Schema == nil || v.Schema.OpenAPIV3Schema == nil {
   101  			continue
   102  		}
   103  
   104  		internalSchema := &apiextensionsinternal.CustomResourceValidation{}
   105  		if err := apiextensionsv1.Convert_v1_CustomResourceValidation_To_apiextensions_CustomResourceValidation(v.Schema, internalSchema, nil); err != nil {
   106  			klog.Errorf("failed to convert CRD validation to internal version: %v", err)
   107  			continue
   108  		}
   109  		s, err := schema.NewStructural(internalSchema.OpenAPIV3Schema)
   110  		if err != nil {
   111  			cond.Reason = "StructuralError"
   112  			cond.Message = fmt.Sprintf("failed to check validation schema for version %s: %v", v.Name, err)
   113  			return cond
   114  		}
   115  
   116  		pth := field.NewPath("spec", "versions").Index(i).Child("schema", "openAPIV3Schema")
   117  
   118  		allErrs = append(allErrs, schema.ValidateStructural(pth, s)...)
   119  	}
   120  
   121  	if len(allErrs) == 0 {
   122  		return nil
   123  	}
   124  
   125  	cond.Status = apiextensionsv1.ConditionTrue
   126  	cond.Reason = "Violations"
   127  	cond.Message = allErrs.ToAggregate().Error()
   128  
   129  	return cond
   130  }
   131  
   132  func (c *ConditionController) sync(key string) error {
   133  	inCustomResourceDefinition, err := c.crdLister.Get(key)
   134  	if apierrors.IsNotFound(err) {
   135  		return nil
   136  	}
   137  	if err != nil {
   138  		return err
   139  	}
   140  
   141  	// avoid repeated calculation for the same generation
   142  	c.lastSeenGenerationLock.Lock()
   143  	lastSeen, seenBefore := c.lastSeenGeneration[inCustomResourceDefinition.Name]
   144  	c.lastSeenGenerationLock.Unlock()
   145  	if seenBefore && inCustomResourceDefinition.Generation <= lastSeen {
   146  		return nil
   147  	}
   148  
   149  	// check old condition
   150  	cond := calculateCondition(inCustomResourceDefinition)
   151  	old := apiextensionshelpers.FindCRDCondition(inCustomResourceDefinition, apiextensionsv1.NonStructuralSchema)
   152  
   153  	if cond == nil && old == nil {
   154  		return nil
   155  	}
   156  	if cond != nil && old != nil && old.Status == cond.Status && old.Reason == cond.Reason && old.Message == cond.Message {
   157  		return nil
   158  	}
   159  
   160  	// update condition
   161  	crd := inCustomResourceDefinition.DeepCopy()
   162  	if cond == nil {
   163  		apiextensionshelpers.RemoveCRDCondition(crd, apiextensionsv1.NonStructuralSchema)
   164  	} else {
   165  		cond.LastTransitionTime = metav1.NewTime(time.Now())
   166  		apiextensionshelpers.SetCRDCondition(crd, *cond)
   167  	}
   168  
   169  	_, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
   170  	if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
   171  		// deleted or changed in the meantime, we'll get called again
   172  		return nil
   173  	}
   174  	if err != nil {
   175  		return err
   176  	}
   177  
   178  	// store generation in order to avoid repeated updates for the same generation (and potential
   179  	// fights of API server in HA environments).
   180  	c.lastSeenGenerationLock.Lock()
   181  	defer c.lastSeenGenerationLock.Unlock()
   182  	c.lastSeenGeneration[crd.Name] = crd.Generation
   183  
   184  	return nil
   185  }
   186  
   187  // Run starts the controller.
   188  func (c *ConditionController) Run(workers int, stopCh <-chan struct{}) {
   189  	defer utilruntime.HandleCrash()
   190  	defer c.queue.ShutDown()
   191  
   192  	klog.Infof("Starting NonStructuralSchemaConditionController")
   193  	defer klog.Infof("Shutting down NonStructuralSchemaConditionController")
   194  
   195  	if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
   196  		return
   197  	}
   198  
   199  	for i := 0; i < workers; i++ {
   200  		go wait.Until(c.runWorker, time.Second, stopCh)
   201  	}
   202  
   203  	<-stopCh
   204  }
   205  
   206  func (c *ConditionController) runWorker() {
   207  	for c.processNextWorkItem() {
   208  	}
   209  }
   210  
   211  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   212  func (c *ConditionController) processNextWorkItem() bool {
   213  	key, quit := c.queue.Get()
   214  	if quit {
   215  		return false
   216  	}
   217  	defer c.queue.Done(key)
   218  
   219  	err := c.syncFn(key.(string))
   220  	if err == nil {
   221  		c.queue.Forget(key)
   222  		return true
   223  	}
   224  
   225  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   226  	c.queue.AddRateLimited(key)
   227  
   228  	return true
   229  }
   230  
   231  func (c *ConditionController) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
   232  	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   233  	if err != nil {
   234  		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err))
   235  		return
   236  	}
   237  
   238  	c.queue.Add(key)
   239  }
   240  
   241  func (c *ConditionController) addCustomResourceDefinition(obj interface{}) {
   242  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   243  	klog.V(4).Infof("Adding %s", castObj.Name)
   244  	c.enqueue(castObj)
   245  }
   246  
   247  func (c *ConditionController) updateCustomResourceDefinition(obj, _ interface{}) {
   248  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   249  	klog.V(4).Infof("Updating %s", castObj.Name)
   250  	c.enqueue(castObj)
   251  }
   252  
   253  func (c *ConditionController) deleteCustomResourceDefinition(obj interface{}) {
   254  	castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
   255  	if !ok {
   256  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   257  		if !ok {
   258  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   259  			return
   260  		}
   261  		castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
   262  		if !ok {
   263  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   264  			return
   265  		}
   266  	}
   267  
   268  	c.lastSeenGenerationLock.Lock()
   269  	defer c.lastSeenGenerationLock.Unlock()
   270  	delete(c.lastSeenGeneration, castObj.Name)
   271  }
   272  

View as plain text