...

Source file src/k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go

Documentation: k8s.io/kubernetes/cmd/kube-controller-manager/app

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package app implements a server that runs a set of active
    18  // components.  This includes replication controllers, service endpoints and
    19  // nodes.
    20  package app
    21  
    22  import (
    23  	"context"
    24  	"fmt"
    25  	"k8s.io/apimachinery/pkg/runtime/schema"
    26  	"math/rand"
    27  	"net/http"
    28  	"os"
    29  	"sort"
    30  	"time"
    31  
    32  	"github.com/spf13/cobra"
    33  
    34  	"k8s.io/apimachinery/pkg/api/meta"
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    37  	"k8s.io/apimachinery/pkg/util/sets"
    38  	"k8s.io/apimachinery/pkg/util/uuid"
    39  	"k8s.io/apimachinery/pkg/util/wait"
    40  	"k8s.io/apiserver/pkg/server/healthz"
    41  	"k8s.io/apiserver/pkg/server/mux"
    42  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    43  	cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
    44  	"k8s.io/client-go/informers"
    45  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    46  	"k8s.io/client-go/metadata"
    47  	"k8s.io/client-go/metadata/metadatainformer"
    48  	restclient "k8s.io/client-go/rest"
    49  	"k8s.io/client-go/restmapper"
    50  	"k8s.io/client-go/tools/leaderelection"
    51  	"k8s.io/client-go/tools/leaderelection/resourcelock"
    52  	certutil "k8s.io/client-go/util/cert"
    53  	"k8s.io/client-go/util/keyutil"
    54  	cloudprovider "k8s.io/cloud-provider"
    55  	cliflag "k8s.io/component-base/cli/flag"
    56  	"k8s.io/component-base/cli/globalflag"
    57  	"k8s.io/component-base/configz"
    58  	"k8s.io/component-base/featuregate"
    59  	"k8s.io/component-base/logs"
    60  	logsapi "k8s.io/component-base/logs/api/v1"
    61  	metricsfeatures "k8s.io/component-base/metrics/features"
    62  	controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
    63  	"k8s.io/component-base/metrics/prometheus/slis"
    64  	"k8s.io/component-base/term"
    65  	"k8s.io/component-base/version"
    66  	"k8s.io/component-base/version/verflag"
    67  	genericcontrollermanager "k8s.io/controller-manager/app"
    68  	"k8s.io/controller-manager/controller"
    69  	"k8s.io/controller-manager/pkg/clientbuilder"
    70  	controllerhealthz "k8s.io/controller-manager/pkg/healthz"
    71  	"k8s.io/controller-manager/pkg/informerfactory"
    72  	"k8s.io/controller-manager/pkg/leadermigration"
    73  	"k8s.io/klog/v2"
    74  	"k8s.io/kubernetes/cmd/kube-controller-manager/app/config"
    75  	"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
    76  	"k8s.io/kubernetes/cmd/kube-controller-manager/names"
    77  	kubectrlmgrconfig "k8s.io/kubernetes/pkg/controller/apis/config"
    78  	garbagecollector "k8s.io/kubernetes/pkg/controller/garbagecollector"
    79  	serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
    80  	"k8s.io/kubernetes/pkg/serviceaccount"
    81  )
    82  
    83  func init() {
    84  	utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    85  	utilruntime.Must(metricsfeatures.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    86  }
    87  
    88  const (
    89  	// ControllerStartJitter is the Jitter used when starting controller managers
    90  	ControllerStartJitter = 1.0
    91  	// ConfigzName is the name used for register kube-controller manager /configz, same with GroupName.
    92  	ConfigzName = "kubecontrollermanager.config.k8s.io"
    93  )
    94  
    95  // ControllerLoopMode is the kube-controller-manager's mode of running controller loops that are cloud provider dependent
    96  type ControllerLoopMode int
    97  
    98  const (
    99  	// IncludeCloudLoops means the kube-controller-manager include the controller loops that are cloud provider dependent
   100  	IncludeCloudLoops ControllerLoopMode = iota
   101  	// ExternalLoops means the kube-controller-manager exclude the controller loops that are cloud provider dependent
   102  	ExternalLoops
   103  )
   104  
   105  // NewControllerManagerCommand creates a *cobra.Command object with default parameters
   106  func NewControllerManagerCommand() *cobra.Command {
   107  	s, err := options.NewKubeControllerManagerOptions()
   108  	if err != nil {
   109  		klog.Background().Error(err, "Unable to initialize command options")
   110  		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   111  	}
   112  
   113  	cmd := &cobra.Command{
   114  		Use: "kube-controller-manager",
   115  		Long: `The Kubernetes controller manager is a daemon that embeds
   116  the core control loops shipped with Kubernetes. In applications of robotics and
   117  automation, a control loop is a non-terminating loop that regulates the state of
   118  the system. In Kubernetes, a controller is a control loop that watches the shared
   119  state of the cluster through the apiserver and makes changes attempting to move the
   120  current state towards the desired state. Examples of controllers that ship with
   121  Kubernetes today are the replication controller, endpoints controller, namespace
   122  controller, and serviceaccounts controller.`,
   123  		PersistentPreRunE: func(*cobra.Command, []string) error {
   124  			// silence client-go warnings.
   125  			// kube-controller-manager generically watches APIs (including deprecated ones),
   126  			// and CI ensures it works properly against matching kube-apiserver versions.
   127  			restclient.SetDefaultWarningHandler(restclient.NoWarnings{})
   128  			return nil
   129  		},
   130  		RunE: func(cmd *cobra.Command, args []string) error {
   131  			verflag.PrintAndExitIfRequested()
   132  
   133  			// Activate logging as soon as possible, after that
   134  			// show flags with the final logging configuration.
   135  			if err := logsapi.ValidateAndApply(s.Logs, utilfeature.DefaultFeatureGate); err != nil {
   136  				return err
   137  			}
   138  			cliflag.PrintFlags(cmd.Flags())
   139  
   140  			c, err := s.Config(KnownControllers(), ControllersDisabledByDefault(), ControllerAliases())
   141  			if err != nil {
   142  				return err
   143  			}
   144  			// add feature enablement metrics
   145  			utilfeature.DefaultMutableFeatureGate.AddMetrics()
   146  			return Run(context.Background(), c.Complete())
   147  		},
   148  		Args: func(cmd *cobra.Command, args []string) error {
   149  			for _, arg := range args {
   150  				if len(arg) > 0 {
   151  					return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
   152  				}
   153  			}
   154  			return nil
   155  		},
   156  	}
   157  
   158  	fs := cmd.Flags()
   159  	namedFlagSets := s.Flags(KnownControllers(), ControllersDisabledByDefault(), ControllerAliases())
   160  	verflag.AddFlags(namedFlagSets.FlagSet("global"))
   161  	globalflag.AddGlobalFlags(namedFlagSets.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
   162  	registerLegacyGlobalFlags(namedFlagSets)
   163  	for _, f := range namedFlagSets.FlagSets {
   164  		fs.AddFlagSet(f)
   165  	}
   166  
   167  	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
   168  	cliflag.SetUsageAndHelpFunc(cmd, namedFlagSets, cols)
   169  
   170  	return cmd
   171  }
   172  
   173  // ResyncPeriod returns a function which generates a duration each time it is
   174  // invoked; this is so that multiple controllers don't get into lock-step and all
   175  // hammer the apiserver with list requests simultaneously.
   176  func ResyncPeriod(c *config.CompletedConfig) func() time.Duration {
   177  	return func() time.Duration {
   178  		factor := rand.Float64() + 1
   179  		return time.Duration(float64(c.ComponentConfig.Generic.MinResyncPeriod.Nanoseconds()) * factor)
   180  	}
   181  }
   182  
   183  // Run runs the KubeControllerManagerOptions.
   184  func Run(ctx context.Context, c *config.CompletedConfig) error {
   185  	logger := klog.FromContext(ctx)
   186  	stopCh := ctx.Done()
   187  
   188  	// To help debugging, immediately log version
   189  	logger.Info("Starting", "version", version.Get())
   190  
   191  	logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
   192  
   193  	// Start events processing pipeline.
   194  	c.EventBroadcaster.StartStructuredLogging(0)
   195  	c.EventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.Client.CoreV1().Events("")})
   196  	defer c.EventBroadcaster.Shutdown()
   197  
   198  	if cfgz, err := configz.New(ConfigzName); err == nil {
   199  		cfgz.Set(c.ComponentConfig)
   200  	} else {
   201  		logger.Error(err, "Unable to register configz")
   202  	}
   203  
   204  	// Setup any healthz checks we will want to use.
   205  	var checks []healthz.HealthChecker
   206  	var electionChecker *leaderelection.HealthzAdaptor
   207  	if c.ComponentConfig.Generic.LeaderElection.LeaderElect {
   208  		electionChecker = leaderelection.NewLeaderHealthzAdaptor(time.Second * 20)
   209  		checks = append(checks, electionChecker)
   210  	}
   211  	healthzHandler := controllerhealthz.NewMutableHealthzHandler(checks...)
   212  
   213  	// Start the controller manager HTTP server
   214  	// unsecuredMux is the handler for these controller *after* authn/authz filters have been applied
   215  	var unsecuredMux *mux.PathRecorderMux
   216  	if c.SecureServing != nil {
   217  		unsecuredMux = genericcontrollermanager.NewBaseHandler(&c.ComponentConfig.Generic.Debugging, healthzHandler)
   218  		slis.SLIMetricsWithReset{}.Install(unsecuredMux)
   219  
   220  		handler := genericcontrollermanager.BuildHandlerChain(unsecuredMux, &c.Authorization, &c.Authentication)
   221  		// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
   222  		if _, _, err := c.SecureServing.Serve(handler, 0, stopCh); err != nil {
   223  			return err
   224  		}
   225  	}
   226  
   227  	clientBuilder, rootClientBuilder := createClientBuilders(logger, c)
   228  
   229  	saTokenControllerDescriptor := newServiceAccountTokenControllerDescriptor(rootClientBuilder)
   230  
   231  	run := func(ctx context.Context, controllerDescriptors map[string]*ControllerDescriptor) {
   232  		controllerContext, err := CreateControllerContext(ctx, c, rootClientBuilder, clientBuilder)
   233  		if err != nil {
   234  			logger.Error(err, "Error building controller context")
   235  			klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   236  		}
   237  
   238  		if err := StartControllers(ctx, controllerContext, controllerDescriptors, unsecuredMux, healthzHandler); err != nil {
   239  			logger.Error(err, "Error starting controllers")
   240  			klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   241  		}
   242  
   243  		controllerContext.InformerFactory.Start(stopCh)
   244  		controllerContext.ObjectOrMetadataInformerFactory.Start(stopCh)
   245  		close(controllerContext.InformersStarted)
   246  
   247  		<-ctx.Done()
   248  	}
   249  
   250  	// No leader election, run directly
   251  	if !c.ComponentConfig.Generic.LeaderElection.LeaderElect {
   252  		controllerDescriptors := NewControllerDescriptors()
   253  		controllerDescriptors[names.ServiceAccountTokenController] = saTokenControllerDescriptor
   254  		run(ctx, controllerDescriptors)
   255  		return nil
   256  	}
   257  
   258  	id, err := os.Hostname()
   259  	if err != nil {
   260  		return err
   261  	}
   262  
   263  	// add a uniquifier so that two processes on the same host don't accidentally both become active
   264  	id = id + "_" + string(uuid.NewUUID())
   265  
   266  	// leaderMigrator will be non-nil if and only if Leader Migration is enabled.
   267  	var leaderMigrator *leadermigration.LeaderMigrator = nil
   268  
   269  	// If leader migration is enabled, create the LeaderMigrator and prepare for migration
   270  	if leadermigration.Enabled(&c.ComponentConfig.Generic) {
   271  		logger.Info("starting leader migration")
   272  
   273  		leaderMigrator = leadermigration.NewLeaderMigrator(&c.ComponentConfig.Generic.LeaderMigration,
   274  			"kube-controller-manager")
   275  
   276  		// startSATokenControllerInit is the original InitFunc.
   277  		startSATokenControllerInit := saTokenControllerDescriptor.GetInitFunc()
   278  
   279  		// Wrap saTokenControllerDescriptor to signal readiness for migration after starting
   280  		//  the controller.
   281  		saTokenControllerDescriptor.initFunc = func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
   282  			defer close(leaderMigrator.MigrationReady)
   283  			return startSATokenControllerInit(ctx, controllerContext, controllerName)
   284  		}
   285  	}
   286  
   287  	// Start the main lock
   288  	go leaderElectAndRun(ctx, c, id, electionChecker,
   289  		c.ComponentConfig.Generic.LeaderElection.ResourceLock,
   290  		c.ComponentConfig.Generic.LeaderElection.ResourceName,
   291  		leaderelection.LeaderCallbacks{
   292  			OnStartedLeading: func(ctx context.Context) {
   293  				controllerDescriptors := NewControllerDescriptors()
   294  				if leaderMigrator != nil {
   295  					// If leader migration is enabled, we should start only non-migrated controllers
   296  					//  for the main lock.
   297  					controllerDescriptors = filteredControllerDescriptors(controllerDescriptors, leaderMigrator.FilterFunc, leadermigration.ControllerNonMigrated)
   298  					logger.Info("leader migration: starting main controllers.")
   299  				}
   300  				controllerDescriptors[names.ServiceAccountTokenController] = saTokenControllerDescriptor
   301  				run(ctx, controllerDescriptors)
   302  			},
   303  			OnStoppedLeading: func() {
   304  				logger.Error(nil, "leaderelection lost")
   305  				klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   306  			},
   307  		})
   308  
   309  	// If Leader Migration is enabled, proceed to attempt the migration lock.
   310  	if leaderMigrator != nil {
   311  		// Wait for Service Account Token Controller to start before acquiring the migration lock.
   312  		// At this point, the main lock must have already been acquired, or the KCM process already exited.
   313  		// We wait for the main lock before acquiring the migration lock to prevent the situation
   314  		//  where KCM instance A holds the main lock while KCM instance B holds the migration lock.
   315  		<-leaderMigrator.MigrationReady
   316  
   317  		// Start the migration lock.
   318  		go leaderElectAndRun(ctx, c, id, electionChecker,
   319  			c.ComponentConfig.Generic.LeaderMigration.ResourceLock,
   320  			c.ComponentConfig.Generic.LeaderMigration.LeaderName,
   321  			leaderelection.LeaderCallbacks{
   322  				OnStartedLeading: func(ctx context.Context) {
   323  					logger.Info("leader migration: starting migrated controllers.")
   324  					controllerDescriptors := NewControllerDescriptors()
   325  					controllerDescriptors = filteredControllerDescriptors(controllerDescriptors, leaderMigrator.FilterFunc, leadermigration.ControllerMigrated)
   326  					// DO NOT start saTokenController under migration lock
   327  					delete(controllerDescriptors, names.ServiceAccountTokenController)
   328  					run(ctx, controllerDescriptors)
   329  				},
   330  				OnStoppedLeading: func() {
   331  					logger.Error(nil, "migration leaderelection lost")
   332  					klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   333  				},
   334  			})
   335  	}
   336  
   337  	<-stopCh
   338  	return nil
   339  }
   340  
   341  // ControllerContext defines the context object for controller
   342  type ControllerContext struct {
   343  	// ClientBuilder will provide a client for this controller to use
   344  	ClientBuilder clientbuilder.ControllerClientBuilder
   345  
   346  	// InformerFactory gives access to informers for the controller.
   347  	InformerFactory informers.SharedInformerFactory
   348  
   349  	// ObjectOrMetadataInformerFactory gives access to informers for typed resources
   350  	// and dynamic resources by their metadata. All generic controllers currently use
   351  	// object metadata - if a future controller needs access to the full object this
   352  	// would become GenericInformerFactory and take a dynamic client.
   353  	ObjectOrMetadataInformerFactory informerfactory.InformerFactory
   354  
   355  	// ComponentConfig provides access to init options for a given controller
   356  	ComponentConfig kubectrlmgrconfig.KubeControllerManagerConfiguration
   357  
   358  	// DeferredDiscoveryRESTMapper is a RESTMapper that will defer
   359  	// initialization of the RESTMapper until the first mapping is
   360  	// requested.
   361  	RESTMapper *restmapper.DeferredDiscoveryRESTMapper
   362  
   363  	// Cloud is the cloud provider interface for the controllers to use.
   364  	// It must be initialized and ready to use.
   365  	Cloud cloudprovider.Interface
   366  
   367  	// Control for which control loops to be run
   368  	// IncludeCloudLoops is for a kube-controller-manager running all loops
   369  	// ExternalLoops is for a kube-controller-manager running with a cloud-controller-manager
   370  	LoopMode ControllerLoopMode
   371  
   372  	// InformersStarted is closed after all of the controllers have been initialized and are running.  After this point it is safe,
   373  	// for an individual controller to start the shared informers. Before it is closed, they should not.
   374  	InformersStarted chan struct{}
   375  
   376  	// ResyncPeriod generates a duration each time it is invoked; this is so that
   377  	// multiple controllers don't get into lock-step and all hammer the apiserver
   378  	// with list requests simultaneously.
   379  	ResyncPeriod func() time.Duration
   380  
   381  	// ControllerManagerMetrics provides a proxy to set controller manager specific metrics.
   382  	ControllerManagerMetrics *controllersmetrics.ControllerManagerMetrics
   383  
   384  	// GraphBuilder gives an access to dependencyGraphBuilder which keeps tracks of resources in the cluster
   385  	GraphBuilder *garbagecollector.GraphBuilder
   386  }
   387  
   388  // IsControllerEnabled checks if the context's controllers enabled or not
   389  func (c ControllerContext) IsControllerEnabled(controllerDescriptor *ControllerDescriptor) bool {
   390  	controllersDisabledByDefault := sets.NewString()
   391  	if controllerDescriptor.IsDisabledByDefault() {
   392  		controllersDisabledByDefault.Insert(controllerDescriptor.Name())
   393  	}
   394  	return genericcontrollermanager.IsControllerEnabled(controllerDescriptor.Name(), controllersDisabledByDefault, c.ComponentConfig.Generic.Controllers)
   395  }
   396  
   397  // InitFunc is used to launch a particular controller. It returns a controller
   398  // that can optionally implement other interfaces so that the controller manager
   399  // can support the requested features.
   400  // The returned controller may be nil, which will be considered an anonymous controller
   401  // that requests no additional features from the controller manager.
   402  // Any error returned will cause the controller process to `Fatal`
   403  // The bool indicates whether the controller was enabled.
   404  type InitFunc func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller controller.Interface, enabled bool, err error)
   405  
   406  type ControllerDescriptor struct {
   407  	name                      string
   408  	initFunc                  InitFunc
   409  	requiredFeatureGates      []featuregate.Feature
   410  	aliases                   []string
   411  	isDisabledByDefault       bool
   412  	isCloudProviderController bool
   413  	requiresSpecialHandling   bool
   414  }
   415  
   416  func (r *ControllerDescriptor) Name() string {
   417  	return r.name
   418  }
   419  
   420  func (r *ControllerDescriptor) GetInitFunc() InitFunc {
   421  	return r.initFunc
   422  }
   423  
   424  func (r *ControllerDescriptor) GetRequiredFeatureGates() []featuregate.Feature {
   425  	return append([]featuregate.Feature(nil), r.requiredFeatureGates...)
   426  }
   427  
   428  // GetAliases returns aliases to ensure backwards compatibility and should never be removed!
   429  // Only addition of new aliases is allowed, and only when a canonical name is changed (please see CHANGE POLICY of controller names)
   430  func (r *ControllerDescriptor) GetAliases() []string {
   431  	return append([]string(nil), r.aliases...)
   432  }
   433  
   434  func (r *ControllerDescriptor) IsDisabledByDefault() bool {
   435  	return r.isDisabledByDefault
   436  }
   437  
   438  func (r *ControllerDescriptor) IsCloudProviderController() bool {
   439  	return r.isCloudProviderController
   440  }
   441  
   442  // RequiresSpecialHandling should return true only in a special non-generic controllers like ServiceAccountTokenController
   443  func (r *ControllerDescriptor) RequiresSpecialHandling() bool {
   444  	return r.requiresSpecialHandling
   445  }
   446  
   447  // KnownControllers returns all known controllers's name
   448  func KnownControllers() []string {
   449  	return sets.StringKeySet(NewControllerDescriptors()).List()
   450  }
   451  
   452  // ControllerAliases returns a mapping of aliases to canonical controller names
   453  func ControllerAliases() map[string]string {
   454  	aliases := map[string]string{}
   455  	for name, c := range NewControllerDescriptors() {
   456  		for _, alias := range c.GetAliases() {
   457  			aliases[alias] = name
   458  		}
   459  	}
   460  	return aliases
   461  }
   462  
   463  func ControllersDisabledByDefault() []string {
   464  	var controllersDisabledByDefault []string
   465  
   466  	for name, c := range NewControllerDescriptors() {
   467  		if c.IsDisabledByDefault() {
   468  			controllersDisabledByDefault = append(controllersDisabledByDefault, name)
   469  		}
   470  	}
   471  
   472  	sort.Strings(controllersDisabledByDefault)
   473  
   474  	return controllersDisabledByDefault
   475  }
   476  
   477  // NewControllerDescriptors is a public map of named controller groups (you can start more than one in an init func)
   478  // paired to their ControllerDescriptor wrapper object that includes InitFunc.
   479  // This allows for structured downstream composition and subdivision.
   480  func NewControllerDescriptors() map[string]*ControllerDescriptor {
   481  	controllers := map[string]*ControllerDescriptor{}
   482  	aliases := sets.NewString()
   483  
   484  	// All the controllers must fulfil common constraints, or else we will explode.
   485  	register := func(controllerDesc *ControllerDescriptor) {
   486  		if controllerDesc == nil {
   487  			panic("received nil controller for a registration")
   488  		}
   489  		name := controllerDesc.Name()
   490  		if len(name) == 0 {
   491  			panic("received controller without a name for a registration")
   492  		}
   493  		if _, found := controllers[name]; found {
   494  			panic(fmt.Sprintf("controller name %q was registered twice", name))
   495  		}
   496  		if controllerDesc.GetInitFunc() == nil {
   497  			panic(fmt.Sprintf("controller %q does not have an init function", name))
   498  		}
   499  
   500  		for _, alias := range controllerDesc.GetAliases() {
   501  			if aliases.Has(alias) {
   502  				panic(fmt.Sprintf("controller %q has a duplicate alias %q", name, alias))
   503  			}
   504  			aliases.Insert(alias)
   505  		}
   506  
   507  		controllers[name] = controllerDesc
   508  	}
   509  
   510  	// First add "special" controllers that aren't initialized normally. These controllers cannot be initialized
   511  	// in the main controller loop initialization, so we add them here only for the metadata and duplication detection.
   512  	// app.ControllerDescriptor#RequiresSpecialHandling should return true for such controllers
   513  	// The only known special case is the ServiceAccountTokenController which *must* be started
   514  	// first to ensure that the SA tokens for future controllers will exist. Think very carefully before adding new
   515  	// special controllers.
   516  	register(newServiceAccountTokenControllerDescriptor(nil))
   517  
   518  	register(newEndpointsControllerDescriptor())
   519  	register(newEndpointSliceControllerDescriptor())
   520  	register(newEndpointSliceMirroringControllerDescriptor())
   521  	register(newReplicationControllerDescriptor())
   522  	register(newPodGarbageCollectorControllerDescriptor())
   523  	register(newResourceQuotaControllerDescriptor())
   524  	register(newNamespaceControllerDescriptor())
   525  	register(newServiceAccountControllerDescriptor())
   526  	register(newGarbageCollectorControllerDescriptor())
   527  	register(newDaemonSetControllerDescriptor())
   528  	register(newJobControllerDescriptor())
   529  	register(newDeploymentControllerDescriptor())
   530  	register(newReplicaSetControllerDescriptor())
   531  	register(newHorizontalPodAutoscalerControllerDescriptor())
   532  	register(newDisruptionControllerDescriptor())
   533  	register(newStatefulSetControllerDescriptor())
   534  	register(newCronJobControllerDescriptor())
   535  	register(newCertificateSigningRequestSigningControllerDescriptor())
   536  	register(newCertificateSigningRequestApprovingControllerDescriptor())
   537  	register(newCertificateSigningRequestCleanerControllerDescriptor())
   538  	register(newTTLControllerDescriptor())
   539  	register(newBootstrapSignerControllerDescriptor())
   540  	register(newTokenCleanerControllerDescriptor())
   541  	register(newNodeIpamControllerDescriptor())
   542  	register(newNodeLifecycleControllerDescriptor())
   543  
   544  	register(newServiceLBControllerDescriptor())          // cloud provider controller
   545  	register(newNodeRouteControllerDescriptor())          // cloud provider controller
   546  	register(newCloudNodeLifecycleControllerDescriptor()) // cloud provider controller
   547  	// TODO: persistent volume controllers into the IncludeCloudLoops only set as a cloud provider controller.
   548  
   549  	register(newPersistentVolumeBinderControllerDescriptor())
   550  	register(newPersistentVolumeAttachDetachControllerDescriptor())
   551  	register(newPersistentVolumeExpanderControllerDescriptor())
   552  	register(newClusterRoleAggregrationControllerDescriptor())
   553  	register(newPersistentVolumeClaimProtectionControllerDescriptor())
   554  	register(newPersistentVolumeProtectionControllerDescriptor())
   555  	register(newTTLAfterFinishedControllerDescriptor())
   556  	register(newRootCACertificatePublisherControllerDescriptor())
   557  	register(newEphemeralVolumeControllerDescriptor())
   558  
   559  	// feature gated
   560  	register(newStorageVersionGarbageCollectorControllerDescriptor())
   561  	register(newResourceClaimControllerDescriptor())
   562  	register(newLegacyServiceAccountTokenCleanerControllerDescriptor())
   563  	register(newValidatingAdmissionPolicyStatusControllerDescriptor())
   564  	register(newTaintEvictionControllerDescriptor())
   565  	register(newServiceCIDRsControllerDescriptor())
   566  	register(newStorageVersionMigratorControllerDescriptor())
   567  
   568  	for _, alias := range aliases.UnsortedList() {
   569  		if _, ok := controllers[alias]; ok {
   570  			panic(fmt.Sprintf("alias %q conflicts with a controller name", alias))
   571  		}
   572  	}
   573  
   574  	return controllers
   575  }
   576  
   577  // CreateControllerContext creates a context struct containing references to resources needed by the
   578  // controllers such as the cloud provider and clientBuilder. rootClientBuilder is only used for
   579  // the shared-informers client and token controller.
   580  func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, rootClientBuilder, clientBuilder clientbuilder.ControllerClientBuilder) (ControllerContext, error) {
   581  	// Informer transform to trim ManagedFields for memory efficiency.
   582  	trim := func(obj interface{}) (interface{}, error) {
   583  		if accessor, err := meta.Accessor(obj); err == nil {
   584  			if accessor.GetManagedFields() != nil {
   585  				accessor.SetManagedFields(nil)
   586  			}
   587  		}
   588  		return obj, nil
   589  	}
   590  
   591  	versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
   592  	sharedInformers := informers.NewSharedInformerFactoryWithOptions(versionedClient, ResyncPeriod(s)(), informers.WithTransform(trim))
   593  
   594  	metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers"))
   595  	metadataInformers := metadatainformer.NewSharedInformerFactoryWithOptions(metadataClient, ResyncPeriod(s)(), metadatainformer.WithTransform(trim))
   596  
   597  	// If apiserver is not running we should wait for some time and fail only then. This is particularly
   598  	// important when we start apiserver and controller manager at the same time.
   599  	if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
   600  		return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
   601  	}
   602  
   603  	// Use a discovery client capable of being refreshed.
   604  	discoveryClient := rootClientBuilder.DiscoveryClientOrDie("controller-discovery")
   605  	cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
   606  	restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
   607  	go wait.Until(func() {
   608  		restMapper.Reset()
   609  	}, 30*time.Second, ctx.Done())
   610  
   611  	cloud, loopMode, err := createCloudProvider(klog.FromContext(ctx), s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
   612  		s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
   613  	if err != nil {
   614  		return ControllerContext{}, err
   615  	}
   616  
   617  	controllerContext := ControllerContext{
   618  		ClientBuilder:                   clientBuilder,
   619  		InformerFactory:                 sharedInformers,
   620  		ObjectOrMetadataInformerFactory: informerfactory.NewInformerFactory(sharedInformers, metadataInformers),
   621  		ComponentConfig:                 s.ComponentConfig,
   622  		RESTMapper:                      restMapper,
   623  		Cloud:                           cloud,
   624  		LoopMode:                        loopMode,
   625  		InformersStarted:                make(chan struct{}),
   626  		ResyncPeriod:                    ResyncPeriod(s),
   627  		ControllerManagerMetrics:        controllersmetrics.NewControllerManagerMetrics("kube-controller-manager"),
   628  	}
   629  
   630  	if controllerContext.ComponentConfig.GarbageCollectorController.EnableGarbageCollector &&
   631  		controllerContext.IsControllerEnabled(NewControllerDescriptors()[names.GarbageCollectorController]) {
   632  		ignoredResources := make(map[schema.GroupResource]struct{})
   633  		for _, r := range controllerContext.ComponentConfig.GarbageCollectorController.GCIgnoredResources {
   634  			ignoredResources[schema.GroupResource{Group: r.Group, Resource: r.Resource}] = struct{}{}
   635  		}
   636  
   637  		controllerContext.GraphBuilder = garbagecollector.NewDependencyGraphBuilder(
   638  			ctx,
   639  			metadataClient,
   640  			controllerContext.RESTMapper,
   641  			ignoredResources,
   642  			controllerContext.ObjectOrMetadataInformerFactory,
   643  			controllerContext.InformersStarted,
   644  		)
   645  	}
   646  
   647  	controllersmetrics.Register()
   648  	return controllerContext, nil
   649  }
   650  
   651  // StartControllers starts a set of controllers with a specified ControllerContext
   652  func StartControllers(ctx context.Context, controllerCtx ControllerContext, controllerDescriptors map[string]*ControllerDescriptor,
   653  	unsecuredMux *mux.PathRecorderMux, healthzHandler *controllerhealthz.MutableHealthzHandler) error {
   654  	var controllerChecks []healthz.HealthChecker
   655  
   656  	// Always start the SA token controller first using a full-power client, since it needs to mint tokens for the rest
   657  	// If this fails, just return here and fail since other controllers won't be able to get credentials.
   658  	if serviceAccountTokenControllerDescriptor, ok := controllerDescriptors[names.ServiceAccountTokenController]; ok {
   659  		check, err := StartController(ctx, controllerCtx, serviceAccountTokenControllerDescriptor, unsecuredMux)
   660  		if err != nil {
   661  			return err
   662  		}
   663  		if check != nil {
   664  			// HealthChecker should be present when controller has started
   665  			controllerChecks = append(controllerChecks, check)
   666  		}
   667  	}
   668  
   669  	// Initialize the cloud provider with a reference to the clientBuilder only after token controller
   670  	// has started in case the cloud provider uses the client builder.
   671  	if controllerCtx.Cloud != nil {
   672  		controllerCtx.Cloud.Initialize(controllerCtx.ClientBuilder, ctx.Done())
   673  	}
   674  
   675  	// Each controller is passed a context where the logger has the name of
   676  	// the controller set through WithName. That name then becomes the prefix of
   677  	// of all log messages emitted by that controller.
   678  	//
   679  	// In StartController, an explicit "controller" key is used instead, for two reasons:
   680  	// - while contextual logging is alpha, klog.LoggerWithName is still a no-op,
   681  	//   so we cannot rely on it yet to add the name
   682  	// - it allows distinguishing between log entries emitted by the controller
   683  	//   and those emitted for it - this is a bit debatable and could be revised.
   684  	for _, controllerDesc := range controllerDescriptors {
   685  		if controllerDesc.RequiresSpecialHandling() {
   686  			continue
   687  		}
   688  
   689  		check, err := StartController(ctx, controllerCtx, controllerDesc, unsecuredMux)
   690  		if err != nil {
   691  			return err
   692  		}
   693  		if check != nil {
   694  			// HealthChecker should be present when controller has started
   695  			controllerChecks = append(controllerChecks, check)
   696  		}
   697  	}
   698  
   699  	healthzHandler.AddHealthChecker(controllerChecks...)
   700  
   701  	return nil
   702  }
   703  
   704  // StartController starts a controller with a specified ControllerContext
   705  // and performs required pre- and post- checks/actions
   706  func StartController(ctx context.Context, controllerCtx ControllerContext, controllerDescriptor *ControllerDescriptor,
   707  	unsecuredMux *mux.PathRecorderMux) (healthz.HealthChecker, error) {
   708  	logger := klog.FromContext(ctx)
   709  	controllerName := controllerDescriptor.Name()
   710  
   711  	for _, featureGate := range controllerDescriptor.GetRequiredFeatureGates() {
   712  		if !utilfeature.DefaultFeatureGate.Enabled(featureGate) {
   713  			logger.Info("Controller is disabled by a feature gate", "controller", controllerName, "requiredFeatureGates", controllerDescriptor.GetRequiredFeatureGates())
   714  			return nil, nil
   715  		}
   716  	}
   717  
   718  	if controllerDescriptor.IsCloudProviderController() && controllerCtx.LoopMode != IncludeCloudLoops {
   719  		logger.Info("Skipping a cloud provider controller", "controller", controllerName, "loopMode", controllerCtx.LoopMode)
   720  		return nil, nil
   721  	}
   722  
   723  	if !controllerCtx.IsControllerEnabled(controllerDescriptor) {
   724  		logger.Info("Warning: controller is disabled", "controller", controllerName)
   725  		return nil, nil
   726  	}
   727  
   728  	time.Sleep(wait.Jitter(controllerCtx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter))
   729  
   730  	logger.V(1).Info("Starting controller", "controller", controllerName)
   731  
   732  	initFunc := controllerDescriptor.GetInitFunc()
   733  	ctrl, started, err := initFunc(klog.NewContext(ctx, klog.LoggerWithName(logger, controllerName)), controllerCtx, controllerName)
   734  	if err != nil {
   735  		logger.Error(err, "Error starting controller", "controller", controllerName)
   736  		return nil, err
   737  	}
   738  	if !started {
   739  		logger.Info("Warning: skipping controller", "controller", controllerName)
   740  		return nil, nil
   741  	}
   742  
   743  	check := controllerhealthz.NamedPingChecker(controllerName)
   744  	if ctrl != nil {
   745  		// check if the controller supports and requests a debugHandler
   746  		// and it needs the unsecuredMux to mount the handler onto.
   747  		if debuggable, ok := ctrl.(controller.Debuggable); ok && unsecuredMux != nil {
   748  			if debugHandler := debuggable.DebuggingHandler(); debugHandler != nil {
   749  				basePath := "/debug/controllers/" + controllerName
   750  				unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler))
   751  				unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler))
   752  			}
   753  		}
   754  		if healthCheckable, ok := ctrl.(controller.HealthCheckable); ok {
   755  			if realCheck := healthCheckable.HealthChecker(); realCheck != nil {
   756  				check = controllerhealthz.NamedHealthChecker(controllerName, realCheck)
   757  			}
   758  		}
   759  	}
   760  
   761  	logger.Info("Started controller", "controller", controllerName)
   762  	return check, nil
   763  }
   764  
   765  // serviceAccountTokenControllerStarter is special because it must run first to set up permissions for other controllers.
   766  // It cannot use the "normal" client builder, so it tracks its own.
   767  func newServiceAccountTokenControllerDescriptor(rootClientBuilder clientbuilder.ControllerClientBuilder) *ControllerDescriptor {
   768  	return &ControllerDescriptor{
   769  		name:    names.ServiceAccountTokenController,
   770  		aliases: []string{"serviceaccount-token"},
   771  		initFunc: func(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
   772  			return startServiceAccountTokenController(ctx, controllerContext, controllerName, rootClientBuilder)
   773  		},
   774  		// will make sure it runs first before other controllers
   775  		requiresSpecialHandling: true,
   776  	}
   777  }
   778  
   779  func startServiceAccountTokenController(ctx context.Context, controllerContext ControllerContext, controllerName string, rootClientBuilder clientbuilder.ControllerClientBuilder) (controller.Interface, bool, error) {
   780  	logger := klog.FromContext(ctx)
   781  	if len(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
   782  		logger.Info("Controller is disabled because there is no private key", "controller", controllerName)
   783  		return nil, false, nil
   784  	}
   785  	privateKey, err := keyutil.PrivateKeyFromFile(controllerContext.ComponentConfig.SAController.ServiceAccountKeyFile)
   786  	if err != nil {
   787  		return nil, true, fmt.Errorf("error reading key for service account token controller: %v", err)
   788  	}
   789  
   790  	var rootCA []byte
   791  	if controllerContext.ComponentConfig.SAController.RootCAFile != "" {
   792  		if rootCA, err = readCA(controllerContext.ComponentConfig.SAController.RootCAFile); err != nil {
   793  			return nil, true, fmt.Errorf("error parsing root-ca-file at %s: %v", controllerContext.ComponentConfig.SAController.RootCAFile, err)
   794  		}
   795  	} else {
   796  		rootCA = rootClientBuilder.ConfigOrDie("tokens-controller").CAData
   797  	}
   798  
   799  	tokenGenerator, err := serviceaccount.JWTTokenGenerator(serviceaccount.LegacyIssuer, privateKey)
   800  	if err != nil {
   801  		return nil, false, fmt.Errorf("failed to build token generator: %v", err)
   802  	}
   803  	tokenController, err := serviceaccountcontroller.NewTokensController(
   804  		controllerContext.InformerFactory.Core().V1().ServiceAccounts(),
   805  		controllerContext.InformerFactory.Core().V1().Secrets(),
   806  		rootClientBuilder.ClientOrDie("tokens-controller"),
   807  		serviceaccountcontroller.TokensControllerOptions{
   808  			TokenGenerator: tokenGenerator,
   809  			RootCA:         rootCA,
   810  		},
   811  	)
   812  	if err != nil {
   813  		return nil, true, fmt.Errorf("error creating Tokens controller: %v", err)
   814  	}
   815  	go tokenController.Run(ctx, int(controllerContext.ComponentConfig.SAController.ConcurrentSATokenSyncs))
   816  
   817  	// start the first set of informers now so that other controllers can start
   818  	controllerContext.InformerFactory.Start(ctx.Done())
   819  
   820  	return nil, true, nil
   821  }
   822  
   823  func readCA(file string) ([]byte, error) {
   824  	rootCA, err := os.ReadFile(file)
   825  	if err != nil {
   826  		return nil, err
   827  	}
   828  	if _, err := certutil.ParseCertsPEM(rootCA); err != nil {
   829  		return nil, err
   830  	}
   831  
   832  	return rootCA, err
   833  }
   834  
   835  // createClientBuilders creates clientBuilder and rootClientBuilder from the given configuration
   836  func createClientBuilders(logger klog.Logger, c *config.CompletedConfig) (clientBuilder clientbuilder.ControllerClientBuilder, rootClientBuilder clientbuilder.ControllerClientBuilder) {
   837  	rootClientBuilder = clientbuilder.SimpleControllerClientBuilder{
   838  		ClientConfig: c.Kubeconfig,
   839  	}
   840  	if c.ComponentConfig.KubeCloudShared.UseServiceAccountCredentials {
   841  		if len(c.ComponentConfig.SAController.ServiceAccountKeyFile) == 0 {
   842  			// It's possible another controller process is creating the tokens for us.
   843  			// If one isn't, we'll timeout and exit when our client builder is unable to create the tokens.
   844  			logger.Info("Warning: --use-service-account-credentials was specified without providing a --service-account-private-key-file")
   845  		}
   846  
   847  		clientBuilder = clientbuilder.NewDynamicClientBuilder(
   848  			restclient.AnonymousClientConfig(c.Kubeconfig),
   849  			c.Client.CoreV1(),
   850  			metav1.NamespaceSystem)
   851  	} else {
   852  		clientBuilder = rootClientBuilder
   853  	}
   854  	return
   855  }
   856  
   857  // leaderElectAndRun runs the leader election, and runs the callbacks once the leader lease is acquired.
   858  // TODO: extract this function into staging/controller-manager
   859  func leaderElectAndRun(ctx context.Context, c *config.CompletedConfig, lockIdentity string, electionChecker *leaderelection.HealthzAdaptor, resourceLock string, leaseName string, callbacks leaderelection.LeaderCallbacks) {
   860  	logger := klog.FromContext(ctx)
   861  	rl, err := resourcelock.NewFromKubeconfig(resourceLock,
   862  		c.ComponentConfig.Generic.LeaderElection.ResourceNamespace,
   863  		leaseName,
   864  		resourcelock.ResourceLockConfig{
   865  			Identity:      lockIdentity,
   866  			EventRecorder: c.EventRecorder,
   867  		},
   868  		c.Kubeconfig,
   869  		c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration)
   870  	if err != nil {
   871  		logger.Error(err, "Error creating lock")
   872  		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   873  	}
   874  
   875  	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
   876  		Lock:          rl,
   877  		LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration,
   878  		RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration,
   879  		RetryPeriod:   c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration,
   880  		Callbacks:     callbacks,
   881  		WatchDog:      electionChecker,
   882  		Name:          leaseName,
   883  	})
   884  
   885  	panic("unreachable")
   886  }
   887  
   888  // filteredControllerDescriptors returns all controllerDescriptors after filtering through filterFunc.
   889  func filteredControllerDescriptors(controllerDescriptors map[string]*ControllerDescriptor, filterFunc leadermigration.FilterFunc, expected leadermigration.FilterResult) map[string]*ControllerDescriptor {
   890  	resultControllers := make(map[string]*ControllerDescriptor)
   891  	for name, controllerDesc := range controllerDescriptors {
   892  		if filterFunc(name) == expected {
   893  			resultControllers[name] = controllerDesc
   894  		}
   895  	}
   896  	return resultControllers
   897  }
   898  

View as plain text