...

Source file src/k8s.io/kubernetes/cmd/kube-apiserver/app/server.go

Documentation: k8s.io/kubernetes/cmd/kube-apiserver/app

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package app does all of the work necessary to create a Kubernetes
    18  // APIServer by binding together the API, master and APIServer infrastructure.
    19  // It can be configured and called directly or via the hyperkube framework.
    20  package app
    21  
    22  import (
    23  	"crypto/tls"
    24  	"fmt"
    25  	"net/http"
    26  	"net/url"
    27  	"os"
    28  
    29  	"github.com/spf13/cobra"
    30  
    31  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    32  	extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
    33  	"k8s.io/apimachinery/pkg/runtime"
    34  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    35  	utilnet "k8s.io/apimachinery/pkg/util/net"
    36  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    37  	"k8s.io/apiserver/pkg/admission"
    38  	genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
    39  	genericapiserver "k8s.io/apiserver/pkg/server"
    40  	"k8s.io/apiserver/pkg/server/egressselector"
    41  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    42  	"k8s.io/apiserver/pkg/util/notfoundhandler"
    43  	"k8s.io/apiserver/pkg/util/webhook"
    44  	"k8s.io/client-go/dynamic"
    45  	clientgoinformers "k8s.io/client-go/informers"
    46  	clientset "k8s.io/client-go/kubernetes"
    47  	"k8s.io/client-go/rest"
    48  	"k8s.io/client-go/util/keyutil"
    49  	cliflag "k8s.io/component-base/cli/flag"
    50  	"k8s.io/component-base/cli/globalflag"
    51  	"k8s.io/component-base/logs"
    52  	logsapi "k8s.io/component-base/logs/api/v1"
    53  	_ "k8s.io/component-base/metrics/prometheus/workqueue"
    54  	"k8s.io/component-base/term"
    55  	"k8s.io/component-base/version"
    56  	"k8s.io/component-base/version/verflag"
    57  	"k8s.io/klog/v2"
    58  	aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
    59  	aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
    60  
    61  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    62  	"k8s.io/kubernetes/pkg/api/legacyscheme"
    63  	"k8s.io/kubernetes/pkg/capabilities"
    64  	"k8s.io/kubernetes/pkg/controlplane"
    65  	controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
    66  	"k8s.io/kubernetes/pkg/controlplane/reconcilers"
    67  	"k8s.io/kubernetes/pkg/features"
    68  	generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
    69  	kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
    70  	"k8s.io/kubernetes/pkg/serviceaccount"
    71  )
    72  
    73  func init() {
    74  	utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    75  }
    76  
    77  // NewAPIServerCommand creates a *cobra.Command object with default parameters
    78  func NewAPIServerCommand() *cobra.Command {
    79  	s := options.NewServerRunOptions()
    80  	cmd := &cobra.Command{
    81  		Use: "kube-apiserver",
    82  		Long: `The Kubernetes API server validates and configures data
    83  for the api objects which include pods, services, replicationcontrollers, and
    84  others. The API Server services REST operations and provides the frontend to the
    85  cluster's shared state through which all other components interact.`,
    86  
    87  		// stop printing usage when the command errors
    88  		SilenceUsage: true,
    89  		PersistentPreRunE: func(*cobra.Command, []string) error {
    90  			// silence client-go warnings.
    91  			// kube-apiserver loopback clients should not log self-issued warnings.
    92  			rest.SetDefaultWarningHandler(rest.NoWarnings{})
    93  			return nil
    94  		},
    95  		RunE: func(cmd *cobra.Command, args []string) error {
    96  			verflag.PrintAndExitIfRequested()
    97  			fs := cmd.Flags()
    98  
    99  			// Activate logging as soon as possible, after that
   100  			// show flags with the final logging configuration.
   101  			if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
   102  				return err
   103  			}
   104  			cliflag.PrintFlags(fs)
   105  
   106  			// set default options
   107  			completedOptions, err := s.Complete()
   108  			if err != nil {
   109  				return err
   110  			}
   111  
   112  			// validate options
   113  			if errs := completedOptions.Validate(); len(errs) != 0 {
   114  				return utilerrors.NewAggregate(errs)
   115  			}
   116  			// add feature enablement metrics
   117  			utilfeature.DefaultMutableFeatureGate.AddMetrics()
   118  			return Run(completedOptions, genericapiserver.SetupSignalHandler())
   119  		},
   120  		Args: func(cmd *cobra.Command, args []string) error {
   121  			for _, arg := range args {
   122  				if len(arg) > 0 {
   123  					return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
   124  				}
   125  			}
   126  			return nil
   127  		},
   128  	}
   129  
   130  	fs := cmd.Flags()
   131  	namedFlagSets := s.Flags()
   132  	verflag.AddFlags(namedFlagSets.FlagSet("global"))
   133  	globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
   134  	options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
   135  	for _, f := range namedFlagSets.FlagSets {
   136  		fs.AddFlagSet(f)
   137  	}
   138  
   139  	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
   140  	cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
   141  
   142  	return cmd
   143  }
   144  
   145  // Run runs the specified APIServer.  This should never exit.
   146  func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
   147  	// To help debugging, immediately log version
   148  	klog.Infof("Version: %+v", version.Get())
   149  
   150  	klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
   151  
   152  	config, err := NewConfig(opts)
   153  	if err != nil {
   154  		return err
   155  	}
   156  	completed, err := config.Complete()
   157  	if err != nil {
   158  		return err
   159  	}
   160  	server, err := CreateServerChain(completed)
   161  	if err != nil {
   162  		return err
   163  	}
   164  
   165  	prepared, err := server.PrepareRun()
   166  	if err != nil {
   167  		return err
   168  	}
   169  
   170  	return prepared.Run(stopCh)
   171  }
   172  
   173  // CreateServerChain creates the apiservers connected via delegation.
   174  func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
   175  	notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
   176  	apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
   177  	if err != nil {
   178  		return nil, err
   179  	}
   180  	crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
   181  
   182  	kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
   183  	if err != nil {
   184  		return nil, err
   185  	}
   186  
   187  	// aggregator comes last in the chain
   188  	aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
   189  	if err != nil {
   190  		// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
   191  		return nil, err
   192  	}
   193  
   194  	return aggregatorServer, nil
   195  }
   196  
   197  // CreateProxyTransport creates the dialer infrastructure to connect to the nodes.
   198  func CreateProxyTransport() *http.Transport {
   199  	var proxyDialerFn utilnet.DialFunc
   200  	// Proxying to pods and services is IP-based... don't expect to be able to verify the hostname
   201  	proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
   202  	proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
   203  		DialContext:     proxyDialerFn,
   204  		TLSClientConfig: proxyTLSClientConfig,
   205  	})
   206  	return proxyTransport
   207  }
   208  
   209  // CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
   210  func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
   211  	*controlplane.Config,
   212  	aggregatorapiserver.ServiceResolver,
   213  	[]admission.PluginInitializer,
   214  	error,
   215  ) {
   216  	proxyTransport := CreateProxyTransport()
   217  
   218  	genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
   219  		opts.CompletedOptions,
   220  		[]*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme},
   221  		generatedopenapi.GetOpenAPIDefinitions,
   222  	)
   223  	if err != nil {
   224  		return nil, nil, nil, err
   225  	}
   226  
   227  	capabilities.Setup(opts.AllowPrivileged, opts.MaxConnectionBytesPerSec)
   228  
   229  	opts.Metrics.Apply()
   230  	serviceaccount.RegisterMetrics()
   231  
   232  	config := &controlplane.Config{
   233  		GenericConfig: genericConfig,
   234  		ExtraConfig: controlplane.ExtraConfig{
   235  			APIResourceConfigSource: storageFactory.APIResourceConfigSource,
   236  			StorageFactory:          storageFactory,
   237  			EventTTL:                opts.EventTTL,
   238  			KubeletClientConfig:     opts.KubeletConfig,
   239  			EnableLogsSupport:       opts.EnableLogsHandler,
   240  			ProxyTransport:          proxyTransport,
   241  
   242  			ServiceIPRange:          opts.PrimaryServiceClusterIPRange,
   243  			APIServerServiceIP:      opts.APIServerServiceIP,
   244  			SecondaryServiceIPRange: opts.SecondaryServiceClusterIPRange,
   245  
   246  			APIServerServicePort: 443,
   247  
   248  			ServiceNodePortRange:      opts.ServiceNodePortRange,
   249  			KubernetesServiceNodePort: opts.KubernetesServiceNodePort,
   250  
   251  			EndpointReconcilerType: reconcilers.Type(opts.EndpointReconcilerType),
   252  			MasterCount:            opts.MasterCount,
   253  
   254  			ServiceAccountIssuer:        opts.ServiceAccountIssuer,
   255  			ServiceAccountMaxExpiration: opts.ServiceAccountTokenMaxExpiration,
   256  			ExtendExpiration:            opts.Authentication.ServiceAccounts.ExtendExpiration,
   257  
   258  			VersionedInformers: versionedInformers,
   259  		},
   260  	}
   261  
   262  	if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
   263  		config.ExtraConfig.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
   264  		if err != nil {
   265  			return nil, nil, nil, err
   266  		}
   267  		// build peer proxy config only if peer ca file exists
   268  		if opts.PeerCAFile != "" {
   269  			config.ExtraConfig.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
   270  				opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ExtraConfig.PeerEndpointLeaseReconciler, config.GenericConfig.Serializer)
   271  			if err != nil {
   272  				return nil, nil, nil, err
   273  			}
   274  		}
   275  	}
   276  
   277  	clientCAProvider, err := opts.Authentication.ClientCert.GetClientCAContentProvider()
   278  	if err != nil {
   279  		return nil, nil, nil, err
   280  	}
   281  	config.ExtraConfig.ClusterAuthenticationInfo.ClientCA = clientCAProvider
   282  
   283  	requestHeaderConfig, err := opts.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()
   284  	if err != nil {
   285  		return nil, nil, nil, err
   286  	}
   287  	if requestHeaderConfig != nil {
   288  		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA = requestHeaderConfig.CAContentProvider
   289  		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames = requestHeaderConfig.AllowedClientNames
   290  		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes = requestHeaderConfig.ExtraHeaderPrefixes
   291  		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders = requestHeaderConfig.GroupHeaders
   292  		config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders
   293  	}
   294  
   295  	// setup admission
   296  	admissionConfig := &kubeapiserveradmission.Config{
   297  		ExternalInformers:    versionedInformers,
   298  		LoopbackClientConfig: genericConfig.LoopbackClientConfig,
   299  		CloudConfigFile:      opts.CloudProvider.CloudConfigFile,
   300  	}
   301  	serviceResolver := buildServiceResolver(opts.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
   302  	pluginInitializers, err := admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider)
   303  	if err != nil {
   304  		return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
   305  	}
   306  	clientgoExternalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
   307  	if err != nil {
   308  		return nil, nil, nil, fmt.Errorf("failed to create real client-go external client: %w", err)
   309  	}
   310  	dynamicExternalClient, err := dynamic.NewForConfig(genericConfig.LoopbackClientConfig)
   311  	if err != nil {
   312  		return nil, nil, nil, fmt.Errorf("failed to create real dynamic external client: %w", err)
   313  	}
   314  	err = opts.Admission.ApplyTo(
   315  		genericConfig,
   316  		versionedInformers,
   317  		clientgoExternalClient,
   318  		dynamicExternalClient,
   319  		utilfeature.DefaultFeatureGate,
   320  		pluginInitializers...)
   321  	if err != nil {
   322  		return nil, nil, nil, fmt.Errorf("failed to apply admission: %w", err)
   323  	}
   324  
   325  	if config.GenericConfig.EgressSelector != nil {
   326  		// Use the config.GenericConfig.EgressSelector lookup to find the dialer to connect to the kubelet
   327  		config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
   328  
   329  		// Use the config.GenericConfig.EgressSelector lookup as the transport used by the "proxy" subresources.
   330  		networkContext := egressselector.Cluster.AsNetworkContext()
   331  		dialer, err := config.GenericConfig.EgressSelector.Lookup(networkContext)
   332  		if err != nil {
   333  			return nil, nil, nil, err
   334  		}
   335  		c := proxyTransport.Clone()
   336  		c.DialContext = dialer
   337  		config.ExtraConfig.ProxyTransport = c
   338  	}
   339  
   340  	// Load and set the public keys.
   341  	var pubKeys []interface{}
   342  	for _, f := range opts.Authentication.ServiceAccounts.KeyFiles {
   343  		keys, err := keyutil.PublicKeysFromFile(f)
   344  		if err != nil {
   345  			return nil, nil, nil, fmt.Errorf("failed to parse key file %q: %v", f, err)
   346  		}
   347  		pubKeys = append(pubKeys, keys...)
   348  	}
   349  	config.ExtraConfig.ServiceAccountIssuerURL = opts.Authentication.ServiceAccounts.Issuers[0]
   350  	config.ExtraConfig.ServiceAccountJWKSURI = opts.Authentication.ServiceAccounts.JWKSURI
   351  	config.ExtraConfig.ServiceAccountPublicKeys = pubKeys
   352  
   353  	return config, serviceResolver, pluginInitializers, nil
   354  }
   355  
   356  var testServiceResolver webhook.ServiceResolver
   357  
   358  // SetServiceResolverForTests allows the service resolver to be overridden during tests.
   359  // Tests using this function must run serially as this function is not safe to call concurrently with server start.
   360  func SetServiceResolverForTests(resolver webhook.ServiceResolver) func() {
   361  	if testServiceResolver != nil {
   362  		panic("test service resolver is set: tests are either running concurrently or clean up was skipped")
   363  	}
   364  
   365  	testServiceResolver = resolver
   366  
   367  	return func() {
   368  		testServiceResolver = nil
   369  	}
   370  }
   371  
   372  func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver {
   373  	if testServiceResolver != nil {
   374  		return testServiceResolver
   375  	}
   376  
   377  	var serviceResolver webhook.ServiceResolver
   378  	if enabledAggregatorRouting {
   379  		serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
   380  			informer.Core().V1().Services().Lister(),
   381  			informer.Core().V1().Endpoints().Lister(),
   382  		)
   383  	} else {
   384  		serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
   385  			informer.Core().V1().Services().Lister(),
   386  		)
   387  	}
   388  
   389  	// resolve kubernetes.default.svc locally
   390  	if localHost, err := url.Parse(hostname); err == nil {
   391  		serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
   392  	}
   393  	return serviceResolver
   394  }
   395  

View as plain text