1
16
17
18
19
20 package app
21
22 import (
23 "context"
24 "fmt"
25 "k8s.io/apimachinery/pkg/runtime/schema"
26 "math/rand"
27 "net/http"
28 "os"
29 "sort"
30 "time"
31
32 "github.com/spf13/cobra"
33
34 "k8s.io/apimachinery/pkg/api/meta"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37 "k8s.io/apimachinery/pkg/util/sets"
38 "k8s.io/apimachinery/pkg/util/uuid"
39 "k8s.io/apimachinery/pkg/util/wait"
40 "k8s.io/apiserver/pkg/server/healthz"
41 "k8s.io/apiserver/pkg/server/mux"
42 utilfeature "k8s.io/apiserver/pkg/util/feature"
43 cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
44 "k8s.io/client-go/informers"
45 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
46 "k8s.io/client-go/metadata"
47 "k8s.io/client-go/metadata/metadatainformer"
48 restclient "k8s.io/client-go/rest"
49 "k8s.io/client-go/restmapper"
50 "k8s.io/client-go/tools/leaderelection"
51 "k8s.io/client-go/tools/leaderelection/resourcelock"
52 certutil "k8s.io/client-go/util/cert"
53 "k8s.io/client-go/util/keyutil"
54 cloudprovider "k8s.io/cloud-provider"
55 cliflag "k8s.io/component-base/cli/flag"
56 "k8s.io/component-base/cli/globalflag"
57 "k8s.io/component-base/configz"
58 "k8s.io/component-base/featuregate"
59 "k8s.io/component-base/logs"
60 logsapi "k8s.io/component-base/logs/api/v1"
61 metricsfeatures "k8s.io/component-base/metrics/features"
62 controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
63 "k8s.io/component-base/metrics/prometheus/slis"
64 "k8s.io/component-base/term"
65 "k8s.io/component-base/version"
66 "k8s.io/component-base/version/verflag"
67 genericcontrollermanager "k8s.io/controller-manager/app"
68 "k8s.io/controller-manager/controller"
69 "k8s.io/controller-manager/pkg/clientbuilder"
70 controllerhealthz "k8s.io/controller-manager/pkg/healthz"
71 "k8s.io/controller-manager/pkg/informerfactory"
72 "k8s.io/controller-manager/pkg/leadermigration"
73 "k8s.io/klog/v2"
74 "k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
75 "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
76 "k8s.io/kubernetes/cmd/kube-controller-manager/names"
77 kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config"
78 garbagecollector "k8s.io/kubernetes/pkg/controller/garbagecollector"
79 serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
80 "k8s.io/kubernetes/pkg/serviceaccount"
81 )
82
83 func init() {
84 utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
85 utilruntime.Must(metricsfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
86 }
87
88 const (
89
90 ControllerStartJitter = 1.0
91
92 ConfigzName = "kubecontrollermanager.config.k8s.io"
93 )
94
95
96 type ControllerLoopMode int
97
98 const (
99
100 IncludeCloudLoops ControllerLoopMode = iota
101
102 ExternalLoops
103 )
104
105
106 func NewControllerManagerCommand() *cobra.Command {
107 s, err := options.NewKubeControllerManagerOptions()
108 if err != nil {
109 klog.Background().Error(err, "Unable to initialize command options")
110 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
111 }
112
113 cmd := &cobra.Command{
114 Use: "kube-controller-manager",
115 Long: `The Kubernetes controller manager is a daemon that embeds
116 the core control loops shipped with Kubernetes. In applications of robotics and
117 automation, a control loop is a non-terminating loop that regulates the state of
118 the system. In Kubernetes, a controller is a control loop that watches the shared
119 state of the cluster through the apiserver and makes changes attempting to move the
120 current state towards the desired state. Examples of controllers that ship with
121 Kubernetes today are the replication controller, endpoints controller, namespace
122 controller, and serviceaccounts controller.`,
123 PersistentPreRunE: func(*cobra.Command, []string) error {
124
125
126
127 restclient.SetDefaultWarningHandler(restclient.NoWarnings{})
128 return nil
129 },
130 RunE: func(cmd *cobra.Command, args []string) error {
131 verflag.PrintAndExitIfRequested()
132
133
134
135 if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
136 return err
137 }
138 cliflag.PrintFlags(cmd.Flags())
139
140 c, err := s.Config(KnownControllers(), ControllersDisabledByDefault(), ControllerAliases())
141 if err != nil {
142 return err
143 }
144
145 utilfeature.DefaultMutableFeatureGate.AddMetrics()
146 return Run(context.Background(), c.Complete())
147 },
148 Args: func(cmd *cobra.Command, args []string) error {
149 for _, arg := range args {
150 if len(arg) > 0 {
151 return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
152 }
153 }
154 return nil
155 },
156 }
157
158 fs := cmd.Flags()
159 namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault(), ControllerAliases())
160 verflag.AddFlags(namedFlagSets.FlagSet("global"))
161 globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
162 registerLegacyGlobalFlags(namedFlagSets)
163 for _, f := range namedFlagSets.FlagSets {
164 fs.AddFlagSet(f)
165 }
166
167 cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
168 cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
169
170 return cmd
171 }
172
173
174
175
176 func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
177 return func() time.Duration {
178 factor := rand.Float64() + 1
179 return time.Duration(float64(c.ComponentConfig.Generic.MinResyncPeriod.Nanoseconds()) * factor)
180 }
181 }
182
183
184 func Run(ctx context.Context, c *config.CompletedConfig) error {
185 logger := klog.FromContext(ctx)
186 stopCh := ctx.Done()
187
188
189 logger.Info("Starting", "version", version.Get())
190
191 logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
192
193
194 c.EventBroadcaster.StartStructuredLogging(0)
195 c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")})
196 defer c.EventBroadcaster.Shutdown()
197
198 if cfgz, err := configz.New(ConfigzName); err == nil {
199 cfgz.Set(c.ComponentConfig)
200 } else {
201 logger.Error(err, "Unable to register configz")
202 }
203
204
205 var checks []healthz.HealthChecker
206 var electionChecker *leaderelection.HealthzAdaptor
207 if c.ComponentConfig.Generic.LeaderElection.LeaderElect {
208 electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
209 checks = append(checks, electionChecker)
210 }
211 healthzHandler := controllerhealthz.NewMutableHealthzHandler(checks...)
212
213
214
215 var unsecuredMux *mux.PathRecorderMux
216 if c.SecureServing != nil {
217 unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler)
218 slis.SLIMetricsWithReset{}.Install(unsecuredMux)
219
220 handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
221
222 if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
223 return err
224 }
225 }
226
227 clientBuilder, rootClientBuilder := createClientBuilders(logger, c)
228
229 saTokenControllerDescriptor := newServiceAccountTokenControllerDescriptor(rootClientBuilder)
230
231 run := func(ctx context.Context, controllerDescriptors map[string]*ControllerDescriptor) {
232 controllerContext, err := CreateControllerContext(ctx, c, rootClientBuilder, clientBuilder)
233 if err != nil {
234 logger.Error(err, "Error building controller context")
235 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
236 }
237
238 if err := StartControllers(ctx, controllerContext, controllerDescriptors, unsecuredMux, healthzHandler); err != nil {
239 logger.Error(err, "Error starting controllers")
240 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
241 }
242
243 controllerContext.InformerFactory.Start(stopCh)
244 controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
245 close(controllerContext.InformersStarted)
246
247 <-ctx.Done()
248 }
249
250
251 if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
252 controllerDescriptors := NewControllerDescriptors()
253 controllerDescriptors[names.ServiceAccountTokenController] = saTokenControllerDescriptor
254 run(ctx, controllerDescriptors)
255 return nil
256 }
257
258 id, err := os.Hostname()
259 if err != nil {
260 return err
261 }
262
263
264 id = id + "_" + string(uuid.NewUUID())
265
266
267 var leaderMigrator *leadermigration.LeaderMigrator = nil
268
269
270 if leadermigration.Enabled(&c.ComponentConfig.Generic) {
271 logger.Info("starting leader migration")
272
273 leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
274 "kube-controller-manager")
275
276
277 startSATokenControllerInit := saTokenControllerDescriptor.GetInitFunc()
278
279
280
281 saTokenControllerDescriptor.initFunc = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
282 defer close(leaderMigrator.MigrationReady)
283 return startSATokenControllerInit(ctx, controllerContext, controllerName)
284 }
285 }
286
287
288 go leaderElectAndRun(ctx, c, id, electionChecker,
289 c.ComponentConfig.Generic.LeaderElection.ResourceLock,
290 c.ComponentConfig.Generic.LeaderElection.ResourceName,
291 leaderelection.LeaderCallbacks{
292 OnStartedLeading: func(ctx context.Context) {
293 controllerDescriptors := NewControllerDescriptors()
294 if leaderMigrator != nil {
295
296
297 controllerDescriptors = filteredControllerDescriptors(controllerDescriptors, leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
298 logger.Info("leader migration: starting main controllers.")
299 }
300 controllerDescriptors[names.ServiceAccountTokenController] = saTokenControllerDescriptor
301 run(ctx, controllerDescriptors)
302 },
303 OnStoppedLeading: func() {
304 logger.Error(nil, "leaderelection lost")
305 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
306 },
307 })
308
309
310 if leaderMigrator != nil {
311
312
313
314
315 <-leaderMigrator.MigrationReady
316
317
318 go leaderElectAndRun(ctx, c, id, electionChecker,
319 c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
320 c.ComponentConfig.Generic.LeaderMigration.LeaderName,
321 leaderelection.LeaderCallbacks{
322 OnStartedLeading: func(ctx context.Context) {
323 logger.Info("leader migration: starting migrated controllers.")
324 controllerDescriptors := NewControllerDescriptors()
325 controllerDescriptors = filteredControllerDescriptors(controllerDescriptors, leaderMigrator.FilterFunc, leadermigration.ControllerMigrated)
326
327 delete(controllerDescriptors, names.ServiceAccountTokenController)
328 run(ctx, controllerDescriptors)
329 },
330 OnStoppedLeading: func() {
331 logger.Error(nil, "migration leaderelection lost")
332 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
333 },
334 })
335 }
336
337 <-stopCh
338 return nil
339 }
340
341
342 type ControllerContext struct {
343
344 ClientBuilder clientbuilder.ControllerClientBuilder
345
346
347 InformerFactory informers.SharedInformerFactory
348
349
350
351
352
353 ObjectOrMetadataInformerFactory informerfactory.InformerFactory
354
355
356 ComponentConfig kubectrlmgrconfig.KubeControllerManagerConfiguration
357
358
359
360
361 RESTMapper *restmapper.DeferredDiscoveryRESTMapper
362
363
364
365 Cloud cloudprovider.Interface
366
367
368
369
370 LoopMode ControllerLoopMode
371
372
373
374 InformersStarted chan struct{}
375
376
377
378
379 ResyncPeriod func() time.Duration
380
381
382 ControllerManagerMetrics *controllersmetrics.ControllerManagerMetrics
383
384
385 GraphBuilder *garbagecollector.GraphBuilder
386 }
387
388
389 func (c ControllerContext) IsControllerEnabled(controllerDescriptor *ControllerDescriptor) bool {
390 controllersDisabledByDefault := sets.NewString()
391 if controllerDescriptor.IsDisabledByDefault() {
392 controllersDisabledByDefault.Insert(controllerDescriptor.Name())
393 }
394 return genericcontrollermanager.IsControllerEnabled(controllerDescriptor.Name(), controllersDisabledByDefault, c.ComponentConfig.Generic.Controllers)
395 }
396
397
398
399
400
401
402
403
404 type InitFunc func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller controller.Interface, enabled bool, err error)
405
406 type ControllerDescriptor struct {
407 name string
408 initFunc InitFunc
409 requiredFeatureGates []featuregate.Feature
410 aliases []string
411 isDisabledByDefault bool
412 isCloudProviderController bool
413 requiresSpecialHandling bool
414 }
415
416 func (r *ControllerDescriptor) Name() string {
417 return r.name
418 }
419
420 func (r *ControllerDescriptor) GetInitFunc() InitFunc {
421 return r.initFunc
422 }
423
424 func (r *ControllerDescriptor) GetRequiredFeatureGates() []featuregate.Feature {
425 return append([]featuregate.Feature(nil), r.requiredFeatureGates...)
426 }
427
428
429
430 func (r *ControllerDescriptor) GetAliases() []string {
431 return append([]string(nil), r.aliases...)
432 }
433
434 func (r *ControllerDescriptor) IsDisabledByDefault() bool {
435 return r.isDisabledByDefault
436 }
437
438 func (r *ControllerDescriptor) IsCloudProviderController() bool {
439 return r.isCloudProviderController
440 }
441
442
443 func (r *ControllerDescriptor) RequiresSpecialHandling() bool {
444 return r.requiresSpecialHandling
445 }
446
447
448 func KnownControllers() []string {
449 return sets.StringKeySet(NewControllerDescriptors()).List()
450 }
451
452
453 func ControllerAliases() map[string]string {
454 aliases := map[string]string{}
455 for name, c := range NewControllerDescriptors() {
456 for _, alias := range c.GetAliases() {
457 aliases[alias] = name
458 }
459 }
460 return aliases
461 }
462
463 func ControllersDisabledByDefault() []string {
464 var controllersDisabledByDefault []string
465
466 for name, c := range NewControllerDescriptors() {
467 if c.IsDisabledByDefault() {
468 controllersDisabledByDefault = append(controllersDisabledByDefault, name)
469 }
470 }
471
472 sort.Strings(controllersDisabledByDefault)
473
474 return controllersDisabledByDefault
475 }
476
477
478
479
480 func NewControllerDescriptors() map[string]*ControllerDescriptor {
481 controllers := map[string]*ControllerDescriptor{}
482 aliases := sets.NewString()
483
484
485 register := func(controllerDesc *ControllerDescriptor) {
486 if controllerDesc == nil {
487 panic("received nil controller for a registration")
488 }
489 name := controllerDesc.Name()
490 if len(name) == 0 {
491 panic("received controller without a name for a registration")
492 }
493 if _, found := controllers[name]; found {
494 panic(fmt.Sprintf("controller name %q was registered twice", name))
495 }
496 if controllerDesc.GetInitFunc() == nil {
497 panic(fmt.Sprintf("controller %q does not have an init function", name))
498 }
499
500 for _, alias := range controllerDesc.GetAliases() {
501 if aliases.Has(alias) {
502 panic(fmt.Sprintf("controller %q has a duplicate alias %q", name, alias))
503 }
504 aliases.Insert(alias)
505 }
506
507 controllers[name] = controllerDesc
508 }
509
510
511
512
513
514
515
516 register(newServiceAccountTokenControllerDescriptor(nil))
517
518 register(newEndpointsControllerDescriptor())
519 register(newEndpointSliceControllerDescriptor())
520 register(newEndpointSliceMirroringControllerDescriptor())
521 register(newReplicationControllerDescriptor())
522 register(newPodGarbageCollectorControllerDescriptor())
523 register(newResourceQuotaControllerDescriptor())
524 register(newNamespaceControllerDescriptor())
525 register(newServiceAccountControllerDescriptor())
526 register(newGarbageCollectorControllerDescriptor())
527 register(newDaemonSetControllerDescriptor())
528 register(newJobControllerDescriptor())
529 register(newDeploymentControllerDescriptor())
530 register(newReplicaSetControllerDescriptor())
531 register(newHorizontalPodAutoscalerControllerDescriptor())
532 register(newDisruptionControllerDescriptor())
533 register(newStatefulSetControllerDescriptor())
534 register(newCronJobControllerDescriptor())
535 register(newCertificateSigningRequestSigningControllerDescriptor())
536 register(newCertificateSigningRequestApprovingControllerDescriptor())
537 register(newCertificateSigningRequestCleanerControllerDescriptor())
538 register(newTTLControllerDescriptor())
539 register(newBootstrapSignerControllerDescriptor())
540 register(newTokenCleanerControllerDescriptor())
541 register(newNodeIpamControllerDescriptor())
542 register(newNodeLifecycleControllerDescriptor())
543
544 register(newServiceLBControllerDescriptor())
545 register(newNodeRouteControllerDescriptor())
546 register(newCloudNodeLifecycleControllerDescriptor())
547
548
549 register(newPersistentVolumeBinderControllerDescriptor())
550 register(newPersistentVolumeAttachDetachControllerDescriptor())
551 register(newPersistentVolumeExpanderControllerDescriptor())
552 register(newClusterRoleAggregrationControllerDescriptor())
553 register(newPersistentVolumeClaimProtectionControllerDescriptor())
554 register(newPersistentVolumeProtectionControllerDescriptor())
555 register(newTTLAfterFinishedControllerDescriptor())
556 register(newRootCACertificatePublisherControllerDescriptor())
557 register(newEphemeralVolumeControllerDescriptor())
558
559
560 register(newStorageVersionGarbageCollectorControllerDescriptor())
561 register(newResourceClaimControllerDescriptor())
562 register(newLegacyServiceAccountTokenCleanerControllerDescriptor())
563 register(newValidatingAdmissionPolicyStatusControllerDescriptor())
564 register(newTaintEvictionControllerDescriptor())
565 register(newServiceCIDRsControllerDescriptor())
566 register(newStorageVersionMigratorControllerDescriptor())
567
568 for _, alias := range aliases.UnsortedList() {
569 if _, ok := controllers[alias]; ok {
570 panic(fmt.Sprintf("alias %q conflicts with a controller name", alias))
571 }
572 }
573
574 return controllers
575 }
576
577
578
579
580 func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder) (ControllerContext, error) {
581
582 trim := func(obj interface{}) (interface{}, error) {
583 if accessor, err := meta.Accessor(obj); err == nil {
584 if accessor.GetManagedFields() != nil {
585 accessor.SetManagedFields(nil)
586 }
587 }
588 return obj, nil
589 }
590
591 versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
592 sharedInformers := informers.NewSharedInformerFactoryWithOptions(versionedClient, ResyncPeriod(s)(), informers.WithTransform(trim))
593
594 metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers"))
595 metadataInformers := metadatainformer.NewSharedInformerFactoryWithOptions(metadataClient, ResyncPeriod(s)(), metadatainformer.WithTransform(trim))
596
597
598
599 if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
600 return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
601 }
602
603
604 discoveryClient := rootClientBuilder.DiscoveryClientOrDie("controller-discovery")
605 cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
606 restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
607 go wait.Until(func() {
608 restMapper.Reset()
609 }, 30*time.Second, ctx.Done())
610
611 cloud, loopMode, err := createCloudProvider(klog.FromContext(ctx), s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
612 s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
613 if err != nil {
614 return ControllerContext{}, err
615 }
616
617 controllerContext := ControllerContext{
618 ClientBuilder: clientBuilder,
619 InformerFactory: sharedInformers,
620 ObjectOrMetadataInformerFactory: informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
621 ComponentConfig: s.ComponentConfig,
622 RESTMapper: restMapper,
623 Cloud: cloud,
624 LoopMode: loopMode,
625 InformersStarted: make(chan struct{}),
626 ResyncPeriod: ResyncPeriod(s),
627 ControllerManagerMetrics: controllersmetrics.NewControllerManagerMetrics("kube-controller-manager"),
628 }
629
630 if controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector &&
631 controllerContext.IsControllerEnabled(NewControllerDescriptors()[names.GarbageCollectorController]) {
632 ignoredResources := make(map[schema.GroupResource]struct{})
633 for _, r := range controllerContext.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
634 ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
635 }
636
637 controllerContext.GraphBuilder = garbagecollector.NewDependencyGraphBuilder(
638 ctx,
639 metadataClient,
640 controllerContext.RESTMapper,
641 ignoredResources,
642 controllerContext.ObjectOrMetadataInformerFactory,
643 controllerContext.InformersStarted,
644 )
645 }
646
647 controllersmetrics.Register()
648 return controllerContext, nil
649 }
650
651
652 func StartControllers(ctx context.Context, controllerCtx ControllerContext, controllerDescriptors map[string]*ControllerDescriptor,
653 unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
654 var controllerChecks []healthz.HealthChecker
655
656
657
658 if serviceAccountTokenControllerDescriptor, ok := controllerDescriptors[names.ServiceAccountTokenController]; ok {
659 check, err := StartController(ctx, controllerCtx, serviceAccountTokenControllerDescriptor, unsecuredMux)
660 if err != nil {
661 return err
662 }
663 if check != nil {
664
665 controllerChecks = append(controllerChecks, check)
666 }
667 }
668
669
670
671 if controllerCtx.Cloud != nil {
672 controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done())
673 }
674
675
676
677
678
679
680
681
682
683
684 for _, controllerDesc := range controllerDescriptors {
685 if controllerDesc.RequiresSpecialHandling() {
686 continue
687 }
688
689 check, err := StartController(ctx, controllerCtx, controllerDesc, unsecuredMux)
690 if err != nil {
691 return err
692 }
693 if check != nil {
694
695 controllerChecks = append(controllerChecks, check)
696 }
697 }
698
699 healthzHandler.AddHealthChecker(controllerChecks...)
700
701 return nil
702 }
703
704
705
706 func StartController(ctx context.Context, controllerCtx ControllerContext, controllerDescriptor *ControllerDescriptor,
707 unsecuredMux *mux.PathRecorderMux) (healthz.HealthChecker, error) {
708 logger := klog.FromContext(ctx)
709 controllerName := controllerDescriptor.Name()
710
711 for _, featureGate := range controllerDescriptor.GetRequiredFeatureGates() {
712 if !utilfeature.DefaultFeatureGate.Enabled(featureGate) {
713 logger.Info("Controller is disabled by a feature gate", "controller", controllerName, "requiredFeatureGates", controllerDescriptor.GetRequiredFeatureGates())
714 return nil, nil
715 }
716 }
717
718 if controllerDescriptor.IsCloudProviderController() && controllerCtx.LoopMode != IncludeCloudLoops {
719 logger.Info("Skipping a cloud provider controller", "controller", controllerName, "loopMode", controllerCtx.LoopMode)
720 return nil, nil
721 }
722
723 if !controllerCtx.IsControllerEnabled(controllerDescriptor) {
724 logger.Info("Warning: controller is disabled", "controller", controllerName)
725 return nil, nil
726 }
727
728 time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
729
730 logger.V(1).Info("Starting controller", "controller", controllerName)
731
732 initFunc := controllerDescriptor.GetInitFunc()
733 ctrl, started, err := initFunc(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx, controllerName)
734 if err != nil {
735 logger.Error(err, "Error starting controller", "controller", controllerName)
736 return nil, err
737 }
738 if !started {
739 logger.Info("Warning: skipping controller", "controller", controllerName)
740 return nil, nil
741 }
742
743 check := controllerhealthz.NamedPingChecker(controllerName)
744 if ctrl != nil {
745
746
747 if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil {
748 if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil {
749 basePath := "/debug/controllers/" + controllerName
750 unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
751 unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
752 }
753 }
754 if healthCheckable, ok := ctrl.(controller.HealthCheckable); ok {
755 if realCheck := healthCheckable.HealthChecker(); realCheck != nil {
756 check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
757 }
758 }
759 }
760
761 logger.Info("Started controller", "controller", controllerName)
762 return check, nil
763 }
764
765
766
767 func newServiceAccountTokenControllerDescriptor(rootClientBuilder clientbuilder.ControllerClientBuilder) *ControllerDescriptor {
768 return &ControllerDescriptor{
769 name: names.ServiceAccountTokenController,
770 aliases: []string{"serviceaccount-token"},
771 initFunc: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
772 return startServiceAccountTokenController(ctx, controllerContext, controllerName, rootClientBuilder)
773 },
774
775 requiresSpecialHandling: true,
776 }
777 }
778
779 func startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext, controllerName string, rootClientBuilder clientbuilder.ControllerClientBuilder) (controller.Interface, bool, error) {
780 logger := klog.FromContext(ctx)
781 if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
782 logger.Info("Controller is disabled because there is no private key", "controller", controllerName)
783 return nil, false, nil
784 }
785 privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile)
786 if err != nil {
787 return nil, true, fmt.Errorf("error reading key for service account token controller: %v", err)
788 }
789
790 var rootCA []byte
791 if controllerContext.ComponentConfig.SAController.RootCAFile != "" {
792 if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil {
793 return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", controllerContext.ComponentConfig.SAController.RootCAFile, err)
794 }
795 } else {
796 rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
797 }
798
799 tokenGenerator, err := serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey)
800 if err != nil {
801 return nil, false, fmt.Errorf("failed to build token generator: %v", err)
802 }
803 tokenController, err := serviceaccountcontroller.NewTokensController(
804 controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
805 controllerContext.InformerFactory.Core().V1().Secrets(),
806 rootClientBuilder.ClientOrDie("tokens-controller"),
807 serviceaccountcontroller.TokensControllerOptions{
808 TokenGenerator: tokenGenerator,
809 RootCA: rootCA,
810 },
811 )
812 if err != nil {
813 return nil, true, fmt.Errorf("error creating Tokens controller: %v", err)
814 }
815 go tokenController.Run(ctx, int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs))
816
817
818 controllerContext.InformerFactory.Start(ctx.Done())
819
820 return nil, true, nil
821 }
822
823 func readCA(file string) ([]byte, error) {
824 rootCA, err := os.ReadFile(file)
825 if err != nil {
826 return nil, err
827 }
828 if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
829 return nil, err
830 }
831
832 return rootCA, err
833 }
834
835
836 func createClientBuilders(logger klog.Logger, c *config.CompletedConfig) (clientBuilder clientbuilder.ControllerClientBuilder, rootClientBuilder clientbuilder.ControllerClientBuilder) {
837 rootClientBuilder = clientbuilder.SimpleControllerClientBuilder{
838 ClientConfig: c.Kubeconfig,
839 }
840 if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
841 if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
842
843
844 logger.Info("Warning: --use-service-account-credentials was specified without providing a --service-account-private-key-file")
845 }
846
847 clientBuilder = clientbuilder.NewDynamicClientBuilder(
848 restclient.AnonymousClientConfig(c.Kubeconfig),
849 c.Client.CoreV1(),
850 metav1.NamespaceSystem)
851 } else {
852 clientBuilder = rootClientBuilder
853 }
854 return
855 }
856
857
858
859 func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
860 logger := klog.FromContext(ctx)
861 rl, err := resourcelock.NewFromKubeconfig(resourceLock,
862 c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
863 leaseName,
864 resourcelock.ResourceLockConfig{
865 Identity: lockIdentity,
866 EventRecorder: c.EventRecorder,
867 },
868 c.Kubeconfig,
869 c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
870 if err != nil {
871 logger.Error(err, "Error creating lock")
872 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
873 }
874
875 leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
876 Lock: rl,
877 LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
878 RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
879 RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
880 Callbacks: callbacks,
881 WatchDog: electionChecker,
882 Name: leaseName,
883 })
884
885 panic("unreachable")
886 }
887
888
889 func filteredControllerDescriptors(controllerDescriptors map[string]*ControllerDescriptor, filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) map[string]*ControllerDescriptor {
890 resultControllers := make(map[string]*ControllerDescriptor)
891 for name, controllerDesc := range controllerDescriptors {
892 if filterFunc(name) == expected {
893 resultControllers[name] = controllerDesc
894 }
895 }
896 return resultControllers
897 }
898
View as plain text