...

Source file src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

Documentation: k8s.io/kube-aggregator/pkg/apiserver

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiserver
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net/http"
    23  	"time"
    24  
    25  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/runtime/schema"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	"k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
    31  	genericfeatures "k8s.io/apiserver/pkg/features"
    32  	peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
    33  	genericapiserver "k8s.io/apiserver/pkg/server"
    34  	"k8s.io/apiserver/pkg/server/dynamiccertificates"
    35  	"k8s.io/apiserver/pkg/server/egressselector"
    36  	serverstorage "k8s.io/apiserver/pkg/server/storage"
    37  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    38  	"k8s.io/client-go/kubernetes"
    39  	"k8s.io/client-go/transport"
    40  	"k8s.io/component-base/version"
    41  	v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
    42  	v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
    43  	"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
    44  	aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
    45  	"k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
    46  	informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions"
    47  	listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
    48  	openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi"
    49  	openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
    50  	openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3"
    51  	openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
    52  	statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
    53  	apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest"
    54  	openapicommon "k8s.io/kube-openapi/pkg/common"
    55  )
    56  
    57  func init() {
    58  	// we need to add the options (like ListOptions) to empty v1
    59  	metav1.AddToGroupVersion(aggregatorscheme.Scheme, schema.GroupVersion{Group: "", Version: "v1"})
    60  
    61  	unversioned := schema.GroupVersion{Group: "", Version: "v1"}
    62  	aggregatorscheme.Scheme.AddUnversionedTypes(unversioned,
    63  		&metav1.Status{},
    64  		&metav1.APIVersions{},
    65  		&metav1.APIGroupList{},
    66  		&metav1.APIGroup{},
    67  		&metav1.APIResourceList{},
    68  	)
    69  }
    70  
    71  const (
    72  	// legacyAPIServiceName is the fixed name of the only non-groupified API version
    73  	legacyAPIServiceName = "v1."
    74  	// StorageVersionPostStartHookName is the name of the storage version updater post start hook.
    75  	StorageVersionPostStartHookName = "built-in-resources-storage-version-updater"
    76  )
    77  
    78  // ExtraConfig represents APIServices-specific configuration
    79  type ExtraConfig struct {
    80  	// PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers'
    81  	// serving certs when routing a request to the peer in the case the request can not be served
    82  	// locally due to version skew.
    83  	PeerCAFile string
    84  
    85  	// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
    86  	// to this apiserver. This happens in cases where the peer is not able to serve the request due to
    87  	// version skew. If unset, AdvertiseAddress/BindAddress will be used.
    88  	PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
    89  
    90  	// ProxyClientCert/Key are the client cert used to identify this proxy. Backing APIServices use
    91  	// this to confirm the proxy's identity
    92  	ProxyClientCertFile string
    93  	ProxyClientKeyFile  string
    94  
    95  	// If present, the Dial method will be used for dialing out to delegate
    96  	// apiservers.
    97  	ProxyTransport *http.Transport
    98  
    99  	// Mechanism by which the Aggregator will resolve services. Required.
   100  	ServiceResolver ServiceResolver
   101  
   102  	RejectForwardingRedirects bool
   103  }
   104  
   105  // Config represents the configuration needed to create an APIAggregator.
   106  type Config struct {
   107  	GenericConfig *genericapiserver.RecommendedConfig
   108  	ExtraConfig   ExtraConfig
   109  }
   110  
   111  type completedConfig struct {
   112  	GenericConfig genericapiserver.CompletedConfig
   113  	ExtraConfig   *ExtraConfig
   114  }
   115  
   116  // CompletedConfig same as Config, just to swap private object.
   117  type CompletedConfig struct {
   118  	// Embed a private pointer that cannot be instantiated outside of this package.
   119  	*completedConfig
   120  }
   121  
   122  type runnable interface {
   123  	Run(stopCh <-chan struct{}) error
   124  }
   125  
   126  // preparedGenericAPIServer is a private wrapper that enforces a call of PrepareRun() before Run can be invoked.
   127  type preparedAPIAggregator struct {
   128  	*APIAggregator
   129  	runnable runnable
   130  }
   131  
   132  // APIAggregator contains state for a Kubernetes cluster master/api server.
   133  type APIAggregator struct {
   134  	GenericAPIServer *genericapiserver.GenericAPIServer
   135  
   136  	// provided for easier embedding
   137  	APIRegistrationInformers informers.SharedInformerFactory
   138  
   139  	delegateHandler http.Handler
   140  
   141  	// proxyCurrentCertKeyContent holds he client cert used to identify this proxy. Backing APIServices use this to confirm the proxy's identity
   142  	proxyCurrentCertKeyContent certKeyFunc
   143  	proxyTransportDial         *transport.DialHolder
   144  
   145  	// proxyHandlers are the proxy handlers that are currently registered, keyed by apiservice.name
   146  	proxyHandlers map[string]*proxyHandler
   147  	// handledGroupVersions contain the groups that already have routes. The key is the name of the group and the value
   148  	// is the versions for the group.
   149  	handledGroupVersions map[string]sets.Set[string]
   150  
   151  	// lister is used to add group handling for /apis/<group> aggregator lookups based on
   152  	// controller state
   153  	lister listers.APIServiceLister
   154  
   155  	// Information needed to determine routing for the aggregator
   156  	serviceResolver ServiceResolver
   157  
   158  	// Enable swagger and/or OpenAPI if these configs are non-nil.
   159  	openAPIConfig *openapicommon.Config
   160  
   161  	// Enable OpenAPI V3 if these configs are non-nil
   162  	openAPIV3Config *openapicommon.OpenAPIV3Config
   163  
   164  	// openAPIAggregationController downloads and merges OpenAPI v2 specs.
   165  	openAPIAggregationController *openapicontroller.AggregationController
   166  
   167  	// openAPIV3AggregationController downloads and caches OpenAPI v3 specs.
   168  	openAPIV3AggregationController *openapiv3controller.AggregationController
   169  
   170  	// discoveryAggregationController downloads and caches discovery documents
   171  	// from all aggregated apiservices so they are available from /apis endpoint
   172  	// when discovery with resources are requested
   173  	discoveryAggregationController DiscoveryAggregationController
   174  
   175  	// rejectForwardingRedirects is whether to allow to forward redirect response
   176  	rejectForwardingRedirects bool
   177  }
   178  
   179  // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
   180  func (cfg *Config) Complete() CompletedConfig {
   181  	c := completedConfig{
   182  		cfg.GenericConfig.Complete(),
   183  		&cfg.ExtraConfig,
   184  	}
   185  
   186  	// the kube aggregator wires its own discovery mechanism
   187  	// TODO eventually collapse this by extracting all of the discovery out
   188  	c.GenericConfig.EnableDiscovery = false
   189  	version := version.Get()
   190  	c.GenericConfig.Version = &version
   191  
   192  	return CompletedConfig{&c}
   193  }
   194  
   195  // NewWithDelegate returns a new instance of APIAggregator from the given config.
   196  func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
   197  	genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
   198  	if err != nil {
   199  		return nil, err
   200  	}
   201  
   202  	apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)
   203  	if err != nil {
   204  		return nil, err
   205  	}
   206  	informerFactory := informers.NewSharedInformerFactory(
   207  		apiregistrationClient,
   208  		5*time.Minute, // this is effectively used as a refresh interval right now.  Might want to do something nicer later on.
   209  	)
   210  
   211  	// apiServiceRegistrationControllerInitiated is closed when APIServiceRegistrationController has finished "installing" all known APIServices.
   212  	// At this point we know that the proxy handler knows about APIServices and can handle client requests.
   213  	// Before it might have resulted in a 404 response which could have serious consequences for some controllers like  GC and NS
   214  	//
   215  	// Note that the APIServiceRegistrationController waits for APIServiceInformer to synced before doing its work.
   216  	apiServiceRegistrationControllerInitiated := make(chan struct{})
   217  	if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil {
   218  		return nil, err
   219  	}
   220  
   221  	var proxyTransportDial *transport.DialHolder
   222  	if c.GenericConfig.EgressSelector != nil {
   223  		egressDialer, err := c.GenericConfig.EgressSelector.Lookup(egressselector.Cluster.AsNetworkContext())
   224  		if err != nil {
   225  			return nil, err
   226  		}
   227  		if egressDialer != nil {
   228  			proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
   229  		}
   230  	} else if c.ExtraConfig.ProxyTransport != nil && c.ExtraConfig.ProxyTransport.DialContext != nil {
   231  		proxyTransportDial = &transport.DialHolder{Dial: c.ExtraConfig.ProxyTransport.DialContext}
   232  	}
   233  
   234  	s := &APIAggregator{
   235  		GenericAPIServer:           genericServer,
   236  		delegateHandler:            delegationTarget.UnprotectedHandler(),
   237  		proxyTransportDial:         proxyTransportDial,
   238  		proxyHandlers:              map[string]*proxyHandler{},
   239  		handledGroupVersions:       map[string]sets.Set[string]{},
   240  		lister:                     informerFactory.Apiregistration().V1().APIServices().Lister(),
   241  		APIRegistrationInformers:   informerFactory,
   242  		serviceResolver:            c.ExtraConfig.ServiceResolver,
   243  		openAPIConfig:              c.GenericConfig.OpenAPIConfig,
   244  		openAPIV3Config:            c.GenericConfig.OpenAPIV3Config,
   245  		proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
   246  		rejectForwardingRedirects:  c.ExtraConfig.RejectForwardingRedirects,
   247  	}
   248  
   249  	// used later  to filter the served resource by those that have expired.
   250  	resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*c.GenericConfig.Version)
   251  	if err != nil {
   252  		return nil, err
   253  	}
   254  
   255  	apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))
   256  	if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
   257  		return nil, err
   258  	}
   259  
   260  	enabledVersions := sets.NewString()
   261  	for v := range apiGroupInfo.VersionedResourcesStorageMap {
   262  		enabledVersions.Insert(v)
   263  	}
   264  	if !enabledVersions.Has(v1.SchemeGroupVersion.Version) {
   265  		return nil, fmt.Errorf("API group/version %s must be enabled", v1.SchemeGroupVersion.String())
   266  	}
   267  
   268  	apisHandler := &apisHandler{
   269  		codecs:         aggregatorscheme.Codecs,
   270  		lister:         s.lister,
   271  		discoveryGroup: discoveryGroup(enabledVersions),
   272  	}
   273  
   274  	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
   275  		apisHandlerWithAggregationSupport := aggregated.WrapAggregatedDiscoveryToHandler(apisHandler, s.GenericAPIServer.AggregatedDiscoveryGroupManager)
   276  		s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandlerWithAggregationSupport)
   277  	} else {
   278  		s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
   279  	}
   280  	s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
   281  
   282  	apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
   283  	if len(c.ExtraConfig.ProxyClientCertFile) > 0 && len(c.ExtraConfig.ProxyClientKeyFile) > 0 {
   284  		aggregatorProxyCerts, err := dynamiccertificates.NewDynamicServingContentFromFiles("aggregator-proxy-cert", c.ExtraConfig.ProxyClientCertFile, c.ExtraConfig.ProxyClientKeyFile)
   285  		if err != nil {
   286  			return nil, err
   287  		}
   288  		// We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the
   289  		// context is not used at all. So passing a empty context shouldn't be a problem
   290  		ctx := context.TODO()
   291  		if err := aggregatorProxyCerts.RunOnce(ctx); err != nil {
   292  			return nil, err
   293  		}
   294  		aggregatorProxyCerts.AddListener(apiserviceRegistrationController)
   295  		s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent
   296  
   297  		s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error {
   298  			// generate a context  from stopCh. This is to avoid modifying files which are relying on apiserver
   299  			// TODO: See if we can pass ctx to the current method
   300  			ctx, cancel := context.WithCancel(context.Background())
   301  			go func() {
   302  				select {
   303  				case <-postStartHookContext.StopCh:
   304  					cancel() // stopCh closed, so cancel our context
   305  				case <-ctx.Done():
   306  				}
   307  			}()
   308  			go aggregatorProxyCerts.Run(ctx, 1)
   309  			return nil
   310  		})
   311  	}
   312  
   313  	availableController, err := statuscontrollers.NewAvailableConditionController(
   314  		informerFactory.Apiregistration().V1().APIServices(),
   315  		c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
   316  		c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
   317  		apiregistrationClient.ApiregistrationV1(),
   318  		proxyTransportDial,
   319  		(func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
   320  		s.serviceResolver,
   321  	)
   322  	if err != nil {
   323  		return nil, err
   324  	}
   325  
   326  	s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
   327  		informerFactory.Start(context.StopCh)
   328  		c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
   329  		return nil
   330  	})
   331  	s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
   332  		go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
   333  		select {
   334  		case <-context.StopCh:
   335  		case <-apiServiceRegistrationControllerInitiated:
   336  		}
   337  
   338  		return nil
   339  	})
   340  	s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
   341  		// if we end up blocking for long periods of time, we may need to increase workers.
   342  		go availableController.Run(5, context.StopCh)
   343  		return nil
   344  	})
   345  
   346  	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
   347  		s.discoveryAggregationController = NewDiscoveryManager(
   348  			// Use aggregator as the source name to avoid overwriting native/CRD
   349  			// groups
   350  			s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource),
   351  		)
   352  
   353  		// Setup discovery endpoint
   354  		s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error {
   355  			// Discovery aggregation depends on the apiservice registration controller
   356  			// having the full list of APIServices already synced
   357  			select {
   358  			case <-context.StopCh:
   359  				return nil
   360  			// Context cancelled, should abort/clean goroutines
   361  			case <-apiServiceRegistrationControllerInitiated:
   362  			}
   363  
   364  			// Run discovery manager's worker to watch for new/removed/updated
   365  			// APIServices to the discovery document can be updated at runtime
   366  			// When discovery is ready, all APIServices will be present, with APIServices
   367  			// that have not successfully synced discovery to be present but marked as Stale.
   368  			discoverySyncedCh := make(chan struct{})
   369  			go s.discoveryAggregationController.Run(context.StopCh, discoverySyncedCh)
   370  
   371  			select {
   372  			case <-context.StopCh:
   373  				return nil
   374  			// Context cancelled, should abort/clean goroutines
   375  			case <-discoverySyncedCh:
   376  				// API services successfully sync
   377  			}
   378  			return nil
   379  		})
   380  	}
   381  
   382  	if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
   383  		utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
   384  		// Spawn a goroutine in aggregator apiserver to update storage version for
   385  		// all built-in resources
   386  		s.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {
   387  			// Wait for apiserver-identity to exist first before updating storage
   388  			// versions, to avoid storage version GC accidentally garbage-collecting
   389  			// storage versions.
   390  			kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
   391  			if err != nil {
   392  				return err
   393  			}
   394  			if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
   395  				_, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(
   396  					context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})
   397  				if apierrors.IsNotFound(err) {
   398  					return false, nil
   399  				}
   400  				if err != nil {
   401  					return false, err
   402  				}
   403  				return true, nil
   404  			}, hookContext.StopCh); err != nil {
   405  				return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
   406  					s.GenericAPIServer.APIServerID, err)
   407  			}
   408  			// Technically an apiserver only needs to update storage version once during bootstrap.
   409  			// Reconcile StorageVersion objects every 10 minutes will help in the case that the
   410  			// StorageVersion objects get accidentally modified/deleted by a different agent. In that
   411  			// case, the reconciliation ensures future storage migration still works. If nothing gets
   412  			// changed, the reconciliation update is a noop and gets short-circuited by the apiserver,
   413  			// therefore won't change the resource version and trigger storage migration.
   414  			go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {
   415  				// All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver)
   416  				// share the same generic apiserver config. The same StorageVersion manager is used
   417  				// to register all built-in resources when the generic apiservers install APIs.
   418  				s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
   419  				return false, nil
   420  			}, hookContext.StopCh)
   421  			// Once the storage version updater finishes the first round of update,
   422  			// the PostStartHook will return to unblock /healthz. The handler chain
   423  			// won't block write requests anymore. Check every second since it's not
   424  			// expensive.
   425  			wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
   426  				return s.GenericAPIServer.StorageVersionManager.Completed(), nil
   427  			}, hookContext.StopCh)
   428  			return nil
   429  		})
   430  	}
   431  
   432  	return s, nil
   433  }
   434  
   435  // PrepareRun prepares the aggregator to run, by setting up the OpenAPI spec &
   436  // aggregated discovery document and calling the generic PrepareRun.
   437  func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
   438  	// add post start hook before generic PrepareRun in order to be before /healthz installation
   439  	if s.openAPIConfig != nil {
   440  		s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
   441  			go s.openAPIAggregationController.Run(context.StopCh)
   442  			return nil
   443  		})
   444  	}
   445  
   446  	if s.openAPIV3Config != nil {
   447  		s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapiv3-controller", func(context genericapiserver.PostStartHookContext) error {
   448  			go s.openAPIV3AggregationController.Run(context.StopCh)
   449  			return nil
   450  		})
   451  	}
   452  
   453  	prepared := s.GenericAPIServer.PrepareRun()
   454  
   455  	// delay OpenAPI setup until the delegate had a chance to setup their OpenAPI handlers
   456  	if s.openAPIConfig != nil {
   457  		specDownloader := openapiaggregator.NewDownloader()
   458  		openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator(
   459  			&specDownloader,
   460  			s.GenericAPIServer.NextDelegate(),
   461  			s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),
   462  			s.openAPIConfig,
   463  			s.GenericAPIServer.Handler.NonGoRestfulMux)
   464  		if err != nil {
   465  			return preparedAPIAggregator{}, err
   466  		}
   467  		s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)
   468  	}
   469  
   470  	if s.openAPIV3Config != nil {
   471  		specDownloaderV3 := openapiv3aggregator.NewDownloader()
   472  		openAPIV3Aggregator, err := openapiv3aggregator.BuildAndRegisterAggregator(
   473  			specDownloaderV3,
   474  			s.GenericAPIServer.NextDelegate(),
   475  			s.GenericAPIServer.Handler.GoRestfulContainer,
   476  			s.openAPIV3Config,
   477  			s.GenericAPIServer.Handler.NonGoRestfulMux)
   478  		if err != nil {
   479  			return preparedAPIAggregator{}, err
   480  		}
   481  		s.openAPIV3AggregationController = openapiv3controller.NewAggregationController(openAPIV3Aggregator)
   482  	}
   483  
   484  	return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
   485  }
   486  
   487  func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
   488  	return s.runnable.Run(stopCh)
   489  }
   490  
   491  // AddAPIService adds an API service.  It is not thread-safe, so only call it on one thread at a time please.
   492  // It's a slow moving API, so its ok to run the controller on a single thread
   493  func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
   494  	// if the proxyHandler already exists, it needs to be updated. The aggregation bits do not
   495  	// since they are wired against listers because they require multiple resources to respond
   496  	if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
   497  		proxyHandler.updateAPIService(apiService)
   498  		if s.openAPIAggregationController != nil {
   499  			s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService)
   500  		}
   501  		if s.openAPIV3AggregationController != nil {
   502  			s.openAPIV3AggregationController.UpdateAPIService(proxyHandler, apiService)
   503  		}
   504  		// Forward calls to discovery manager to update discovery document
   505  		if s.discoveryAggregationController != nil {
   506  			handlerCopy := *proxyHandler
   507  			handlerCopy.setServiceAvailable()
   508  			s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy)
   509  		}
   510  		return nil
   511  	}
   512  
   513  	proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
   514  	// v1. is a special case for the legacy API.  It proxies to a wider set of endpoints.
   515  	if apiService.Name == legacyAPIServiceName {
   516  		proxyPath = "/api"
   517  	}
   518  
   519  	// register the proxy handler
   520  	proxyHandler := &proxyHandler{
   521  		localDelegate:              s.delegateHandler,
   522  		proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
   523  		proxyTransportDial:         s.proxyTransportDial,
   524  		serviceResolver:            s.serviceResolver,
   525  		rejectForwardingRedirects:  s.rejectForwardingRedirects,
   526  	}
   527  	proxyHandler.updateAPIService(apiService)
   528  	if s.openAPIAggregationController != nil {
   529  		s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
   530  	}
   531  	if s.openAPIV3AggregationController != nil {
   532  		s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService)
   533  	}
   534  	if s.discoveryAggregationController != nil {
   535  		s.discoveryAggregationController.AddAPIService(apiService, proxyHandler)
   536  	}
   537  
   538  	s.proxyHandlers[apiService.Name] = proxyHandler
   539  	s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
   540  	s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
   541  
   542  	// if we're dealing with the legacy group, we're done here
   543  	if apiService.Name == legacyAPIServiceName {
   544  		return nil
   545  	}
   546  
   547  	// if we've already registered the path with the handler, we don't want to do it again.
   548  	versions, exist := s.handledGroupVersions[apiService.Spec.Group]
   549  	if exist {
   550  		versions.Insert(apiService.Spec.Version)
   551  		return nil
   552  	}
   553  
   554  	// it's time to register the group aggregation endpoint
   555  	groupPath := "/apis/" + apiService.Spec.Group
   556  	groupDiscoveryHandler := &apiGroupHandler{
   557  		codecs:    aggregatorscheme.Codecs,
   558  		groupName: apiService.Spec.Group,
   559  		lister:    s.lister,
   560  		delegate:  s.delegateHandler,
   561  	}
   562  	// aggregation is protected
   563  	s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
   564  	s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
   565  	s.handledGroupVersions[apiService.Spec.Group] = sets.New[string](apiService.Spec.Version)
   566  	return nil
   567  }
   568  
   569  // RemoveAPIService removes the APIService from being handled.  It is not thread-safe, so only call it on one thread at a time please.
   570  // It's a slow moving API, so it's ok to run the controller on a single thread.
   571  func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
   572  	// Forward calls to discovery manager to update discovery document
   573  	if s.discoveryAggregationController != nil {
   574  		s.discoveryAggregationController.RemoveAPIService(apiServiceName)
   575  	}
   576  
   577  	version := v1helper.APIServiceNameToGroupVersion(apiServiceName)
   578  
   579  	proxyPath := "/apis/" + version.Group + "/" + version.Version
   580  	// v1. is a special case for the legacy API.  It proxies to a wider set of endpoints.
   581  	if apiServiceName == legacyAPIServiceName {
   582  		proxyPath = "/api"
   583  	}
   584  	s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath)
   585  	s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath + "/")
   586  	if s.openAPIAggregationController != nil {
   587  		s.openAPIAggregationController.RemoveAPIService(apiServiceName)
   588  	}
   589  	if s.openAPIV3AggregationController != nil {
   590  		s.openAPIV3AggregationController.RemoveAPIService(apiServiceName)
   591  	}
   592  	delete(s.proxyHandlers, apiServiceName)
   593  
   594  	versions, exist := s.handledGroupVersions[version.Group]
   595  	if !exist {
   596  		return
   597  	}
   598  	versions.Delete(version.Version)
   599  	if versions.Len() > 0 {
   600  		return
   601  	}
   602  	delete(s.handledGroupVersions, version.Group)
   603  	groupPath := "/apis/" + version.Group
   604  	s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(groupPath)
   605  	s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(groupPath + "/")
   606  }
   607  
   608  // DefaultAPIResourceConfigSource returns default configuration for an APIResource.
   609  func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
   610  	ret := serverstorage.NewResourceConfig()
   611  	// NOTE: GroupVersions listed here will be enabled by default. Don't put alpha versions in the list.
   612  	ret.EnableVersions(
   613  		v1.SchemeGroupVersion,
   614  		v1beta1.SchemeGroupVersion,
   615  	)
   616  
   617  	return ret
   618  }
   619  

View as plain text