1
16
17
18 package app
19
20 import (
21 "context"
22 "fmt"
23 "net/http"
24 "os"
25 goruntime "runtime"
26
27 "github.com/spf13/cobra"
28
29 utilerrors "k8s.io/apimachinery/pkg/util/errors"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apiserver/pkg/authentication/authenticator"
32 "k8s.io/apiserver/pkg/authorization/authorizer"
33 genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
34 apirequest "k8s.io/apiserver/pkg/endpoints/request"
35 "k8s.io/apiserver/pkg/server"
36 genericfilters "k8s.io/apiserver/pkg/server/filters"
37 "k8s.io/apiserver/pkg/server/healthz"
38 "k8s.io/apiserver/pkg/server/mux"
39 "k8s.io/apiserver/pkg/server/routes"
40 utilfeature "k8s.io/apiserver/pkg/util/feature"
41 "k8s.io/client-go/informers"
42 "k8s.io/client-go/kubernetes/scheme"
43 "k8s.io/client-go/tools/events"
44 "k8s.io/client-go/tools/leaderelection"
45 cliflag "k8s.io/component-base/cli/flag"
46 "k8s.io/component-base/cli/globalflag"
47 "k8s.io/component-base/configz"
48 "k8s.io/component-base/logs"
49 logsapi "k8s.io/component-base/logs/api/v1"
50 "k8s.io/component-base/metrics/features"
51 "k8s.io/component-base/metrics/legacyregistry"
52 "k8s.io/component-base/metrics/prometheus/slis"
53 "k8s.io/component-base/term"
54 "k8s.io/component-base/version"
55 "k8s.io/component-base/version/verflag"
56 "k8s.io/klog/v2"
57 schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config"
58 "k8s.io/kubernetes/cmd/kube-scheduler/app/options"
59 "k8s.io/kubernetes/pkg/scheduler"
60 kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
61 "k8s.io/kubernetes/pkg/scheduler/apis/config/latest"
62 "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
63 "k8s.io/kubernetes/pkg/scheduler/metrics/resources"
64 "k8s.io/kubernetes/pkg/scheduler/profile"
65 )
66
67 func init() {
68 utilruntime.Must(logsapi.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
69 utilruntime.Must(features.AddFeatureGates(utilfeature.DefaultMutableFeatureGate))
70 }
71
72
73 type Option func(runtime.Registry) error
74
75
76 func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
77 opts := options.NewOptions()
78
79 cmd := &cobra.Command{
80 Use: "kube-scheduler",
81 Long: `The Kubernetes scheduler is a control plane process which assigns
82 Pods to Nodes. The scheduler determines which Nodes are valid placements for
83 each Pod in the scheduling queue according to constraints and available
84 resources. The scheduler then ranks each valid Node and binds the Pod to a
85 suitable Node. Multiple different schedulers may be used within a cluster;
86 kube-scheduler is the reference implementation.
87 See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/)
88 for more information about scheduling and the kube-scheduler component.`,
89 RunE: func(cmd *cobra.Command, args []string) error {
90 return runCommand(cmd, opts, registryOptions...)
91 },
92 Args: func(cmd *cobra.Command, args []string) error {
93 for _, arg := range args {
94 if len(arg) > 0 {
95 return fmt.Errorf("%q does not take any arguments, got %q", cmd.CommandPath(), args)
96 }
97 }
98 return nil
99 },
100 }
101
102 nfs := opts.Flags
103 verflag.AddFlags(nfs.FlagSet("global"))
104 globalflag.AddGlobalFlags(nfs.FlagSet("global"), cmd.Name(), logs.SkipLoggingConfigurationFlags())
105 fs := cmd.Flags()
106 for _, f := range nfs.FlagSets {
107 fs.AddFlagSet(f)
108 }
109
110 cols, _, _ := term.TerminalSize(cmd.OutOrStdout())
111 cliflag.SetUsageAndHelpFunc(cmd, *nfs, cols)
112
113 if err := cmd.MarkFlagFilename("config", "yaml", "yml", "json"); err != nil {
114 klog.Background().Error(err, "Failed to mark flag filename")
115 }
116
117 return cmd
118 }
119
120
121 func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
122 verflag.PrintAndExitIfRequested()
123
124
125
126 if err := logsapi.ValidateAndApply(opts.Logs, utilfeature.DefaultFeatureGate); err != nil {
127 fmt.Fprintf(os.Stderr, "%v\n", err)
128 os.Exit(1)
129 }
130 cliflag.PrintFlags(cmd.Flags())
131
132 ctx, cancel := context.WithCancel(context.Background())
133 defer cancel()
134 go func() {
135 stopCh := server.SetupSignalHandler()
136 <-stopCh
137 cancel()
138 }()
139
140 cc, sched, err := Setup(ctx, opts, registryOptions...)
141 if err != nil {
142 return err
143 }
144
145 utilfeature.DefaultMutableFeatureGate.AddMetrics()
146 return Run(ctx, cc, sched)
147 }
148
149
150 func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
151 logger := klog.FromContext(ctx)
152
153
154 logger.Info("Starting Kubernetes Scheduler", "version", version.Get())
155
156 logger.Info("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
157
158
159 if cz, err := configz.New("componentconfig"); err == nil {
160 cz.Set(cc.ComponentConfig)
161 } else {
162 return fmt.Errorf("unable to register configz: %s", err)
163 }
164
165
166 cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
167 defer cc.EventBroadcaster.Shutdown()
168
169
170 var checks []healthz.HealthChecker
171 if cc.ComponentConfig.LeaderElection.LeaderElect {
172 checks = append(checks, cc.LeaderElection.WatchDog)
173 }
174
175 waitingForLeader := make(chan struct{})
176 isLeader := func() bool {
177 select {
178 case _, ok := <-waitingForLeader:
179
180 return !ok
181 default:
182
183 return false
184 }
185 }
186
187
188 if cc.SecureServing != nil {
189 handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
190
191 if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
192
193 return fmt.Errorf("failed to start secure server: %v", err)
194 }
195 }
196
197 startInformersAndWaitForSync := func(ctx context.Context) {
198
199 cc.InformerFactory.Start(ctx.Done())
200
201 if cc.DynInformerFactory != nil {
202 cc.DynInformerFactory.Start(ctx.Done())
203 }
204
205
206 cc.InformerFactory.WaitForCacheSync(ctx.Done())
207
208 if cc.DynInformerFactory != nil {
209 cc.DynInformerFactory.WaitForCacheSync(ctx.Done())
210 }
211
212
213 if err := sched.WaitForHandlersSync(ctx); err != nil {
214 logger.Error(err, "waiting for handlers to sync")
215 }
216
217 logger.V(3).Info("Handlers synced")
218 }
219 if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
220 startInformersAndWaitForSync(ctx)
221 }
222
223 if cc.LeaderElection != nil {
224 cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
225 OnStartedLeading: func(ctx context.Context) {
226 close(waitingForLeader)
227 if cc.ComponentConfig.DelayCacheUntilActive {
228 logger.Info("Starting informers and waiting for sync...")
229 startInformersAndWaitForSync(ctx)
230 logger.Info("Sync completed")
231 }
232 sched.Run(ctx)
233 },
234 OnStoppedLeading: func() {
235 select {
236 case <-ctx.Done():
237
238 logger.Info("Requested to terminate, exiting")
239 os.Exit(0)
240 default:
241
242 logger.Error(nil, "Leaderelection lost")
243 klog.FlushAndExit(klog.ExitFlushTimeout, 1)
244 }
245 },
246 }
247 leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
248 if err != nil {
249 return fmt.Errorf("couldn't create leader elector: %v", err)
250 }
251
252 leaderElector.Run(ctx)
253
254 return fmt.Errorf("lost lease")
255 }
256
257
258 close(waitingForLeader)
259 sched.Run(ctx)
260 return fmt.Errorf("finished without leader elect")
261 }
262
263
264 func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz authorizer.Authorizer) http.Handler {
265 requestInfoResolver := &apirequest.RequestInfoFactory{}
266 failedHandler := genericapifilters.Unauthorized(scheme.Codecs)
267
268 handler = genericapifilters.WithAuthorization(handler, authz, scheme.Codecs)
269 handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil, nil)
270 handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
271 handler = genericapifilters.WithCacheControl(handler)
272 handler = genericfilters.WithHTTPLogging(handler)
273 handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)
274
275 return handler
276 }
277
278 func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers informers.SharedInformerFactory, isLeader func() bool) {
279 configz.InstallHandler(pathRecorderMux)
280 pathRecorderMux.Handle("/metrics", legacyregistry.HandlerWithReset())
281
282 resourceMetricsHandler := resources.Handler(informers.Core().V1().Pods().Lister())
283 pathRecorderMux.HandleFunc("/metrics/resources", func(w http.ResponseWriter, req *http.Request) {
284 if !isLeader() {
285 return
286 }
287 resourceMetricsHandler.ServeHTTP(w, req)
288 })
289 }
290
291
292
293 func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler {
294 pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
295 healthz.InstallHandler(pathRecorderMux, checks...)
296 installMetricHandler(pathRecorderMux, informers, isLeader)
297 slis.SLIMetricsWithReset{}.Install(pathRecorderMux)
298
299 if config.EnableProfiling {
300 routes.Profiling{}.Install(pathRecorderMux)
301 if config.EnableContentionProfiling {
302 goruntime.SetBlockProfileRate(1)
303 }
304 routes.DebugFlags{}.Install(pathRecorderMux, "v", routes.StringFlagPutHandler(logs.GlogSetter))
305 }
306 return pathRecorderMux
307 }
308
309 func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.RecorderFactory {
310 return func(name string) events.EventRecorder {
311 return cc.EventBroadcaster.NewRecorder(name)
312 }
313 }
314
315
316
317 func WithPlugin(name string, factory runtime.PluginFactory) Option {
318 return func(registry runtime.Registry) error {
319 return registry.Register(name, factory)
320 }
321 }
322
323
324 func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
325 if cfg, err := latest.Default(); err != nil {
326 return nil, nil, err
327 } else {
328 opts.ComponentConfig = cfg
329 }
330
331 if errs := opts.Validate(); len(errs) > 0 {
332 return nil, nil, utilerrors.NewAggregate(errs)
333 }
334
335 c, err := opts.Config(ctx)
336 if err != nil {
337 return nil, nil, err
338 }
339
340
341 cc := c.Complete()
342
343 outOfTreeRegistry := make(runtime.Registry)
344 for _, option := range outOfTreeRegistryOptions {
345 if err := option(outOfTreeRegistry); err != nil {
346 return nil, nil, err
347 }
348 }
349
350 recorderFactory := getRecorderFactory(&cc)
351 completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
352
353 sched, err := scheduler.New(ctx,
354 cc.Client,
355 cc.InformerFactory,
356 cc.DynInformerFactory,
357 recorderFactory,
358 scheduler.WithComponentConfigVersion(cc.ComponentConfig.TypeMeta.APIVersion),
359 scheduler.WithKubeConfig(cc.KubeConfig),
360 scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
361 scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
362 scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
363 scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
364 scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
365 scheduler.WithPodMaxInUnschedulablePodsDuration(cc.PodMaxInUnschedulablePodsDuration),
366 scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
367 scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
368 scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
369
370 completedProfiles = append(completedProfiles, profile)
371 }),
372 )
373 if err != nil {
374 return nil, nil, err
375 }
376 if err := options.LogOrWriteConfig(klog.FromContext(ctx), opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
377 return nil, nil, err
378 }
379
380 return &cc, sched, nil
381 }
382
View as plain text