...

Source file src/k8s.io/apiextensions-apiserver/pkg/controller/apiapproval/apiapproval_controller.go

Documentation: k8s.io/apiextensions-apiserver/pkg/controller/apiapproval

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiapproval
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"time"
    24  
    25  	"k8s.io/apiextensions-apiserver/pkg/apihelpers"
    26  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    27  	client "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
    28  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    29  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	"k8s.io/client-go/tools/cache"
    35  	"k8s.io/client-go/util/workqueue"
    36  	"k8s.io/klog/v2"
    37  )
    38  
    39  // KubernetesAPIApprovalPolicyConformantConditionController is maintaining the KubernetesAPIApprovalPolicyConformant condition.
    40  type KubernetesAPIApprovalPolicyConformantConditionController struct {
    41  	crdClient client.CustomResourceDefinitionsGetter
    42  
    43  	crdLister listers.CustomResourceDefinitionLister
    44  	crdSynced cache.InformerSynced
    45  
    46  	// To allow injection for testing.
    47  	syncFn func(key string) error
    48  
    49  	queue workqueue.RateLimitingInterface
    50  
    51  	// last protectedAnnotation value this controller updated the condition per CRD name (to avoid two
    52  	// different version of the apiextensions-apiservers in HA to fight for the right message)
    53  	lastSeenProtectedAnnotationLock sync.Mutex
    54  	lastSeenProtectedAnnotation     map[string]string
    55  }
    56  
    57  // NewKubernetesAPIApprovalPolicyConformantConditionController constructs a KubernetesAPIApprovalPolicyConformant schema condition controller.
    58  func NewKubernetesAPIApprovalPolicyConformantConditionController(
    59  	crdInformer informers.CustomResourceDefinitionInformer,
    60  	crdClient client.CustomResourceDefinitionsGetter,
    61  ) *KubernetesAPIApprovalPolicyConformantConditionController {
    62  	c := &KubernetesAPIApprovalPolicyConformantConditionController{
    63  		crdClient:                   crdClient,
    64  		crdLister:                   crdInformer.Lister(),
    65  		crdSynced:                   crdInformer.Informer().HasSynced,
    66  		queue:                       workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "kubernetes_api_approval_conformant_condition_controller"),
    67  		lastSeenProtectedAnnotation: map[string]string{},
    68  	}
    69  
    70  	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    71  		AddFunc:    c.addCustomResourceDefinition,
    72  		UpdateFunc: c.updateCustomResourceDefinition,
    73  		DeleteFunc: c.deleteCustomResourceDefinition,
    74  	})
    75  
    76  	c.syncFn = c.sync
    77  
    78  	return c
    79  }
    80  
    81  // calculateCondition determines the new KubernetesAPIApprovalPolicyConformant condition
    82  func calculateCondition(crd *apiextensionsv1.CustomResourceDefinition) *apiextensionsv1.CustomResourceDefinitionCondition {
    83  	if !apihelpers.IsProtectedCommunityGroup(crd.Spec.Group) {
    84  		return nil
    85  	}
    86  
    87  	approvalState, reason := apihelpers.GetAPIApprovalState(crd.Annotations)
    88  	switch approvalState {
    89  	case apihelpers.APIApprovalInvalid:
    90  		return &apiextensionsv1.CustomResourceDefinitionCondition{
    91  			Type:    apiextensionsv1.KubernetesAPIApprovalPolicyConformant,
    92  			Status:  apiextensionsv1.ConditionFalse,
    93  			Reason:  "InvalidAnnotation",
    94  			Message: reason,
    95  		}
    96  	case apihelpers.APIApprovalMissing:
    97  		return &apiextensionsv1.CustomResourceDefinitionCondition{
    98  			Type:    apiextensionsv1.KubernetesAPIApprovalPolicyConformant,
    99  			Status:  apiextensionsv1.ConditionFalse,
   100  			Reason:  "MissingAnnotation",
   101  			Message: reason,
   102  		}
   103  	case apihelpers.APIApproved:
   104  		return &apiextensionsv1.CustomResourceDefinitionCondition{
   105  			Type:    apiextensionsv1.KubernetesAPIApprovalPolicyConformant,
   106  			Status:  apiextensionsv1.ConditionTrue,
   107  			Reason:  "ApprovedAnnotation",
   108  			Message: reason,
   109  		}
   110  	case apihelpers.APIApprovalBypassed:
   111  		return &apiextensionsv1.CustomResourceDefinitionCondition{
   112  			Type:    apiextensionsv1.KubernetesAPIApprovalPolicyConformant,
   113  			Status:  apiextensionsv1.ConditionFalse,
   114  			Reason:  "UnapprovedAnnotation",
   115  			Message: reason,
   116  		}
   117  	default:
   118  		return &apiextensionsv1.CustomResourceDefinitionCondition{
   119  			Type:    apiextensionsv1.KubernetesAPIApprovalPolicyConformant,
   120  			Status:  apiextensionsv1.ConditionUnknown,
   121  			Reason:  "UnknownAnnotation",
   122  			Message: reason,
   123  		}
   124  	}
   125  }
   126  
   127  func (c *KubernetesAPIApprovalPolicyConformantConditionController) sync(key string) error {
   128  	inCustomResourceDefinition, err := c.crdLister.Get(key)
   129  	if apierrors.IsNotFound(err) {
   130  		return nil
   131  	}
   132  	if err != nil {
   133  		return err
   134  	}
   135  
   136  	// avoid repeated calculation for the same annotation
   137  	protectionAnnotationValue := inCustomResourceDefinition.Annotations[apiextensionsv1.KubeAPIApprovedAnnotation]
   138  	c.lastSeenProtectedAnnotationLock.Lock()
   139  	lastSeen, seenBefore := c.lastSeenProtectedAnnotation[inCustomResourceDefinition.Name]
   140  	c.lastSeenProtectedAnnotationLock.Unlock()
   141  	if seenBefore && protectionAnnotationValue == lastSeen {
   142  		return nil
   143  	}
   144  
   145  	// check old condition
   146  	cond := calculateCondition(inCustomResourceDefinition)
   147  	if cond == nil {
   148  		// because group is immutable, if we have no condition now, we have no need to remove a condition.
   149  		return nil
   150  	}
   151  	old := apihelpers.FindCRDCondition(inCustomResourceDefinition, apiextensionsv1.KubernetesAPIApprovalPolicyConformant)
   152  
   153  	// don't attempt a write if all the condition details are the same
   154  	if old != nil && old.Status == cond.Status && old.Reason == cond.Reason && old.Message == cond.Message {
   155  		// no need to update annotation because we took no action.
   156  		return nil
   157  	}
   158  
   159  	// update condition
   160  	crd := inCustomResourceDefinition.DeepCopy()
   161  	apihelpers.SetCRDCondition(crd, *cond)
   162  
   163  	_, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
   164  	if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
   165  		// deleted or changed in the meantime, we'll get called again
   166  		return nil
   167  	}
   168  	if err != nil {
   169  		return err
   170  	}
   171  
   172  	// store annotation in order to avoid repeated updates for the same annotation (and potential
   173  	// fights of API server in HA environments).
   174  	c.lastSeenProtectedAnnotationLock.Lock()
   175  	defer c.lastSeenProtectedAnnotationLock.Unlock()
   176  	c.lastSeenProtectedAnnotation[crd.Name] = protectionAnnotationValue
   177  
   178  	return nil
   179  }
   180  
   181  // Run starts the controller.
   182  func (c *KubernetesAPIApprovalPolicyConformantConditionController) Run(workers int, stopCh <-chan struct{}) {
   183  	defer utilruntime.HandleCrash()
   184  	defer c.queue.ShutDown()
   185  
   186  	klog.Infof("Starting KubernetesAPIApprovalPolicyConformantConditionController")
   187  	defer klog.Infof("Shutting down KubernetesAPIApprovalPolicyConformantConditionController")
   188  
   189  	if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
   190  		return
   191  	}
   192  
   193  	for i := 0; i < workers; i++ {
   194  		go wait.Until(c.runWorker, time.Second, stopCh)
   195  	}
   196  
   197  	<-stopCh
   198  }
   199  
   200  func (c *KubernetesAPIApprovalPolicyConformantConditionController) runWorker() {
   201  	for c.processNextWorkItem() {
   202  	}
   203  }
   204  
   205  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   206  func (c *KubernetesAPIApprovalPolicyConformantConditionController) processNextWorkItem() bool {
   207  	key, quit := c.queue.Get()
   208  	if quit {
   209  		return false
   210  	}
   211  	defer c.queue.Done(key)
   212  
   213  	err := c.syncFn(key.(string))
   214  	if err == nil {
   215  		c.queue.Forget(key)
   216  		return true
   217  	}
   218  
   219  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   220  	c.queue.AddRateLimited(key)
   221  
   222  	return true
   223  }
   224  
   225  func (c *KubernetesAPIApprovalPolicyConformantConditionController) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
   226  	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   227  	if err != nil {
   228  		utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", obj, err))
   229  		return
   230  	}
   231  
   232  	c.queue.Add(key)
   233  }
   234  
   235  func (c *KubernetesAPIApprovalPolicyConformantConditionController) addCustomResourceDefinition(obj interface{}) {
   236  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   237  	klog.V(4).Infof("Adding %s", castObj.Name)
   238  	c.enqueue(castObj)
   239  }
   240  
   241  func (c *KubernetesAPIApprovalPolicyConformantConditionController) updateCustomResourceDefinition(obj, _ interface{}) {
   242  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   243  	klog.V(4).Infof("Updating %s", castObj.Name)
   244  	c.enqueue(castObj)
   245  }
   246  
   247  func (c *KubernetesAPIApprovalPolicyConformantConditionController) deleteCustomResourceDefinition(obj interface{}) {
   248  	castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
   249  	if !ok {
   250  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   251  		if !ok {
   252  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   253  			return
   254  		}
   255  		castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
   256  		if !ok {
   257  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   258  			return
   259  		}
   260  	}
   261  
   262  	c.lastSeenProtectedAnnotationLock.Lock()
   263  	defer c.lastSeenProtectedAnnotationLock.Unlock()
   264  	delete(c.lastSeenProtectedAnnotation, castObj.Name)
   265  }
   266  

View as plain text