1
16
17 package scheduler
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/api/meta"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/util/wait"
29 utilfeature "k8s.io/apiserver/pkg/util/feature"
30 "k8s.io/client-go/dynamic/dynamicinformer"
31 "k8s.io/client-go/informers"
32 coreinformers "k8s.io/client-go/informers/core/v1"
33 clientset "k8s.io/client-go/kubernetes"
34 restclient "k8s.io/client-go/rest"
35 "k8s.io/client-go/tools/cache"
36 "k8s.io/klog/v2"
37 configv1 "k8s.io/kube-scheduler/config/v1"
38 "k8s.io/kubernetes/pkg/features"
39 schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
40 "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
41 "k8s.io/kubernetes/pkg/scheduler/framework"
42 "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
43 frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
44 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
45 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
46 internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
47 cachedebugger "k8s.io/kubernetes/pkg/scheduler/internal/cache/debugger"
48 internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
49 "k8s.io/kubernetes/pkg/scheduler/metrics"
50 "k8s.io/kubernetes/pkg/scheduler/profile"
51 )
52
53 const (
54
55
56 durationToExpireAssumedPod time.Duration = 0
57 )
58
59
60 var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
61
62
63
64 type Scheduler struct {
65
66
67 Cache internalcache.Cache
68
69 Extenders []framework.Extender
70
71
72
73
74
75 NextPod func(logger klog.Logger) (*framework.QueuedPodInfo, error)
76
77
78 FailureHandler FailureHandlerFn
79
80
81
82
83 SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
84
85
86 StopEverything <-chan struct{}
87
88
89 SchedulingQueue internalqueue.SchedulingQueue
90
91
92 Profiles profile.Map
93
94 client clientset.Interface
95
96 nodeInfoSnapshot *internalcache.Snapshot
97
98 percentageOfNodesToScore int32
99
100 nextStartNodeIndex int
101
102
103
104
105 logger klog.Logger
106
107
108 registeredHandlers []cache.ResourceEventHandlerRegistration
109 }
110
111 func (sched *Scheduler) applyDefaultHandlers() {
112 sched.SchedulePod = sched.schedulePod
113 sched.FailureHandler = sched.handleSchedulingFailure
114 }
115
116 type schedulerOptions struct {
117 componentConfigVersion string
118 kubeConfig *restclient.Config
119
120 percentageOfNodesToScore int32
121 podInitialBackoffSeconds int64
122 podMaxBackoffSeconds int64
123 podMaxInUnschedulablePodsDuration time.Duration
124
125 frameworkOutOfTreeRegistry frameworkruntime.Registry
126 profiles []schedulerapi.KubeSchedulerProfile
127 extenders []schedulerapi.Extender
128 frameworkCapturer FrameworkCapturer
129 parallelism int32
130 applyDefaultProfile bool
131 }
132
133
134 type Option func(*schedulerOptions)
135
136
137 type ScheduleResult struct {
138
139 SuggestedHost string
140
141
142
143 EvaluatedNodes int
144
145 FeasibleNodes int
146
147 nominatingInfo *framework.NominatingInfo
148 }
149
150
151
152
153
154 func WithComponentConfigVersion(apiVersion string) Option {
155 return func(o *schedulerOptions) {
156 o.componentConfigVersion = apiVersion
157 }
158 }
159
160
161 func WithKubeConfig(cfg *restclient.Config) Option {
162 return func(o *schedulerOptions) {
163 o.kubeConfig = cfg
164 }
165 }
166
167
168
169 func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
170 return func(o *schedulerOptions) {
171 o.profiles = p
172 o.applyDefaultProfile = false
173 }
174 }
175
176
177 func WithParallelism(threads int32) Option {
178 return func(o *schedulerOptions) {
179 o.parallelism = threads
180 }
181 }
182
183
184
185 func WithPercentageOfNodesToScore(percentageOfNodesToScore *int32) Option {
186 return func(o *schedulerOptions) {
187 if percentageOfNodesToScore != nil {
188 o.percentageOfNodesToScore = *percentageOfNodesToScore
189 }
190 }
191 }
192
193
194
195 func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
196 return func(o *schedulerOptions) {
197 o.frameworkOutOfTreeRegistry = registry
198 }
199 }
200
201
202 func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
203 return func(o *schedulerOptions) {
204 o.podInitialBackoffSeconds = podInitialBackoffSeconds
205 }
206 }
207
208
209 func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
210 return func(o *schedulerOptions) {
211 o.podMaxBackoffSeconds = podMaxBackoffSeconds
212 }
213 }
214
215
216 func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
217 return func(o *schedulerOptions) {
218 o.podMaxInUnschedulablePodsDuration = duration
219 }
220 }
221
222
223 func WithExtenders(e ...schedulerapi.Extender) Option {
224 return func(o *schedulerOptions) {
225 o.extenders = e
226 }
227 }
228
229
230 type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
231
232
233 func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
234 return func(o *schedulerOptions) {
235 o.frameworkCapturer = fc
236 }
237 }
238
239 var defaultSchedulerOptions = schedulerOptions{
240 percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
241 podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
242 podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
243 podMaxInUnschedulablePodsDuration: internalqueue.DefaultPodMaxInUnschedulablePodsDuration,
244 parallelism: int32(parallelize.DefaultParallelism),
245
246
247
248
249 applyDefaultProfile: true,
250 }
251
252
253 func New(ctx context.Context,
254 client clientset.Interface,
255 informerFactory informers.SharedInformerFactory,
256 dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
257 recorderFactory profile.RecorderFactory,
258 opts ...Option) (*Scheduler, error) {
259
260 logger := klog.FromContext(ctx)
261 stopEverything := ctx.Done()
262
263 options := defaultSchedulerOptions
264 for _, opt := range opts {
265 opt(&options)
266 }
267
268 if options.applyDefaultProfile {
269 var versionedCfg configv1.KubeSchedulerConfiguration
270 scheme.Scheme.Default(&versionedCfg)
271 cfg := schedulerapi.KubeSchedulerConfiguration{}
272 if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
273 return nil, err
274 }
275 options.profiles = cfg.Profiles
276 }
277
278 registry := frameworkplugins.NewInTreeRegistry()
279 if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
280 return nil, err
281 }
282
283 metrics.Register()
284
285 extenders, err := buildExtenders(logger, options.extenders, options.profiles)
286 if err != nil {
287 return nil, fmt.Errorf("couldn't build extenders: %w", err)
288 }
289
290 podLister := informerFactory.Core().V1().Pods().Lister()
291 nodeLister := informerFactory.Core().V1().Nodes().Lister()
292
293 snapshot := internalcache.NewEmptySnapshot()
294 metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
295
296 profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
297 frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
298 frameworkruntime.WithClientSet(client),
299 frameworkruntime.WithKubeConfig(options.kubeConfig),
300 frameworkruntime.WithInformerFactory(informerFactory),
301 frameworkruntime.WithSnapshotSharedLister(snapshot),
302 frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
303 frameworkruntime.WithParallelism(int(options.parallelism)),
304 frameworkruntime.WithExtenders(extenders),
305 frameworkruntime.WithMetricsRecorder(metricsRecorder),
306 )
307 if err != nil {
308 return nil, fmt.Errorf("initializing profiles: %v", err)
309 }
310
311 if len(profiles) == 0 {
312 return nil, errors.New("at least one profile is required")
313 }
314
315 preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
316 queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
317 for profileName, profile := range profiles {
318 preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
319 queueingHintsPerProfile[profileName] = buildQueueingHintMap(profile.EnqueueExtensions())
320 }
321
322 podQueue := internalqueue.NewSchedulingQueue(
323 profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
324 informerFactory,
325 internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
326 internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
327 internalqueue.WithPodLister(podLister),
328 internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
329 internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
330 internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),
331 internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
332 internalqueue.WithMetricsRecorder(*metricsRecorder),
333 )
334
335 for _, fwk := range profiles {
336 fwk.SetPodNominator(podQueue)
337 }
338
339 schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)
340
341
342 debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
343 debugger.ListenForSignal(ctx)
344
345 sched := &Scheduler{
346 Cache: schedulerCache,
347 client: client,
348 nodeInfoSnapshot: snapshot,
349 percentageOfNodesToScore: options.percentageOfNodesToScore,
350 Extenders: extenders,
351 StopEverything: stopEverything,
352 SchedulingQueue: podQueue,
353 Profiles: profiles,
354 logger: logger,
355 }
356 sched.NextPod = podQueue.Pop
357 sched.applyDefaultHandlers()
358
359 if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(queueingHintsPerProfile)); err != nil {
360 return nil, fmt.Errorf("adding event handlers: %w", err)
361 }
362
363 return sched, nil
364 }
365
366
367
368 var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
369 return framework.Queue, nil
370 }
371
372 func buildQueueingHintMap(es []framework.EnqueueExtensions) internalqueue.QueueingHintMap {
373 queueingHintMap := make(internalqueue.QueueingHintMap)
374 for _, e := range es {
375 events := e.EventsToRegister()
376
377
378
379
380 if len(events) == 0 {
381 continue
382 }
383
384
385
386
387
388
389 registerNodeAdded := false
390 registerNodeTaintUpdated := false
391 for _, event := range events {
392 fn := event.QueueingHintFn
393 if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
394 fn = defaultQueueingHintFn
395 }
396
397 if event.Event.Resource == framework.Node {
398 if event.Event.ActionType&framework.Add != 0 {
399 registerNodeAdded = true
400 }
401 if event.Event.ActionType&framework.UpdateNodeTaint != 0 {
402 registerNodeTaintUpdated = true
403 }
404 }
405
406 queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{
407 PluginName: e.Name(),
408 QueueingHintFn: fn,
409 })
410 }
411 if registerNodeAdded && !registerNodeTaintUpdated {
412
413
414
415
416
417
418
419
420
421
422 queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}] =
423 append(queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}],
424 &internalqueue.QueueingHintFunction{
425 PluginName: e.Name(),
426 QueueingHintFn: defaultQueueingHintFn,
427 },
428 )
429 }
430 }
431 return queueingHintMap
432 }
433
434
435 func (sched *Scheduler) Run(ctx context.Context) {
436 logger := klog.FromContext(ctx)
437 sched.SchedulingQueue.Run(logger)
438
439
440
441
442
443
444
445 go wait.UntilWithContext(ctx, sched.ScheduleOne, 0)
446
447 <-ctx.Done()
448 sched.SchedulingQueue.Close()
449
450
451 err := sched.Profiles.Close()
452 if err != nil {
453 logger.Error(err, "Failed to close plugins")
454 }
455 }
456
457
458
459 func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {
460 informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)
461 informerFactory.InformerFor(&v1.Pod{}, newPodInformer)
462 return informerFactory
463 }
464
465 func buildExtenders(logger klog.Logger, extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) {
466 var fExtenders []framework.Extender
467 if len(extenders) == 0 {
468 return nil, nil
469 }
470
471 var ignoredExtendedResources []string
472 var ignorableExtenders []framework.Extender
473 for i := range extenders {
474 logger.V(2).Info("Creating extender", "extender", extenders[i])
475 extender, err := NewHTTPExtender(&extenders[i])
476 if err != nil {
477 return nil, err
478 }
479 if !extender.IsIgnorable() {
480 fExtenders = append(fExtenders, extender)
481 } else {
482 ignorableExtenders = append(ignorableExtenders, extender)
483 }
484 for _, r := range extenders[i].ManagedResources {
485 if r.IgnoredByScheduler {
486 ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
487 }
488 }
489 }
490
491 fExtenders = append(fExtenders, ignorableExtenders...)
492
493
494
495
496 if len(ignoredExtendedResources) == 0 {
497 return fExtenders, nil
498 }
499
500 for i := range profiles {
501 prof := &profiles[i]
502 var found = false
503 for k := range prof.PluginConfig {
504 if prof.PluginConfig[k].Name == noderesources.Name {
505
506 pc := &prof.PluginConfig[k]
507 args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
508 if !ok {
509 return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
510 }
511 args.IgnoredResources = ignoredExtendedResources
512 found = true
513 break
514 }
515 }
516 if !found {
517 return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
518 }
519 }
520 return fExtenders, nil
521 }
522
523 type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time)
524
525 func unionedGVKs(queueingHintsPerProfile internalqueue.QueueingHintMapPerProfile) map[framework.GVK]framework.ActionType {
526 gvkMap := make(map[framework.GVK]framework.ActionType)
527 for _, queueingHints := range queueingHintsPerProfile {
528 for evt := range queueingHints {
529 if _, ok := gvkMap[evt.Resource]; ok {
530 gvkMap[evt.Resource] |= evt.ActionType
531 } else {
532 gvkMap[evt.Resource] = evt.ActionType
533 }
534 }
535 }
536 return gvkMap
537 }
538
539
540
541 func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
542 selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed)
543 tweakListOptions := func(options *metav1.ListOptions) {
544 options.FieldSelector = selector
545 }
546 informer := coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, cache.Indexers{}, tweakListOptions)
547
548
549
550 trim := func(obj interface{}) (interface{}, error) {
551 if accessor, err := meta.Accessor(obj); err == nil {
552 if accessor.GetManagedFields() != nil {
553 accessor.SetManagedFields(nil)
554 }
555 }
556 return obj, nil
557 }
558 informer.SetTransform(trim)
559 return informer
560 }
561
View as plain text