...

Source file src/k8s.io/apiextensions-apiserver/pkg/controller/openapi/controller.go

Documentation: k8s.io/apiextensions-apiserver/pkg/controller/openapi

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package openapi
    18  
    19  import (
    20  	"fmt"
    21  	"sync"
    22  	"time"
    23  
    24  	"github.com/google/uuid"
    25  
    26  	apiextensionsfeatures "k8s.io/apiextensions-apiserver/pkg/features"
    27  	"k8s.io/apimachinery/pkg/api/errors"
    28  	"k8s.io/apimachinery/pkg/labels"
    29  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    32  	"k8s.io/client-go/tools/cache"
    33  	"k8s.io/client-go/util/workqueue"
    34  	"k8s.io/klog/v2"
    35  	"k8s.io/kube-openapi/pkg/cached"
    36  	"k8s.io/kube-openapi/pkg/handler"
    37  	"k8s.io/kube-openapi/pkg/validation/spec"
    38  
    39  	apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
    40  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    41  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    42  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    43  	"k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder"
    44  )
    45  
    46  // Controller watches CustomResourceDefinitions and publishes validation schema
    47  type Controller struct {
    48  	crdLister  listers.CustomResourceDefinitionLister
    49  	crdsSynced cache.InformerSynced
    50  
    51  	// To allow injection for testing.
    52  	syncFn func(string) error
    53  
    54  	queue workqueue.RateLimitingInterface
    55  
    56  	staticSpec *spec.Swagger
    57  
    58  	openAPIService *handler.OpenAPIService
    59  
    60  	// specs by name. The specs are lazily constructed on request.
    61  	// The lock is for the map only.
    62  	lock        sync.Mutex
    63  	specsByName map[string]*specCache
    64  }
    65  
    66  // specCache holds the merged version spec for a CRD as well as the CRD object.
    67  // The spec is created lazily from the CRD object on request.
    68  // The mergedVersionSpec is only created on instantiation and is never
    69  // changed. crdCache is a cached.Replaceable and updates are thread
    70  // safe. Thus, no lock is needed to protect this struct.
    71  type specCache struct {
    72  	crdCache          cached.LastSuccess[*apiextensionsv1.CustomResourceDefinition]
    73  	mergedVersionSpec cached.Value[*spec.Swagger]
    74  }
    75  
    76  func (s *specCache) update(crd *apiextensionsv1.CustomResourceDefinition) {
    77  	s.crdCache.Store(cached.Static(crd, generateCRDHash(crd)))
    78  }
    79  
    80  func createSpecCache(crd *apiextensionsv1.CustomResourceDefinition) *specCache {
    81  	s := specCache{}
    82  	s.update(crd)
    83  
    84  	s.mergedVersionSpec = cached.Transform[*apiextensionsv1.CustomResourceDefinition](func(crd *apiextensionsv1.CustomResourceDefinition, etag string, err error) (*spec.Swagger, string, error) {
    85  		if err != nil {
    86  			// This should never happen, but return the err if it does.
    87  			return nil, "", err
    88  		}
    89  		mergeSpec := &spec.Swagger{}
    90  		for _, v := range crd.Spec.Versions {
    91  			if !v.Served {
    92  				continue
    93  			}
    94  			s, err := builder.BuildOpenAPIV2(crd, v.Name, builder.Options{
    95  				V2:                      true,
    96  				IncludeSelectableFields: utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourceFieldSelectors),
    97  			})
    98  			// Defaults must be pruned here for CRDs to cleanly merge with the static
    99  			// spec that already has defaults pruned
   100  			if err != nil {
   101  				return nil, "", err
   102  			}
   103  			s.Definitions = handler.PruneDefaults(s.Definitions)
   104  			mergeSpec, err = builder.MergeSpecs(mergeSpec, s)
   105  			if err != nil {
   106  				return nil, "", err
   107  			}
   108  		}
   109  		return mergeSpec, generateCRDHash(crd), nil
   110  	}, &s.crdCache)
   111  	return &s
   112  }
   113  
   114  // NewController creates a new Controller with input CustomResourceDefinition informer
   115  func NewController(crdInformer informers.CustomResourceDefinitionInformer) *Controller {
   116  	c := &Controller{
   117  		crdLister:   crdInformer.Lister(),
   118  		crdsSynced:  crdInformer.Informer().HasSynced,
   119  		queue:       workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd_openapi_controller"),
   120  		specsByName: map[string]*specCache{},
   121  	}
   122  
   123  	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   124  		AddFunc:    c.addCustomResourceDefinition,
   125  		UpdateFunc: c.updateCustomResourceDefinition,
   126  		DeleteFunc: c.deleteCustomResourceDefinition,
   127  	})
   128  
   129  	c.syncFn = c.sync
   130  	return c
   131  }
   132  
   133  // Run sets openAPIAggregationManager and starts workers
   134  func (c *Controller) Run(staticSpec *spec.Swagger, openAPIService *handler.OpenAPIService, stopCh <-chan struct{}) {
   135  	defer utilruntime.HandleCrash()
   136  	defer c.queue.ShutDown()
   137  	defer klog.Infof("Shutting down OpenAPI controller")
   138  
   139  	klog.Infof("Starting OpenAPI controller")
   140  
   141  	c.staticSpec = staticSpec
   142  	c.openAPIService = openAPIService
   143  
   144  	if !cache.WaitForCacheSync(stopCh, c.crdsSynced) {
   145  		utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
   146  		return
   147  	}
   148  
   149  	// create initial spec to avoid merging once per CRD on startup
   150  	crds, err := c.crdLister.List(labels.Everything())
   151  	if err != nil {
   152  		utilruntime.HandleError(fmt.Errorf("failed to initially list all CRDs: %v", err))
   153  		return
   154  	}
   155  	for _, crd := range crds {
   156  		if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
   157  			continue
   158  		}
   159  		c.specsByName[crd.Name] = createSpecCache(crd)
   160  	}
   161  	c.updateSpecLocked()
   162  
   163  	// only start one worker thread since its a slow moving API
   164  	go wait.Until(c.runWorker, time.Second, stopCh)
   165  
   166  	<-stopCh
   167  }
   168  
   169  func (c *Controller) runWorker() {
   170  	for c.processNextWorkItem() {
   171  	}
   172  }
   173  
   174  func (c *Controller) processNextWorkItem() bool {
   175  	key, quit := c.queue.Get()
   176  	if quit {
   177  		return false
   178  	}
   179  	defer c.queue.Done(key)
   180  
   181  	// log slow aggregations
   182  	start := time.Now()
   183  	defer func() {
   184  		elapsed := time.Since(start)
   185  		if elapsed > time.Second {
   186  			klog.Warningf("slow openapi aggregation of %q: %s", key.(string), elapsed)
   187  		}
   188  	}()
   189  
   190  	err := c.syncFn(key.(string))
   191  	if err == nil {
   192  		c.queue.Forget(key)
   193  		return true
   194  	}
   195  
   196  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   197  	c.queue.AddRateLimited(key)
   198  	return true
   199  }
   200  
   201  func (c *Controller) sync(name string) error {
   202  	c.lock.Lock()
   203  	defer c.lock.Unlock()
   204  
   205  	crd, err := c.crdLister.Get(name)
   206  	if err != nil && !errors.IsNotFound(err) {
   207  		return err
   208  	}
   209  
   210  	// do we have to remove all specs of this CRD?
   211  	if errors.IsNotFound(err) || !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
   212  		if _, found := c.specsByName[name]; !found {
   213  			return nil
   214  		}
   215  		delete(c.specsByName, name)
   216  		klog.V(2).Infof("Updating CRD OpenAPI spec because %s was removed", name)
   217  		regenerationCounter.With(map[string]string{"crd": name, "reason": "remove"})
   218  		c.updateSpecLocked()
   219  		return nil
   220  	}
   221  
   222  	// If CRD spec already exists, update the CRD.
   223  	// specCache.update() includes the ETag so an update on a spec
   224  	// resulting in the same ETag will be a noop.
   225  	s, exists := c.specsByName[crd.Name]
   226  	if exists {
   227  		s.update(crd)
   228  		klog.V(2).Infof("Updating CRD OpenAPI spec because %s changed", name)
   229  		regenerationCounter.With(map[string]string{"crd": name, "reason": "update"})
   230  		return nil
   231  	}
   232  
   233  	c.specsByName[crd.Name] = createSpecCache(crd)
   234  	klog.V(2).Infof("Updating CRD OpenAPI spec because %s changed", name)
   235  	regenerationCounter.With(map[string]string{"crd": name, "reason": "add"})
   236  	c.updateSpecLocked()
   237  	return nil
   238  }
   239  
   240  // updateSpecLocked updates the cached spec graph.
   241  func (c *Controller) updateSpecLocked() {
   242  	specList := make([]cached.Value[*spec.Swagger], 0, len(c.specsByName))
   243  	for crd := range c.specsByName {
   244  		specList = append(specList, c.specsByName[crd].mergedVersionSpec)
   245  	}
   246  
   247  	cache := cached.MergeList(func(results []cached.Result[*spec.Swagger]) (*spec.Swagger, string, error) {
   248  		localCRDSpec := make([]*spec.Swagger, 0, len(results))
   249  		for k := range results {
   250  			if results[k].Err == nil {
   251  				localCRDSpec = append(localCRDSpec, results[k].Value)
   252  			}
   253  		}
   254  		mergedSpec, err := builder.MergeSpecs(c.staticSpec, localCRDSpec...)
   255  		if err != nil {
   256  			return nil, "", fmt.Errorf("failed to merge specs: %v", err)
   257  		}
   258  		// A UUID is returned for the etag because we will only
   259  		// create a new merger when a CRD has changed. A hash based
   260  		// etag is more expensive because the CRDs are not
   261  		// premarshalled.
   262  		return mergedSpec, uuid.New().String(), nil
   263  	}, specList)
   264  	c.openAPIService.UpdateSpecLazy(cache)
   265  }
   266  
   267  func (c *Controller) addCustomResourceDefinition(obj interface{}) {
   268  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   269  	klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name)
   270  	c.enqueue(castObj)
   271  }
   272  
   273  func (c *Controller) updateCustomResourceDefinition(oldObj, newObj interface{}) {
   274  	castNewObj := newObj.(*apiextensionsv1.CustomResourceDefinition)
   275  	klog.V(4).Infof("Updating customresourcedefinition %s", castNewObj.Name)
   276  	c.enqueue(castNewObj)
   277  }
   278  
   279  func (c *Controller) deleteCustomResourceDefinition(obj interface{}) {
   280  	castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
   281  	if !ok {
   282  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   283  		if !ok {
   284  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   285  			return
   286  		}
   287  		castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
   288  		if !ok {
   289  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   290  			return
   291  		}
   292  	}
   293  	klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name)
   294  	c.enqueue(castObj)
   295  }
   296  
   297  func (c *Controller) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
   298  	c.queue.Add(obj.Name)
   299  }
   300  
   301  func generateCRDHash(crd *apiextensionsv1.CustomResourceDefinition) string {
   302  	return fmt.Sprintf("%s,%d", crd.UID, crd.Generation)
   303  }
   304  

View as plain text