1
16
17
18
19
20 package app
21
22 import (
23 "crypto/tls"
24 "fmt"
25 "net/http"
26 "net/url"
27 "os"
28
29 "github.com/spf13/cobra"
30
31 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
32 extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
33 "k8s.io/apimachinery/pkg/runtime"
34 utilerrors "k8s.io/apimachinery/pkg/util/errors"
35 utilnet "k8s.io/apimachinery/pkg/util/net"
36 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37 "k8s.io/apiserver/pkg/admission"
38 genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
39 genericapiserver "k8s.io/apiserver/pkg/server"
40 "k8s.io/apiserver/pkg/server/egressselector"
41 utilfeature "k8s.io/apiserver/pkg/util/feature"
42 "k8s.io/apiserver/pkg/util/notfoundhandler"
43 "k8s.io/apiserver/pkg/util/webhook"
44 "k8s.io/client-go/dynamic"
45 clientgoinformers "k8s.io/client-go/informers"
46 clientset "k8s.io/client-go/kubernetes"
47 "k8s.io/client-go/rest"
48 "k8s.io/client-go/util/keyutil"
49 cliflag "k8s.io/component-base/cli/flag"
50 "k8s.io/component-base/cli/globalflag"
51 "k8s.io/component-base/logs"
52 logsapi "k8s.io/component-base/logs/api/v1"
53 _ "k8s.io/component-base/metrics/prometheus/workqueue"
54 "k8s.io/component-base/term"
55 "k8s.io/component-base/version"
56 "k8s.io/component-base/version/verflag"
57 "k8s.io/klog/v2"
58 aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
59 aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
60
61 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
62 "k8s.io/kubernetes/pkg/api/legacyscheme"
63 "k8s.io/kubernetes/pkg/capabilities"
64 "k8s.io/kubernetes/pkg/controlplane"
65 controlplaneapiserver "k8s.io/kubernetes/pkg/controlplane/apiserver"
66 "k8s.io/kubernetes/pkg/controlplane/reconcilers"
67 "k8s.io/kubernetes/pkg/features"
68 generatedopenapi "k8s.io/kubernetes/pkg/generated/openapi"
69 kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
70 "k8s.io/kubernetes/pkg/serviceaccount"
71 )
72
73 func init() {
74 utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
75 }
76
77
78 func NewAPIServerCommand() *cobra.Command {
79 s := options.NewServerRunOptions()
80 cmd := &cobra.Command{
81 Use: "kube-apiserver",
82 Long: `The Kubernetes API server validates and configures data
83 for the api objects which include pods, services, replicationcontrollers, and
84 others. The API Server services REST operations and provides the frontend to the
85 cluster's shared state through which all other components interact.`,
86
87
88 SilenceUsage: true,
89 PersistentPreRunE: func(*cobra.Command, []string) error {
90
91
92 rest.SetDefaultWarningHandler(rest.NoWarnings{})
93 return nil
94 },
95 RunE: func(cmd *cobra.Command, args []string) error {
96 verflag.PrintAndExitIfRequested()
97 fs := cmd.Flags()
98
99
100
101 if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
102 return err
103 }
104 cliflag.PrintFlags(fs)
105
106
107 completedOptions, err := s.Complete()
108 if err != nil {
109 return err
110 }
111
112
113 if errs := completedOptions.Validate(); len(errs) != 0 {
114 return utilerrors.NewAggregate(errs)
115 }
116
117 utilfeature.DefaultMutableFeatureGate.AddMetrics()
118 return Run(completedOptions, genericapiserver.SetupSignalHandler())
119 },
120 Args: func(cmd *cobra.Command, args []string) error {
121 for _, arg := range args {
122 if len(arg) > 0 {
123 return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
124 }
125 }
126 return nil
127 },
128 }
129
130 fs := cmd.Flags()
131 namedFlagSets := s.Flags()
132 verflag.AddFlags(namedFlagSets.FlagSet("global"))
133 globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
134 options.AddCustomGlobalFlags(namedFlagSets.FlagSet("generic"))
135 for _, f := range namedFlagSets.FlagSets {
136 fs.AddFlagSet(f)
137 }
138
139 cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
140 cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
141
142 return cmd
143 }
144
145
146 func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
147
148 klog.Infof("Version: %+v", version.Get())
149
150 klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
151
152 config, err := NewConfig(opts)
153 if err != nil {
154 return err
155 }
156 completed, err := config.Complete()
157 if err != nil {
158 return err
159 }
160 server, err := CreateServerChain(completed)
161 if err != nil {
162 return err
163 }
164
165 prepared, err := server.PrepareRun()
166 if err != nil {
167 return err
168 }
169
170 return prepared.Run(stopCh)
171 }
172
173
174 func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
175 notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
176 apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
177 if err != nil {
178 return nil, err
179 }
180 crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
181
182 kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
183 if err != nil {
184 return nil, err
185 }
186
187
188 aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
189 if err != nil {
190
191 return nil, err
192 }
193
194 return aggregatorServer, nil
195 }
196
197
198 func CreateProxyTransport() *http.Transport {
199 var proxyDialerFn utilnet.DialFunc
200
201 proxyTLSClientConfig := &tls.Config{InsecureSkipVerify: true}
202 proxyTransport := utilnet.SetTransportDefaults(&http.Transport{
203 DialContext: proxyDialerFn,
204 TLSClientConfig: proxyTLSClientConfig,
205 })
206 return proxyTransport
207 }
208
209
210 func CreateKubeAPIServerConfig(opts options.CompletedOptions) (
211 *controlplane.Config,
212 aggregatorapiserver.ServiceResolver,
213 []admission.PluginInitializer,
214 error,
215 ) {
216 proxyTransport := CreateProxyTransport()
217
218 genericConfig, versionedInformers, storageFactory, err := controlplaneapiserver.BuildGenericConfig(
219 opts.CompletedOptions,
220 []*runtime.Scheme{legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme},
221 generatedopenapi.GetOpenAPIDefinitions,
222 )
223 if err != nil {
224 return nil, nil, nil, err
225 }
226
227 capabilities.Setup(opts.AllowPrivileged, opts.MaxConnectionBytesPerSec)
228
229 opts.Metrics.Apply()
230 serviceaccount.RegisterMetrics()
231
232 config := &controlplane.Config{
233 GenericConfig: genericConfig,
234 ExtraConfig: controlplane.ExtraConfig{
235 APIResourceConfigSource: storageFactory.APIResourceConfigSource,
236 StorageFactory: storageFactory,
237 EventTTL: opts.EventTTL,
238 KubeletClientConfig: opts.KubeletConfig,
239 EnableLogsSupport: opts.EnableLogsHandler,
240 ProxyTransport: proxyTransport,
241
242 ServiceIPRange: opts.PrimaryServiceClusterIPRange,
243 APIServerServiceIP: opts.APIServerServiceIP,
244 SecondaryServiceIPRange: opts.SecondaryServiceClusterIPRange,
245
246 APIServerServicePort: 443,
247
248 ServiceNodePortRange: opts.ServiceNodePortRange,
249 KubernetesServiceNodePort: opts.KubernetesServiceNodePort,
250
251 EndpointReconcilerType: reconcilers.Type(opts.EndpointReconcilerType),
252 MasterCount: opts.MasterCount,
253
254 ServiceAccountIssuer: opts.ServiceAccountIssuer,
255 ServiceAccountMaxExpiration: opts.ServiceAccountTokenMaxExpiration,
256 ExtendExpiration: opts.Authentication.ServiceAccounts.ExtendExpiration,
257
258 VersionedInformers: versionedInformers,
259 },
260 }
261
262 if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) {
263 config.ExtraConfig.PeerEndpointLeaseReconciler, err = controlplaneapiserver.CreatePeerEndpointLeaseReconciler(*genericConfig, storageFactory)
264 if err != nil {
265 return nil, nil, nil, err
266 }
267
268 if opts.PeerCAFile != "" {
269 config.ExtraConfig.PeerProxy, err = controlplaneapiserver.BuildPeerProxy(versionedInformers, genericConfig.StorageVersionManager, opts.ProxyClientCertFile,
270 opts.ProxyClientKeyFile, opts.PeerCAFile, opts.PeerAdvertiseAddress, genericConfig.APIServerID, config.ExtraConfig.PeerEndpointLeaseReconciler, config.GenericConfig.Serializer)
271 if err != nil {
272 return nil, nil, nil, err
273 }
274 }
275 }
276
277 clientCAProvider, err := opts.Authentication.ClientCert.GetClientCAContentProvider()
278 if err != nil {
279 return nil, nil, nil, err
280 }
281 config.ExtraConfig.ClusterAuthenticationInfo.ClientCA = clientCAProvider
282
283 requestHeaderConfig, err := opts.Authentication.RequestHeader.ToAuthenticationRequestHeaderConfig()
284 if err != nil {
285 return nil, nil, nil, err
286 }
287 if requestHeaderConfig != nil {
288 config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderCA = requestHeaderConfig.CAContentProvider
289 config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderAllowedNames = requestHeaderConfig.AllowedClientNames
290 config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderExtraHeaderPrefixes = requestHeaderConfig.ExtraHeaderPrefixes
291 config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderGroupHeaders = requestHeaderConfig.GroupHeaders
292 config.ExtraConfig.ClusterAuthenticationInfo.RequestHeaderUsernameHeaders = requestHeaderConfig.UsernameHeaders
293 }
294
295
296 admissionConfig := &kubeapiserveradmission.Config{
297 ExternalInformers: versionedInformers,
298 LoopbackClientConfig: genericConfig.LoopbackClientConfig,
299 CloudConfigFile: opts.CloudProvider.CloudConfigFile,
300 }
301 serviceResolver := buildServiceResolver(opts.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
302 pluginInitializers, err := admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver, genericConfig.TracerProvider)
303 if err != nil {
304 return nil, nil, nil, fmt.Errorf("failed to create admission plugin initializer: %v", err)
305 }
306 clientgoExternalClient, err := clientset.NewForConfig(genericConfig.LoopbackClientConfig)
307 if err != nil {
308 return nil, nil, nil, fmt.Errorf("failed to create real client-go external client: %w", err)
309 }
310 dynamicExternalClient, err := dynamic.NewForConfig(genericConfig.LoopbackClientConfig)
311 if err != nil {
312 return nil, nil, nil, fmt.Errorf("failed to create real dynamic external client: %w", err)
313 }
314 err = opts.Admission.ApplyTo(
315 genericConfig,
316 versionedInformers,
317 clientgoExternalClient,
318 dynamicExternalClient,
319 utilfeature.DefaultFeatureGate,
320 pluginInitializers...)
321 if err != nil {
322 return nil, nil, nil, fmt.Errorf("failed to apply admission: %w", err)
323 }
324
325 if config.GenericConfig.EgressSelector != nil {
326
327 config.ExtraConfig.KubeletClientConfig.Lookup = config.GenericConfig.EgressSelector.Lookup
328
329
330 networkContext := egressselector.Cluster.AsNetworkContext()
331 dialer, err := config.GenericConfig.EgressSelector.Lookup(networkContext)
332 if err != nil {
333 return nil, nil, nil, err
334 }
335 c := proxyTransport.Clone()
336 c.DialContext = dialer
337 config.ExtraConfig.ProxyTransport = c
338 }
339
340
341 var pubKeys []interface{}
342 for _, f := range opts.Authentication.ServiceAccounts.KeyFiles {
343 keys, err := keyutil.PublicKeysFromFile(f)
344 if err != nil {
345 return nil, nil, nil, fmt.Errorf("failed to parse key file %q: %v", f, err)
346 }
347 pubKeys = append(pubKeys, keys...)
348 }
349 config.ExtraConfig.ServiceAccountIssuerURL = opts.Authentication.ServiceAccounts.Issuers[0]
350 config.ExtraConfig.ServiceAccountJWKSURI = opts.Authentication.ServiceAccounts.JWKSURI
351 config.ExtraConfig.ServiceAccountPublicKeys = pubKeys
352
353 return config, serviceResolver, pluginInitializers, nil
354 }
355
356 var testServiceResolver webhook.ServiceResolver
357
358
359
360 func SetServiceResolverForTests(resolver webhook.ServiceResolver) func() {
361 if testServiceResolver != nil {
362 panic("test service resolver is set: tests are either running concurrently or clean up was skipped")
363 }
364
365 testServiceResolver = resolver
366
367 return func() {
368 testServiceResolver = nil
369 }
370 }
371
372 func buildServiceResolver(enabledAggregatorRouting bool, hostname string, informer clientgoinformers.SharedInformerFactory) webhook.ServiceResolver {
373 if testServiceResolver != nil {
374 return testServiceResolver
375 }
376
377 var serviceResolver webhook.ServiceResolver
378 if enabledAggregatorRouting {
379 serviceResolver = aggregatorapiserver.NewEndpointServiceResolver(
380 informer.Core().V1().Services().Lister(),
381 informer.Core().V1().Endpoints().Lister(),
382 )
383 } else {
384 serviceResolver = aggregatorapiserver.NewClusterIPServiceResolver(
385 informer.Core().V1().Services().Lister(),
386 )
387 }
388
389
390 if localHost, err := url.Parse(hostname); err == nil {
391 serviceResolver = aggregatorapiserver.NewLoopbackServiceResolver(serviceResolver, localHost)
392 }
393 return serviceResolver
394 }
395
View as plain text