...

Source file src/k8s.io/kubernetes/pkg/controlplane/instance.go

Documentation: k8s.io/kubernetes/pkg/controlplane

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package controlplane
    18  
    19  import (
    20  	"fmt"
    21  	"net"
    22  	"net/http"
    23  	"os"
    24  	"reflect"
    25  	"strconv"
    26  	"time"
    27  
    28  	admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
    29  	admissionregistrationv1alpha1 "k8s.io/api/admissionregistration/v1alpha1"
    30  	admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
    31  	apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1"
    32  	appsv1 "k8s.io/api/apps/v1"
    33  	authenticationv1 "k8s.io/api/authentication/v1"
    34  	authenticationv1alpha1 "k8s.io/api/authentication/v1alpha1"
    35  	authenticationv1beta1 "k8s.io/api/authentication/v1beta1"
    36  	authorizationapiv1 "k8s.io/api/authorization/v1"
    37  	autoscalingapiv1 "k8s.io/api/autoscaling/v1"
    38  	autoscalingapiv2 "k8s.io/api/autoscaling/v2"
    39  	batchapiv1 "k8s.io/api/batch/v1"
    40  	certificatesapiv1 "k8s.io/api/certificates/v1"
    41  	certificatesv1alpha1 "k8s.io/api/certificates/v1alpha1"
    42  	coordinationapiv1 "k8s.io/api/coordination/v1"
    43  	apiv1 "k8s.io/api/core/v1"
    44  	discoveryv1 "k8s.io/api/discovery/v1"
    45  	eventsv1 "k8s.io/api/events/v1"
    46  	networkingapiv1 "k8s.io/api/networking/v1"
    47  	networkingapiv1alpha1 "k8s.io/api/networking/v1alpha1"
    48  	nodev1 "k8s.io/api/node/v1"
    49  	policyapiv1 "k8s.io/api/policy/v1"
    50  	rbacv1 "k8s.io/api/rbac/v1"
    51  	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
    52  	schedulingapiv1 "k8s.io/api/scheduling/v1"
    53  	storageapiv1 "k8s.io/api/storage/v1"
    54  	storageapiv1alpha1 "k8s.io/api/storage/v1alpha1"
    55  	storageapiv1beta1 "k8s.io/api/storage/v1beta1"
    56  	svmv1alpha1 "k8s.io/api/storagemigration/v1alpha1"
    57  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    58  	"k8s.io/apimachinery/pkg/runtime/schema"
    59  	utilnet "k8s.io/apimachinery/pkg/util/net"
    60  	"k8s.io/apimachinery/pkg/util/runtime"
    61  	"k8s.io/apimachinery/pkg/util/uuid"
    62  	"k8s.io/apimachinery/pkg/util/wait"
    63  	"k8s.io/apiserver/pkg/endpoints/discovery"
    64  	apiserverfeatures "k8s.io/apiserver/pkg/features"
    65  	peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
    66  	"k8s.io/apiserver/pkg/registry/generic"
    67  	genericapiserver "k8s.io/apiserver/pkg/server"
    68  	"k8s.io/apiserver/pkg/server/dynamiccertificates"
    69  	serverstorage "k8s.io/apiserver/pkg/server/storage"
    70  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    71  	utilpeerproxy "k8s.io/apiserver/pkg/util/peerproxy"
    72  	"k8s.io/client-go/informers"
    73  	"k8s.io/client-go/kubernetes"
    74  	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
    75  	discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
    76  	"k8s.io/component-helpers/apimachinery/lease"
    77  	"k8s.io/klog/v2"
    78  	api "k8s.io/kubernetes/pkg/apis/core"
    79  	flowcontrolv1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1"
    80  	flowcontrolv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
    81  	flowcontrolv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2"
    82  	flowcontrolv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
    83  	"k8s.io/kubernetes/pkg/controlplane/apiserver/options"
    84  	"k8s.io/kubernetes/pkg/controlplane/controller/apiserverleasegc"
    85  	"k8s.io/kubernetes/pkg/controlplane/controller/clusterauthenticationtrust"
    86  	"k8s.io/kubernetes/pkg/controlplane/controller/defaultservicecidr"
    87  	"k8s.io/kubernetes/pkg/controlplane/controller/kubernetesservice"
    88  	"k8s.io/kubernetes/pkg/controlplane/controller/legacytokentracking"
    89  	"k8s.io/kubernetes/pkg/controlplane/controller/systemnamespaces"
    90  	"k8s.io/kubernetes/pkg/controlplane/reconcilers"
    91  	"k8s.io/kubernetes/pkg/features"
    92  	kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
    93  	kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
    94  	"k8s.io/kubernetes/pkg/routes"
    95  	"k8s.io/kubernetes/pkg/serviceaccount"
    96  	"k8s.io/utils/clock"
    97  
    98  	// RESTStorage installers
    99  	admissionregistrationrest "k8s.io/kubernetes/pkg/registry/admissionregistration/rest"
   100  	apiserverinternalrest "k8s.io/kubernetes/pkg/registry/apiserverinternal/rest"
   101  	appsrest "k8s.io/kubernetes/pkg/registry/apps/rest"
   102  	authenticationrest "k8s.io/kubernetes/pkg/registry/authentication/rest"
   103  	authorizationrest "k8s.io/kubernetes/pkg/registry/authorization/rest"
   104  	autoscalingrest "k8s.io/kubernetes/pkg/registry/autoscaling/rest"
   105  	batchrest "k8s.io/kubernetes/pkg/registry/batch/rest"
   106  	certificatesrest "k8s.io/kubernetes/pkg/registry/certificates/rest"
   107  	coordinationrest "k8s.io/kubernetes/pkg/registry/coordination/rest"
   108  	corerest "k8s.io/kubernetes/pkg/registry/core/rest"
   109  	discoveryrest "k8s.io/kubernetes/pkg/registry/discovery/rest"
   110  	eventsrest "k8s.io/kubernetes/pkg/registry/events/rest"
   111  	flowcontrolrest "k8s.io/kubernetes/pkg/registry/flowcontrol/rest"
   112  	networkingrest "k8s.io/kubernetes/pkg/registry/networking/rest"
   113  	noderest "k8s.io/kubernetes/pkg/registry/node/rest"
   114  	policyrest "k8s.io/kubernetes/pkg/registry/policy/rest"
   115  	rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
   116  	resourcerest "k8s.io/kubernetes/pkg/registry/resource/rest"
   117  	schedulingrest "k8s.io/kubernetes/pkg/registry/scheduling/rest"
   118  	storagerest "k8s.io/kubernetes/pkg/registry/storage/rest"
   119  	svmrest "k8s.io/kubernetes/pkg/registry/storagemigration/rest"
   120  )
   121  
   122  const (
   123  	// DefaultEndpointReconcilerInterval is the default amount of time for how often the endpoints for
   124  	// the kubernetes Service are reconciled.
   125  	DefaultEndpointReconcilerInterval = 10 * time.Second
   126  	// DefaultEndpointReconcilerTTL is the default TTL timeout for the storage layer
   127  	DefaultEndpointReconcilerTTL = 15 * time.Second
   128  	// IdentityLeaseComponentLabelKey is used to apply a component label to identity lease objects, indicating:
   129  	//   1. the lease is an identity lease (different from leader election leases)
   130  	//   2. which component owns this lease
   131  	IdentityLeaseComponentLabelKey = "apiserver.kubernetes.io/identity"
   132  	// KubeAPIServer defines variable used internally when referring to kube-apiserver component
   133  	KubeAPIServer = "kube-apiserver"
   134  	// KubeAPIServerIdentityLeaseLabelSelector selects kube-apiserver identity leases
   135  	KubeAPIServerIdentityLeaseLabelSelector = IdentityLeaseComponentLabelKey + "=" + KubeAPIServer
   136  	// repairLoopInterval defines the interval used to run the Services ClusterIP and NodePort repair loops
   137  	repairLoopInterval = 3 * time.Minute
   138  )
   139  
   140  var (
   141  	// IdentityLeaseGCPeriod is the interval which the lease GC controller checks for expired leases
   142  	// IdentityLeaseGCPeriod is exposed so integration tests can tune this value.
   143  	IdentityLeaseGCPeriod = 3600 * time.Second
   144  	// IdentityLeaseDurationSeconds is the duration of kube-apiserver lease in seconds
   145  	// IdentityLeaseDurationSeconds is exposed so integration tests can tune this value.
   146  	IdentityLeaseDurationSeconds = 3600
   147  	// IdentityLeaseRenewIntervalSeconds is the interval of kube-apiserver renewing its lease in seconds
   148  	// IdentityLeaseRenewIntervalSeconds is exposed so integration tests can tune this value.
   149  	IdentityLeaseRenewIntervalPeriod = 10 * time.Second
   150  )
   151  
   152  // ExtraConfig defines extra configuration for the master
   153  type ExtraConfig struct {
   154  	ClusterAuthenticationInfo clusterauthenticationtrust.ClusterAuthenticationInfo
   155  
   156  	APIResourceConfigSource  serverstorage.APIResourceConfigSource
   157  	StorageFactory           serverstorage.StorageFactory
   158  	EndpointReconcilerConfig EndpointReconcilerConfig
   159  	EventTTL                 time.Duration
   160  	KubeletClientConfig      kubeletclient.KubeletClientConfig
   161  
   162  	EnableLogsSupport bool
   163  	ProxyTransport    *http.Transport
   164  
   165  	// PeerProxy, if not nil, sets proxy transport between kube-apiserver peers for requests
   166  	// that can not be served locally
   167  	PeerProxy utilpeerproxy.Interface
   168  
   169  	// PeerEndpointLeaseReconciler updates the peer endpoint leases
   170  	PeerEndpointLeaseReconciler peerreconcilers.PeerEndpointLeaseReconciler
   171  
   172  	// PeerCAFile is the ca bundle used by this kube-apiserver to verify peer apiservers'
   173  	// serving certs when routing a request to the peer in the case the request can not be served
   174  	// locally due to version skew.
   175  	PeerCAFile string
   176  
   177  	// PeerAdvertiseAddress is the IP for this kube-apiserver which is used by peer apiservers to route a request
   178  	// to this apiserver. This happens in cases where the peer is not able to serve the request due to
   179  	// version skew. If unset, AdvertiseAddress/BindAddress will be used.
   180  	PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
   181  
   182  	// Values to build the IP addresses used by discovery
   183  	// The range of IPs to be assigned to services with type=ClusterIP or greater
   184  	ServiceIPRange net.IPNet
   185  	// The IP address for the GenericAPIServer service (must be inside ServiceIPRange)
   186  	APIServerServiceIP net.IP
   187  
   188  	// dual stack services, the range represents an alternative IP range for service IP
   189  	// must be of different family than primary (ServiceIPRange)
   190  	SecondaryServiceIPRange net.IPNet
   191  	// the secondary IP address the GenericAPIServer service (must be inside SecondaryServiceIPRange)
   192  	SecondaryAPIServerServiceIP net.IP
   193  
   194  	// Port for the apiserver service.
   195  	APIServerServicePort int
   196  
   197  	// TODO, we can probably group service related items into a substruct to make it easier to configure
   198  	// the API server items and `Extra*` fields likely fit nicely together.
   199  
   200  	// The range of ports to be assigned to services with type=NodePort or greater
   201  	ServiceNodePortRange utilnet.PortRange
   202  	// If non-zero, the "kubernetes" services uses this port as NodePort.
   203  	KubernetesServiceNodePort int
   204  
   205  	// Number of masters running; all masters must be started with the
   206  	// same value for this field. (Numbers > 1 currently untested.)
   207  	MasterCount int
   208  
   209  	// MasterEndpointReconcileTTL sets the time to live in seconds of an
   210  	// endpoint record recorded by each master. The endpoints are checked at an
   211  	// interval that is 2/3 of this value and this value defaults to 15s if
   212  	// unset. In very large clusters, this value may be increased to reduce the
   213  	// possibility that the master endpoint record expires (due to other load
   214  	// on the etcd server) and causes masters to drop in and out of the
   215  	// kubernetes service record. It is not recommended to set this value below
   216  	// 15s.
   217  	MasterEndpointReconcileTTL time.Duration
   218  
   219  	// Selects which reconciler to use
   220  	EndpointReconcilerType reconcilers.Type
   221  
   222  	ServiceAccountIssuer        serviceaccount.TokenGenerator
   223  	ServiceAccountMaxExpiration time.Duration
   224  	ExtendExpiration            bool
   225  
   226  	// ServiceAccountIssuerDiscovery
   227  	ServiceAccountIssuerURL  string
   228  	ServiceAccountJWKSURI    string
   229  	ServiceAccountPublicKeys []interface{}
   230  
   231  	VersionedInformers informers.SharedInformerFactory
   232  
   233  	// RepairServicesInterval interval used by the repair loops for
   234  	// the Services NodePort and ClusterIP resources
   235  	RepairServicesInterval time.Duration
   236  }
   237  
   238  // Config defines configuration for the master
   239  type Config struct {
   240  	GenericConfig *genericapiserver.Config
   241  	ExtraConfig   ExtraConfig
   242  }
   243  
   244  type completedConfig struct {
   245  	GenericConfig genericapiserver.CompletedConfig
   246  	ExtraConfig   *ExtraConfig
   247  }
   248  
   249  // CompletedConfig embeds a private pointer that cannot be instantiated outside of this package
   250  type CompletedConfig struct {
   251  	*completedConfig
   252  }
   253  
   254  // EndpointReconcilerConfig holds the endpoint reconciler and endpoint reconciliation interval to be
   255  // used by the master.
   256  type EndpointReconcilerConfig struct {
   257  	Reconciler reconcilers.EndpointReconciler
   258  	Interval   time.Duration
   259  }
   260  
   261  // Instance contains state for a Kubernetes cluster api server instance.
   262  type Instance struct {
   263  	GenericAPIServer *genericapiserver.GenericAPIServer
   264  
   265  	ClusterAuthenticationInfo clusterauthenticationtrust.ClusterAuthenticationInfo
   266  }
   267  
   268  func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler {
   269  	endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
   270  	endpointSliceClient := discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
   271  	endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
   272  
   273  	return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointsAdapter)
   274  }
   275  
   276  func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler {
   277  	return reconcilers.NewNoneEndpointReconciler()
   278  }
   279  
   280  func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
   281  	endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
   282  	endpointSliceClient := discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
   283  	endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
   284  
   285  	ttl := c.ExtraConfig.MasterEndpointReconcileTTL
   286  	config, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("apiServerIPInfo"))
   287  	if err != nil {
   288  		klog.Fatalf("Error creating storage factory config: %v", err)
   289  	}
   290  	masterLeases, err := reconcilers.NewLeases(config, "/masterleases/", ttl)
   291  	if err != nil {
   292  		klog.Fatalf("Error creating leases: %v", err)
   293  	}
   294  
   295  	return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases)
   296  }
   297  
   298  func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
   299  	klog.Infof("Using reconciler: %v", c.ExtraConfig.EndpointReconcilerType)
   300  	switch c.ExtraConfig.EndpointReconcilerType {
   301  	// there are numerous test dependencies that depend on a default controller
   302  	case reconcilers.MasterCountReconcilerType:
   303  		return c.createMasterCountReconciler()
   304  	case "", reconcilers.LeaseEndpointReconcilerType:
   305  		return c.createLeaseReconciler()
   306  	case reconcilers.NoneEndpointReconcilerType:
   307  		return c.createNoneReconciler()
   308  	default:
   309  		klog.Fatalf("Reconciler not implemented: %v", c.ExtraConfig.EndpointReconcilerType)
   310  	}
   311  	return nil
   312  }
   313  
   314  // Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
   315  func (c *Config) Complete() CompletedConfig {
   316  	cfg := completedConfig{
   317  		c.GenericConfig.Complete(c.ExtraConfig.VersionedInformers),
   318  		&c.ExtraConfig,
   319  	}
   320  
   321  	serviceIPRange, apiServerServiceIP, err := options.ServiceIPRange(cfg.ExtraConfig.ServiceIPRange)
   322  	if err != nil {
   323  		klog.Fatalf("Error determining service IP ranges: %v", err)
   324  	}
   325  	if cfg.ExtraConfig.ServiceIPRange.IP == nil {
   326  		cfg.ExtraConfig.ServiceIPRange = serviceIPRange
   327  	}
   328  	if cfg.ExtraConfig.APIServerServiceIP == nil {
   329  		cfg.ExtraConfig.APIServerServiceIP = apiServerServiceIP
   330  	}
   331  
   332  	discoveryAddresses := discovery.DefaultAddresses{DefaultAddress: cfg.GenericConfig.ExternalAddress}
   333  	discoveryAddresses.CIDRRules = append(discoveryAddresses.CIDRRules,
   334  		discovery.CIDRRule{IPRange: cfg.ExtraConfig.ServiceIPRange, Address: net.JoinHostPort(cfg.ExtraConfig.APIServerServiceIP.String(), strconv.Itoa(cfg.ExtraConfig.APIServerServicePort))})
   335  	cfg.GenericConfig.DiscoveryAddresses = discoveryAddresses
   336  
   337  	if cfg.ExtraConfig.ServiceNodePortRange.Size == 0 {
   338  		// TODO: Currently no way to specify an empty range (do we need to allow this?)
   339  		// We should probably allow this for clouds that don't require NodePort to do load-balancing (GCE)
   340  		// but then that breaks the strict nestedness of ServiceType.
   341  		// Review post-v1
   342  		cfg.ExtraConfig.ServiceNodePortRange = kubeoptions.DefaultServiceNodePortRange
   343  		klog.Infof("Node port range unspecified. Defaulting to %v.", cfg.ExtraConfig.ServiceNodePortRange)
   344  	}
   345  
   346  	if cfg.ExtraConfig.EndpointReconcilerConfig.Interval == 0 {
   347  		cfg.ExtraConfig.EndpointReconcilerConfig.Interval = DefaultEndpointReconcilerInterval
   348  	}
   349  
   350  	if cfg.ExtraConfig.MasterEndpointReconcileTTL == 0 {
   351  		cfg.ExtraConfig.MasterEndpointReconcileTTL = DefaultEndpointReconcilerTTL
   352  	}
   353  
   354  	if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
   355  		cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
   356  	}
   357  
   358  	if cfg.ExtraConfig.RepairServicesInterval == 0 {
   359  		cfg.ExtraConfig.RepairServicesInterval = repairLoopInterval
   360  	}
   361  
   362  	return CompletedConfig{&cfg}
   363  }
   364  
   365  // New returns a new instance of Master from the given config.
   366  // Certain config fields will be set to a default value if unset.
   367  // Certain config fields must be specified, including:
   368  // KubeletClientConfig
   369  func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
   370  	if reflect.DeepEqual(c.ExtraConfig.KubeletClientConfig, kubeletclient.KubeletClientConfig{}) {
   371  		return nil, fmt.Errorf("Master.New() called with empty config.KubeletClientConfig")
   372  	}
   373  
   374  	s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
   375  	if err != nil {
   376  		return nil, err
   377  	}
   378  
   379  	if c.ExtraConfig.EnableLogsSupport {
   380  		routes.Logs{}.Install(s.Handler.GoRestfulContainer)
   381  	}
   382  
   383  	// Metadata and keys are expected to only change across restarts at present,
   384  	// so we just marshal immediately and serve the cached JSON bytes.
   385  	md, err := serviceaccount.NewOpenIDMetadata(
   386  		c.ExtraConfig.ServiceAccountIssuerURL,
   387  		c.ExtraConfig.ServiceAccountJWKSURI,
   388  		c.GenericConfig.ExternalAddress,
   389  		c.ExtraConfig.ServiceAccountPublicKeys,
   390  	)
   391  	if err != nil {
   392  		// If there was an error, skip installing the endpoints and log the
   393  		// error, but continue on. We don't return the error because the
   394  		// metadata responses require additional, backwards incompatible
   395  		// validation of command-line options.
   396  		msg := fmt.Sprintf("Could not construct pre-rendered responses for"+
   397  			" ServiceAccountIssuerDiscovery endpoints. Endpoints will not be"+
   398  			" enabled. Error: %v", err)
   399  		if c.ExtraConfig.ServiceAccountIssuerURL != "" {
   400  			// The user likely expects this feature to be enabled if issuer URL is
   401  			// set and the feature gate is enabled. In the future, if there is no
   402  			// longer a feature gate and issuer URL is not set, the user may not
   403  			// expect this feature to be enabled. We log the former case as an Error
   404  			// and the latter case as an Info.
   405  			klog.Error(msg)
   406  		} else {
   407  			klog.Info(msg)
   408  		}
   409  	} else {
   410  		routes.NewOpenIDMetadataServer(md.ConfigJSON, md.PublicKeysetJSON).
   411  			Install(s.Handler.GoRestfulContainer)
   412  	}
   413  
   414  	m := &Instance{
   415  		GenericAPIServer:          s,
   416  		ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
   417  	}
   418  
   419  	clientset, err := kubernetes.NewForConfig(c.GenericConfig.LoopbackClientConfig)
   420  	if err != nil {
   421  		return nil, err
   422  	}
   423  
   424  	// TODO: update to a version that caches success but will recheck on failure, unlike memcache discovery
   425  	discoveryClientForAdmissionRegistration := clientset.Discovery()
   426  
   427  	legacyRESTStorageProvider, err := corerest.New(corerest.Config{
   428  		GenericConfig: corerest.GenericConfig{
   429  			StorageFactory:              c.ExtraConfig.StorageFactory,
   430  			EventTTL:                    c.ExtraConfig.EventTTL,
   431  			LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
   432  			ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
   433  			ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
   434  			ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
   435  			APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
   436  			Informers:                   c.ExtraConfig.VersionedInformers,
   437  		},
   438  		Proxy: corerest.ProxyConfig{
   439  			Transport:           c.ExtraConfig.ProxyTransport,
   440  			KubeletClientConfig: c.ExtraConfig.KubeletClientConfig,
   441  		},
   442  		Services: corerest.ServicesConfig{
   443  			ClusterIPRange:          c.ExtraConfig.ServiceIPRange,
   444  			SecondaryClusterIPRange: c.ExtraConfig.SecondaryServiceIPRange,
   445  			NodePortRange:           c.ExtraConfig.ServiceNodePortRange,
   446  			IPRepairInterval:        c.ExtraConfig.RepairServicesInterval,
   447  		},
   448  	})
   449  	if err != nil {
   450  		return nil, err
   451  	}
   452  
   453  	// The order here is preserved in discovery.
   454  	// If resources with identical names exist in more than one of these groups (e.g. "deployments.apps"" and "deployments.extensions"),
   455  	// the order of this list determines which group an unqualified resource name (e.g. "deployments") should prefer.
   456  	// This priority order is used for local discovery, but it ends up aggregated in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go
   457  	// with specific priorities.
   458  	// TODO: describe the priority all the way down in the RESTStorageProviders and plumb it back through the various discovery
   459  	// handlers that we have.
   460  	restStorageProviders := []RESTStorageProvider{
   461  		legacyRESTStorageProvider,
   462  		apiserverinternalrest.StorageProvider{},
   463  		authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences},
   464  		authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver},
   465  		autoscalingrest.RESTStorageProvider{},
   466  		batchrest.RESTStorageProvider{},
   467  		certificatesrest.RESTStorageProvider{},
   468  		coordinationrest.RESTStorageProvider{},
   469  		discoveryrest.StorageProvider{},
   470  		networkingrest.RESTStorageProvider{},
   471  		noderest.RESTStorageProvider{},
   472  		policyrest.RESTStorageProvider{},
   473  		rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
   474  		schedulingrest.RESTStorageProvider{},
   475  		storagerest.RESTStorageProvider{},
   476  		svmrest.RESTStorageProvider{},
   477  		flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
   478  		// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
   479  		// See https://github.com/kubernetes/kubernetes/issues/42392
   480  		appsrest.StorageProvider{},
   481  		admissionregistrationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, DiscoveryClient: discoveryClientForAdmissionRegistration},
   482  		eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL},
   483  		resourcerest.RESTStorageProvider{},
   484  	}
   485  	if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
   486  		return nil, err
   487  	}
   488  
   489  	m.GenericAPIServer.AddPostStartHookOrDie("start-system-namespaces-controller", func(hookContext genericapiserver.PostStartHookContext) error {
   490  		go systemnamespaces.NewController(clientset, c.ExtraConfig.VersionedInformers.Core().V1().Namespaces()).Run(hookContext.StopCh)
   491  		return nil
   492  	})
   493  
   494  	_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
   495  	if err != nil {
   496  		return nil, fmt.Errorf("failed to get listener address: %w", err)
   497  	}
   498  	kubernetesServiceCtrl := kubernetesservice.New(kubernetesservice.Config{
   499  		PublicIP: c.GenericConfig.PublicAddress,
   500  
   501  		EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
   502  		EndpointInterval:   c.ExtraConfig.EndpointReconcilerConfig.Interval,
   503  
   504  		ServiceIP:                 c.ExtraConfig.APIServerServiceIP,
   505  		ServicePort:               c.ExtraConfig.APIServerServicePort,
   506  		PublicServicePort:         publicServicePort,
   507  		KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort,
   508  	}, clientset, c.ExtraConfig.VersionedInformers.Core().V1().Services())
   509  	m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error {
   510  		kubernetesServiceCtrl.Start(hookContext.StopCh)
   511  		return nil
   512  	})
   513  	m.GenericAPIServer.AddPreShutdownHookOrDie("stop-kubernetes-service-controller", func() error {
   514  		kubernetesServiceCtrl.Stop()
   515  		return nil
   516  	})
   517  
   518  	if utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
   519  		m.GenericAPIServer.AddPostStartHookOrDie("start-kubernetes-service-cidr-controller", func(hookContext genericapiserver.PostStartHookContext) error {
   520  			controller := defaultservicecidr.NewController(
   521  				c.ExtraConfig.ServiceIPRange,
   522  				c.ExtraConfig.SecondaryServiceIPRange,
   523  				clientset,
   524  			)
   525  			// The default serviceCIDR must exist before the apiserver is healthy
   526  			// otherwise the allocators for Services will not work.
   527  			controller.Start(hookContext.StopCh)
   528  			return nil
   529  		})
   530  	}
   531  
   532  	if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
   533  		peeraddress := getPeerAddress(c.ExtraConfig.PeerAdvertiseAddress, c.GenericConfig.PublicAddress, publicServicePort)
   534  		peerEndpointCtrl := peerreconcilers.New(
   535  			c.GenericConfig.APIServerID,
   536  			peeraddress,
   537  			c.ExtraConfig.PeerEndpointLeaseReconciler,
   538  			c.ExtraConfig.EndpointReconcilerConfig.Interval,
   539  			clientset)
   540  		if err != nil {
   541  			return nil, fmt.Errorf("failed to create peer endpoint lease controller: %w", err)
   542  		}
   543  		m.GenericAPIServer.AddPostStartHookOrDie("peer-endpoint-reconciler-controller",
   544  			func(hookContext genericapiserver.PostStartHookContext) error {
   545  				peerEndpointCtrl.Start(hookContext.StopCh)
   546  				return nil
   547  			})
   548  		m.GenericAPIServer.AddPreShutdownHookOrDie("peer-endpoint-reconciler-controller",
   549  			func() error {
   550  				peerEndpointCtrl.Stop()
   551  				return nil
   552  			})
   553  		// Add PostStartHooks for Unknown Version Proxy filter.
   554  		if c.ExtraConfig.PeerProxy != nil {
   555  			m.GenericAPIServer.AddPostStartHookOrDie("unknown-version-proxy-filter", func(context genericapiserver.PostStartHookContext) error {
   556  				err := c.ExtraConfig.PeerProxy.WaitForCacheSync(context.StopCh)
   557  				return err
   558  			})
   559  		}
   560  	}
   561  
   562  	m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
   563  		controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, clientset)
   564  
   565  		// generate a context  from stopCh. This is to avoid modifying files which are relying on apiserver
   566  		// TODO: See if we can pass ctx to the current method
   567  		ctx := wait.ContextForChannel(hookContext.StopCh)
   568  
   569  		// prime values and start listeners
   570  		if m.ClusterAuthenticationInfo.ClientCA != nil {
   571  			m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
   572  			if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
   573  				// runonce to be sure that we have a value.
   574  				if err := controller.RunOnce(ctx); err != nil {
   575  					runtime.HandleError(err)
   576  				}
   577  				go controller.Run(ctx, 1)
   578  			}
   579  		}
   580  		if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
   581  			m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
   582  			if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
   583  				// runonce to be sure that we have a value.
   584  				if err := controller.RunOnce(ctx); err != nil {
   585  					runtime.HandleError(err)
   586  				}
   587  				go controller.Run(ctx, 1)
   588  			}
   589  		}
   590  
   591  		go controller.Run(ctx, 1)
   592  		return nil
   593  	})
   594  
   595  	if utilfeature.DefaultFeatureGate.Enabled(apiserverfeatures.APIServerIdentity) {
   596  		m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-controller", func(hookContext genericapiserver.PostStartHookContext) error {
   597  			// generate a context  from stopCh. This is to avoid modifying files which are relying on apiserver
   598  			// TODO: See if we can pass ctx to the current method
   599  			ctx := wait.ContextForChannel(hookContext.StopCh)
   600  
   601  			leaseName := m.GenericAPIServer.APIServerID
   602  			holderIdentity := m.GenericAPIServer.APIServerID + "_" + string(uuid.NewUUID())
   603  
   604  			peeraddress := getPeerAddress(c.ExtraConfig.PeerAdvertiseAddress, c.GenericConfig.PublicAddress, publicServicePort)
   605  			// must replace ':,[]' in [ip:port] to be able to store this as a valid label value
   606  			controller := lease.NewController(
   607  				clock.RealClock{},
   608  				clientset,
   609  				holderIdentity,
   610  				int32(IdentityLeaseDurationSeconds),
   611  				nil,
   612  				IdentityLeaseRenewIntervalPeriod,
   613  				leaseName,
   614  				metav1.NamespaceSystem,
   615  				// TODO: receive identity label value as a parameter when post start hook is moved to generic apiserver.
   616  				labelAPIServerHeartbeatFunc(KubeAPIServer, peeraddress))
   617  			go controller.Run(ctx)
   618  			return nil
   619  		})
   620  		// TODO: move this into generic apiserver and make the lease identity value configurable
   621  		m.GenericAPIServer.AddPostStartHookOrDie("start-kube-apiserver-identity-lease-garbage-collector", func(hookContext genericapiserver.PostStartHookContext) error {
   622  			go apiserverleasegc.NewAPIServerLeaseGC(
   623  				clientset,
   624  				IdentityLeaseGCPeriod,
   625  				metav1.NamespaceSystem,
   626  				KubeAPIServerIdentityLeaseLabelSelector,
   627  			).Run(hookContext.StopCh)
   628  			return nil
   629  		})
   630  	}
   631  
   632  	m.GenericAPIServer.AddPostStartHookOrDie("start-legacy-token-tracking-controller", func(hookContext genericapiserver.PostStartHookContext) error {
   633  		go legacytokentracking.NewController(clientset).Run(hookContext.StopCh)
   634  		return nil
   635  	})
   636  
   637  	return m, nil
   638  }
   639  
   640  func labelAPIServerHeartbeatFunc(identity string, peeraddress string) lease.ProcessLeaseFunc {
   641  	return func(lease *coordinationapiv1.Lease) error {
   642  		if lease.Labels == nil {
   643  			lease.Labels = map[string]string{}
   644  		}
   645  
   646  		if lease.Annotations == nil {
   647  			lease.Annotations = map[string]string{}
   648  		}
   649  
   650  		// This label indiciates the identity of the lease object.
   651  		lease.Labels[IdentityLeaseComponentLabelKey] = identity
   652  
   653  		hostname, err := os.Hostname()
   654  		if err != nil {
   655  			return err
   656  		}
   657  
   658  		// convenience label to easily map a lease object to a specific apiserver
   659  		lease.Labels[apiv1.LabelHostname] = hostname
   660  
   661  		// Include apiserver network location <ip_port> used by peers to proxy requests between kube-apiservers
   662  		if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
   663  			if peeraddress != "" {
   664  				lease.Annotations[apiv1.AnnotationPeerAdvertiseAddress] = peeraddress
   665  			}
   666  		}
   667  		return nil
   668  	}
   669  }
   670  
   671  // RESTStorageProvider is a factory type for REST storage.
   672  type RESTStorageProvider interface {
   673  	GroupName() string
   674  	NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, error)
   675  }
   676  
   677  // InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
   678  func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
   679  	nonLegacy := []*genericapiserver.APIGroupInfo{}
   680  
   681  	// used later in the loop to filter the served resource by those that have expired.
   682  	resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*m.GenericAPIServer.Version)
   683  	if err != nil {
   684  		return err
   685  	}
   686  
   687  	for _, restStorageBuilder := range restStorageProviders {
   688  		groupName := restStorageBuilder.GroupName()
   689  		apiGroupInfo, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
   690  		if err != nil {
   691  			return fmt.Errorf("problem initializing API group %q : %v", groupName, err)
   692  		}
   693  		if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
   694  			// If we have no storage for any resource configured, this API group is effectively disabled.
   695  			// This can happen when an entire API group, version, or development-stage (alpha, beta, GA) is disabled.
   696  			klog.Infof("API group %q is not enabled, skipping.", groupName)
   697  			continue
   698  		}
   699  
   700  		// Remove resources that serving kinds that are removed.
   701  		// We do this here so that we don't accidentally serve versions without resources or openapi information that for kinds we don't serve.
   702  		// This is a spot above the construction of individual storage handlers so that no sig accidentally forgets to check.
   703  		resourceExpirationEvaluator.RemoveDeletedKinds(groupName, apiGroupInfo.Scheme, apiGroupInfo.VersionedResourcesStorageMap)
   704  		if len(apiGroupInfo.VersionedResourcesStorageMap) == 0 {
   705  			klog.V(1).Infof("Removing API group %v because it is time to stop serving it because it has no versions per APILifecycle.", groupName)
   706  			continue
   707  		}
   708  
   709  		klog.V(1).Infof("Enabling API group %q.", groupName)
   710  
   711  		if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
   712  			name, hook, err := postHookProvider.PostStartHook()
   713  			if err != nil {
   714  				klog.Fatalf("Error building PostStartHook: %v", err)
   715  			}
   716  			m.GenericAPIServer.AddPostStartHookOrDie(name, hook)
   717  		}
   718  
   719  		if len(groupName) == 0 {
   720  			// the legacy group for core APIs is special that it is installed into /api via this special install method.
   721  			if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
   722  				return fmt.Errorf("error in registering legacy API: %w", err)
   723  			}
   724  		} else {
   725  			// everything else goes to /apis
   726  			nonLegacy = append(nonLegacy, &apiGroupInfo)
   727  		}
   728  	}
   729  
   730  	if err := m.GenericAPIServer.InstallAPIGroups(nonLegacy...); err != nil {
   731  		return fmt.Errorf("error in registering group versions: %v", err)
   732  	}
   733  	return nil
   734  }
   735  
   736  var (
   737  	// stableAPIGroupVersionsEnabledByDefault is a list of our stable versions.
   738  	stableAPIGroupVersionsEnabledByDefault = []schema.GroupVersion{
   739  		admissionregistrationv1.SchemeGroupVersion,
   740  		apiv1.SchemeGroupVersion,
   741  		appsv1.SchemeGroupVersion,
   742  		authenticationv1.SchemeGroupVersion,
   743  		authorizationapiv1.SchemeGroupVersion,
   744  		autoscalingapiv1.SchemeGroupVersion,
   745  		autoscalingapiv2.SchemeGroupVersion,
   746  		batchapiv1.SchemeGroupVersion,
   747  		certificatesapiv1.SchemeGroupVersion,
   748  		coordinationapiv1.SchemeGroupVersion,
   749  		discoveryv1.SchemeGroupVersion,
   750  		eventsv1.SchemeGroupVersion,
   751  		networkingapiv1.SchemeGroupVersion,
   752  		nodev1.SchemeGroupVersion,
   753  		policyapiv1.SchemeGroupVersion,
   754  		rbacv1.SchemeGroupVersion,
   755  		storageapiv1.SchemeGroupVersion,
   756  		schedulingapiv1.SchemeGroupVersion,
   757  		flowcontrolv1.SchemeGroupVersion,
   758  	}
   759  
   760  	// legacyBetaEnabledByDefaultResources is the list of beta resources we enable.  You may only add to this list
   761  	// if your resource is already enabled by default in a beta level we still serve AND there is no stable API for it.
   762  	// see https://github.com/kubernetes/enhancements/tree/master/keps/sig-architecture/3136-beta-apis-off-by-default
   763  	// for more details.
   764  	legacyBetaEnabledByDefaultResources = []schema.GroupVersionResource{
   765  		flowcontrolv1beta3.SchemeGroupVersion.WithResource("flowschemas"),                 // deprecate in 1.29, remove in 1.32
   766  		flowcontrolv1beta3.SchemeGroupVersion.WithResource("prioritylevelconfigurations"), // deprecate in 1.29, remove in 1.32
   767  	}
   768  	// betaAPIGroupVersionsDisabledByDefault is for all future beta groupVersions.
   769  	betaAPIGroupVersionsDisabledByDefault = []schema.GroupVersion{
   770  		admissionregistrationv1beta1.SchemeGroupVersion,
   771  		authenticationv1beta1.SchemeGroupVersion,
   772  		storageapiv1beta1.SchemeGroupVersion,
   773  		flowcontrolv1beta1.SchemeGroupVersion,
   774  		flowcontrolv1beta2.SchemeGroupVersion,
   775  		flowcontrolv1beta3.SchemeGroupVersion,
   776  	}
   777  
   778  	// alphaAPIGroupVersionsDisabledByDefault holds the alpha APIs we have.  They are always disabled by default.
   779  	alphaAPIGroupVersionsDisabledByDefault = []schema.GroupVersion{
   780  		admissionregistrationv1alpha1.SchemeGroupVersion,
   781  		apiserverinternalv1alpha1.SchemeGroupVersion,
   782  		authenticationv1alpha1.SchemeGroupVersion,
   783  		resourcev1alpha2.SchemeGroupVersion,
   784  		certificatesv1alpha1.SchemeGroupVersion,
   785  		networkingapiv1alpha1.SchemeGroupVersion,
   786  		storageapiv1alpha1.SchemeGroupVersion,
   787  		svmv1alpha1.SchemeGroupVersion,
   788  	}
   789  )
   790  
   791  // DefaultAPIResourceConfigSource returns default configuration for an APIResource.
   792  func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
   793  	ret := serverstorage.NewResourceConfig()
   794  	// NOTE: GroupVersions listed here will be enabled by default. Don't put alpha or beta versions in the list.
   795  	ret.EnableVersions(stableAPIGroupVersionsEnabledByDefault...)
   796  
   797  	// disable alpha and beta versions explicitly so we have a full list of what's possible to serve
   798  	ret.DisableVersions(betaAPIGroupVersionsDisabledByDefault...)
   799  	ret.DisableVersions(alphaAPIGroupVersionsDisabledByDefault...)
   800  
   801  	// enable the legacy beta resources that were present before stopped serving new beta APIs by default.
   802  	ret.EnableResources(legacyBetaEnabledByDefaultResources...)
   803  
   804  	return ret
   805  }
   806  
   807  // utility function to get the apiserver address that is used by peer apiservers to proxy
   808  // requests to this apiserver in case the peer is incapable of serving the request
   809  func getPeerAddress(peerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress, publicAddress net.IP, publicServicePort int) string {
   810  	if peerAdvertiseAddress.PeerAdvertiseIP != "" && peerAdvertiseAddress.PeerAdvertisePort != "" {
   811  		return net.JoinHostPort(peerAdvertiseAddress.PeerAdvertiseIP, peerAdvertiseAddress.PeerAdvertisePort)
   812  	} else {
   813  		return net.JoinHostPort(publicAddress.String(), strconv.Itoa(publicServicePort))
   814  	}
   815  }
   816  

View as plain text