    17  // Package app implements a Server object for running the scheduler.
    18  package app
    20  import (
    21  	"context"
    22  	"fmt"
    23  	"net/http"
    24  	"os"
    25  	goruntime "runtime"
    27  	"github.com/spf13/cobra"
    29  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apiserver/pkg/authentication/authenticator"
    32  	"k8s.io/apiserver/pkg/authorization/authorizer"
    33  	genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
    34  	apirequest "k8s.io/apiserver/pkg/endpoints/request"
    35  	"k8s.io/apiserver/pkg/server"
    36  	genericfilters "k8s.io/apiserver/pkg/server/filters"
    37  	"k8s.io/apiserver/pkg/server/healthz"
    38  	"k8s.io/apiserver/pkg/server/mux"
    39  	"k8s.io/apiserver/pkg/server/routes"
    40  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    41  	"k8s.io/client-go/informers"
    42  	"k8s.io/client-go/kubernetes/scheme"
    43  	"k8s.io/client-go/tools/events"
    44  	"k8s.io/client-go/tools/leaderelection"
    45  	cliflag "k8s.io/component-base/cli/flag"
    46  	"k8s.io/component-base/cli/globalflag"
    47  	"k8s.io/component-base/configz"
    48  	"k8s.io/component-base/logs"
    49  	logsapi "k8s.io/component-base/logs/api/v1"
    50  	"k8s.io/component-base/metrics/features"
    51  	"k8s.io/component-base/metrics/legacyregistry"
    52  	"k8s.io/component-base/metrics/prometheus/slis"
    53  	"k8s.io/component-base/term"
    54  	"k8s.io/component-base/version"
    55  	"k8s.io/component-base/version/verflag"
    56  	"k8s.io/klog/v2"
    57  	schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
    58  	"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
    59  	"k8s.io/kubernetes/pkg/scheduler"
    60  	kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
    61  	"k8s.io/kubernetes/pkg/scheduler/apis/config/latest"
    62  	"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    63  	"k8s.io/kubernetes/pkg/scheduler/metrics/resources"
    64  	"k8s.io/kubernetes/pkg/scheduler/profile"
    65  )
    67  func init() {
    68  	utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    69  	utilruntime.Must(features.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    70  }
    72  // Option configures a framework.Registry.
    73  type Option func(runtime.Registry) error
    75  // NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
    76  func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    77  	opts := options.NewOptions()
    79  	cmd := &cobra.Command{
    80  		Use: "kube-scheduler",
    81  		Long: `The Kubernetes scheduler is a control plane process which assigns
    82  Pods to Nodes. The scheduler determines which Nodes are valid placements for
    83  each Pod in the scheduling queue according to constraints and available
    84  resources. The scheduler then ranks each valid Node and binds the Pod to a
    85  suitable Node. Multiple different schedulers may be used within a cluster;
    86  kube-scheduler is the reference implementation.
    87  See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
    88  for more information about scheduling and the kube-scheduler component.`,
    89  		RunE: func(cmd *cobra.Command, args []string) error {
    90  			return runCommand(cmd, opts, registryOptions...)
    91  		},
    92  		Args: func(cmd *cobra.Command, args []string) error {
    93  			for _, arg := range args {
    94  				if len(arg) > 0 {
    95  					return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
    96  				}
    97  			}
    98  			return nil
    99  		},
   100  	}
   102  	nfs := opts.Flags
   103  	verflag.AddFlags(nfs.FlagSet("global"))
   104  	globalflag.AddGlobalFlags(nfs.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
   105  	fs := cmd.Flags()
   106  	for _, f := range nfs.FlagSets {
   107  		fs.AddFlagSet(f)
   108  	}
   110  	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
   111  	cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)
   113  	if err := cmd.MarkFlagFilename("config", "yaml", "yml", "json"); err != nil {
   114  		klog.Background().Error(err, "Failed to mark flag filename")
   115  	}
   117  	return cmd
   118  }
   120  // runCommand runs the scheduler.
   121  func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
   122  	verflag.PrintAndExitIfRequested()
   124  	// Activate logging as soon as possible, after that
   125  	// show flags with the final logging configuration.
   126  	if err := logsapi.ValidateAndApply(opts.Logs, utilfeature.DefaultFeatureGate); err != nil {
   127  		fmt.Fprintf(os.Stderr, "%v\n", err)
   128  		os.Exit(1)
   129  	}
   130  	cliflag.PrintFlags(cmd.Flags())
   132  	ctx, cancel := context.WithCancel(context.Background())
   133  	defer cancel()
   134  	go func() {
   135  		stopCh := server.SetupSignalHandler()
   136  		<-stopCh
   137  		cancel()
   138  	}()
   140  	cc, sched, err := Setup(ctx, opts, registryOptions...)
   141  	if err != nil {
   142  		return err
   143  	}
   144  	// add feature enablement metrics
   145  	utilfeature.DefaultMutableFeatureGate.AddMetrics()
   146  	return Run(ctx, cc, sched)
   147  }
   149  // Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
   150  func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
   151  	logger := klog.FromContext(ctx)
   153  	// To help debugging, immediately log version
   154  	logger.Info("Starting Kubernetes Scheduler", "version", version.Get())
   156  	logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
   158  	// Configz registration.
   159  	if cz, err := configz.New("componentconfig"); err == nil {
   160  		cz.Set(cc.ComponentConfig)
   161  	} else {
   162  		return fmt.Errorf("unable to register configz: %s", err)
   163  	}
   165  	// Start events processing pipeline.
   166  	cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
   167  	defer cc.EventBroadcaster.Shutdown()
   169  	// Setup healthz checks.
   170  	var checks []healthz.HealthChecker
   171  	if cc.ComponentConfig.LeaderElection.LeaderElect {
   172  		checks = append(checks, cc.LeaderElection.WatchDog)
   173  	}
   175  	waitingForLeader := make(chan struct{})
   176  	isLeader := func() bool {
   177  		select {
   178  		case _, ok := <-waitingForLeader:
   179  			// if channel is closed, we are leading
   180  			return !ok
   181  		default:
   182  			// channel is open, we are waiting for a leader
   183  			return false
   184  		}
   185  	}
   187  	// Start up the healthz server.
   188  	if cc.SecureServing != nil {
   189  		handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
   190  		// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
   191  		if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
   192  			// fail early for secure handlers, removing the old error loop from above
   193  			return fmt.Errorf("failed to start secure server: %v", err)
   194  		}
   195  	}
   197  	startInformersAndWaitForSync := func(ctx context.Context) {
   198  		// Start all informers.
   199  		cc.InformerFactory.Start(ctx.Done())
   200  		// DynInformerFactory can be nil in tests.
   201  		if cc.DynInformerFactory != nil {
   202  			cc.DynInformerFactory.Start(ctx.Done())
   203  		}
   205  		// Wait for all caches to sync before scheduling.
   206  		cc.InformerFactory.WaitForCacheSync(ctx.Done())
   207  		// DynInformerFactory can be nil in tests.
   208  		if cc.DynInformerFactory != nil {
   209  			cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
   210  		}
   212  		// Wait for all handlers to sync (all items in the initial list delivered) before scheduling.
   213  		if err := sched.WaitForHandlersSync(ctx); err != nil {
   214  			logger.Error(err, "waiting for handlers to sync")
   215  		}
   217  		logger.V(3).Info("Handlers synced")
   218  	}
   219  	if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
   220  		startInformersAndWaitForSync(ctx)
   221  	}
   222  	// If leader election is enabled, runCommand via LeaderElector until done and exit.
   223  	if cc.LeaderElection != nil {
   224  		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
   225  			OnStartedLeading: func(ctx context.Context) {
   226  				close(waitingForLeader)
   227  				if cc.ComponentConfig.DelayCacheUntilActive {
   228  					logger.Info("Starting informers and waiting for sync...")
   229  					startInformersAndWaitForSync(ctx)
   230  					logger.Info("Sync completed")
   231  				}
   232  				sched.Run(ctx)
   233  			},
   234  			OnStoppedLeading: func() {
   235  				select {
   236  				case <-ctx.Done():
   237  					// We were asked to terminate. Exit 0.
   238  					logger.Info("Requested to terminate, exiting")
   239  					os.Exit(0)
   240  				default:
   241  					// We lost the lock.
   242  					logger.Error(nil, "Leaderelection lost")
   243  					klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   244  				}
   245  			},
   246  		}
   247  		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
   248  		if err != nil {
   249  			return fmt.Errorf("couldn't create leader elector: %v", err)
   250  		}
   252  		leaderElector.Run(ctx)
   254  		return fmt.Errorf("lost lease")
   255  	}
   257  	// Leader election is disabled, so runCommand inline until done.
   258  	close(waitingForLeader)
   259  	sched.Run(ctx)
   260  	return fmt.Errorf("finished without leader elect")
   261  }
   263  // buildHandlerChain wraps the given handler with the standard filters.
   264  func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz authorizer.Authorizer) http.Handler {
   265  	requestInfoResolver := &apirequest.RequestInfoFactory{}
   266  	failedHandler := genericapifilters.Unauthorized(scheme.Codecs)
   268  	handler = genericapifilters.WithAuthorization(handler, authz, scheme.Codecs)
   269  	handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil, nil)
   270  	handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
   271  	handler = genericapifilters.WithCacheControl(handler)
   272  	handler = genericfilters.WithHTTPLogging(handler)
   273  	handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
   275  	return handler
   276  }
   278  func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers informers.SharedInformerFactory, isLeader func() bool) {
   279  	configz.InstallHandler(pathRecorderMux)
   280  	pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())
   282  	resourceMetricsHandler := resources.Handler(informers.Core().V1().Pods().Lister())
   283  	pathRecorderMux.HandleFunc("/metrics/resources", func(w http.ResponseWriter, req *http.Request) {
   284  		if !isLeader() {
   285  			return
   286  		}
   287  		resourceMetricsHandler.ServeHTTP(w, req)
   288  	})
   289  }
   291  // newHealthzAndMetricsHandler creates a healthz server from the config, and will also
   292  // embed the metrics handler.
   293  func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler {
   294  	pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
   295  	healthz.InstallHandler(pathRecorderMux, checks...)
   296  	installMetricHandler(pathRecorderMux, informers, isLeader)
   297  	slis.SLIMetricsWithReset{}.Install(pathRecorderMux)
   299  	if config.EnableProfiling {
   300  		routes.Profiling{}.Install(pathRecorderMux)
   301  		if config.EnableContentionProfiling {
   302  			goruntime.SetBlockProfileRate(1)
   303  		}
   304  		routes.DebugFlags{}.Install(pathRecorderMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
   305  	}
   306  	return pathRecorderMux
   307  }
   309  func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {
   310  	return func(name string) events.EventRecorder {
   311  		return cc.EventBroadcaster.NewRecorder(name)
   312  	}
   313  }
   315  // WithPlugin creates an Option based on plugin name and factory. Please don't remove this function: it is used to register out-of-tree plugins,
   316  // hence there are no references to it from the kubernetes scheduler code base.
   317  func WithPlugin(name string, factory runtime.PluginFactory) Option {
   318  	return func(registry runtime.Registry) error {
   319  		return registry.Register(name, factory)
   320  	}
   321  }
   323  // Setup creates a completed config and a scheduler based on the command args and options
   324  func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
   325  	if cfg, err := latest.Default(); err != nil {
   326  		return nil, nil, err
   327  	} else {
   328  		opts.ComponentConfig = cfg
   329  	}
   331  	if errs := opts.Validate(); len(errs) > 0 {
   332  		return nil, nil, utilerrors.NewAggregate(errs)
   333  	}
   335  	c, err := opts.Config(ctx)
   336  	if err != nil {
   337  		return nil, nil, err
   338  	}
   340  	// Get the completed config
   341  	cc := c.Complete()
   343  	outOfTreeRegistry := make(runtime.Registry)
   344  	for _, option := range outOfTreeRegistryOptions {
   345  		if err := option(outOfTreeRegistry); err != nil {
   346  			return nil, nil, err
   347  		}
   348  	}
   350  	recorderFactory := getRecorderFactory(&cc)
   351  	completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
   352  	// Create the scheduler.
   353  	sched, err := scheduler.New(ctx,
   354  		cc.Client,
   355  		cc.InformerFactory,
   356  		cc.DynInformerFactory,
   357  		recorderFactory,
   358  		scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
   359  		scheduler.WithKubeConfig(cc.KubeConfig),
   360  		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
   361  		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
   362  		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
   363  		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
   364  		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
   365  		scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
   366  		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
   367  		scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
   368  		scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
   369  			// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
   370  			completedProfiles = append(completedProfiles, profile)
   371  		}),
   372  	)
   373  	if err != nil {
   374  		return nil, nil, err
   375  	}
   376  	if err := options.LogOrWriteConfig(klog.FromContext(ctx), opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
   377  		return nil, nil, err
   378  	}
   380  	return &cc, sched, nil
   381  }

