...

Source file src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go

Documentation: k8s.io/apiextensions-apiserver/pkg/apiserver

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"fmt"
    21  	"sort"
    22  	"time"
    23  
    24  	"k8s.io/klog/v2"
    25  
    26  	apidiscoveryv2 "k8s.io/api/apidiscovery/v2"
    27  	autoscaling "k8s.io/api/autoscaling/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	"k8s.io/apimachinery/pkg/runtime/schema"
    31  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	"k8s.io/apimachinery/pkg/version"
    34  	"k8s.io/apiserver/pkg/endpoints/discovery"
    35  	discoveryendpoint "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/util/workqueue"
    38  
    39  	apiextensionshelpers "k8s.io/apiextensions-apiserver/pkg/apihelpers"
    40  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    41  	informers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions/apiextensions/v1"
    42  	listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1"
    43  )
    44  
    45  type DiscoveryController struct {
    46  	versionHandler  *versionDiscoveryHandler
    47  	groupHandler    *groupDiscoveryHandler
    48  	resourceManager discoveryendpoint.ResourceManager
    49  
    50  	crdLister  listers.CustomResourceDefinitionLister
    51  	crdsSynced cache.InformerSynced
    52  
    53  	// To allow injection for testing.
    54  	syncFn func(version schema.GroupVersion) error
    55  
    56  	queue workqueue.RateLimitingInterface
    57  }
    58  
    59  func NewDiscoveryController(
    60  	crdInformer informers.CustomResourceDefinitionInformer,
    61  	versionHandler *versionDiscoveryHandler,
    62  	groupHandler *groupDiscoveryHandler,
    63  	resourceManager discoveryendpoint.ResourceManager,
    64  ) *DiscoveryController {
    65  	c := &DiscoveryController{
    66  		versionHandler:  versionHandler,
    67  		groupHandler:    groupHandler,
    68  		resourceManager: resourceManager,
    69  		crdLister:       crdInformer.Lister(),
    70  		crdsSynced:      crdInformer.Informer().HasSynced,
    71  
    72  		queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DiscoveryController"),
    73  	}
    74  
    75  	crdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    76  		AddFunc:    c.addCustomResourceDefinition,
    77  		UpdateFunc: c.updateCustomResourceDefinition,
    78  		DeleteFunc: c.deleteCustomResourceDefinition,
    79  	})
    80  
    81  	c.syncFn = c.sync
    82  
    83  	return c
    84  }
    85  
    86  func (c *DiscoveryController) sync(version schema.GroupVersion) error {
    87  
    88  	apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
    89  	apiResourcesForDiscovery := []metav1.APIResource{}
    90  	aggregatedAPIResourcesForDiscovery := []apidiscoveryv2.APIResourceDiscovery{}
    91  	versionsForDiscoveryMap := map[metav1.GroupVersion]bool{}
    92  
    93  	crds, err := c.crdLister.List(labels.Everything())
    94  	if err != nil {
    95  		return err
    96  	}
    97  	foundVersion := false
    98  	foundGroup := false
    99  	for _, crd := range crds {
   100  		if !apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Established) {
   101  			continue
   102  		}
   103  
   104  		if crd.Spec.Group != version.Group {
   105  			continue
   106  		}
   107  
   108  		foundThisVersion := false
   109  		var storageVersionHash string
   110  		for _, v := range crd.Spec.Versions {
   111  			if !v.Served {
   112  				continue
   113  			}
   114  			// If there is any Served version, that means the group should show up in discovery
   115  			foundGroup = true
   116  
   117  			gv := metav1.GroupVersion{Group: crd.Spec.Group, Version: v.Name}
   118  			if !versionsForDiscoveryMap[gv] {
   119  				versionsForDiscoveryMap[gv] = true
   120  				apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
   121  					GroupVersion: crd.Spec.Group + "/" + v.Name,
   122  					Version:      v.Name,
   123  				})
   124  			}
   125  			if v.Name == version.Version {
   126  				foundThisVersion = true
   127  			}
   128  			if v.Storage {
   129  				storageVersionHash = discovery.StorageVersionHash(gv.Group, gv.Version, crd.Spec.Names.Kind)
   130  			}
   131  		}
   132  
   133  		if !foundThisVersion {
   134  			continue
   135  		}
   136  		foundVersion = true
   137  
   138  		verbs := metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "patch", "create", "update", "watch"})
   139  		// if we're terminating we don't allow some verbs
   140  		if apiextensionshelpers.IsCRDConditionTrue(crd, apiextensionsv1.Terminating) {
   141  			verbs = metav1.Verbs([]string{"delete", "deletecollection", "get", "list", "watch"})
   142  		}
   143  
   144  		apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{
   145  			Name:               crd.Status.AcceptedNames.Plural,
   146  			SingularName:       crd.Status.AcceptedNames.Singular,
   147  			Namespaced:         crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
   148  			Kind:               crd.Status.AcceptedNames.Kind,
   149  			Verbs:              verbs,
   150  			ShortNames:         crd.Status.AcceptedNames.ShortNames,
   151  			Categories:         crd.Status.AcceptedNames.Categories,
   152  			StorageVersionHash: storageVersionHash,
   153  		})
   154  
   155  		subresources, err := apiextensionshelpers.GetSubresourcesForVersion(crd, version.Version)
   156  		if err != nil {
   157  			return err
   158  		}
   159  
   160  		if c.resourceManager != nil {
   161  			var scope apidiscoveryv2.ResourceScope
   162  			if crd.Spec.Scope == apiextensionsv1.NamespaceScoped {
   163  				scope = apidiscoveryv2.ScopeNamespace
   164  			} else {
   165  				scope = apidiscoveryv2.ScopeCluster
   166  			}
   167  			apiResourceDiscovery := apidiscoveryv2.APIResourceDiscovery{
   168  				Resource:         crd.Status.AcceptedNames.Plural,
   169  				SingularResource: crd.Status.AcceptedNames.Singular,
   170  				Scope:            scope,
   171  				ResponseKind: &metav1.GroupVersionKind{
   172  					Group:   version.Group,
   173  					Version: version.Version,
   174  					Kind:    crd.Status.AcceptedNames.Kind,
   175  				},
   176  				Verbs:      verbs,
   177  				ShortNames: crd.Status.AcceptedNames.ShortNames,
   178  				Categories: crd.Status.AcceptedNames.Categories,
   179  			}
   180  			if subresources != nil && subresources.Status != nil {
   181  				apiResourceDiscovery.Subresources = append(apiResourceDiscovery.Subresources, apidiscoveryv2.APISubresourceDiscovery{
   182  					Subresource: "status",
   183  					ResponseKind: &metav1.GroupVersionKind{
   184  						Group:   version.Group,
   185  						Version: version.Version,
   186  						Kind:    crd.Status.AcceptedNames.Kind,
   187  					},
   188  					Verbs: metav1.Verbs([]string{"get", "patch", "update"}),
   189  				})
   190  			}
   191  			if subresources != nil && subresources.Scale != nil {
   192  				apiResourceDiscovery.Subresources = append(apiResourceDiscovery.Subresources, apidiscoveryv2.APISubresourceDiscovery{
   193  					Subresource: "scale",
   194  					ResponseKind: &metav1.GroupVersionKind{
   195  						Group:   autoscaling.GroupName,
   196  						Version: "v1",
   197  						Kind:    "Scale",
   198  					},
   199  					Verbs: metav1.Verbs([]string{"get", "patch", "update"}),
   200  				})
   201  
   202  			}
   203  			aggregatedAPIResourcesForDiscovery = append(aggregatedAPIResourcesForDiscovery, apiResourceDiscovery)
   204  		}
   205  
   206  		if subresources != nil && subresources.Status != nil {
   207  			apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{
   208  				Name:       crd.Status.AcceptedNames.Plural + "/status",
   209  				Namespaced: crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
   210  				Kind:       crd.Status.AcceptedNames.Kind,
   211  				Verbs:      metav1.Verbs([]string{"get", "patch", "update"}),
   212  			})
   213  		}
   214  
   215  		if subresources != nil && subresources.Scale != nil {
   216  			apiResourcesForDiscovery = append(apiResourcesForDiscovery, metav1.APIResource{
   217  				Group:      autoscaling.GroupName,
   218  				Version:    "v1",
   219  				Kind:       "Scale",
   220  				Name:       crd.Status.AcceptedNames.Plural + "/scale",
   221  				Namespaced: crd.Spec.Scope == apiextensionsv1.NamespaceScoped,
   222  				Verbs:      metav1.Verbs([]string{"get", "patch", "update"}),
   223  			})
   224  		}
   225  	}
   226  
   227  	if !foundGroup {
   228  		c.groupHandler.unsetDiscovery(version.Group)
   229  		c.versionHandler.unsetDiscovery(version)
   230  
   231  		if c.resourceManager != nil {
   232  			c.resourceManager.RemoveGroup(version.Group)
   233  		}
   234  		return nil
   235  	}
   236  
   237  	sortGroupDiscoveryByKubeAwareVersion(apiVersionsForDiscovery)
   238  
   239  	apiGroup := metav1.APIGroup{
   240  		Name:     version.Group,
   241  		Versions: apiVersionsForDiscovery,
   242  		// the preferred versions for a group is the first item in
   243  		// apiVersionsForDiscovery after it put in the right ordered
   244  		PreferredVersion: apiVersionsForDiscovery[0],
   245  	}
   246  	c.groupHandler.setDiscovery(version.Group, discovery.NewAPIGroupHandler(Codecs, apiGroup))
   247  
   248  	if !foundVersion {
   249  		c.versionHandler.unsetDiscovery(version)
   250  
   251  		if c.resourceManager != nil {
   252  			c.resourceManager.RemoveGroupVersion(metav1.GroupVersion{
   253  				Group:   version.Group,
   254  				Version: version.Version,
   255  			})
   256  		}
   257  		return nil
   258  	}
   259  	c.versionHandler.setDiscovery(version, discovery.NewAPIVersionHandler(Codecs, version, discovery.APIResourceListerFunc(func() []metav1.APIResource {
   260  		return apiResourcesForDiscovery
   261  	})))
   262  
   263  	sort.Slice(aggregatedAPIResourcesForDiscovery, func(i, j int) bool {
   264  		return aggregatedAPIResourcesForDiscovery[i].Resource < aggregatedAPIResourcesForDiscovery[j].Resource
   265  	})
   266  	if c.resourceManager != nil {
   267  		c.resourceManager.AddGroupVersion(version.Group, apidiscoveryv2.APIVersionDiscovery{
   268  			Freshness: apidiscoveryv2.DiscoveryFreshnessCurrent,
   269  			Version:   version.Version,
   270  			Resources: aggregatedAPIResourcesForDiscovery,
   271  		})
   272  		// Default priority for CRDs
   273  		c.resourceManager.SetGroupVersionPriority(metav1.GroupVersion(version), 1000, 100)
   274  	}
   275  	return nil
   276  }
   277  
   278  func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery) {
   279  	sort.Slice(gd, func(i, j int) bool {
   280  		return version.CompareKubeAwareVersionStrings(gd[i].Version, gd[j].Version) > 0
   281  	})
   282  }
   283  
   284  func (c *DiscoveryController) Run(stopCh <-chan struct{}, synchedCh chan<- struct{}) {
   285  	defer utilruntime.HandleCrash()
   286  	defer c.queue.ShutDown()
   287  	defer klog.Info("Shutting down DiscoveryController")
   288  
   289  	klog.Info("Starting DiscoveryController")
   290  
   291  	if !cache.WaitForCacheSync(stopCh, c.crdsSynced) {
   292  		utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
   293  		return
   294  	}
   295  
   296  	// initially sync all group versions to make sure we serve complete discovery
   297  	if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
   298  		crds, err := c.crdLister.List(labels.Everything())
   299  		if err != nil {
   300  			utilruntime.HandleError(fmt.Errorf("failed to initially list CRDs: %v", err))
   301  			return false, nil
   302  		}
   303  		for _, crd := range crds {
   304  			for _, v := range crd.Spec.Versions {
   305  				gv := schema.GroupVersion{Group: crd.Spec.Group, Version: v.Name}
   306  				if err := c.sync(gv); err != nil {
   307  					utilruntime.HandleError(fmt.Errorf("failed to initially sync CRD version %v: %v", gv, err))
   308  					return false, nil
   309  				}
   310  			}
   311  		}
   312  		return true, nil
   313  	}, stopCh); err == wait.ErrWaitTimeout {
   314  		utilruntime.HandleError(fmt.Errorf("timed out waiting for discovery endpoint to initialize"))
   315  		return
   316  	} else if err != nil {
   317  		panic(fmt.Errorf("unexpected error: %v", err))
   318  	}
   319  	close(synchedCh)
   320  
   321  	// only start one worker thread since its a slow moving API
   322  	go wait.Until(c.runWorker, time.Second, stopCh)
   323  
   324  	<-stopCh
   325  }
   326  
   327  func (c *DiscoveryController) runWorker() {
   328  	for c.processNextWorkItem() {
   329  	}
   330  }
   331  
   332  // processNextWorkItem deals with one key off the queue.  It returns false when it's time to quit.
   333  func (c *DiscoveryController) processNextWorkItem() bool {
   334  	key, quit := c.queue.Get()
   335  	if quit {
   336  		return false
   337  	}
   338  	defer c.queue.Done(key)
   339  
   340  	err := c.syncFn(key.(schema.GroupVersion))
   341  	if err == nil {
   342  		c.queue.Forget(key)
   343  		return true
   344  	}
   345  
   346  	utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   347  	c.queue.AddRateLimited(key)
   348  
   349  	return true
   350  }
   351  
   352  func (c *DiscoveryController) enqueue(obj *apiextensionsv1.CustomResourceDefinition) {
   353  	for _, v := range obj.Spec.Versions {
   354  		c.queue.Add(schema.GroupVersion{Group: obj.Spec.Group, Version: v.Name})
   355  	}
   356  }
   357  
   358  func (c *DiscoveryController) addCustomResourceDefinition(obj interface{}) {
   359  	castObj := obj.(*apiextensionsv1.CustomResourceDefinition)
   360  	klog.V(4).Infof("Adding customresourcedefinition %s", castObj.Name)
   361  	c.enqueue(castObj)
   362  }
   363  
   364  func (c *DiscoveryController) updateCustomResourceDefinition(oldObj, newObj interface{}) {
   365  	castNewObj := newObj.(*apiextensionsv1.CustomResourceDefinition)
   366  	castOldObj := oldObj.(*apiextensionsv1.CustomResourceDefinition)
   367  	klog.V(4).Infof("Updating customresourcedefinition %s", castOldObj.Name)
   368  	// Enqueue both old and new object to make sure we remove and add appropriate Versions.
   369  	// The working queue will resolve any duplicates and only changes will stay in the queue.
   370  	c.enqueue(castNewObj)
   371  	c.enqueue(castOldObj)
   372  }
   373  
   374  func (c *DiscoveryController) deleteCustomResourceDefinition(obj interface{}) {
   375  	castObj, ok := obj.(*apiextensionsv1.CustomResourceDefinition)
   376  	if !ok {
   377  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   378  		if !ok {
   379  			klog.Errorf("Couldn't get object from tombstone %#v", obj)
   380  			return
   381  		}
   382  		castObj, ok = tombstone.Obj.(*apiextensionsv1.CustomResourceDefinition)
   383  		if !ok {
   384  			klog.Errorf("Tombstone contained object that is not expected %#v", obj)
   385  			return
   386  		}
   387  	}
   388  	klog.V(4).Infof("Deleting customresourcedefinition %q", castObj.Name)
   389  	c.enqueue(castObj)
   390  }
   391  

View as plain text