...

Source file src/k8s.io/apiextensions-apiserver/pkg/controller/openapiv3/controller.go

Documentation: k8s.io/apiextensions-apiserver/pkg/controller/openapiv3

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package openapiv3
    18  
    19  import (
    20  	"fmt"
    21  	"reflect"
    22  	"sync"
    23  	"time"
    24  
    25  	apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
    26  	"k8s.io/apimachinery/pkg/api/errors"
    27  	"k8s.io/apimachinery/pkg/labels"
    28  	"k8s.io/apimachinery/pkg/runtime/schema"
    29  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	"k8s.io/client-go/tools/cache"
    33  	"k8s.io/client-go/util/workqueue"
    34  	"k8s.io/klog/v2"
    35  	"k8s.io/kube-openapi/pkg/handler3"
    36  	"k8s.io/kube-openapi/pkg/spec3"
    37  
    38  	apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
    39  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    40  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    41  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    42  	"k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder"
    43  )
    44  
    45  // Controller watches CustomResourceDefinitions and publishes OpenAPI v3
    46  type Controller struct {
    47  	crdLister  listers.CustomResourceDefinitionLister
    48  	crdsSynced cache.InformerSynced
    49  
    50  	// To allow injection for testing.
    51  	syncFn func(string) error
    52  
    53  	queue workqueue.RateLimitingInterface
    54  
    55  	openAPIV3Service *handler3.OpenAPIService
    56  
    57  	// specs per version and per CRD name
    58  	lock             sync.Mutex
    59  	specsByGVandName map[schema.GroupVersion]map[string]*spec3.OpenAPI
    60  }
    61  
    62  // NewController creates a new Controller with input CustomResourceDefinition informer
    63  func NewController(crdInformer informers.CustomResourceDefinitionInformer) *Controller {
    64  	c := &Controller{
    65  		crdLister:        crdInformer.Lister(),
    66  		crdsSynced:       crdInformer.Informer().HasSynced,
    67  		queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_v3_controller"),
    68  		specsByGVandName: map[schema.GroupVersion]map[string]*spec3.OpenAPI{},
    69  	}
    70  
    71  	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    72  		AddFunc:    c.addCustomResourceDefinition,
    73  		UpdateFunc: c.updateCustomResourceDefinition,
    74  		DeleteFunc: c.deleteCustomResourceDefinition,
    75  	})
    76  
    77  	c.syncFn = c.sync
    78  	return c
    79  }
    80  
    81  // Run sets openAPIAggregationManager and starts workers
    82  func (c *Controller) Run(openAPIV3Service *handler3.OpenAPIService, stopCh <-chan struct{}) {
    83  	defer utilruntime.HandleCrash()
    84  	defer c.queue.ShutDown()
    85  	defer klog.Infof("Shutting down OpenAPI V3 controller")
    86  
    87  	klog.Infof("Starting OpenAPI V3 controller")
    88  
    89  	c.openAPIV3Service = openAPIV3Service
    90  
    91  	if !cache.WaitForCacheSync(stopCh, c.crdsSynced) {
    92  		utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
    93  		return
    94  	}
    95  
    96  	crds, err := c.crdLister.List(labels.Everything())
    97  	if err != nil {
    98  		utilruntime.HandleError(fmt.Errorf("failed to initially list all CRDs: %v", err))
    99  		return
   100  	}
   101  	for _, crd := range crds {
   102  		if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
   103  			continue
   104  		}
   105  		for _, v := range crd.Spec.Versions {
   106  			if !v.Served {
   107  				continue
   108  			}
   109  			c.buildV3Spec(crd, crd.Name, v.Name)
   110  		}
   111  	}
   112  
   113  	// only start one worker thread since its a slow moving API
   114  	go wait.Until(c.runWorker, time.Second, stopCh)
   115  
   116  	<-stopCh
   117  }
   118  
   119  func (c *Controller) runWorker() {
   120  	for c.processNextWorkItem() {
   121  	}
   122  }
   123  
   124  func (c *Controller) processNextWorkItem() bool {
   125  	key, quit := c.queue.Get()
   126  	if quit {
   127  		return false
   128  	}
   129  	defer c.queue.Done(key)
   130  
   131  	// log slow aggregations
   132  	start := time.Now()
   133  	defer func() {
   134  		elapsed := time.Since(start)
   135  		if elapsed > time.Second {
   136  			klog.Warningf("slow openapi aggregation of %q: %s", key.(string), elapsed)
   137  		}
   138  	}()
   139  
   140  	err := c.syncFn(key.(string))
   141  	if err == nil {
   142  		c.queue.Forget(key)
   143  		return true
   144  	}
   145  
   146  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   147  	c.queue.AddRateLimited(key)
   148  	return true
   149  }
   150  
   151  func (c *Controller) sync(name string) error {
   152  	c.lock.Lock()
   153  	defer c.lock.Unlock()
   154  
   155  	crd, err := c.crdLister.Get(name)
   156  	if err != nil && !errors.IsNotFound(err) {
   157  		return err
   158  	}
   159  
   160  	if errors.IsNotFound(err) || !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
   161  		c.deleteCRD(name)
   162  		return nil
   163  	}
   164  
   165  	for _, v := range crd.Spec.Versions {
   166  		if !v.Served {
   167  			continue
   168  		}
   169  		c.buildV3Spec(crd, name, v.Name)
   170  	}
   171  
   172  	return nil
   173  }
   174  
   175  func (c *Controller) deleteCRD(name string) {
   176  	for gv, crdListForGV := range c.specsByGVandName {
   177  		_, needOpenAPIUpdate := crdListForGV[name]
   178  		if needOpenAPIUpdate {
   179  			delete(crdListForGV, name)
   180  			if len(crdListForGV) == 0 {
   181  				delete(c.specsByGVandName, gv)
   182  			}
   183  			regenerationCounter.With(map[string]string{"group": gv.Group, "version": gv.Version, "crd": name, "reason": "remove"})
   184  			c.updateGroupVersion(gv)
   185  		}
   186  	}
   187  }
   188  
   189  func (c *Controller) updateGroupVersion(gv schema.GroupVersion) error {
   190  	if _, ok := c.specsByGVandName[gv]; !ok {
   191  		c.openAPIV3Service.DeleteGroupVersion(groupVersionToOpenAPIV3Path(gv))
   192  		return nil
   193  	}
   194  
   195  	var specs []*spec3.OpenAPI
   196  	for _, spec := range c.specsByGVandName[gv] {
   197  		specs = append(specs, spec)
   198  	}
   199  
   200  	mergedSpec, err := builder.MergeSpecsV3(specs...)
   201  	if err != nil {
   202  		return fmt.Errorf("failed to merge specs: %v", err)
   203  	}
   204  
   205  	c.openAPIV3Service.UpdateGroupVersion(groupVersionToOpenAPIV3Path(gv), mergedSpec)
   206  	return nil
   207  }
   208  
   209  func (c *Controller) updateCRDSpec(crd *apiextensionsv1.CustomResourceDefinition, name, versionName string, v3 *spec3.OpenAPI) error {
   210  	gv := schema.GroupVersion{
   211  		Group:   crd.Spec.Group,
   212  		Version: versionName,
   213  	}
   214  
   215  	_, ok := c.specsByGVandName[gv]
   216  	reason := "update"
   217  	if !ok {
   218  		reason = "add"
   219  		c.specsByGVandName[gv] = map[string]*spec3.OpenAPI{}
   220  	}
   221  
   222  	oldSpec, ok := c.specsByGVandName[gv][name]
   223  	if ok {
   224  		if reflect.DeepEqual(oldSpec, v3) {
   225  			// no changes to CRD
   226  			return nil
   227  		}
   228  	}
   229  	c.specsByGVandName[gv][name] = v3
   230  	regenerationCounter.With(map[string]string{"crd": name, "group": gv.Group, "version": gv.Version, "reason": reason})
   231  	return c.updateGroupVersion(gv)
   232  }
   233  
   234  func (c *Controller) buildV3Spec(crd *apiextensionsv1.CustomResourceDefinition, name, versionName string) error {
   235  	v3, err := builder.BuildOpenAPIV3(crd, versionName, builder.Options{
   236  		V2:                      false,
   237  		IncludeSelectableFields: utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceFieldSelectors),
   238  	})
   239  
   240  	if err != nil {
   241  		return err
   242  	}
   243  
   244  	c.updateCRDSpec(crd, name, versionName, v3)
   245  	return nil
   246  }
   247  
   248  func (c *Controller) addCustomResourceDefinition(obj interface{}) {
   249  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   250  	klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name)
   251  	c.enqueue(castObj)
   252  }
   253  
   254  func (c *Controller) updateCustomResourceDefinition(oldObj, newObj interface{}) {
   255  	castNewObj := newObj.(*apiextensionsv1.CustomResourceDefinition)
   256  	klog.V(4).Infof("Updating customresourcedefinition %s", castNewObj.Name)
   257  	c.enqueue(castNewObj)
   258  }
   259  
   260  func (c *Controller) deleteCustomResourceDefinition(obj interface{}) {
   261  	castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
   262  	if !ok {
   263  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   264  		if !ok {
   265  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   266  			return
   267  		}
   268  		castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
   269  		if !ok {
   270  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   271  			return
   272  		}
   273  	}
   274  	klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name)
   275  	c.enqueue(castObj)
   276  }
   277  
   278  func (c *Controller) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
   279  	c.queue.Add(obj.Name)
   280  }
   281  

View as plain text