...

Source file src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go

Documentation: k8s.io/apiextensions-apiserver/pkg/apiserver

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"fmt"
    21  	"net/http"
    22  	"sort"
    23  	"strings"
    24  	"sync"
    25  	"sync/atomic"
    26  	"time"
    27  
    28  	"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
    29  
    30  	apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
    31  	apiextensionsinternal "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
    32  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    33  	"k8s.io/apiextensions-apiserver/pkg/apiserver/conversion"
    34  	structuralschema "k8s.io/apiextensions-apiserver/pkg/apiserver/schema"
    35  	structuraldefaulting "k8s.io/apiextensions-apiserver/pkg/apiserver/schema/defaulting"
    36  	schemaobjectmeta "k8s.io/apiextensions-apiserver/pkg/apiserver/schema/objectmeta"
    37  	structuralpruning "k8s.io/apiextensions-apiserver/pkg/apiserver/schema/pruning"
    38  	apiservervalidation "k8s.io/apiextensions-apiserver/pkg/apiserver/validation"
    39  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    40  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    41  	"k8s.io/apiextensions-apiserver/pkg/controller/establish"
    42  	"k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
    43  	"k8s.io/apiextensions-apiserver/pkg/controller/openapi/builder"
    44  	"k8s.io/apiextensions-apiserver/pkg/crdserverscheme"
    45  	"k8s.io/apiextensions-apiserver/pkg/registry/customresource"
    46  	"k8s.io/apiextensions-apiserver/pkg/registry/customresource/tableconvertor"
    47  
    48  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    49  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    50  	"k8s.io/apimachinery/pkg/api/meta"
    51  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    52  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    53  	"k8s.io/apimachinery/pkg/labels"
    54  	"k8s.io/apimachinery/pkg/runtime"
    55  	"k8s.io/apimachinery/pkg/runtime/schema"
    56  	"k8s.io/apimachinery/pkg/runtime/serializer"
    57  	"k8s.io/apimachinery/pkg/runtime/serializer/json"
    58  	"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
    59  	"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
    60  	"k8s.io/apimachinery/pkg/types"
    61  	"k8s.io/apimachinery/pkg/util/managedfields"
    62  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    63  	"k8s.io/apimachinery/pkg/util/sets"
    64  	utilwaitgroup "k8s.io/apimachinery/pkg/util/waitgroup"
    65  	"k8s.io/apimachinery/pkg/version"
    66  	"k8s.io/apiserver/pkg/admission"
    67  	"k8s.io/apiserver/pkg/authorization/authorizer"
    68  	"k8s.io/apiserver/pkg/endpoints/handlers"
    69  	"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
    70  	"k8s.io/apiserver/pkg/endpoints/metrics"
    71  	apirequest "k8s.io/apiserver/pkg/endpoints/request"
    72  	"k8s.io/apiserver/pkg/registry/generic"
    73  	genericfilters "k8s.io/apiserver/pkg/server/filters"
    74  	"k8s.io/apiserver/pkg/util/webhook"
    75  	"k8s.io/apiserver/pkg/warning"
    76  	"k8s.io/client-go/scale"
    77  	"k8s.io/client-go/scale/scheme/autoscalingv1"
    78  	"k8s.io/client-go/tools/cache"
    79  	"k8s.io/klog/v2"
    80  	"k8s.io/kube-openapi/pkg/spec3"
    81  	"k8s.io/kube-openapi/pkg/validation/spec"
    82  )
    83  
    84  // crdHandler serves the `/apis` endpoint.
    85  // This is registered as a filter so that it never collides with any explicitly registered endpoints
    86  type crdHandler struct {
    87  	versionDiscoveryHandler *versionDiscoveryHandler
    88  	groupDiscoveryHandler   *groupDiscoveryHandler
    89  
    90  	customStorageLock sync.Mutex
    91  	// customStorage contains a crdStorageMap
    92  	// atomic.Value has a very good read performance compared to sync.RWMutex
    93  	// see https://gist.github.com/dim/152e6bf80e1384ea72e17ac717a5000a
    94  	// which is suited for most read and rarely write cases
    95  	customStorage atomic.Value
    96  
    97  	crdLister listers.CustomResourceDefinitionLister
    98  
    99  	delegate          http.Handler
   100  	restOptionsGetter generic.RESTOptionsGetter
   101  	admission         admission.Interface
   102  
   103  	establishingController *establish.EstablishingController
   104  
   105  	// MasterCount is used to implement sleep to improve
   106  	// CRD establishing process for HA clusters.
   107  	masterCount int
   108  
   109  	converterFactory *conversion.CRConverterFactory
   110  
   111  	// so that we can do create on update.
   112  	authorizer authorizer.Authorizer
   113  
   114  	// request timeout we should delay storage teardown for
   115  	requestTimeout time.Duration
   116  
   117  	// minRequestTimeout applies to CR's list/watch calls
   118  	minRequestTimeout time.Duration
   119  
   120  	// staticOpenAPISpec is used as a base for the schema of CR's for the
   121  	// purpose of managing fields, it is how CR handlers get the structure
   122  	// of TypeMeta and ObjectMeta
   123  	staticOpenAPISpec map[string]*spec.Schema
   124  
   125  	// The limit on the request size that would be accepted and decoded in a write request
   126  	// 0 means no limit.
   127  	maxRequestBodyBytes int64
   128  }
   129  
   130  // crdInfo stores enough information to serve the storage for the custom resource
   131  type crdInfo struct {
   132  	// spec and acceptedNames are used to compare against if a change is made on a CRD. We only update
   133  	// the storage if one of these changes.
   134  	spec          *apiextensionsv1.CustomResourceDefinitionSpec
   135  	acceptedNames *apiextensionsv1.CustomResourceDefinitionNames
   136  
   137  	// Deprecated per version
   138  	deprecated map[string]bool
   139  
   140  	// Warnings per version
   141  	warnings map[string][]string
   142  
   143  	// Storage per version
   144  	storages map[string]customresource.CustomResourceStorage
   145  
   146  	// Request scope per version
   147  	requestScopes map[string]*handlers.RequestScope
   148  
   149  	// Scale scope per version
   150  	scaleRequestScopes map[string]*handlers.RequestScope
   151  
   152  	// Status scope per version
   153  	statusRequestScopes map[string]*handlers.RequestScope
   154  
   155  	// storageVersion is the CRD version used when storing the object in etcd.
   156  	storageVersion string
   157  
   158  	waitGroup *utilwaitgroup.SafeWaitGroup
   159  }
   160  
   161  // crdStorageMap goes from customresourcedefinition to its storage
   162  type crdStorageMap map[types.UID]*crdInfo
   163  
   164  func NewCustomResourceDefinitionHandler(
   165  	versionDiscoveryHandler *versionDiscoveryHandler,
   166  	groupDiscoveryHandler *groupDiscoveryHandler,
   167  	crdInformer informers.CustomResourceDefinitionInformer,
   168  	delegate http.Handler,
   169  	restOptionsGetter generic.RESTOptionsGetter,
   170  	admission admission.Interface,
   171  	establishingController *establish.EstablishingController,
   172  	serviceResolver webhook.ServiceResolver,
   173  	authResolverWrapper webhook.AuthenticationInfoResolverWrapper,
   174  	masterCount int,
   175  	authorizer authorizer.Authorizer,
   176  	requestTimeout time.Duration,
   177  	minRequestTimeout time.Duration,
   178  	staticOpenAPISpec map[string]*spec.Schema,
   179  	maxRequestBodyBytes int64) (*crdHandler, error) {
   180  	ret := &crdHandler{
   181  		versionDiscoveryHandler: versionDiscoveryHandler,
   182  		groupDiscoveryHandler:   groupDiscoveryHandler,
   183  		customStorage:           atomic.Value{},
   184  		crdLister:               crdInformer.Lister(),
   185  		delegate:                delegate,
   186  		restOptionsGetter:       restOptionsGetter,
   187  		admission:               admission,
   188  		establishingController:  establishingController,
   189  		masterCount:             masterCount,
   190  		authorizer:              authorizer,
   191  		requestTimeout:          requestTimeout,
   192  		minRequestTimeout:       minRequestTimeout,
   193  		staticOpenAPISpec:       staticOpenAPISpec,
   194  		maxRequestBodyBytes:     maxRequestBodyBytes,
   195  	}
   196  	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   197  		AddFunc:    ret.createCustomResourceDefinition,
   198  		UpdateFunc: ret.updateCustomResourceDefinition,
   199  		DeleteFunc: func(obj interface{}) {
   200  			ret.removeDeadStorage()
   201  		},
   202  	})
   203  	crConverterFactory, err := conversion.NewCRConverterFactory(serviceResolver, authResolverWrapper)
   204  	if err != nil {
   205  		return nil, err
   206  	}
   207  	ret.converterFactory = crConverterFactory
   208  
   209  	ret.customStorage.Store(crdStorageMap{})
   210  
   211  	return ret, nil
   212  }
   213  
   214  // watches are expected to handle storage disruption gracefully,
   215  // both on the server-side (by terminating the watch connection)
   216  // and on the client side (by restarting the watch)
   217  var longRunningFilter = genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString())
   218  
   219  // possiblyAcrossAllNamespacesVerbs contains those verbs which can be per-namespace and across all
   220  // namespaces for namespaces resources. I.e. for these an empty namespace in the requestInfo is fine.
   221  var possiblyAcrossAllNamespacesVerbs = sets.NewString("list", "watch")
   222  
   223  func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
   224  	ctx := req.Context()
   225  	requestInfo, ok := apirequest.RequestInfoFrom(ctx)
   226  	if !ok {
   227  		responsewriters.ErrorNegotiated(
   228  			apierrors.NewInternalError(fmt.Errorf("no RequestInfo found in the context")),
   229  			Codecs, schema.GroupVersion{}, w, req,
   230  		)
   231  		return
   232  	}
   233  	if !requestInfo.IsResourceRequest {
   234  		pathParts := splitPath(requestInfo.Path)
   235  		// only match /apis/<group>/<version>
   236  		// only registered under /apis
   237  		if len(pathParts) == 3 {
   238  			r.versionDiscoveryHandler.ServeHTTP(w, req)
   239  			return
   240  		}
   241  		// only match /apis/<group>
   242  		if len(pathParts) == 2 {
   243  			r.groupDiscoveryHandler.ServeHTTP(w, req)
   244  			return
   245  		}
   246  
   247  		r.delegate.ServeHTTP(w, req)
   248  		return
   249  	}
   250  
   251  	crdName := requestInfo.Resource + "." + requestInfo.APIGroup
   252  	crd, err := r.crdLister.Get(crdName)
   253  	if apierrors.IsNotFound(err) {
   254  		r.delegate.ServeHTTP(w, req)
   255  		return
   256  	}
   257  	if err != nil {
   258  		utilruntime.HandleError(err)
   259  		responsewriters.ErrorNegotiated(
   260  			apierrors.NewInternalError(fmt.Errorf("error resolving resource")),
   261  			Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
   262  		)
   263  		return
   264  	}
   265  
   266  	// if the scope in the CRD and the scope in request differ (with exception of the verbs in possiblyAcrossAllNamespacesVerbs
   267  	// for namespaced resources), pass request to the delegate, which is supposed to lead to a 404.
   268  	namespacedCRD, namespacedReq := crd.Spec.Scope == apiextensionsv1.NamespaceScoped, len(requestInfo.Namespace) > 0
   269  	if !namespacedCRD && namespacedReq {
   270  		r.delegate.ServeHTTP(w, req)
   271  		return
   272  	}
   273  	if namespacedCRD && !namespacedReq && !possiblyAcrossAllNamespacesVerbs.Has(requestInfo.Verb) {
   274  		r.delegate.ServeHTTP(w, req)
   275  		return
   276  	}
   277  
   278  	if !apiextensionshelpers.HasServedCRDVersion(crd, requestInfo.APIVersion) {
   279  		r.delegate.ServeHTTP(w, req)
   280  		return
   281  	}
   282  
   283  	// There is a small chance that a CRD is being served because NamesAccepted condition is true,
   284  	// but it becomes "unserved" because another names update leads to a conflict
   285  	// and EstablishingController wasn't fast enough to put the CRD into the Established condition.
   286  	// We accept this as the problem is small and self-healing.
   287  	if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.NamesAccepted) &&
   288  		!apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
   289  		r.delegate.ServeHTTP(w, req)
   290  		return
   291  	}
   292  
   293  	terminating := apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Terminating)
   294  
   295  	crdInfo, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name)
   296  	if apierrors.IsNotFound(err) {
   297  		r.delegate.ServeHTTP(w, req)
   298  		return
   299  	}
   300  	if err != nil {
   301  		utilruntime.HandleError(err)
   302  		responsewriters.ErrorNegotiated(
   303  			apierrors.NewInternalError(fmt.Errorf("error resolving resource")),
   304  			Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
   305  		)
   306  		return
   307  	}
   308  	if !hasServedCRDVersion(crdInfo.spec, requestInfo.APIVersion) {
   309  		r.delegate.ServeHTTP(w, req)
   310  		return
   311  	}
   312  
   313  	deprecated := crdInfo.deprecated[requestInfo.APIVersion]
   314  	for _, w := range crdInfo.warnings[requestInfo.APIVersion] {
   315  		warning.AddWarning(req.Context(), "", w)
   316  	}
   317  
   318  	verb := strings.ToUpper(requestInfo.Verb)
   319  	resource := requestInfo.Resource
   320  	subresource := requestInfo.Subresource
   321  	scope := metrics.CleanScope(requestInfo)
   322  	supportedTypes := []string{
   323  		string(types.JSONPatchType),
   324  		string(types.MergePatchType),
   325  		string(types.ApplyPatchType),
   326  	}
   327  
   328  	var handlerFunc http.HandlerFunc
   329  	subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, requestInfo.APIVersion)
   330  	if err != nil {
   331  		utilruntime.HandleError(err)
   332  		responsewriters.ErrorNegotiated(
   333  			apierrors.NewInternalError(fmt.Errorf("could not properly serve the subresource")),
   334  			Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
   335  		)
   336  		return
   337  	}
   338  	switch {
   339  	case subresource == "status" && subresources != nil && subresources.Status != nil:
   340  		handlerFunc = r.serveStatus(w, req, requestInfo, crdInfo, terminating, supportedTypes)
   341  	case subresource == "scale" && subresources != nil && subresources.Scale != nil:
   342  		handlerFunc = r.serveScale(w, req, requestInfo, crdInfo, terminating, supportedTypes)
   343  	case len(subresource) == 0:
   344  		handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, crd, terminating, supportedTypes)
   345  	default:
   346  		responsewriters.ErrorNegotiated(
   347  			apierrors.NewNotFound(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Name),
   348  			Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
   349  		)
   350  	}
   351  
   352  	if handlerFunc != nil {
   353  		handlerFunc = metrics.InstrumentHandlerFunc(verb, requestInfo.APIGroup, requestInfo.APIVersion, resource, subresource, scope, metrics.APIServerComponent, deprecated, "", handlerFunc)
   354  		handler := genericfilters.WithWaitGroup(handlerFunc, longRunningFilter, crdInfo.waitGroup)
   355  		handler.ServeHTTP(w, req)
   356  		return
   357  	}
   358  }
   359  
   360  func (r *crdHandler) serveResource(w http.ResponseWriter, req *http.Request, requestInfo *apirequest.RequestInfo, crdInfo *crdInfo, crd *apiextensionsv1.CustomResourceDefinition, terminating bool, supportedTypes []string) http.HandlerFunc {
   361  	requestScope := crdInfo.requestScopes[requestInfo.APIVersion]
   362  	storage := crdInfo.storages[requestInfo.APIVersion].CustomResource
   363  
   364  	switch requestInfo.Verb {
   365  	case "get":
   366  		return handlers.GetResource(storage, requestScope)
   367  	case "list":
   368  		forceWatch := false
   369  		return handlers.ListResource(storage, storage, requestScope, forceWatch, r.minRequestTimeout)
   370  	case "watch":
   371  		forceWatch := true
   372  		return handlers.ListResource(storage, storage, requestScope, forceWatch, r.minRequestTimeout)
   373  	case "create":
   374  		// we want to track recently created CRDs so that in HA environments we don't have server A allow a create and server B
   375  		// not have observed the established, so a followup get,update,delete results in a 404. We've observed about 800ms
   376  		// delay in some CI environments.  Two seconds looks long enough and reasonably short for hot retriers.
   377  		justCreated := time.Since(apiextensionshelpers.FindCRDCondition(crd, apiextensionsv1.Established).LastTransitionTime.Time) < 2*time.Second
   378  		if justCreated {
   379  			time.Sleep(2 * time.Second)
   380  		}
   381  		if terminating {
   382  			err := apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb)
   383  			err.ErrStatus.Message = fmt.Sprintf("%v not allowed while custom resource definition is terminating", requestInfo.Verb)
   384  			responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
   385  			return nil
   386  		}
   387  		return handlers.CreateResource(storage, requestScope, r.admission)
   388  	case "update":
   389  		return handlers.UpdateResource(storage, requestScope, r.admission)
   390  	case "patch":
   391  		return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
   392  	case "delete":
   393  		allowsOptions := true
   394  		return handlers.DeleteResource(storage, allowsOptions, requestScope, r.admission)
   395  	case "deletecollection":
   396  		checkBody := true
   397  		return handlers.DeleteCollection(storage, checkBody, requestScope, r.admission)
   398  	default:
   399  		responsewriters.ErrorNegotiated(
   400  			apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb),
   401  			Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
   402  		)
   403  		return nil
   404  	}
   405  }
   406  
   407  func (r *crdHandler) serveStatus(w http.ResponseWriter, req *http.Request, requestInfo *apirequest.RequestInfo, crdInfo *crdInfo, terminating bool, supportedTypes []string) http.HandlerFunc {
   408  	requestScope := crdInfo.statusRequestScopes[requestInfo.APIVersion]
   409  	storage := crdInfo.storages[requestInfo.APIVersion].Status
   410  
   411  	switch requestInfo.Verb {
   412  	case "get":
   413  		return handlers.GetResource(storage, requestScope)
   414  	case "update":
   415  		return handlers.UpdateResource(storage, requestScope, r.admission)
   416  	case "patch":
   417  		return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
   418  	default:
   419  		responsewriters.ErrorNegotiated(
   420  			apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb),
   421  			Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
   422  		)
   423  		return nil
   424  	}
   425  }
   426  
   427  func (r *crdHandler) serveScale(w http.ResponseWriter, req *http.Request, requestInfo *apirequest.RequestInfo, crdInfo *crdInfo, terminating bool, supportedTypes []string) http.HandlerFunc {
   428  	requestScope := crdInfo.scaleRequestScopes[requestInfo.APIVersion]
   429  	storage := crdInfo.storages[requestInfo.APIVersion].Scale
   430  
   431  	switch requestInfo.Verb {
   432  	case "get":
   433  		return handlers.GetResource(storage, requestScope)
   434  	case "update":
   435  		return handlers.UpdateResource(storage, requestScope, r.admission)
   436  	case "patch":
   437  		return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
   438  	default:
   439  		responsewriters.ErrorNegotiated(
   440  			apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb),
   441  			Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
   442  		)
   443  		return nil
   444  	}
   445  }
   446  
   447  // createCustomResourceDefinition removes potentially stale storage so it gets re-created
   448  func (r *crdHandler) createCustomResourceDefinition(obj interface{}) {
   449  	crd := obj.(*apiextensionsv1.CustomResourceDefinition)
   450  	r.customStorageLock.Lock()
   451  	defer r.customStorageLock.Unlock()
   452  	// this could happen if the create event is merged from create-update events
   453  	storageMap := r.customStorage.Load().(crdStorageMap)
   454  	oldInfo, found := storageMap[crd.UID]
   455  	if !found {
   456  		return
   457  	}
   458  	if apiequality.Semantic.DeepEqual(&crd.Spec, oldInfo.spec) && apiequality.Semantic.DeepEqual(&crd.Status.AcceptedNames, oldInfo.acceptedNames) {
   459  		klog.V(6).Infof("Ignoring customresourcedefinition %s create event because a storage with the same spec and accepted names exists",
   460  			crd.Name)
   461  		return
   462  	}
   463  	r.removeStorage_locked(crd.UID)
   464  }
   465  
   466  // updateCustomResourceDefinition removes potentially stale storage so it gets re-created
   467  func (r *crdHandler) updateCustomResourceDefinition(oldObj, newObj interface{}) {
   468  	oldCRD := oldObj.(*apiextensionsv1.CustomResourceDefinition)
   469  	newCRD := newObj.(*apiextensionsv1.CustomResourceDefinition)
   470  
   471  	r.customStorageLock.Lock()
   472  	defer r.customStorageLock.Unlock()
   473  
   474  	// Add CRD to the establishing controller queue.
   475  	// For HA clusters, we want to prevent race conditions when changing status to Established,
   476  	// so we want to be sure that CRD is Installing at least for 5 seconds before Establishing it.
   477  	// TODO: find a real HA safe checkpointing mechanism instead of an arbitrary wait.
   478  	if !apiextensionshelpers.IsCRDConditionTrue(newCRD, apiextensionsv1.Established) &&
   479  		apiextensionshelpers.IsCRDConditionTrue(newCRD, apiextensionsv1.NamesAccepted) {
   480  		if r.masterCount > 1 {
   481  			r.establishingController.QueueCRD(newCRD.Name, 5*time.Second)
   482  		} else {
   483  			r.establishingController.QueueCRD(newCRD.Name, 0)
   484  		}
   485  	}
   486  
   487  	if oldCRD.UID != newCRD.UID {
   488  		r.removeStorage_locked(oldCRD.UID)
   489  	}
   490  
   491  	storageMap := r.customStorage.Load().(crdStorageMap)
   492  	oldInfo, found := storageMap[newCRD.UID]
   493  	if !found {
   494  		return
   495  	}
   496  	if apiequality.Semantic.DeepEqual(&newCRD.Spec, oldInfo.spec) && apiequality.Semantic.DeepEqual(&newCRD.Status.AcceptedNames, oldInfo.acceptedNames) {
   497  		klog.V(6).Infof("Ignoring customresourcedefinition %s update because neither spec, nor accepted names changed", oldCRD.Name)
   498  		return
   499  	}
   500  
   501  	klog.V(4).Infof("Updating customresourcedefinition %s", newCRD.Name)
   502  	r.removeStorage_locked(newCRD.UID)
   503  }
   504  
   505  // removeStorage_locked removes the cached storage with the given uid as key from the storage map. This function
   506  // updates r.customStorage with the cleaned-up storageMap and tears down the old storage.
   507  // NOTE: Caller MUST hold r.customStorageLock to write r.customStorage thread-safely.
   508  func (r *crdHandler) removeStorage_locked(uid types.UID) {
   509  	storageMap := r.customStorage.Load().(crdStorageMap)
   510  	if oldInfo, ok := storageMap[uid]; ok {
   511  		// Copy because we cannot write to storageMap without a race
   512  		// as it is used without locking elsewhere.
   513  		storageMap2 := storageMap.clone()
   514  
   515  		// Remove from the CRD info map and store the map
   516  		delete(storageMap2, uid)
   517  		r.customStorage.Store(storageMap2)
   518  
   519  		// Tear down the old storage
   520  		go r.tearDown(oldInfo)
   521  	}
   522  }
   523  
   524  // removeDeadStorage removes REST storage that isn't being used
   525  func (r *crdHandler) removeDeadStorage() {
   526  	allCustomResourceDefinitions, err := r.crdLister.List(labels.Everything())
   527  	if err != nil {
   528  		utilruntime.HandleError(err)
   529  		return
   530  	}
   531  
   532  	r.customStorageLock.Lock()
   533  	defer r.customStorageLock.Unlock()
   534  
   535  	storageMap := r.customStorage.Load().(crdStorageMap)
   536  	// Copy because we cannot write to storageMap without a race
   537  	storageMap2 := make(crdStorageMap)
   538  	for _, crd := range allCustomResourceDefinitions {
   539  		if _, ok := storageMap[crd.UID]; ok {
   540  			storageMap2[crd.UID] = storageMap[crd.UID]
   541  		}
   542  	}
   543  	r.customStorage.Store(storageMap2)
   544  
   545  	for uid, crdInfo := range storageMap {
   546  		if _, ok := storageMap2[uid]; !ok {
   547  			klog.V(4).Infof("Removing dead CRD storage for %s/%s", crdInfo.spec.Group, crdInfo.spec.Names.Kind)
   548  			go r.tearDown(crdInfo)
   549  		}
   550  	}
   551  }
   552  
   553  // Wait up to a minute for requests to drain, then tear down storage
   554  func (r *crdHandler) tearDown(oldInfo *crdInfo) {
   555  	requestsDrained := make(chan struct{})
   556  	go func() {
   557  		defer close(requestsDrained)
   558  		// Allow time for in-flight requests with a handle to the old info to register themselves
   559  		time.Sleep(time.Second)
   560  		// Wait for in-flight requests to drain
   561  		oldInfo.waitGroup.Wait()
   562  	}()
   563  
   564  	select {
   565  	case <-time.After(r.requestTimeout * 2):
   566  		klog.Warningf("timeout waiting for requests to drain for %s/%s, tearing down storage", oldInfo.spec.Group, oldInfo.spec.Names.Kind)
   567  	case <-requestsDrained:
   568  	}
   569  
   570  	for _, storage := range oldInfo.storages {
   571  		// destroy only the main storage. Those for the subresources share cacher and etcd clients.
   572  		storage.CustomResource.DestroyFunc()
   573  	}
   574  }
   575  
   576  // Destroy shuts down storage layer for all registered CRDs.
   577  // It should be called as a last step of the shutdown sequence.
   578  func (r *crdHandler) destroy() {
   579  	r.customStorageLock.Lock()
   580  	defer r.customStorageLock.Unlock()
   581  
   582  	storageMap := r.customStorage.Load().(crdStorageMap)
   583  	for _, crdInfo := range storageMap {
   584  		for _, storage := range crdInfo.storages {
   585  			// DestroyFunc have to be implemented in idempotent way,
   586  			// so the potential race with r.tearDown() (being called
   587  			// from a goroutine) is safe.
   588  			storage.CustomResource.DestroyFunc()
   589  		}
   590  	}
   591  }
   592  
   593  // GetCustomResourceListerCollectionDeleter returns the ListerCollectionDeleter of
   594  // the given crd.
   595  func (r *crdHandler) GetCustomResourceListerCollectionDeleter(crd *apiextensionsv1.CustomResourceDefinition) (finalizer.ListerCollectionDeleter, error) {
   596  	info, err := r.getOrCreateServingInfoFor(crd.UID, crd.Name)
   597  	if err != nil {
   598  		return nil, err
   599  	}
   600  	return info.storages[info.storageVersion].CustomResource, nil
   601  }
   602  
   603  // getOrCreateServingInfoFor gets the CRD serving info for the given CRD UID if the key exists in the storage map.
   604  // Otherwise the function fetches the up-to-date CRD using the given CRD name and creates CRD serving info.
   605  func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crdInfo, error) {
   606  	storageMap := r.customStorage.Load().(crdStorageMap)
   607  	if ret, ok := storageMap[uid]; ok {
   608  		return ret, nil
   609  	}
   610  
   611  	r.customStorageLock.Lock()
   612  	defer r.customStorageLock.Unlock()
   613  
   614  	// Get the up-to-date CRD when we have the lock, to avoid racing with updateCustomResourceDefinition.
   615  	// If updateCustomResourceDefinition sees an update and happens later, the storage will be deleted and
   616  	// we will re-create the updated storage on demand. If updateCustomResourceDefinition happens before,
   617  	// we make sure that we observe the same up-to-date CRD.
   618  	crd, err := r.crdLister.Get(name)
   619  	if err != nil {
   620  		return nil, err
   621  	}
   622  	storageMap = r.customStorage.Load().(crdStorageMap)
   623  	if ret, ok := storageMap[crd.UID]; ok {
   624  		return ret, nil
   625  	}
   626  
   627  	storageVersion, err := apiextensionshelpers.GetCRDStorageVersion(crd)
   628  	if err != nil {
   629  		return nil, err
   630  	}
   631  
   632  	// Scope/Storages per version.
   633  	requestScopes := map[string]*handlers.RequestScope{}
   634  	storages := map[string]customresource.CustomResourceStorage{}
   635  	statusScopes := map[string]*handlers.RequestScope{}
   636  	scaleScopes := map[string]*handlers.RequestScope{}
   637  	deprecated := map[string]bool{}
   638  	warnings := map[string][]string{}
   639  
   640  	equivalentResourceRegistry := runtime.NewEquivalentResourceRegistry()
   641  
   642  	structuralSchemas := map[string]*structuralschema.Structural{}
   643  	for _, v := range crd.Spec.Versions {
   644  		val, err := apiextensionshelpers.GetSchemaForVersion(crd, v.Name)
   645  		if err != nil {
   646  			utilruntime.HandleError(err)
   647  			return nil, fmt.Errorf("the server could not properly serve the CR schema")
   648  		}
   649  		if val == nil {
   650  			continue
   651  		}
   652  		internalValidation := &apiextensionsinternal.CustomResourceValidation{}
   653  		if err := apiextensionsv1.Convert_v1_CustomResourceValidation_To_apiextensions_CustomResourceValidation(val, internalValidation, nil); err != nil {
   654  			return nil, fmt.Errorf("failed converting CRD validation to internal version: %v", err)
   655  		}
   656  		s, err := structuralschema.NewStructural(internalValidation.OpenAPIV3Schema)
   657  		if crd.Spec.PreserveUnknownFields == false && err != nil {
   658  			// This should never happen. If it does, it is a programming error.
   659  			utilruntime.HandleError(fmt.Errorf("failed to convert schema to structural: %v", err))
   660  			return nil, fmt.Errorf("the server could not properly serve the CR schema") // validation should avoid this
   661  		}
   662  
   663  		if crd.Spec.PreserveUnknownFields == false {
   664  			// we don't own s completely, e.g. defaults are not deep-copied. So better make a copy here.
   665  			s = s.DeepCopy()
   666  
   667  			if err := structuraldefaulting.PruneDefaults(s); err != nil {
   668  				// This should never happen. If it does, it is a programming error.
   669  				utilruntime.HandleError(fmt.Errorf("failed to prune defaults: %v", err))
   670  				return nil, fmt.Errorf("the server could not properly serve the CR schema") // validation should avoid this
   671  			}
   672  		}
   673  		structuralSchemas[v.Name] = s
   674  	}
   675  
   676  	openAPIModels, err := buildOpenAPIModelsForApply(r.staticOpenAPISpec, crd)
   677  	if err != nil {
   678  		utilruntime.HandleError(fmt.Errorf("error building openapi models for %s: %v", crd.Name, err))
   679  		openAPIModels = nil
   680  	}
   681  
   682  	var typeConverter managedfields.TypeConverter = managedfields.NewDeducedTypeConverter()
   683  	if len(openAPIModels) > 0 {
   684  		typeConverter, err = managedfields.NewTypeConverter(openAPIModels, crd.Spec.PreserveUnknownFields)
   685  		if err != nil {
   686  			return nil, err
   687  		}
   688  	}
   689  
   690  	safeConverter, unsafeConverter, err := r.converterFactory.NewConverter(crd)
   691  	if err != nil {
   692  		return nil, err
   693  	}
   694  
   695  	// Create replicasPathInCustomResource
   696  	replicasPathInCustomResource := managedfields.ResourcePathMappings{}
   697  	for _, v := range crd.Spec.Versions {
   698  		subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, v.Name)
   699  		if err != nil {
   700  			utilruntime.HandleError(err)
   701  			return nil, fmt.Errorf("the server could not properly serve the CR subresources")
   702  		}
   703  		if subresources == nil || subresources.Scale == nil {
   704  			replicasPathInCustomResource[schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}.String()] = nil
   705  			continue
   706  		}
   707  		path := fieldpath.Path{}
   708  		splitReplicasPath := strings.Split(strings.TrimPrefix(subresources.Scale.SpecReplicasPath, "."), ".")
   709  		for _, element := range splitReplicasPath {
   710  			s := element
   711  			path = append(path, fieldpath.PathElement{FieldName: &s})
   712  		}
   713  		replicasPathInCustomResource[schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}.String()] = path
   714  	}
   715  
   716  	for _, v := range crd.Spec.Versions {
   717  		// In addition to Unstructured objects (Custom Resources), we also may sometimes need to
   718  		// decode unversioned Options objects, so we delegate to parameterScheme for such types.
   719  		parameterScheme := runtime.NewScheme()
   720  		parameterScheme.AddUnversionedTypes(schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name},
   721  			&metav1.ListOptions{},
   722  			&metav1.GetOptions{},
   723  			&metav1.DeleteOptions{},
   724  		)
   725  		parameterCodec := runtime.NewParameterCodec(parameterScheme)
   726  
   727  		resource := schema.GroupVersionResource{Group: crd.Spec.Group, Version: v.Name, Resource: crd.Status.AcceptedNames.Plural}
   728  		if len(resource.Resource) == 0 {
   729  			utilruntime.HandleError(fmt.Errorf("CustomResourceDefinition %s has unexpected empty status.acceptedNames.plural", crd.Name))
   730  			return nil, fmt.Errorf("the server could not properly serve the resource")
   731  		}
   732  		singularResource := schema.GroupVersionResource{Group: crd.Spec.Group, Version: v.Name, Resource: crd.Status.AcceptedNames.Singular}
   733  		if len(singularResource.Resource) == 0 {
   734  			utilruntime.HandleError(fmt.Errorf("CustomResourceDefinition %s has unexpected empty status.acceptedNames.singular", crd.Name))
   735  			return nil, fmt.Errorf("the server could not properly serve the resource")
   736  		}
   737  		kind := schema.GroupVersionKind{Group: crd.Spec.Group, Version: v.Name, Kind: crd.Status.AcceptedNames.Kind}
   738  		if len(kind.Kind) == 0 {
   739  			utilruntime.HandleError(fmt.Errorf("CustomResourceDefinition %s has unexpected empty status.acceptedNames.kind", crd.Name))
   740  			return nil, fmt.Errorf("the server could not properly serve the kind")
   741  		}
   742  		equivalentResourceRegistry.RegisterKindFor(resource, "", kind)
   743  
   744  		typer := newUnstructuredObjectTyper(parameterScheme)
   745  		creator := unstructuredCreator{}
   746  
   747  		validationSchema, err := apiextensionshelpers.GetSchemaForVersion(crd, v.Name)
   748  		if err != nil {
   749  			utilruntime.HandleError(err)
   750  			return nil, fmt.Errorf("the server could not properly serve the CR schema")
   751  		}
   752  		var internalSchemaProps *apiextensionsinternal.JSONSchemaProps
   753  		var internalValidationSchema *apiextensionsinternal.CustomResourceValidation
   754  		if validationSchema != nil {
   755  			internalValidationSchema = &apiextensionsinternal.CustomResourceValidation{}
   756  			if err := apiextensionsv1.Convert_v1_CustomResourceValidation_To_apiextensions_CustomResourceValidation(validationSchema, internalValidationSchema, nil); err != nil {
   757  				return nil, fmt.Errorf("failed to convert CRD validation to internal version: %v", err)
   758  			}
   759  			internalSchemaProps = internalValidationSchema.OpenAPIV3Schema
   760  		}
   761  		validator, _, err := apiservervalidation.NewSchemaValidator(internalSchemaProps)
   762  		if err != nil {
   763  			return nil, err
   764  		}
   765  
   766  		var statusSpec *apiextensionsinternal.CustomResourceSubresourceStatus
   767  		var statusValidator apiservervalidation.SchemaValidator
   768  		subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, v.Name)
   769  		if err != nil {
   770  			utilruntime.HandleError(err)
   771  			return nil, fmt.Errorf("the server could not properly serve the CR subresources")
   772  		}
   773  		if subresources != nil && subresources.Status != nil {
   774  			equivalentResourceRegistry.RegisterKindFor(resource, "status", kind)
   775  			statusSpec = &apiextensionsinternal.CustomResourceSubresourceStatus{}
   776  			if err := apiextensionsv1.Convert_v1_CustomResourceSubresourceStatus_To_apiextensions_CustomResourceSubresourceStatus(subresources.Status, statusSpec, nil); err != nil {
   777  				return nil, fmt.Errorf("failed converting CRD status subresource to internal version: %v", err)
   778  			}
   779  			// for the status subresource, validate only against the status schema
   780  			if internalValidationSchema != nil && internalValidationSchema.OpenAPIV3Schema != nil && internalValidationSchema.OpenAPIV3Schema.Properties != nil {
   781  				if statusSchema, ok := internalValidationSchema.OpenAPIV3Schema.Properties["status"]; ok {
   782  					statusValidator, _, err = apiservervalidation.NewSchemaValidator(&statusSchema)
   783  					if err != nil {
   784  						return nil, err
   785  					}
   786  				}
   787  			}
   788  		}
   789  
   790  		var scaleSpec *apiextensionsinternal.CustomResourceSubresourceScale
   791  		if subresources != nil && subresources.Scale != nil {
   792  			equivalentResourceRegistry.RegisterKindFor(resource, "scale", autoscalingv1.SchemeGroupVersion.WithKind("Scale"))
   793  			scaleSpec = &apiextensionsinternal.CustomResourceSubresourceScale{}
   794  			if err := apiextensionsv1.Convert_v1_CustomResourceSubresourceScale_To_apiextensions_CustomResourceSubresourceScale(subresources.Scale, scaleSpec, nil); err != nil {
   795  				return nil, fmt.Errorf("failed converting CRD status subresource to internal version: %v", err)
   796  			}
   797  		}
   798  
   799  		columns, err := getColumnsForVersion(crd, v.Name)
   800  		if err != nil {
   801  			utilruntime.HandleError(err)
   802  			return nil, fmt.Errorf("the server could not properly serve the CR columns")
   803  		}
   804  		table, err := tableconvertor.New(columns)
   805  		if err != nil {
   806  			klog.V(2).Infof("The CRD for %v has an invalid printer specification, falling back to default printing: %v", kind, err)
   807  		}
   808  
   809  		listKind := schema.GroupVersionKind{Group: crd.Spec.Group, Version: v.Name, Kind: crd.Status.AcceptedNames.ListKind}
   810  		if len(listKind.Kind) == 0 {
   811  			utilruntime.HandleError(fmt.Errorf("CustomResourceDefinition %s has unexpected empty status.acceptedNames.listKind", crd.Name))
   812  			return nil, fmt.Errorf("the server could not properly serve the list kind")
   813  		}
   814  
   815  		storages[v.Name] = customresource.NewStorage(
   816  			resource.GroupResource(),
   817  			singularResource.GroupResource(),
   818  			kind,
   819  			listKind,
   820  			customresource.NewStrategy(
   821  				typer,
   822  				crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
   823  				kind,
   824  				validator,
   825  				statusValidator,
   826  				structuralSchemas[v.Name],
   827  				statusSpec,
   828  				scaleSpec,
   829  				v.SelectableFields,
   830  			),
   831  			crdConversionRESTOptionsGetter{
   832  				RESTOptionsGetter:     r.restOptionsGetter,
   833  				converter:             safeConverter,
   834  				decoderVersion:        schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name},
   835  				encoderVersion:        schema.GroupVersion{Group: crd.Spec.Group, Version: storageVersion},
   836  				structuralSchemas:     structuralSchemas,
   837  				structuralSchemaGK:    kind.GroupKind(),
   838  				preserveUnknownFields: crd.Spec.PreserveUnknownFields,
   839  			},
   840  			crd.Status.AcceptedNames.Categories,
   841  			table,
   842  			replicasPathInCustomResource,
   843  		)
   844  
   845  		clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped
   846  
   847  		// CRDs explicitly do not support protobuf, but some objects returned by the API server do
   848  		negotiatedSerializer := unstructuredNegotiatedSerializer{
   849  			typer:                 typer,
   850  			creator:               creator,
   851  			converter:             safeConverter,
   852  			structuralSchemas:     structuralSchemas,
   853  			structuralSchemaGK:    kind.GroupKind(),
   854  			preserveUnknownFields: crd.Spec.PreserveUnknownFields,
   855  			supportedMediaTypes: []runtime.SerializerInfo{
   856  				{
   857  					MediaType:        "application/json",
   858  					MediaTypeType:    "application",
   859  					MediaTypeSubType: "json",
   860  					EncodesAsText:    true,
   861  					Serializer:       json.NewSerializer(json.DefaultMetaFactory, creator, typer, false),
   862  					PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, creator, typer, true),
   863  					StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
   864  						Strict: true,
   865  					}),
   866  					StreamSerializer: &runtime.StreamSerializerInfo{
   867  						EncodesAsText: true,
   868  						Serializer:    json.NewSerializer(json.DefaultMetaFactory, creator, typer, false),
   869  						Framer:        json.Framer,
   870  					},
   871  				},
   872  				{
   873  					MediaType:        "application/yaml",
   874  					MediaTypeType:    "application",
   875  					MediaTypeSubType: "yaml",
   876  					EncodesAsText:    true,
   877  					Serializer:       json.NewYAMLSerializer(json.DefaultMetaFactory, creator, typer),
   878  					StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
   879  						Yaml:   true,
   880  						Strict: true,
   881  					}),
   882  				},
   883  				{
   884  					MediaType:        "application/vnd.kubernetes.protobuf",
   885  					MediaTypeType:    "application",
   886  					MediaTypeSubType: "vnd.kubernetes.protobuf",
   887  					Serializer:       protobuf.NewSerializer(creator, typer),
   888  					StreamSerializer: &runtime.StreamSerializerInfo{
   889  						Serializer: protobuf.NewRawSerializer(creator, typer),
   890  						Framer:     protobuf.LengthDelimitedFramer,
   891  					},
   892  				},
   893  			},
   894  		}
   895  		var standardSerializers []runtime.SerializerInfo
   896  		for _, s := range negotiatedSerializer.SupportedMediaTypes() {
   897  			if s.MediaType == runtime.ContentTypeProtobuf {
   898  				continue
   899  			}
   900  			standardSerializers = append(standardSerializers, s)
   901  		}
   902  
   903  		reqScope := handlers.RequestScope{
   904  			Namer: handlers.ContextBasedNaming{
   905  				Namer:         meta.NewAccessor(),
   906  				ClusterScoped: clusterScoped,
   907  			},
   908  			Serializer:          negotiatedSerializer,
   909  			ParameterCodec:      parameterCodec,
   910  			StandardSerializers: standardSerializers,
   911  
   912  			Creater:         creator,
   913  			Convertor:       safeConverter,
   914  			Defaulter:       unstructuredDefaulter{parameterScheme, structuralSchemas, kind.GroupKind()},
   915  			Typer:           typer,
   916  			UnsafeConvertor: unsafeConverter,
   917  
   918  			EquivalentResourceMapper: equivalentResourceRegistry,
   919  
   920  			Resource: schema.GroupVersionResource{Group: crd.Spec.Group, Version: v.Name, Resource: crd.Status.AcceptedNames.Plural},
   921  			Kind:     kind,
   922  
   923  			// a handler for a specific group-version of a custom resource uses that version as the in-memory representation
   924  			HubGroupVersion: kind.GroupVersion(),
   925  
   926  			MetaGroupVersion: metav1.SchemeGroupVersion,
   927  
   928  			TableConvertor: storages[v.Name].CustomResource,
   929  
   930  			Authorizer: r.authorizer,
   931  
   932  			MaxRequestBodyBytes: r.maxRequestBodyBytes,
   933  		}
   934  
   935  		resetFields := storages[v.Name].CustomResource.GetResetFields()
   936  		reqScope, err = scopeWithFieldManager(
   937  			typeConverter,
   938  			reqScope,
   939  			resetFields,
   940  			"",
   941  		)
   942  		if err != nil {
   943  			return nil, err
   944  		}
   945  		requestScopes[v.Name] = &reqScope
   946  
   947  		scaleColumns, err := getScaleColumnsForVersion(crd, v.Name)
   948  		if err != nil {
   949  			return nil, fmt.Errorf("the server could not properly serve the CR scale subresource columns %w", err)
   950  		}
   951  		scaleTable, _ := tableconvertor.New(scaleColumns)
   952  
   953  		// override scale subresource values
   954  		// shallow copy
   955  		scaleScope := *requestScopes[v.Name]
   956  		scaleConverter := scale.NewScaleConverter()
   957  		scaleScope.Subresource = "scale"
   958  		scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme())
   959  		scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
   960  		scaleScope.Namer = handlers.ContextBasedNaming{
   961  			Namer:         meta.NewAccessor(),
   962  			ClusterScoped: clusterScoped,
   963  		}
   964  		scaleScope.TableConvertor = scaleTable
   965  
   966  		if subresources != nil && subresources.Scale != nil {
   967  			scaleScope, err = scopeWithFieldManager(
   968  				typeConverter,
   969  				scaleScope,
   970  				nil,
   971  				"scale",
   972  			)
   973  			if err != nil {
   974  				return nil, err
   975  			}
   976  		}
   977  
   978  		scaleScopes[v.Name] = &scaleScope
   979  
   980  		// override status subresource values
   981  		// shallow copy
   982  		statusScope := *requestScopes[v.Name]
   983  		statusScope.Subresource = "status"
   984  		statusScope.Namer = handlers.ContextBasedNaming{
   985  			Namer:         meta.NewAccessor(),
   986  			ClusterScoped: clusterScoped,
   987  		}
   988  
   989  		if subresources != nil && subresources.Status != nil {
   990  			resetFields := storages[v.Name].Status.GetResetFields()
   991  			statusScope, err = scopeWithFieldManager(
   992  				typeConverter,
   993  				statusScope,
   994  				resetFields,
   995  				"status",
   996  			)
   997  			if err != nil {
   998  				return nil, err
   999  			}
  1000  		}
  1001  
  1002  		statusScopes[v.Name] = &statusScope
  1003  
  1004  		if v.Deprecated {
  1005  			deprecated[v.Name] = true
  1006  			if v.DeprecationWarning != nil {
  1007  				warnings[v.Name] = append(warnings[v.Name], *v.DeprecationWarning)
  1008  			} else {
  1009  				warnings[v.Name] = append(warnings[v.Name], defaultDeprecationWarning(v.Name, crd.Spec))
  1010  			}
  1011  		}
  1012  	}
  1013  
  1014  	ret := &crdInfo{
  1015  		spec:                &crd.Spec,
  1016  		acceptedNames:       &crd.Status.AcceptedNames,
  1017  		storages:            storages,
  1018  		requestScopes:       requestScopes,
  1019  		scaleRequestScopes:  scaleScopes,
  1020  		statusRequestScopes: statusScopes,
  1021  		deprecated:          deprecated,
  1022  		warnings:            warnings,
  1023  		storageVersion:      storageVersion,
  1024  		waitGroup:           &utilwaitgroup.SafeWaitGroup{},
  1025  	}
  1026  
  1027  	// Copy because we cannot write to storageMap without a race
  1028  	// as it is used without locking elsewhere.
  1029  	storageMap2 := storageMap.clone()
  1030  
  1031  	storageMap2[crd.UID] = ret
  1032  	r.customStorage.Store(storageMap2)
  1033  
  1034  	return ret, nil
  1035  }
  1036  
  1037  func scopeWithFieldManager(typeConverter managedfields.TypeConverter, reqScope handlers.RequestScope, resetFields map[fieldpath.APIVersion]*fieldpath.Set, subresource string) (handlers.RequestScope, error) {
  1038  	fieldManager, err := managedfields.NewDefaultCRDFieldManager(
  1039  		typeConverter,
  1040  		reqScope.Convertor,
  1041  		reqScope.Defaulter,
  1042  		reqScope.Creater,
  1043  		reqScope.Kind,
  1044  		reqScope.HubGroupVersion,
  1045  		subresource,
  1046  		resetFields,
  1047  	)
  1048  	if err != nil {
  1049  		return handlers.RequestScope{}, err
  1050  	}
  1051  	reqScope.FieldManager = fieldManager
  1052  	return reqScope, nil
  1053  }
  1054  
  1055  func defaultDeprecationWarning(deprecatedVersion string, crd apiextensionsv1.CustomResourceDefinitionSpec) string {
  1056  	msg := fmt.Sprintf("%s/%s %s is deprecated", crd.Group, deprecatedVersion, crd.Names.Kind)
  1057  
  1058  	var servedNonDeprecatedVersions []string
  1059  	for _, v := range crd.Versions {
  1060  		if v.Served && !v.Deprecated && version.CompareKubeAwareVersionStrings(deprecatedVersion, v.Name) < 0 {
  1061  			servedNonDeprecatedVersions = append(servedNonDeprecatedVersions, v.Name)
  1062  		}
  1063  	}
  1064  	if len(servedNonDeprecatedVersions) == 0 {
  1065  		return msg
  1066  	}
  1067  	sort.Slice(servedNonDeprecatedVersions, func(i, j int) bool {
  1068  		return version.CompareKubeAwareVersionStrings(servedNonDeprecatedVersions[i], servedNonDeprecatedVersions[j]) > 0
  1069  	})
  1070  	msg += fmt.Sprintf("; use %s/%s %s", crd.Group, servedNonDeprecatedVersions[0], crd.Names.Kind)
  1071  	return msg
  1072  }
  1073  
  1074  type unstructuredNegotiatedSerializer struct {
  1075  	typer     runtime.ObjectTyper
  1076  	creator   runtime.ObjectCreater
  1077  	converter runtime.ObjectConvertor
  1078  
  1079  	structuralSchemas     map[string]*structuralschema.Structural // by version
  1080  	structuralSchemaGK    schema.GroupKind
  1081  	preserveUnknownFields bool
  1082  
  1083  	supportedMediaTypes []runtime.SerializerInfo
  1084  }
  1085  
  1086  func (s unstructuredNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
  1087  	return s.supportedMediaTypes
  1088  }
  1089  
  1090  func (s unstructuredNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
  1091  	return versioning.NewCodec(encoder, nil, s.converter, Scheme, Scheme, Scheme, gv, nil, "crdNegotiatedSerializer")
  1092  }
  1093  
  1094  func (s unstructuredNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
  1095  	returnUnknownFieldPaths := false
  1096  	if serializer, ok := decoder.(*json.Serializer); ok {
  1097  		returnUnknownFieldPaths = serializer.IsStrict()
  1098  	}
  1099  	d := schemaCoercingDecoder{delegate: decoder, validator: unstructuredSchemaCoercer{structuralSchemas: s.structuralSchemas, structuralSchemaGK: s.structuralSchemaGK, preserveUnknownFields: s.preserveUnknownFields, returnUnknownFieldPaths: returnUnknownFieldPaths}}
  1100  	return versioning.NewCodec(nil, d, runtime.UnsafeObjectConvertor(Scheme), Scheme, Scheme, unstructuredDefaulter{
  1101  		delegate:           Scheme,
  1102  		structuralSchemas:  s.structuralSchemas,
  1103  		structuralSchemaGK: s.structuralSchemaGK,
  1104  	}, nil, gv, "unstructuredNegotiatedSerializer")
  1105  }
  1106  
  1107  type UnstructuredObjectTyper struct {
  1108  	Delegate          runtime.ObjectTyper
  1109  	UnstructuredTyper runtime.ObjectTyper
  1110  }
  1111  
  1112  func newUnstructuredObjectTyper(Delegate runtime.ObjectTyper) UnstructuredObjectTyper {
  1113  	return UnstructuredObjectTyper{
  1114  		Delegate:          Delegate,
  1115  		UnstructuredTyper: crdserverscheme.NewUnstructuredObjectTyper(),
  1116  	}
  1117  }
  1118  
  1119  func (t UnstructuredObjectTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind, bool, error) {
  1120  	// Delegate for things other than Unstructured.
  1121  	if _, ok := obj.(runtime.Unstructured); !ok {
  1122  		return t.Delegate.ObjectKinds(obj)
  1123  	}
  1124  	return t.UnstructuredTyper.ObjectKinds(obj)
  1125  }
  1126  
  1127  func (t UnstructuredObjectTyper) Recognizes(gvk schema.GroupVersionKind) bool {
  1128  	return t.Delegate.Recognizes(gvk) || t.UnstructuredTyper.Recognizes(gvk)
  1129  }
  1130  
  1131  type unstructuredCreator struct{}
  1132  
  1133  func (c unstructuredCreator) New(kind schema.GroupVersionKind) (runtime.Object, error) {
  1134  	ret := &unstructured.Unstructured{}
  1135  	ret.SetGroupVersionKind(kind)
  1136  	return ret, nil
  1137  }
  1138  
  1139  type unstructuredDefaulter struct {
  1140  	delegate           runtime.ObjectDefaulter
  1141  	structuralSchemas  map[string]*structuralschema.Structural // by version
  1142  	structuralSchemaGK schema.GroupKind
  1143  }
  1144  
  1145  func (d unstructuredDefaulter) Default(in runtime.Object) {
  1146  	// Delegate for things other than Unstructured, and other GKs
  1147  	u, ok := in.(runtime.Unstructured)
  1148  	if !ok || u.GetObjectKind().GroupVersionKind().GroupKind() != d.structuralSchemaGK {
  1149  		d.delegate.Default(in)
  1150  		return
  1151  	}
  1152  
  1153  	structuraldefaulting.Default(u.UnstructuredContent(), d.structuralSchemas[u.GetObjectKind().GroupVersionKind().Version])
  1154  }
  1155  
  1156  // clone returns a clone of the provided crdStorageMap.
  1157  // The clone is a shallow copy of the map.
  1158  func (in crdStorageMap) clone() crdStorageMap {
  1159  	if in == nil {
  1160  		return nil
  1161  	}
  1162  	out := make(crdStorageMap, len(in))
  1163  	for key, value := range in {
  1164  		out[key] = value
  1165  	}
  1166  	return out
  1167  }
  1168  
  1169  // crdConversionRESTOptionsGetter overrides the codec with one using the
  1170  // provided custom converter and custom encoder and decoder version.
  1171  type crdConversionRESTOptionsGetter struct {
  1172  	generic.RESTOptionsGetter
  1173  	converter             runtime.ObjectConvertor
  1174  	encoderVersion        schema.GroupVersion
  1175  	decoderVersion        schema.GroupVersion
  1176  	structuralSchemas     map[string]*structuralschema.Structural // by version
  1177  	structuralSchemaGK    schema.GroupKind
  1178  	preserveUnknownFields bool
  1179  }
  1180  
  1181  func (t crdConversionRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
  1182  	ret, err := t.RESTOptionsGetter.GetRESTOptions(resource)
  1183  	if err == nil {
  1184  		d := schemaCoercingDecoder{delegate: ret.StorageConfig.Codec, validator: unstructuredSchemaCoercer{
  1185  			// drop invalid fields while decoding old CRs (before we haven't had any ObjectMeta validation)
  1186  			dropInvalidMetadata:   true,
  1187  			repairGeneration:      true,
  1188  			structuralSchemas:     t.structuralSchemas,
  1189  			structuralSchemaGK:    t.structuralSchemaGK,
  1190  			preserveUnknownFields: t.preserveUnknownFields,
  1191  		}}
  1192  		c := schemaCoercingConverter{delegate: t.converter, validator: unstructuredSchemaCoercer{
  1193  			structuralSchemas:     t.structuralSchemas,
  1194  			structuralSchemaGK:    t.structuralSchemaGK,
  1195  			preserveUnknownFields: t.preserveUnknownFields,
  1196  		}}
  1197  		ret.StorageConfig.Codec = versioning.NewCodec(
  1198  			ret.StorageConfig.Codec,
  1199  			d,
  1200  			c,
  1201  			&unstructuredCreator{},
  1202  			crdserverscheme.NewUnstructuredObjectTyper(),
  1203  			&unstructuredDefaulter{
  1204  				delegate:           Scheme,
  1205  				structuralSchemaGK: t.structuralSchemaGK,
  1206  				structuralSchemas:  t.structuralSchemas,
  1207  			},
  1208  			t.encoderVersion,
  1209  			t.decoderVersion,
  1210  			"crdRESTOptions",
  1211  		)
  1212  	}
  1213  	return ret, err
  1214  }
  1215  
  1216  // schemaCoercingDecoder calls the delegate decoder, and then applies the Unstructured schema validator
  1217  // to coerce the schema.
  1218  type schemaCoercingDecoder struct {
  1219  	delegate  runtime.Decoder
  1220  	validator unstructuredSchemaCoercer
  1221  }
  1222  
  1223  var _ runtime.Decoder = schemaCoercingDecoder{}
  1224  
  1225  func (d schemaCoercingDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
  1226  	var decodingStrictErrs []error
  1227  	obj, gvk, err := d.delegate.Decode(data, defaults, into)
  1228  	if err != nil {
  1229  		decodeStrictErr, ok := runtime.AsStrictDecodingError(err)
  1230  		if !ok || obj == nil {
  1231  			return nil, gvk, err
  1232  		}
  1233  		decodingStrictErrs = decodeStrictErr.Errors()
  1234  	}
  1235  	var unknownFields []string
  1236  	if u, ok := obj.(*unstructured.Unstructured); ok {
  1237  		unknownFields, err = d.validator.apply(u)
  1238  		if err != nil {
  1239  			return nil, gvk, err
  1240  		}
  1241  	}
  1242  	if d.validator.returnUnknownFieldPaths && (len(decodingStrictErrs) > 0 || len(unknownFields) > 0) {
  1243  		for _, unknownField := range unknownFields {
  1244  			decodingStrictErrs = append(decodingStrictErrs, fmt.Errorf(`unknown field "%s"`, unknownField))
  1245  		}
  1246  		return obj, gvk, runtime.NewStrictDecodingError(decodingStrictErrs)
  1247  	}
  1248  
  1249  	return obj, gvk, nil
  1250  }
  1251  
  1252  // schemaCoercingConverter calls the delegate converter and applies the Unstructured validator to
  1253  // coerce the schema.
  1254  type schemaCoercingConverter struct {
  1255  	delegate  runtime.ObjectConvertor
  1256  	validator unstructuredSchemaCoercer
  1257  }
  1258  
  1259  var _ runtime.ObjectConvertor = schemaCoercingConverter{}
  1260  
  1261  func (v schemaCoercingConverter) Convert(in, out, context interface{}) error {
  1262  	if err := v.delegate.Convert(in, out, context); err != nil {
  1263  		return err
  1264  	}
  1265  
  1266  	if u, ok := out.(*unstructured.Unstructured); ok {
  1267  		if _, err := v.validator.apply(u); err != nil {
  1268  			return err
  1269  		}
  1270  	}
  1271  
  1272  	return nil
  1273  }
  1274  
  1275  func (v schemaCoercingConverter) ConvertToVersion(in runtime.Object, gv runtime.GroupVersioner) (runtime.Object, error) {
  1276  	out, err := v.delegate.ConvertToVersion(in, gv)
  1277  	if err != nil {
  1278  		return nil, err
  1279  	}
  1280  
  1281  	if u, ok := out.(*unstructured.Unstructured); ok {
  1282  		if _, err := v.validator.apply(u); err != nil {
  1283  			return nil, err
  1284  		}
  1285  	}
  1286  
  1287  	return out, nil
  1288  }
  1289  
  1290  func (v schemaCoercingConverter) ConvertFieldLabel(gvk schema.GroupVersionKind, label, value string) (string, string, error) {
  1291  	return v.delegate.ConvertFieldLabel(gvk, label, value)
  1292  }
  1293  
  1294  // unstructuredSchemaCoercer adds to unstructured unmarshalling what json.Unmarshal does
  1295  // in addition for native types when decoding into Golang structs:
  1296  //
  1297  // - validating and pruning ObjectMeta
  1298  // - generic pruning of unknown fields following a structural schema
  1299  // - removal of non-defaulted non-nullable null map values.
  1300  type unstructuredSchemaCoercer struct {
  1301  	dropInvalidMetadata bool
  1302  	repairGeneration    bool
  1303  
  1304  	structuralSchemas       map[string]*structuralschema.Structural
  1305  	structuralSchemaGK      schema.GroupKind
  1306  	preserveUnknownFields   bool
  1307  	returnUnknownFieldPaths bool
  1308  }
  1309  
  1310  func (v *unstructuredSchemaCoercer) apply(u *unstructured.Unstructured) (unknownFieldPaths []string, err error) {
  1311  	// save implicit meta fields that don't have to be specified in the validation spec
  1312  	kind, foundKind, err := unstructured.NestedString(u.UnstructuredContent(), "kind")
  1313  	if err != nil {
  1314  		return nil, err
  1315  	}
  1316  	apiVersion, foundApiVersion, err := unstructured.NestedString(u.UnstructuredContent(), "apiVersion")
  1317  	if err != nil {
  1318  		return nil, err
  1319  	}
  1320  	objectMeta, foundObjectMeta, metaUnknownFields, err := schemaobjectmeta.GetObjectMetaWithOptions(u.Object, schemaobjectmeta.ObjectMetaOptions{
  1321  		DropMalformedFields:     v.dropInvalidMetadata,
  1322  		ReturnUnknownFieldPaths: v.returnUnknownFieldPaths,
  1323  	})
  1324  	if err != nil {
  1325  		return nil, err
  1326  	}
  1327  	unknownFieldPaths = append(unknownFieldPaths, metaUnknownFields...)
  1328  
  1329  	// compare group and kind because also other object like DeleteCollection options pass through here
  1330  	gv, err := schema.ParseGroupVersion(apiVersion)
  1331  	if err != nil {
  1332  		return nil, err
  1333  	}
  1334  
  1335  	if gv.Group == v.structuralSchemaGK.Group && kind == v.structuralSchemaGK.Kind {
  1336  		if !v.preserveUnknownFields {
  1337  			// TODO: switch over pruning and coercing at the root to schemaobjectmeta.Coerce too
  1338  			pruneOpts := structuralschema.UnknownFieldPathOptions{}
  1339  			if v.returnUnknownFieldPaths {
  1340  				pruneOpts.TrackUnknownFieldPaths = true
  1341  			}
  1342  			unknownFieldPaths = append(unknownFieldPaths, structuralpruning.PruneWithOptions(u.Object, v.structuralSchemas[gv.Version], true, pruneOpts)...)
  1343  			structuraldefaulting.PruneNonNullableNullsWithoutDefaults(u.Object, v.structuralSchemas[gv.Version])
  1344  		}
  1345  
  1346  		err, paths := schemaobjectmeta.CoerceWithOptions(nil, u.Object, v.structuralSchemas[gv.Version], false, schemaobjectmeta.CoerceOptions{
  1347  			DropInvalidFields:       v.dropInvalidMetadata,
  1348  			ReturnUnknownFieldPaths: v.returnUnknownFieldPaths,
  1349  		})
  1350  		if err != nil {
  1351  			return nil, err
  1352  		}
  1353  		unknownFieldPaths = append(unknownFieldPaths, paths...)
  1354  
  1355  		// fixup missing generation in very old CRs
  1356  		if v.repairGeneration && objectMeta.Generation == 0 {
  1357  			objectMeta.Generation = 1
  1358  		}
  1359  	}
  1360  
  1361  	// restore meta fields, starting clean
  1362  	if foundKind {
  1363  		u.SetKind(kind)
  1364  	}
  1365  	if foundApiVersion {
  1366  		u.SetAPIVersion(apiVersion)
  1367  	}
  1368  	if foundObjectMeta {
  1369  		if err := schemaobjectmeta.SetObjectMeta(u.Object, objectMeta); err != nil {
  1370  			return nil, err
  1371  		}
  1372  	}
  1373  
  1374  	return unknownFieldPaths, nil
  1375  }
  1376  
  1377  // hasServedCRDVersion returns true if the given version is in the list of CRD's versions and the Served flag is set.
  1378  func hasServedCRDVersion(spec *apiextensionsv1.CustomResourceDefinitionSpec, version string) bool {
  1379  	for _, v := range spec.Versions {
  1380  		if v.Name == version {
  1381  			return v.Served
  1382  		}
  1383  	}
  1384  	return false
  1385  }
  1386  
  1387  // buildOpenAPIModelsForApply constructs openapi models from any validation schemas specified in the custom resource,
  1388  // and merges it with the models defined in the static OpenAPI spec.
  1389  // Returns nil models ifthe static spec is nil, or an error is encountered.
  1390  func buildOpenAPIModelsForApply(staticOpenAPISpec map[string]*spec.Schema, crd *apiextensionsv1.CustomResourceDefinition) (map[string]*spec.Schema, error) {
  1391  	if staticOpenAPISpec == nil {
  1392  		return nil, nil
  1393  	}
  1394  
  1395  	// Convert static spec to V3 format to be able to merge
  1396  	staticSpecV3 := &spec3.OpenAPI{
  1397  		Version: "3.0.0",
  1398  		Info: &spec.Info{
  1399  			InfoProps: spec.InfoProps{
  1400  				Title:   "Kubernetes CRD Swagger",
  1401  				Version: "v0.1.0",
  1402  			},
  1403  		},
  1404  		Components: &spec3.Components{
  1405  			Schemas: staticOpenAPISpec,
  1406  		},
  1407  	}
  1408  
  1409  	specs := []*spec3.OpenAPI{staticSpecV3}
  1410  	for _, v := range crd.Spec.Versions {
  1411  		// Defaults are not pruned here, but before being served.
  1412  		// See flag description in builder.go for flag usage
  1413  		s, err := builder.BuildOpenAPIV3(crd, v.Name, builder.Options{})
  1414  		if err != nil {
  1415  			return nil, err
  1416  		}
  1417  		specs = append(specs, s)
  1418  	}
  1419  
  1420  	mergedOpenAPI, err := builder.MergeSpecsV3(specs...)
  1421  	if err != nil {
  1422  		return nil, err
  1423  	}
  1424  	return mergedOpenAPI.Components.Schemas, nil
  1425  }
  1426  

View as plain text