...

Source file src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go

Documentation: k8s.io/apiextensions-apiserver/pkg/apiserver

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"fmt"
    21  	"net/http"
    22  	"time"
    23  
    24  	"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
    25  	"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/install"
    26  	v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    27  	"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
    28  	"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
    29  	externalinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
    30  	"k8s.io/apiextensions-apiserver/pkg/controller/apiapproval"
    31  	"k8s.io/apiextensions-apiserver/pkg/controller/establish"
    32  	"k8s.io/apiextensions-apiserver/pkg/controller/finalizer"
    33  	"k8s.io/apiextensions-apiserver/pkg/controller/nonstructuralschema"
    34  	openapicontroller "k8s.io/apiextensions-apiserver/pkg/controller/openapi"
    35  	openapiv3controller "k8s.io/apiextensions-apiserver/pkg/controller/openapiv3"
    36  	"k8s.io/apiextensions-apiserver/pkg/controller/status"
    37  	"k8s.io/apiextensions-apiserver/pkg/registry/customresourcedefinition"
    38  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    39  	"k8s.io/apimachinery/pkg/runtime"
    40  	"k8s.io/apimachinery/pkg/runtime/schema"
    41  	"k8s.io/apimachinery/pkg/runtime/serializer"
    42  	"k8s.io/apimachinery/pkg/util/wait"
    43  	"k8s.io/apimachinery/pkg/version"
    44  	"k8s.io/apiserver/pkg/endpoints/discovery"
    45  	"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
    46  	genericregistry "k8s.io/apiserver/pkg/registry/generic"
    47  	"k8s.io/apiserver/pkg/registry/rest"
    48  	genericapiserver "k8s.io/apiserver/pkg/server"
    49  	serverstorage "k8s.io/apiserver/pkg/server/storage"
    50  	"k8s.io/apiserver/pkg/util/webhook"
    51  )
    52  
    53  var (
    54  	Scheme = runtime.NewScheme()
    55  	Codecs = serializer.NewCodecFactory(Scheme)
    56  
    57  	// if you modify this, make sure you update the crEncoder
    58  	unversionedVersion = schema.GroupVersion{Group: "", Version: "v1"}
    59  	unversionedTypes   = []runtime.Object{
    60  		&metav1.Status{},
    61  		&metav1.WatchEvent{},
    62  		&metav1.APIVersions{},
    63  		&metav1.APIGroupList{},
    64  		&metav1.APIGroup{},
    65  		&metav1.APIResourceList{},
    66  	}
    67  )
    68  
    69  func init() {
    70  	install.Install(Scheme)
    71  
    72  	// we need to add the options to empty v1
    73  	metav1.AddToGroupVersion(Scheme, schema.GroupVersion{Group: "", Version: "v1"})
    74  
    75  	Scheme.AddUnversionedTypes(unversionedVersion, unversionedTypes...)
    76  }
    77  
    78  type ExtraConfig struct {
    79  	CRDRESTOptionsGetter genericregistry.RESTOptionsGetter
    80  
    81  	// MasterCount is used to detect whether cluster is HA, and if it is
    82  	// the CRD Establishing will be hold by 5 seconds.
    83  	MasterCount int
    84  
    85  	// ServiceResolver is used in CR webhook converters to resolve webhook's service names
    86  	ServiceResolver webhook.ServiceResolver
    87  	// AuthResolverWrapper is used in CR webhook converters
    88  	AuthResolverWrapper webhook.AuthenticationInfoResolverWrapper
    89  }
    90  
    91  type Config struct {
    92  	GenericConfig *genericapiserver.RecommendedConfig
    93  	ExtraConfig   ExtraConfig
    94  }
    95  
    96  type completedConfig struct {
    97  	GenericConfig genericapiserver.CompletedConfig
    98  	ExtraConfig   *ExtraConfig
    99  }
   100  
   101  type CompletedConfig struct {
   102  	// Embed a private pointer that cannot be instantiated outside of this package.
   103  	*completedConfig
   104  }
   105  
   106  type CustomResourceDefinitions struct {
   107  	GenericAPIServer *genericapiserver.GenericAPIServer
   108  
   109  	// provided for easier embedding
   110  	Informers externalinformers.SharedInformerFactory
   111  }
   112  
   113  // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
   114  func (cfg *Config) Complete() CompletedConfig {
   115  	c := completedConfig{
   116  		cfg.GenericConfig.Complete(),
   117  		&cfg.ExtraConfig,
   118  	}
   119  
   120  	c.GenericConfig.EnableDiscovery = false
   121  	if c.GenericConfig.Version == nil {
   122  		c.GenericConfig.Version = &version.Info{
   123  			Major: "0",
   124  			Minor: "1",
   125  		}
   126  	}
   127  
   128  	return CompletedConfig{&c}
   129  }
   130  
   131  // New returns a new instance of CustomResourceDefinitions from the given config.
   132  func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
   133  	genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)
   134  	if err != nil {
   135  		return nil, err
   136  	}
   137  
   138  	// hasCRDInformerSyncedSignal is closed when the CRD informer this server uses has been fully synchronized.
   139  	// It ensures that requests to potential custom resource endpoints while the server hasn't installed all known HTTP paths get a 503 error instead of a 404
   140  	hasCRDInformerSyncedSignal := make(chan struct{})
   141  	if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("CRDInformerHasNotSynced", hasCRDInformerSyncedSignal); err != nil {
   142  		return nil, err
   143  	}
   144  
   145  	s := &CustomResourceDefinitions{
   146  		GenericAPIServer: genericServer,
   147  	}
   148  
   149  	apiResourceConfig := c.GenericConfig.MergedResourceConfig
   150  	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)
   151  	storage := map[string]rest.Storage{}
   152  	// customresourcedefinitions
   153  	if resource := "customresourcedefinitions"; apiResourceConfig.ResourceEnabled(v1.SchemeGroupVersion.WithResource(resource)) {
   154  		customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
   155  		if err != nil {
   156  			return nil, err
   157  		}
   158  		storage[resource] = customResourceDefinitionStorage
   159  		storage[resource+"/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)
   160  	}
   161  	if len(storage) > 0 {
   162  		apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
   163  	}
   164  
   165  	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
   166  		return nil, err
   167  	}
   168  
   169  	crdClient, err := clientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)
   170  	if err != nil {
   171  		// it's really bad that this is leaking here, but until we can fix the test (which I'm pretty sure isn't even testing what it wants to test),
   172  		// we need to be able to move forward
   173  		return nil, fmt.Errorf("failed to create clientset: %v", err)
   174  	}
   175  	s.Informers = externalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
   176  
   177  	delegateHandler := delegationTarget.UnprotectedHandler()
   178  	if delegateHandler == nil {
   179  		delegateHandler = http.NotFoundHandler()
   180  	}
   181  
   182  	versionDiscoveryHandler := &versionDiscoveryHandler{
   183  		discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{},
   184  		delegate:  delegateHandler,
   185  	}
   186  	groupDiscoveryHandler := &groupDiscoveryHandler{
   187  		discovery: map[string]*discovery.APIGroupHandler{},
   188  		delegate:  delegateHandler,
   189  	}
   190  	establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
   191  	crdHandler, err := NewCustomResourceDefinitionHandler(
   192  		versionDiscoveryHandler,
   193  		groupDiscoveryHandler,
   194  		s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
   195  		delegateHandler,
   196  		c.ExtraConfig.CRDRESTOptionsGetter,
   197  		c.GenericConfig.AdmissionControl,
   198  		establishingController,
   199  		c.ExtraConfig.ServiceResolver,
   200  		c.ExtraConfig.AuthResolverWrapper,
   201  		c.ExtraConfig.MasterCount,
   202  		s.GenericAPIServer.Authorizer,
   203  		c.GenericConfig.RequestTimeout,
   204  		time.Duration(c.GenericConfig.MinRequestTimeout)*time.Second,
   205  		apiGroupInfo.StaticOpenAPISpec,
   206  		c.GenericConfig.MaxRequestBodyBytes,
   207  	)
   208  	if err != nil {
   209  		return nil, err
   210  	}
   211  	s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
   212  	s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
   213  	s.GenericAPIServer.RegisterDestroyFunc(crdHandler.destroy)
   214  
   215  	aggregatedDiscoveryManager := genericServer.AggregatedDiscoveryGroupManager
   216  	if aggregatedDiscoveryManager != nil {
   217  		aggregatedDiscoveryManager = aggregatedDiscoveryManager.WithSource(aggregated.CRDSource)
   218  	}
   219  	discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler, aggregatedDiscoveryManager)
   220  	namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
   221  	nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
   222  	apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
   223  	finalizingController := finalizer.NewCRDFinalizer(
   224  		s.Informers.Apiextensions().V1().CustomResourceDefinitions(),
   225  		crdClient.ApiextensionsV1(),
   226  		crdHandler,
   227  	)
   228  
   229  	s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
   230  		s.Informers.Start(context.StopCh)
   231  		return nil
   232  	})
   233  	s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
   234  		// OpenAPIVersionedService and StaticOpenAPISpec are populated in generic apiserver PrepareRun().
   235  		// Together they serve the /openapi/v2 endpoint on a generic apiserver. A generic apiserver may
   236  		// choose to not enable OpenAPI by having null openAPIConfig, and thus OpenAPIVersionedService
   237  		// and StaticOpenAPISpec are both null. In that case we don't run the CRD OpenAPI controller.
   238  		if s.GenericAPIServer.StaticOpenAPISpec != nil {
   239  			if s.GenericAPIServer.OpenAPIVersionedService != nil {
   240  				openapiController := openapicontroller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
   241  				go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
   242  			}
   243  
   244  			if s.GenericAPIServer.OpenAPIV3VersionedService != nil {
   245  				openapiv3Controller := openapiv3controller.NewController(s.Informers.Apiextensions().V1().CustomResourceDefinitions())
   246  				go openapiv3Controller.Run(s.GenericAPIServer.OpenAPIV3VersionedService, context.StopCh)
   247  			}
   248  		}
   249  
   250  		go namingController.Run(context.StopCh)
   251  		go establishingController.Run(context.StopCh)
   252  		go nonStructuralSchemaController.Run(5, context.StopCh)
   253  		go apiApprovalController.Run(5, context.StopCh)
   254  		go finalizingController.Run(5, context.StopCh)
   255  
   256  		discoverySyncedCh := make(chan struct{})
   257  		go discoveryController.Run(context.StopCh, discoverySyncedCh)
   258  		select {
   259  		case <-context.StopCh:
   260  		case <-discoverySyncedCh:
   261  		}
   262  
   263  		return nil
   264  	})
   265  	// we don't want to report healthy until we can handle all CRDs that have already been registered.  Waiting for the informer
   266  	// to sync makes sure that the lister will be valid before we begin.  There may still be races for CRDs added after startup,
   267  	// but we won't go healthy until we can handle the ones already present.
   268  	s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
   269  		return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
   270  			if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
   271  				close(hasCRDInformerSyncedSignal)
   272  				return true, nil
   273  			}
   274  			return false, nil
   275  		}, context.StopCh)
   276  	})
   277  
   278  	return s, nil
   279  }
   280  
   281  func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
   282  	ret := serverstorage.NewResourceConfig()
   283  	// NOTE: GroupVersions listed here will be enabled by default. Don't put alpha versions in the list.
   284  	ret.EnableVersions(
   285  		v1beta1.SchemeGroupVersion,
   286  		v1.SchemeGroupVersion,
   287  	)
   288  
   289  	return ret
   290  }
   291  

View as plain text