1
16
17 package apiserver
18
19 import (
20 "context"
21 "fmt"
22 "net/http"
23 "time"
24
25 apierrors "k8s.io/apimachinery/pkg/api/errors"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/runtime/schema"
28 "k8s.io/apimachinery/pkg/util/sets"
29 "k8s.io/apimachinery/pkg/util/wait"
30 "k8s.io/apiserver/pkg/endpoints/discovery/aggregated"
31 genericfeatures "k8s.io/apiserver/pkg/features"
32 peerreconcilers "k8s.io/apiserver/pkg/reconcilers"
33 genericapiserver "k8s.io/apiserver/pkg/server"
34 "k8s.io/apiserver/pkg/server/dynamiccertificates"
35 "k8s.io/apiserver/pkg/server/egressselector"
36 serverstorage "k8s.io/apiserver/pkg/server/storage"
37 utilfeature "k8s.io/apiserver/pkg/util/feature"
38 "k8s.io/client-go/kubernetes"
39 "k8s.io/client-go/transport"
40 "k8s.io/component-base/version"
41 v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
42 v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
43 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
44 aggregatorscheme "k8s.io/kube-aggregator/pkg/apiserver/scheme"
45 "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
46 informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions"
47 listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
48 openapicontroller "k8s.io/kube-aggregator/pkg/controllers/openapi"
49 openapiaggregator "k8s.io/kube-aggregator/pkg/controllers/openapi/aggregator"
50 openapiv3controller "k8s.io/kube-aggregator/pkg/controllers/openapiv3"
51 openapiv3aggregator "k8s.io/kube-aggregator/pkg/controllers/openapiv3/aggregator"
52 statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
53 apiservicerest "k8s.io/kube-aggregator/pkg/registry/apiservice/rest"
54 openapicommon "k8s.io/kube-openapi/pkg/common"
55 )
56
57 func init() {
58
59 metav1.AddToGroupVersion(aggregatorscheme.Scheme, schema.GroupVersion{Group: "", Version: "v1"})
60
61 unversioned := schema.GroupVersion{Group: "", Version: "v1"}
62 aggregatorscheme.Scheme.AddUnversionedTypes(unversioned,
63 &metav1.Status{},
64 &metav1.APIVersions{},
65 &metav1.APIGroupList{},
66 &metav1.APIGroup{},
67 &metav1.APIResourceList{},
68 )
69 }
70
71 const (
72
73 legacyAPIServiceName = "v1."
74
75 StorageVersionPostStartHookName = "built-in-resources-storage-version-updater"
76 )
77
78
79 type ExtraConfig struct {
80
81
82
83 PeerCAFile string
84
85
86
87
88 PeerAdvertiseAddress peerreconcilers.PeerAdvertiseAddress
89
90
91
92 ProxyClientCertFile string
93 ProxyClientKeyFile string
94
95
96
97 ProxyTransport *http.Transport
98
99
100 ServiceResolver ServiceResolver
101
102 RejectForwardingRedirects bool
103 }
104
105
106 type Config struct {
107 GenericConfig *genericapiserver.RecommendedConfig
108 ExtraConfig ExtraConfig
109 }
110
111 type completedConfig struct {
112 GenericConfig genericapiserver.CompletedConfig
113 ExtraConfig *ExtraConfig
114 }
115
116
117 type CompletedConfig struct {
118
119 *completedConfig
120 }
121
122 type runnable interface {
123 Run(stopCh <-chan struct{}) error
124 }
125
126
127 type preparedAPIAggregator struct {
128 *APIAggregator
129 runnable runnable
130 }
131
132
133 type APIAggregator struct {
134 GenericAPIServer *genericapiserver.GenericAPIServer
135
136
137 APIRegistrationInformers informers.SharedInformerFactory
138
139 delegateHandler http.Handler
140
141
142 proxyCurrentCertKeyContent certKeyFunc
143 proxyTransportDial *transport.DialHolder
144
145
146 proxyHandlers map[string]*proxyHandler
147
148
149 handledGroupVersions map[string]sets.Set[string]
150
151
152
153 lister listers.APIServiceLister
154
155
156 serviceResolver ServiceResolver
157
158
159 openAPIConfig *openapicommon.Config
160
161
162 openAPIV3Config *openapicommon.OpenAPIV3Config
163
164
165 openAPIAggregationController *openapicontroller.AggregationController
166
167
168 openAPIV3AggregationController *openapiv3controller.AggregationController
169
170
171
172
173 discoveryAggregationController DiscoveryAggregationController
174
175
176 rejectForwardingRedirects bool
177 }
178
179
180 func (cfg *Config) Complete() CompletedConfig {
181 c := completedConfig{
182 cfg.GenericConfig.Complete(),
183 &cfg.ExtraConfig,
184 }
185
186
187
188 c.GenericConfig.EnableDiscovery = false
189 version := version.Get()
190 c.GenericConfig.Version = &version
191
192 return CompletedConfig{&c}
193 }
194
195
196 func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
197 genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
198 if err != nil {
199 return nil, err
200 }
201
202 apiregistrationClient, err := clientset.NewForConfig(c.GenericConfig.LoopbackClientConfig)
203 if err != nil {
204 return nil, err
205 }
206 informerFactory := informers.NewSharedInformerFactory(
207 apiregistrationClient,
208 5*time.Minute,
209 )
210
211
212
213
214
215
216 apiServiceRegistrationControllerInitiated := make(chan struct{})
217 if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("APIServiceRegistrationControllerInitiated", apiServiceRegistrationControllerInitiated); err != nil {
218 return nil, err
219 }
220
221 var proxyTransportDial *transport.DialHolder
222 if c.GenericConfig.EgressSelector != nil {
223 egressDialer, err := c.GenericConfig.EgressSelector.Lookup(egressselector.Cluster.AsNetworkContext())
224 if err != nil {
225 return nil, err
226 }
227 if egressDialer != nil {
228 proxyTransportDial = &transport.DialHolder{Dial: egressDialer}
229 }
230 } else if c.ExtraConfig.ProxyTransport != nil && c.ExtraConfig.ProxyTransport.DialContext != nil {
231 proxyTransportDial = &transport.DialHolder{Dial: c.ExtraConfig.ProxyTransport.DialContext}
232 }
233
234 s := &APIAggregator{
235 GenericAPIServer: genericServer,
236 delegateHandler: delegationTarget.UnprotectedHandler(),
237 proxyTransportDial: proxyTransportDial,
238 proxyHandlers: map[string]*proxyHandler{},
239 handledGroupVersions: map[string]sets.Set[string]{},
240 lister: informerFactory.Apiregistration().V1().APIServices().Lister(),
241 APIRegistrationInformers: informerFactory,
242 serviceResolver: c.ExtraConfig.ServiceResolver,
243 openAPIConfig: c.GenericConfig.OpenAPIConfig,
244 openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
245 proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
246 rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
247 }
248
249
250 resourceExpirationEvaluator, err := genericapiserver.NewResourceExpirationEvaluator(*c.GenericConfig.Version)
251 if err != nil {
252 return nil, err
253 }
254
255 apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter, resourceExpirationEvaluator.ShouldServeForVersion(1, 22))
256 if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
257 return nil, err
258 }
259
260 enabledVersions := sets.NewString()
261 for v := range apiGroupInfo.VersionedResourcesStorageMap {
262 enabledVersions.Insert(v)
263 }
264 if !enabledVersions.Has(v1.SchemeGroupVersion.Version) {
265 return nil, fmt.Errorf("API group/version %s must be enabled", v1.SchemeGroupVersion.String())
266 }
267
268 apisHandler := &apisHandler{
269 codecs: aggregatorscheme.Codecs,
270 lister: s.lister,
271 discoveryGroup: discoveryGroup(enabledVersions),
272 }
273
274 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
275 apisHandlerWithAggregationSupport := aggregated.WrapAggregatedDiscoveryToHandler(apisHandler, s.GenericAPIServer.AggregatedDiscoveryGroupManager)
276 s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandlerWithAggregationSupport)
277 } else {
278 s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", apisHandler)
279 }
280 s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
281
282 apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().V1().APIServices(), s)
283 if len(c.ExtraConfig.ProxyClientCertFile) > 0 && len(c.ExtraConfig.ProxyClientKeyFile) > 0 {
284 aggregatorProxyCerts, err := dynamiccertificates.NewDynamicServingContentFromFiles("aggregator-proxy-cert", c.ExtraConfig.ProxyClientCertFile, c.ExtraConfig.ProxyClientKeyFile)
285 if err != nil {
286 return nil, err
287 }
288
289
290 ctx := context.TODO()
291 if err := aggregatorProxyCerts.RunOnce(ctx); err != nil {
292 return nil, err
293 }
294 aggregatorProxyCerts.AddListener(apiserviceRegistrationController)
295 s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent
296
297 s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error {
298
299
300 ctx, cancel := context.WithCancel(context.Background())
301 go func() {
302 select {
303 case <-postStartHookContext.StopCh:
304 cancel()
305 case <-ctx.Done():
306 }
307 }()
308 go aggregatorProxyCerts.Run(ctx, 1)
309 return nil
310 })
311 }
312
313 availableController, err := statuscontrollers.NewAvailableConditionController(
314 informerFactory.Apiregistration().V1().APIServices(),
315 c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
316 c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
317 apiregistrationClient.ApiregistrationV1(),
318 proxyTransportDial,
319 (func() ([]byte, []byte))(s.proxyCurrentCertKeyContent),
320 s.serviceResolver,
321 )
322 if err != nil {
323 return nil, err
324 }
325
326 s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
327 informerFactory.Start(context.StopCh)
328 c.GenericConfig.SharedInformerFactory.Start(context.StopCh)
329 return nil
330 })
331 s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
332 go apiserviceRegistrationController.Run(context.StopCh, apiServiceRegistrationControllerInitiated)
333 select {
334 case <-context.StopCh:
335 case <-apiServiceRegistrationControllerInitiated:
336 }
337
338 return nil
339 })
340 s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
341
342 go availableController.Run(5, context.StopCh)
343 return nil
344 })
345
346 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.AggregatedDiscoveryEndpoint) {
347 s.discoveryAggregationController = NewDiscoveryManager(
348
349
350 s.GenericAPIServer.AggregatedDiscoveryGroupManager.WithSource(aggregated.AggregatorSource),
351 )
352
353
354 s.GenericAPIServer.AddPostStartHookOrDie("apiservice-discovery-controller", func(context genericapiserver.PostStartHookContext) error {
355
356
357 select {
358 case <-context.StopCh:
359 return nil
360
361 case <-apiServiceRegistrationControllerInitiated:
362 }
363
364
365
366
367
368 discoverySyncedCh := make(chan struct{})
369 go s.discoveryAggregationController.Run(context.StopCh, discoverySyncedCh)
370
371 select {
372 case <-context.StopCh:
373 return nil
374
375 case <-discoverySyncedCh:
376
377 }
378 return nil
379 })
380 }
381
382 if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) &&
383 utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) {
384
385
386 s.GenericAPIServer.AddPostStartHookOrDie(StorageVersionPostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {
387
388
389
390 kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
391 if err != nil {
392 return err
393 }
394 if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
395 _, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get(
396 context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{})
397 if apierrors.IsNotFound(err) {
398 return false, nil
399 }
400 if err != nil {
401 return false, err
402 }
403 return true, nil
404 }, hookContext.StopCh); err != nil {
405 return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v",
406 s.GenericAPIServer.APIServerID, err)
407 }
408
409
410
411
412
413
414 go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) {
415
416
417
418 s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID)
419 return false, nil
420 }, hookContext.StopCh)
421
422
423
424
425 wait.PollImmediateUntil(1*time.Second, func() (bool, error) {
426 return s.GenericAPIServer.StorageVersionManager.Completed(), nil
427 }, hookContext.StopCh)
428 return nil
429 })
430 }
431
432 return s, nil
433 }
434
435
436
437 func (s *APIAggregator) PrepareRun() (preparedAPIAggregator, error) {
438
439 if s.openAPIConfig != nil {
440 s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapi-controller", func(context genericapiserver.PostStartHookContext) error {
441 go s.openAPIAggregationController.Run(context.StopCh)
442 return nil
443 })
444 }
445
446 if s.openAPIV3Config != nil {
447 s.GenericAPIServer.AddPostStartHookOrDie("apiservice-openapiv3-controller", func(context genericapiserver.PostStartHookContext) error {
448 go s.openAPIV3AggregationController.Run(context.StopCh)
449 return nil
450 })
451 }
452
453 prepared := s.GenericAPIServer.PrepareRun()
454
455
456 if s.openAPIConfig != nil {
457 specDownloader := openapiaggregator.NewDownloader()
458 openAPIAggregator, err := openapiaggregator.BuildAndRegisterAggregator(
459 &specDownloader,
460 s.GenericAPIServer.NextDelegate(),
461 s.GenericAPIServer.Handler.GoRestfulContainer.RegisteredWebServices(),
462 s.openAPIConfig,
463 s.GenericAPIServer.Handler.NonGoRestfulMux)
464 if err != nil {
465 return preparedAPIAggregator{}, err
466 }
467 s.openAPIAggregationController = openapicontroller.NewAggregationController(&specDownloader, openAPIAggregator)
468 }
469
470 if s.openAPIV3Config != nil {
471 specDownloaderV3 := openapiv3aggregator.NewDownloader()
472 openAPIV3Aggregator, err := openapiv3aggregator.BuildAndRegisterAggregator(
473 specDownloaderV3,
474 s.GenericAPIServer.NextDelegate(),
475 s.GenericAPIServer.Handler.GoRestfulContainer,
476 s.openAPIV3Config,
477 s.GenericAPIServer.Handler.NonGoRestfulMux)
478 if err != nil {
479 return preparedAPIAggregator{}, err
480 }
481 s.openAPIV3AggregationController = openapiv3controller.NewAggregationController(openAPIV3Aggregator)
482 }
483
484 return preparedAPIAggregator{APIAggregator: s, runnable: prepared}, nil
485 }
486
487 func (s preparedAPIAggregator) Run(stopCh <-chan struct{}) error {
488 return s.runnable.Run(stopCh)
489 }
490
491
492
493 func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
494
495
496 if proxyHandler, exists := s.proxyHandlers[apiService.Name]; exists {
497 proxyHandler.updateAPIService(apiService)
498 if s.openAPIAggregationController != nil {
499 s.openAPIAggregationController.UpdateAPIService(proxyHandler, apiService)
500 }
501 if s.openAPIV3AggregationController != nil {
502 s.openAPIV3AggregationController.UpdateAPIService(proxyHandler, apiService)
503 }
504
505 if s.discoveryAggregationController != nil {
506 handlerCopy := *proxyHandler
507 handlerCopy.setServiceAvailable()
508 s.discoveryAggregationController.AddAPIService(apiService, &handlerCopy)
509 }
510 return nil
511 }
512
513 proxyPath := "/apis/" + apiService.Spec.Group + "/" + apiService.Spec.Version
514
515 if apiService.Name == legacyAPIServiceName {
516 proxyPath = "/api"
517 }
518
519
520 proxyHandler := &proxyHandler{
521 localDelegate: s.delegateHandler,
522 proxyCurrentCertKeyContent: s.proxyCurrentCertKeyContent,
523 proxyTransportDial: s.proxyTransportDial,
524 serviceResolver: s.serviceResolver,
525 rejectForwardingRedirects: s.rejectForwardingRedirects,
526 }
527 proxyHandler.updateAPIService(apiService)
528 if s.openAPIAggregationController != nil {
529 s.openAPIAggregationController.AddAPIService(proxyHandler, apiService)
530 }
531 if s.openAPIV3AggregationController != nil {
532 s.openAPIV3AggregationController.AddAPIService(proxyHandler, apiService)
533 }
534 if s.discoveryAggregationController != nil {
535 s.discoveryAggregationController.AddAPIService(apiService, proxyHandler)
536 }
537
538 s.proxyHandlers[apiService.Name] = proxyHandler
539 s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(proxyPath, proxyHandler)
540 s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandlePrefix(proxyPath+"/", proxyHandler)
541
542
543 if apiService.Name == legacyAPIServiceName {
544 return nil
545 }
546
547
548 versions, exist := s.handledGroupVersions[apiService.Spec.Group]
549 if exist {
550 versions.Insert(apiService.Spec.Version)
551 return nil
552 }
553
554
555 groupPath := "/apis/" + apiService.Spec.Group
556 groupDiscoveryHandler := &apiGroupHandler{
557 codecs: aggregatorscheme.Codecs,
558 groupName: apiService.Spec.Group,
559 lister: s.lister,
560 delegate: s.delegateHandler,
561 }
562
563 s.GenericAPIServer.Handler.NonGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
564 s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle(groupPath+"/", groupDiscoveryHandler)
565 s.handledGroupVersions[apiService.Spec.Group] = sets.New[string](apiService.Spec.Version)
566 return nil
567 }
568
569
570
571 func (s *APIAggregator) RemoveAPIService(apiServiceName string) {
572
573 if s.discoveryAggregationController != nil {
574 s.discoveryAggregationController.RemoveAPIService(apiServiceName)
575 }
576
577 version := v1helper.APIServiceNameToGroupVersion(apiServiceName)
578
579 proxyPath := "/apis/" + version.Group + "/" + version.Version
580
581 if apiServiceName == legacyAPIServiceName {
582 proxyPath = "/api"
583 }
584 s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath)
585 s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(proxyPath + "/")
586 if s.openAPIAggregationController != nil {
587 s.openAPIAggregationController.RemoveAPIService(apiServiceName)
588 }
589 if s.openAPIV3AggregationController != nil {
590 s.openAPIV3AggregationController.RemoveAPIService(apiServiceName)
591 }
592 delete(s.proxyHandlers, apiServiceName)
593
594 versions, exist := s.handledGroupVersions[version.Group]
595 if !exist {
596 return
597 }
598 versions.Delete(version.Version)
599 if versions.Len() > 0 {
600 return
601 }
602 delete(s.handledGroupVersions, version.Group)
603 groupPath := "/apis/" + version.Group
604 s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(groupPath)
605 s.GenericAPIServer.Handler.NonGoRestfulMux.Unregister(groupPath + "/")
606 }
607
608
609 func DefaultAPIResourceConfigSource() *serverstorage.ResourceConfig {
610 ret := serverstorage.NewResourceConfig()
611
612 ret.EnableVersions(
613 v1.SchemeGroupVersion,
614 v1beta1.SchemeGroupVersion,
615 )
616
617 return ret
618 }
619
View as plain text