...

Source file src/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go

Documentation: k8s.io/kubernetes/cmd/kube-scheduler/app

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package app implements a Server object for running the scheduler.
    18  package app
    19  
    20  import (
    21  	"context"
    22  	"fmt"
    23  	"net/http"
    24  	"os"
    25  	goruntime "runtime"
    26  
    27  	"github.com/spf13/cobra"
    28  
    29  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apiserver/pkg/authentication/authenticator"
    32  	"k8s.io/apiserver/pkg/authorization/authorizer"
    33  	genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
    34  	apirequest "k8s.io/apiserver/pkg/endpoints/request"
    35  	"k8s.io/apiserver/pkg/server"
    36  	genericfilters "k8s.io/apiserver/pkg/server/filters"
    37  	"k8s.io/apiserver/pkg/server/healthz"
    38  	"k8s.io/apiserver/pkg/server/mux"
    39  	"k8s.io/apiserver/pkg/server/routes"
    40  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    41  	"k8s.io/client-go/informers"
    42  	"k8s.io/client-go/kubernetes/scheme"
    43  	"k8s.io/client-go/tools/events"
    44  	"k8s.io/client-go/tools/leaderelection"
    45  	cliflag "k8s.io/component-base/cli/flag"
    46  	"k8s.io/component-base/cli/globalflag"
    47  	"k8s.io/component-base/configz"
    48  	"k8s.io/component-base/logs"
    49  	logsapi "k8s.io/component-base/logs/api/v1"
    50  	"k8s.io/component-base/metrics/features"
    51  	"k8s.io/component-base/metrics/legacyregistry"
    52  	"k8s.io/component-base/metrics/prometheus/slis"
    53  	"k8s.io/component-base/term"
    54  	"k8s.io/component-base/version"
    55  	"k8s.io/component-base/version/verflag"
    56  	"k8s.io/klog/v2"
    57  	schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
    58  	"k8s.io/kubernetes/cmd/kube-scheduler/app/options"
    59  	"k8s.io/kubernetes/pkg/scheduler"
    60  	kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
    61  	"k8s.io/kubernetes/pkg/scheduler/apis/config/latest"
    62  	"k8s.io/kubernetes/pkg/scheduler/framework/runtime"
    63  	"k8s.io/kubernetes/pkg/scheduler/metrics/resources"
    64  	"k8s.io/kubernetes/pkg/scheduler/profile"
    65  )
    66  
    67  func init() {
    68  	utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    69  	utilruntime.Must(features.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
    70  }
    71  
    72  // Option configures a framework.Registry.
    73  type Option func(runtime.Registry) error
    74  
    75  // NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
    76  func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
    77  	opts := options.NewOptions()
    78  
    79  	cmd := &cobra.Command{
    80  		Use: "kube-scheduler",
    81  		Long: `The Kubernetes scheduler is a control plane process which assigns
    82  Pods to Nodes. The scheduler determines which Nodes are valid placements for
    83  each Pod in the scheduling queue according to constraints and available
    84  resources. The scheduler then ranks each valid Node and binds the Pod to a
    85  suitable Node. Multiple different schedulers may be used within a cluster;
    86  kube-scheduler is the reference implementation.
    87  See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
    88  for more information about scheduling and the kube-scheduler component.`,
    89  		RunE: func(cmd *cobra.Command, args []string) error {
    90  			return runCommand(cmd, opts, registryOptions...)
    91  		},
    92  		Args: func(cmd *cobra.Command, args []string) error {
    93  			for _, arg := range args {
    94  				if len(arg) > 0 {
    95  					return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
    96  				}
    97  			}
    98  			return nil
    99  		},
   100  	}
   101  
   102  	nfs := opts.Flags
   103  	verflag.AddFlags(nfs.FlagSet("global"))
   104  	globalflag.AddGlobalFlags(nfs.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
   105  	fs := cmd.Flags()
   106  	for _, f := range nfs.FlagSets {
   107  		fs.AddFlagSet(f)
   108  	}
   109  
   110  	cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
   111  	cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)
   112  
   113  	if err := cmd.MarkFlagFilename("config", "yaml", "yml", "json"); err != nil {
   114  		klog.Background().Error(err, "Failed to mark flag filename")
   115  	}
   116  
   117  	return cmd
   118  }
   119  
   120  // runCommand runs the scheduler.
   121  func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
   122  	verflag.PrintAndExitIfRequested()
   123  
   124  	// Activate logging as soon as possible, after that
   125  	// show flags with the final logging configuration.
   126  	if err := logsapi.ValidateAndApply(opts.Logs, utilfeature.DefaultFeatureGate); err != nil {
   127  		fmt.Fprintf(os.Stderr, "%v\n", err)
   128  		os.Exit(1)
   129  	}
   130  	cliflag.PrintFlags(cmd.Flags())
   131  
   132  	ctx, cancel := context.WithCancel(context.Background())
   133  	defer cancel()
   134  	go func() {
   135  		stopCh := server.SetupSignalHandler()
   136  		<-stopCh
   137  		cancel()
   138  	}()
   139  
   140  	cc, sched, err := Setup(ctx, opts, registryOptions...)
   141  	if err != nil {
   142  		return err
   143  	}
   144  	// add feature enablement metrics
   145  	utilfeature.DefaultMutableFeatureGate.AddMetrics()
   146  	return Run(ctx, cc, sched)
   147  }
   148  
   149  // Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
   150  func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
   151  	logger := klog.FromContext(ctx)
   152  
   153  	// To help debugging, immediately log version
   154  	logger.Info("Starting Kubernetes Scheduler", "version", version.Get())
   155  
   156  	logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
   157  
   158  	// Configz registration.
   159  	if cz, err := configz.New("componentconfig"); err == nil {
   160  		cz.Set(cc.ComponentConfig)
   161  	} else {
   162  		return fmt.Errorf("unable to register configz: %s", err)
   163  	}
   164  
   165  	// Start events processing pipeline.
   166  	cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
   167  	defer cc.EventBroadcaster.Shutdown()
   168  
   169  	// Setup healthz checks.
   170  	var checks []healthz.HealthChecker
   171  	if cc.ComponentConfig.LeaderElection.LeaderElect {
   172  		checks = append(checks, cc.LeaderElection.WatchDog)
   173  	}
   174  
   175  	waitingForLeader := make(chan struct{})
   176  	isLeader := func() bool {
   177  		select {
   178  		case _, ok := <-waitingForLeader:
   179  			// if channel is closed, we are leading
   180  			return !ok
   181  		default:
   182  			// channel is open, we are waiting for a leader
   183  			return false
   184  		}
   185  	}
   186  
   187  	// Start up the healthz server.
   188  	if cc.SecureServing != nil {
   189  		handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
   190  		// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
   191  		if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
   192  			// fail early for secure handlers, removing the old error loop from above
   193  			return fmt.Errorf("failed to start secure server: %v", err)
   194  		}
   195  	}
   196  
   197  	startInformersAndWaitForSync := func(ctx context.Context) {
   198  		// Start all informers.
   199  		cc.InformerFactory.Start(ctx.Done())
   200  		// DynInformerFactory can be nil in tests.
   201  		if cc.DynInformerFactory != nil {
   202  			cc.DynInformerFactory.Start(ctx.Done())
   203  		}
   204  
   205  		// Wait for all caches to sync before scheduling.
   206  		cc.InformerFactory.WaitForCacheSync(ctx.Done())
   207  		// DynInformerFactory can be nil in tests.
   208  		if cc.DynInformerFactory != nil {
   209  			cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
   210  		}
   211  
   212  		// Wait for all handlers to sync (all items in the initial list delivered) before scheduling.
   213  		if err := sched.WaitForHandlersSync(ctx); err != nil {
   214  			logger.Error(err, "waiting for handlers to sync")
   215  		}
   216  
   217  		logger.V(3).Info("Handlers synced")
   218  	}
   219  	if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
   220  		startInformersAndWaitForSync(ctx)
   221  	}
   222  	// If leader election is enabled, runCommand via LeaderElector until done and exit.
   223  	if cc.LeaderElection != nil {
   224  		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
   225  			OnStartedLeading: func(ctx context.Context) {
   226  				close(waitingForLeader)
   227  				if cc.ComponentConfig.DelayCacheUntilActive {
   228  					logger.Info("Starting informers and waiting for sync...")
   229  					startInformersAndWaitForSync(ctx)
   230  					logger.Info("Sync completed")
   231  				}
   232  				sched.Run(ctx)
   233  			},
   234  			OnStoppedLeading: func() {
   235  				select {
   236  				case <-ctx.Done():
   237  					// We were asked to terminate. Exit 0.
   238  					logger.Info("Requested to terminate, exiting")
   239  					os.Exit(0)
   240  				default:
   241  					// We lost the lock.
   242  					logger.Error(nil, "Leaderelection lost")
   243  					klog.FlushAndExit(klog.ExitFlushTimeout, 1)
   244  				}
   245  			},
   246  		}
   247  		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
   248  		if err != nil {
   249  			return fmt.Errorf("couldn't create leader elector: %v", err)
   250  		}
   251  
   252  		leaderElector.Run(ctx)
   253  
   254  		return fmt.Errorf("lost lease")
   255  	}
   256  
   257  	// Leader election is disabled, so runCommand inline until done.
   258  	close(waitingForLeader)
   259  	sched.Run(ctx)
   260  	return fmt.Errorf("finished without leader elect")
   261  }
   262  
   263  // buildHandlerChain wraps the given handler with the standard filters.
   264  func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz authorizer.Authorizer) http.Handler {
   265  	requestInfoResolver := &apirequest.RequestInfoFactory{}
   266  	failedHandler := genericapifilters.Unauthorized(scheme.Codecs)
   267  
   268  	handler = genericapifilters.WithAuthorization(handler, authz, scheme.Codecs)
   269  	handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil, nil)
   270  	handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
   271  	handler = genericapifilters.WithCacheControl(handler)
   272  	handler = genericfilters.WithHTTPLogging(handler)
   273  	handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
   274  
   275  	return handler
   276  }
   277  
   278  func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers informers.SharedInformerFactory, isLeader func() bool) {
   279  	configz.InstallHandler(pathRecorderMux)
   280  	pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())
   281  
   282  	resourceMetricsHandler := resources.Handler(informers.Core().V1().Pods().Lister())
   283  	pathRecorderMux.HandleFunc("/metrics/resources", func(w http.ResponseWriter, req *http.Request) {
   284  		if !isLeader() {
   285  			return
   286  		}
   287  		resourceMetricsHandler.ServeHTTP(w, req)
   288  	})
   289  }
   290  
   291  // newHealthzAndMetricsHandler creates a healthz server from the config, and will also
   292  // embed the metrics handler.
   293  func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler {
   294  	pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
   295  	healthz.InstallHandler(pathRecorderMux, checks...)
   296  	installMetricHandler(pathRecorderMux, informers, isLeader)
   297  	slis.SLIMetricsWithReset{}.Install(pathRecorderMux)
   298  
   299  	if config.EnableProfiling {
   300  		routes.Profiling{}.Install(pathRecorderMux)
   301  		if config.EnableContentionProfiling {
   302  			goruntime.SetBlockProfileRate(1)
   303  		}
   304  		routes.DebugFlags{}.Install(pathRecorderMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
   305  	}
   306  	return pathRecorderMux
   307  }
   308  
   309  func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {
   310  	return func(name string) events.EventRecorder {
   311  		return cc.EventBroadcaster.NewRecorder(name)
   312  	}
   313  }
   314  
   315  // WithPlugin creates an Option based on plugin name and factory. Please don't remove this function: it is used to register out-of-tree plugins,
   316  // hence there are no references to it from the kubernetes scheduler code base.
   317  func WithPlugin(name string, factory runtime.PluginFactory) Option {
   318  	return func(registry runtime.Registry) error {
   319  		return registry.Register(name, factory)
   320  	}
   321  }
   322  
   323  // Setup creates a completed config and a scheduler based on the command args and options
   324  func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
   325  	if cfg, err := latest.Default(); err != nil {
   326  		return nil, nil, err
   327  	} else {
   328  		opts.ComponentConfig = cfg
   329  	}
   330  
   331  	if errs := opts.Validate(); len(errs) > 0 {
   332  		return nil, nil, utilerrors.NewAggregate(errs)
   333  	}
   334  
   335  	c, err := opts.Config(ctx)
   336  	if err != nil {
   337  		return nil, nil, err
   338  	}
   339  
   340  	// Get the completed config
   341  	cc := c.Complete()
   342  
   343  	outOfTreeRegistry := make(runtime.Registry)
   344  	for _, option := range outOfTreeRegistryOptions {
   345  		if err := option(outOfTreeRegistry); err != nil {
   346  			return nil, nil, err
   347  		}
   348  	}
   349  
   350  	recorderFactory := getRecorderFactory(&cc)
   351  	completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
   352  	// Create the scheduler.
   353  	sched, err := scheduler.New(ctx,
   354  		cc.Client,
   355  		cc.InformerFactory,
   356  		cc.DynInformerFactory,
   357  		recorderFactory,
   358  		scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
   359  		scheduler.WithKubeConfig(cc.KubeConfig),
   360  		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
   361  		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
   362  		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
   363  		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
   364  		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
   365  		scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
   366  		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
   367  		scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
   368  		scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
   369  			// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
   370  			completedProfiles = append(completedProfiles, profile)
   371  		}),
   372  	)
   373  	if err != nil {
   374  		return nil, nil, err
   375  	}
   376  	if err := options.LogOrWriteConfig(klog.FromContext(ctx), opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
   377  		return nil, nil, err
   378  	}
   379  
   380  	return &cc, sched, nil
   381  }
   382  

View as plain text