...

Source file src/k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go

Documentation: k8s.io/kubernetes/cmd/kube-apiserver/app

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package app does all of the work necessary to create a Kubernetes
    18  // APIServer by binding together the API, master and APIServer infrastructure.
    19  // It can be configured and called directly or via the hyperkube framework.
    20  package app
    21  
    22  import (
    23  	"fmt"
    24  	"net/http"
    25  	"strings"
    26  	"sync"
    27  
    28  	"k8s.io/klog/v2"
    29  
    30  	apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/runtime"
    33  	"k8s.io/apimachinery/pkg/runtime/schema"
    34  	"k8s.io/apimachinery/pkg/util/sets"
    35  	"k8s.io/apiserver/pkg/admission"
    36  	genericfeatures "k8s.io/apiserver/pkg/features"
    37  	genericapiserver "k8s.io/apiserver/pkg/server"
    38  	"k8s.io/apiserver/pkg/server/healthz"
    39  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    40  	utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
    41  	kubeexternalinformers "k8s.io/client-go/informers"
    42  	"k8s.io/client-go/tools/cache"
    43  	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    44  	v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
    45  	"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
    46  	aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
    47  	aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
    48  	apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
    49  	informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
    50  	"k8s.io/kube-aggregator/pkg/controllers/autoregister"
    51  	controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver/options"
    52  	"k8s.io/kubernetes/pkg/controlplane/controller/crdregistration"
    53  )
    54  
    55  func createAggregatorConfig(
    56  	kubeAPIServerConfig genericapiserver.Config,
    57  	commandOptions controlplaneapiserver.CompletedOptions,
    58  	externalInformers kubeexternalinformers.SharedInformerFactory,
    59  	serviceResolver aggregatorapiserver.ServiceResolver,
    60  	proxyTransport *http.Transport,
    61  	peerProxy utilpeerproxy.Interface,
    62  	pluginInitializers []admission.PluginInitializer,
    63  ) (*aggregatorapiserver.Config, error) {
    64  	// make a shallow copy to let us twiddle a few things
    65  	// most of the config actually remains the same.  We only need to mess with a couple items related to the particulars of the aggregator
    66  	genericConfig := kubeAPIServerConfig
    67  	genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
    68  	genericConfig.RESTOptionsGetter = nil
    69  	// prevent generic API server from installing the OpenAPI handler. Aggregator server
    70  	// has its own customized OpenAPI handler.
    71  	genericConfig.SkipOpenAPIInstallation = true
    72  
    73  	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
    74  		utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
    75  		// Add StorageVersionPrecondition handler to aggregator-apiserver.
    76  		// The handler will block write requests to built-in resources until the
    77  		// target resources' storage versions are up-to-date.
    78  		genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition
    79  	}
    80  
    81  	if peerProxy != nil {
    82  		originalHandlerChainBuilder := genericConfig.BuildHandlerChainFunc
    83  		genericConfig.BuildHandlerChainFunc = func(apiHandler http.Handler, c *genericapiserver.Config) http.Handler {
    84  			// Add peer proxy handler to aggregator-apiserver.
    85  			// wrap the peer proxy handler first.
    86  			apiHandler = peerProxy.WrapHandler(apiHandler)
    87  			return originalHandlerChainBuilder(apiHandler, c)
    88  		}
    89  	}
    90  
    91  	// copy the etcd options so we don't mutate originals.
    92  	// we assume that the etcd options have been completed already.  avoid messing with anything outside
    93  	// of changes to StorageConfig as that may lead to unexpected behavior when the options are applied.
    94  	etcdOptions := *commandOptions.Etcd
    95  	etcdOptions.StorageConfig.Codec = aggregatorscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion, v1beta1.SchemeGroupVersion)
    96  	etcdOptions.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(v1.SchemeGroupVersion, schema.GroupKind{Group: v1beta1.GroupName})
    97  	etcdOptions.SkipHealthEndpoints = true // avoid double wiring of health checks
    98  	if err := etcdOptions.ApplyTo(&genericConfig); err != nil {
    99  		return nil, err
   100  	}
   101  
   102  	// override MergedResourceConfig with aggregator defaults and registry
   103  	if err := commandOptions.APIEnablement.ApplyTo(
   104  		&genericConfig,
   105  		aggregatorapiserver.DefaultAPIResourceConfigSource(),
   106  		aggregatorscheme.Scheme); err != nil {
   107  		return nil, err
   108  	}
   109  
   110  	aggregatorConfig := &aggregatorapiserver.Config{
   111  		GenericConfig: &genericapiserver.RecommendedConfig{
   112  			Config:                genericConfig,
   113  			SharedInformerFactory: externalInformers,
   114  		},
   115  		ExtraConfig: aggregatorapiserver.ExtraConfig{
   116  			ProxyClientCertFile:       commandOptions.ProxyClientCertFile,
   117  			ProxyClientKeyFile:        commandOptions.ProxyClientKeyFile,
   118  			PeerCAFile:                commandOptions.PeerCAFile,
   119  			PeerAdvertiseAddress:      commandOptions.PeerAdvertiseAddress,
   120  			ServiceResolver:           serviceResolver,
   121  			ProxyTransport:            proxyTransport,
   122  			RejectForwardingRedirects: commandOptions.AggregatorRejectForwardingRedirects,
   123  		},
   124  	}
   125  
   126  	// we need to clear the poststarthooks so we don't add them multiple times to all the servers (that fails)
   127  	aggregatorConfig.GenericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{}
   128  
   129  	return aggregatorConfig, nil
   130  }
   131  
   132  func createAggregatorServer(aggregatorConfig aggregatorapiserver.CompletedConfig, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
   133  	aggregatorServer, err := aggregatorConfig.NewWithDelegate(delegateAPIServer)
   134  	if err != nil {
   135  		return nil, err
   136  	}
   137  
   138  	// create controllers for auto-registration
   139  	apiRegistrationClient, err := apiregistrationclient.NewForConfig(aggregatorConfig.GenericConfig.LoopbackClientConfig)
   140  	if err != nil {
   141  		return nil, err
   142  	}
   143  	autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
   144  	apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
   145  	crdRegistrationController := crdregistration.NewCRDRegistrationController(
   146  		apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
   147  		autoRegistrationController)
   148  
   149  	// Imbue all builtin group-priorities onto the aggregated discovery
   150  	if aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager != nil {
   151  		for gv, entry := range apiVersionPriorities {
   152  			aggregatorConfig.GenericConfig.AggregatedDiscoveryGroupManager.SetGroupVersionPriority(metav1.GroupVersion(gv), int(entry.group), int(entry.version))
   153  		}
   154  	}
   155  
   156  	err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
   157  		go crdRegistrationController.Run(5, context.StopCh)
   158  		go func() {
   159  			// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
   160  			// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
   161  			// we only need to do this if CRDs are enabled on this server.  We can't use discovery because we are the source for discovery.
   162  			if crdAPIEnabled {
   163  				klog.Infof("waiting for initial CRD sync...")
   164  				crdRegistrationController.WaitForInitialSync()
   165  				klog.Infof("initial CRD sync complete...")
   166  			} else {
   167  				klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync")
   168  			}
   169  			autoRegistrationController.Run(5, context.StopCh)
   170  		}()
   171  		return nil
   172  	})
   173  	if err != nil {
   174  		return nil, err
   175  	}
   176  
   177  	err = aggregatorServer.GenericAPIServer.AddBootSequenceHealthChecks(
   178  		makeAPIServiceAvailableHealthCheck(
   179  			"autoregister-completion",
   180  			apiServices,
   181  			aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(),
   182  		),
   183  	)
   184  	if err != nil {
   185  		return nil, err
   186  	}
   187  
   188  	return aggregatorServer, nil
   189  }
   190  
   191  func makeAPIService(gv schema.GroupVersion) *v1.APIService {
   192  	apiServicePriority, ok := apiVersionPriorities[gv]
   193  	if !ok {
   194  		// if we aren't found, then we shouldn't register ourselves because it could result in a CRD group version
   195  		// being permanently stuck in the APIServices list.
   196  		klog.Infof("Skipping APIService creation for %v", gv)
   197  		return nil
   198  	}
   199  	return &v1.APIService{
   200  		ObjectMeta: metav1.ObjectMeta{Name: gv.Version + "." + gv.Group},
   201  		Spec: v1.APIServiceSpec{
   202  			Group:                gv.Group,
   203  			Version:              gv.Version,
   204  			GroupPriorityMinimum: apiServicePriority.group,
   205  			VersionPriority:      apiServicePriority.version,
   206  		},
   207  	}
   208  }
   209  
   210  // makeAPIServiceAvailableHealthCheck returns a healthz check that returns healthy
   211  // once all of the specified services have been observed to be available at least once.
   212  func makeAPIServiceAvailableHealthCheck(name string, apiServices []*v1.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthChecker {
   213  	// Track the auto-registered API services that have not been observed to be available yet
   214  	pendingServiceNamesLock := &sync.RWMutex{}
   215  	pendingServiceNames := sets.NewString()
   216  	for _, service := range apiServices {
   217  		pendingServiceNames.Insert(service.Name)
   218  	}
   219  
   220  	// When an APIService in the list is seen as available, remove it from the pending list
   221  	handleAPIServiceChange := func(service *v1.APIService) {
   222  		pendingServiceNamesLock.Lock()
   223  		defer pendingServiceNamesLock.Unlock()
   224  		if !pendingServiceNames.Has(service.Name) {
   225  			return
   226  		}
   227  		if v1helper.IsAPIServiceConditionTrue(service, v1.Available) {
   228  			pendingServiceNames.Delete(service.Name)
   229  		}
   230  	}
   231  
   232  	// Watch add/update events for APIServices
   233  	apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   234  		AddFunc:    func(obj interface{}) { handleAPIServiceChange(obj.(*v1.APIService)) },
   235  		UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*v1.APIService)) },
   236  	})
   237  
   238  	// Don't return healthy until the pending list is empty
   239  	return healthz.NamedCheck(name, func(r *http.Request) error {
   240  		pendingServiceNamesLock.RLock()
   241  		defer pendingServiceNamesLock.RUnlock()
   242  		if pendingServiceNames.Len() > 0 {
   243  			return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
   244  		}
   245  		return nil
   246  	})
   247  }
   248  
   249  // priority defines group priority that is used in discovery. This controls
   250  // group position in the kubectl output.
   251  type priority struct {
   252  	// group indicates the order of the group relative to other groups.
   253  	group int32
   254  	// version indicates the relative order of the version inside of its group.
   255  	version int32
   256  }
   257  
   258  // The proper way to resolve this letting the aggregator know the desired group and version-within-group order of the underlying servers
   259  // is to refactor the genericapiserver.DelegationTarget to include a list of priorities based on which APIs were installed.
   260  // This requires the APIGroupInfo struct to evolve and include the concept of priorities and to avoid mistakes, the core storage map there needs to be updated.
   261  // That ripples out every bit as far as you'd expect, so for 1.7 we'll include the list here instead of being built up during storage.
   262  var apiVersionPriorities = map[schema.GroupVersion]priority{
   263  	{Group: "", Version: "v1"}: {group: 18000, version: 1},
   264  	// to my knowledge, nothing below here collides
   265  	{Group: "apps", Version: "v1"}:                               {group: 17800, version: 15},
   266  	{Group: "events.k8s.io", Version: "v1"}:                      {group: 17750, version: 15},
   267  	{Group: "events.k8s.io", Version: "v1beta1"}:                 {group: 17750, version: 5},
   268  	{Group: "authentication.k8s.io", Version: "v1"}:              {group: 17700, version: 15},
   269  	{Group: "authentication.k8s.io", Version: "v1beta1"}:         {group: 17700, version: 9},
   270  	{Group: "authentication.k8s.io", Version: "v1alpha1"}:        {group: 17700, version: 1},
   271  	{Group: "authorization.k8s.io", Version: "v1"}:               {group: 17600, version: 15},
   272  	{Group: "autoscaling", Version: "v1"}:                        {group: 17500, version: 15},
   273  	{Group: "autoscaling", Version: "v2"}:                        {group: 17500, version: 30},
   274  	{Group: "autoscaling", Version: "v2beta1"}:                   {group: 17500, version: 9},
   275  	{Group: "autoscaling", Version: "v2beta2"}:                   {group: 17500, version: 1},
   276  	{Group: "batch", Version: "v1"}:                              {group: 17400, version: 15},
   277  	{Group: "batch", Version: "v1beta1"}:                         {group: 17400, version: 9},
   278  	{Group: "batch", Version: "v2alpha1"}:                        {group: 17400, version: 9},
   279  	{Group: "certificates.k8s.io", Version: "v1"}:                {group: 17300, version: 15},
   280  	{Group: "certificates.k8s.io", Version: "v1alpha1"}:          {group: 17300, version: 1},
   281  	{Group: "networking.k8s.io", Version: "v1"}:                  {group: 17200, version: 15},
   282  	{Group: "networking.k8s.io", Version: "v1alpha1"}:            {group: 17200, version: 1},
   283  	{Group: "policy", Version: "v1"}:                             {group: 17100, version: 15},
   284  	{Group: "policy", Version: "v1beta1"}:                        {group: 17100, version: 9},
   285  	{Group: "rbac.authorization.k8s.io", Version: "v1"}:          {group: 17000, version: 15},
   286  	{Group: "storage.k8s.io", Version: "v1"}:                     {group: 16800, version: 15},
   287  	{Group: "storage.k8s.io", Version: "v1beta1"}:                {group: 16800, version: 9},
   288  	{Group: "storage.k8s.io", Version: "v1alpha1"}:               {group: 16800, version: 1},
   289  	{Group: "apiextensions.k8s.io", Version: "v1"}:               {group: 16700, version: 15},
   290  	{Group: "admissionregistration.k8s.io", Version: "v1"}:       {group: 16700, version: 15},
   291  	{Group: "admissionregistration.k8s.io", Version: "v1beta1"}:  {group: 16700, version: 12},
   292  	{Group: "admissionregistration.k8s.io", Version: "v1alpha1"}: {group: 16700, version: 9},
   293  	{Group: "scheduling.k8s.io", Version: "v1"}:                  {group: 16600, version: 15},
   294  	{Group: "coordination.k8s.io", Version: "v1"}:                {group: 16500, version: 15},
   295  	{Group: "node.k8s.io", Version: "v1"}:                        {group: 16300, version: 15},
   296  	{Group: "node.k8s.io", Version: "v1alpha1"}:                  {group: 16300, version: 1},
   297  	{Group: "node.k8s.io", Version: "v1beta1"}:                   {group: 16300, version: 9},
   298  	{Group: "discovery.k8s.io", Version: "v1"}:                   {group: 16200, version: 15},
   299  	{Group: "discovery.k8s.io", Version: "v1beta1"}:              {group: 16200, version: 12},
   300  	{Group: "flowcontrol.apiserver.k8s.io", Version: "v1"}:       {group: 16100, version: 21},
   301  	{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta3"}:  {group: 16100, version: 18},
   302  	{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta2"}:  {group: 16100, version: 15},
   303  	{Group: "flowcontrol.apiserver.k8s.io", Version: "v1beta1"}:  {group: 16100, version: 12},
   304  	{Group: "flowcontrol.apiserver.k8s.io", Version: "v1alpha1"}: {group: 16100, version: 9},
   305  	{Group: "internal.apiserver.k8s.io", Version: "v1alpha1"}:    {group: 16000, version: 9},
   306  	{Group: "resource.k8s.io", Version: "v1alpha2"}:              {group: 15900, version: 9},
   307  	{Group: "storagemigration.k8s.io", Version: "v1alpha1"}:      {group: 15800, version: 9},
   308  	// Append a new group to the end of the list if unsure.
   309  	// You can use min(existing group)-100 as the initial value for a group.
   310  	// Version can be set to 9 (to have space around) for a new group.
   311  }
   312  
   313  func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, registration autoregister.AutoAPIServiceRegistration) []*v1.APIService {
   314  	apiServices := []*v1.APIService{}
   315  
   316  	for _, curr := range delegateAPIServer.ListedPaths() {
   317  		if curr == "/api/v1" {
   318  			apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"})
   319  			registration.AddAPIServiceToSyncOnStart(apiService)
   320  			apiServices = append(apiServices, apiService)
   321  			continue
   322  		}
   323  
   324  		if !strings.HasPrefix(curr, "/apis/") {
   325  			continue
   326  		}
   327  		// this comes back in a list that looks like /apis/rbac.authorization.k8s.io/v1alpha1
   328  		tokens := strings.Split(curr, "/")
   329  		if len(tokens) != 4 {
   330  			continue
   331  		}
   332  
   333  		apiService := makeAPIService(schema.GroupVersion{Group: tokens[2], Version: tokens[3]})
   334  		if apiService == nil {
   335  			continue
   336  		}
   337  		registration.AddAPIServiceToSyncOnStart(apiService)
   338  		apiServices = append(apiServices, apiService)
   339  	}
   340  
   341  	return apiServices
   342  }
   343  

View as plain text