1
16
17 package runtime
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "reflect"
25 "sort"
26 "time"
27
28 v1 "k8s.io/api/core/v1"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/types"
31 "k8s.io/apimachinery/pkg/util/sets"
32 "k8s.io/client-go/informers"
33 clientset "k8s.io/client-go/kubernetes"
34 restclient "k8s.io/client-go/rest"
35 "k8s.io/client-go/tools/events"
36 "k8s.io/component-helpers/scheduling/corev1"
37 "k8s.io/klog/v2"
38 "k8s.io/kubernetes/pkg/scheduler/apis/config"
39 "k8s.io/kubernetes/pkg/scheduler/framework"
40 "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
41 "k8s.io/kubernetes/pkg/scheduler/metrics"
42 "k8s.io/kubernetes/pkg/util/slice"
43 )
44
45 const (
46
47 maxTimeout = 15 * time.Minute
48 )
49
50
51
52 type frameworkImpl struct {
53 registry Registry
54 snapshotSharedLister framework.SharedLister
55 waitingPods *waitingPodsMap
56 scorePluginWeight map[string]int
57 preEnqueuePlugins []framework.PreEnqueuePlugin
58 enqueueExtensions []framework.EnqueueExtensions
59 queueSortPlugins []framework.QueueSortPlugin
60 preFilterPlugins []framework.PreFilterPlugin
61 filterPlugins []framework.FilterPlugin
62 postFilterPlugins []framework.PostFilterPlugin
63 preScorePlugins []framework.PreScorePlugin
64 scorePlugins []framework.ScorePlugin
65 reservePlugins []framework.ReservePlugin
66 preBindPlugins []framework.PreBindPlugin
67 bindPlugins []framework.BindPlugin
68 postBindPlugins []framework.PostBindPlugin
69 permitPlugins []framework.PermitPlugin
70
71
72 pluginsMap map[string]framework.Plugin
73
74 clientSet clientset.Interface
75 kubeConfig *restclient.Config
76 eventRecorder events.EventRecorder
77 informerFactory informers.SharedInformerFactory
78 logger klog.Logger
79
80 metricsRecorder *metrics.MetricAsyncRecorder
81 profileName string
82 percentageOfNodesToScore *int32
83
84 extenders []framework.Extender
85 framework.PodNominator
86
87 parallelizer parallelize.Parallelizer
88 }
89
90
91
92
93 type extensionPoint struct {
94
95 plugins *config.PluginSet
96
97
98 slicePtr interface{}
99 }
100
101 func (f *frameworkImpl) getExtensionPoints(plugins *config.Plugins) []extensionPoint {
102 return []extensionPoint{
103 {&plugins.PreFilter, &f.preFilterPlugins},
104 {&plugins.Filter, &f.filterPlugins},
105 {&plugins.PostFilter, &f.postFilterPlugins},
106 {&plugins.Reserve, &f.reservePlugins},
107 {&plugins.PreScore, &f.preScorePlugins},
108 {&plugins.Score, &f.scorePlugins},
109 {&plugins.PreBind, &f.preBindPlugins},
110 {&plugins.Bind, &f.bindPlugins},
111 {&plugins.PostBind, &f.postBindPlugins},
112 {&plugins.Permit, &f.permitPlugins},
113 {&plugins.PreEnqueue, &f.preEnqueuePlugins},
114 {&plugins.QueueSort, &f.queueSortPlugins},
115 }
116 }
117
118
119 func (f *frameworkImpl) Extenders() []framework.Extender {
120 return f.extenders
121 }
122
123 type frameworkOptions struct {
124 componentConfigVersion string
125 clientSet clientset.Interface
126 kubeConfig *restclient.Config
127 eventRecorder events.EventRecorder
128 informerFactory informers.SharedInformerFactory
129 snapshotSharedLister framework.SharedLister
130 metricsRecorder *metrics.MetricAsyncRecorder
131 podNominator framework.PodNominator
132 extenders []framework.Extender
133 captureProfile CaptureProfile
134 parallelizer parallelize.Parallelizer
135 logger *klog.Logger
136 }
137
138
139 type Option func(*frameworkOptions)
140
141
142
143
144
145 func WithComponentConfigVersion(componentConfigVersion string) Option {
146 return func(o *frameworkOptions) {
147 o.componentConfigVersion = componentConfigVersion
148 }
149 }
150
151
152 func WithClientSet(clientSet clientset.Interface) Option {
153 return func(o *frameworkOptions) {
154 o.clientSet = clientSet
155 }
156 }
157
158
159 func WithKubeConfig(kubeConfig *restclient.Config) Option {
160 return func(o *frameworkOptions) {
161 o.kubeConfig = kubeConfig
162 }
163 }
164
165
166 func WithEventRecorder(recorder events.EventRecorder) Option {
167 return func(o *frameworkOptions) {
168 o.eventRecorder = recorder
169 }
170 }
171
172
173 func WithInformerFactory(informerFactory informers.SharedInformerFactory) Option {
174 return func(o *frameworkOptions) {
175 o.informerFactory = informerFactory
176 }
177 }
178
179
180 func WithSnapshotSharedLister(snapshotSharedLister framework.SharedLister) Option {
181 return func(o *frameworkOptions) {
182 o.snapshotSharedLister = snapshotSharedLister
183 }
184 }
185
186
187 func WithPodNominator(nominator framework.PodNominator) Option {
188 return func(o *frameworkOptions) {
189 o.podNominator = nominator
190 }
191 }
192
193
194 func WithExtenders(extenders []framework.Extender) Option {
195 return func(o *frameworkOptions) {
196 o.extenders = extenders
197 }
198 }
199
200
201 func WithParallelism(parallelism int) Option {
202 return func(o *frameworkOptions) {
203 o.parallelizer = parallelize.NewParallelizer(parallelism)
204 }
205 }
206
207
208 type CaptureProfile func(config.KubeSchedulerProfile)
209
210
211 func WithCaptureProfile(c CaptureProfile) Option {
212 return func(o *frameworkOptions) {
213 o.captureProfile = c
214 }
215 }
216
217
218 func WithMetricsRecorder(r *metrics.MetricAsyncRecorder) Option {
219 return func(o *frameworkOptions) {
220 o.metricsRecorder = r
221 }
222 }
223
224
225 func WithLogger(logger klog.Logger) Option {
226 return func(o *frameworkOptions) {
227 o.logger = &logger
228 }
229 }
230
231
232 func defaultFrameworkOptions(stopCh <-chan struct{}) frameworkOptions {
233 return frameworkOptions{
234 metricsRecorder: metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh),
235 parallelizer: parallelize.NewParallelizer(parallelize.DefaultParallelism),
236 }
237 }
238
239 var _ framework.Framework = &frameworkImpl{}
240
241
242 func NewFramework(ctx context.Context, r Registry, profile *config.KubeSchedulerProfile, opts ...Option) (framework.Framework, error) {
243 options := defaultFrameworkOptions(ctx.Done())
244 for _, opt := range opts {
245 opt(&options)
246 }
247
248 logger := klog.FromContext(ctx)
249 if options.logger != nil {
250 logger = *options.logger
251 }
252
253 f := &frameworkImpl{
254 registry: r,
255 snapshotSharedLister: options.snapshotSharedLister,
256 scorePluginWeight: make(map[string]int),
257 waitingPods: newWaitingPodsMap(),
258 clientSet: options.clientSet,
259 kubeConfig: options.kubeConfig,
260 eventRecorder: options.eventRecorder,
261 informerFactory: options.informerFactory,
262 metricsRecorder: options.metricsRecorder,
263 extenders: options.extenders,
264 PodNominator: options.podNominator,
265 parallelizer: options.parallelizer,
266 logger: logger,
267 }
268
269 if len(f.extenders) > 0 {
270
271
272
273
274 f.enqueueExtensions = []framework.EnqueueExtensions{&defaultEnqueueExtension{pluginName: framework.ExtenderName}}
275 }
276
277 if profile == nil {
278 return f, nil
279 }
280
281 f.profileName = profile.SchedulerName
282 f.percentageOfNodesToScore = profile.PercentageOfNodesToScore
283 if profile.Plugins == nil {
284 return f, nil
285 }
286
287
288 pg := f.pluginsNeeded(profile.Plugins)
289
290 pluginConfig := make(map[string]runtime.Object, len(profile.PluginConfig))
291 for i := range profile.PluginConfig {
292 name := profile.PluginConfig[i].Name
293 if _, ok := pluginConfig[name]; ok {
294 return nil, fmt.Errorf("repeated config for plugin %s", name)
295 }
296 pluginConfig[name] = profile.PluginConfig[i].Args
297 }
298 outputProfile := config.KubeSchedulerProfile{
299 SchedulerName: f.profileName,
300 PercentageOfNodesToScore: f.percentageOfNodesToScore,
301 Plugins: profile.Plugins,
302 PluginConfig: make([]config.PluginConfig, 0, len(pg)),
303 }
304
305 f.pluginsMap = make(map[string]framework.Plugin)
306 for name, factory := range r {
307
308 if !pg.Has(name) {
309 continue
310 }
311
312 args := pluginConfig[name]
313 if args != nil {
314 outputProfile.PluginConfig = append(outputProfile.PluginConfig, config.PluginConfig{
315 Name: name,
316 Args: args,
317 })
318 }
319 p, err := factory(ctx, args, f)
320 if err != nil {
321 return nil, fmt.Errorf("initializing plugin %q: %w", name, err)
322 }
323 f.pluginsMap[name] = p
324
325 f.fillEnqueueExtensions(p)
326 }
327
328
329 for _, e := range f.getExtensionPoints(profile.Plugins) {
330 if err := updatePluginList(e.slicePtr, *e.plugins, f.pluginsMap); err != nil {
331 return nil, err
332 }
333 }
334
335
336 if len(profile.Plugins.MultiPoint.Enabled) > 0 {
337 if err := f.expandMultiPointPlugins(logger, profile); err != nil {
338 return nil, err
339 }
340 }
341
342 if len(f.queueSortPlugins) != 1 {
343 return nil, fmt.Errorf("only one queue sort plugin required for profile with scheduler name %q, but got %d", profile.SchedulerName, len(f.queueSortPlugins))
344 }
345 if len(f.bindPlugins) == 0 {
346 return nil, fmt.Errorf("at least one bind plugin is needed for profile with scheduler name %q", profile.SchedulerName)
347 }
348
349 if err := getScoreWeights(f, append(profile.Plugins.Score.Enabled, profile.Plugins.MultiPoint.Enabled...)); err != nil {
350 return nil, err
351 }
352
353
354
355 for _, scorePlugin := range f.scorePlugins {
356 if f.scorePluginWeight[scorePlugin.Name()] == 0 {
357 return nil, fmt.Errorf("score plugin %q is not configured with weight", scorePlugin.Name())
358 }
359 }
360
361 if options.captureProfile != nil {
362 if len(outputProfile.PluginConfig) != 0 {
363 sort.Slice(outputProfile.PluginConfig, func(i, j int) bool {
364 return outputProfile.PluginConfig[i].Name < outputProfile.PluginConfig[j].Name
365 })
366 } else {
367 outputProfile.PluginConfig = nil
368 }
369 options.captureProfile(outputProfile)
370 }
371
372
373 logger.V(2).Info("the scheduler starts to work with those plugins", "Plugins", *f.ListPlugins())
374 f.setInstrumentedPlugins()
375 return f, nil
376 }
377
378
379 func (f *frameworkImpl) setInstrumentedPlugins() {
380
381 for i, pl := range f.preFilterPlugins {
382 f.preFilterPlugins[i] = &instrumentedPreFilterPlugin{
383 PreFilterPlugin: f.preFilterPlugins[i],
384 metric: metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.PreFilter, f.profileName),
385 }
386 }
387 for i, pl := range f.filterPlugins {
388 f.filterPlugins[i] = &instrumentedFilterPlugin{
389 FilterPlugin: f.filterPlugins[i],
390 metric: metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.Filter, f.profileName),
391 }
392 }
393
394
395 for i, pl := range f.preScorePlugins {
396 f.preScorePlugins[i] = &instrumentedPreScorePlugin{
397 PreScorePlugin: f.preScorePlugins[i],
398 metric: metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.PreScore, f.profileName),
399 }
400 }
401 for i, pl := range f.scorePlugins {
402 f.scorePlugins[i] = &instrumentedScorePlugin{
403 ScorePlugin: f.scorePlugins[i],
404 metric: metrics.PluginEvaluationTotal.WithLabelValues(pl.Name(), metrics.Score, f.profileName),
405 }
406 }
407 }
408
409 func (f *frameworkImpl) SetPodNominator(n framework.PodNominator) {
410 f.PodNominator = n
411 }
412
413
414 func (f *frameworkImpl) Close() error {
415 var errs []error
416 for name, plugin := range f.pluginsMap {
417 if closer, ok := plugin.(io.Closer); ok {
418 err := closer.Close()
419 if err != nil {
420 errs = append(errs, fmt.Errorf("%s failed to close: %w", name, err))
421
422 }
423 }
424 }
425 return errors.Join(errs...)
426 }
427
428
429
430 func getScoreWeights(f *frameworkImpl, plugins []config.Plugin) error {
431 var totalPriority int64
432 scorePlugins := reflect.ValueOf(&f.scorePlugins).Elem()
433 pluginType := scorePlugins.Type().Elem()
434 for _, e := range plugins {
435 pg := f.pluginsMap[e.Name]
436 if !reflect.TypeOf(pg).Implements(pluginType) {
437 continue
438 }
439
440
441
442 if _, ok := f.scorePluginWeight[e.Name]; ok {
443 continue
444 }
445
446
447 f.scorePluginWeight[e.Name] = int(e.Weight)
448 if f.scorePluginWeight[e.Name] == 0 {
449 f.scorePluginWeight[e.Name] = 1
450 }
451
452
453 if int64(f.scorePluginWeight[e.Name])*framework.MaxNodeScore > framework.MaxTotalScore-totalPriority {
454 return fmt.Errorf("total score of Score plugins could overflow")
455 }
456 totalPriority += int64(f.scorePluginWeight[e.Name]) * framework.MaxNodeScore
457 }
458 return nil
459 }
460
461 type orderedSet struct {
462 set map[string]int
463 list []string
464 deletionCnt int
465 }
466
467 func newOrderedSet() *orderedSet {
468 return &orderedSet{set: make(map[string]int)}
469 }
470
471 func (os *orderedSet) insert(s string) {
472 if os.has(s) {
473 return
474 }
475 os.set[s] = len(os.list)
476 os.list = append(os.list, s)
477 }
478
479 func (os *orderedSet) has(s string) bool {
480 _, found := os.set[s]
481 return found
482 }
483
484 func (os *orderedSet) delete(s string) {
485 if i, found := os.set[s]; found {
486 delete(os.set, s)
487 os.list = append(os.list[:i-os.deletionCnt], os.list[i+1-os.deletionCnt:]...)
488 os.deletionCnt++
489 }
490 }
491
492 func (f *frameworkImpl) expandMultiPointPlugins(logger klog.Logger, profile *config.KubeSchedulerProfile) error {
493
494 for _, e := range f.getExtensionPoints(profile.Plugins) {
495 plugins := reflect.ValueOf(e.slicePtr).Elem()
496 pluginType := plugins.Type().Elem()
497
498
499 enabledSet := newOrderedSet()
500 for _, plugin := range e.plugins.Enabled {
501 enabledSet.insert(plugin.Name)
502 }
503
504 disabledSet := sets.New[string]()
505 for _, disabledPlugin := range e.plugins.Disabled {
506 disabledSet.Insert(disabledPlugin.Name)
507 }
508 if disabledSet.Has("*") {
509 logger.V(4).Info("Skipped MultiPoint expansion because all plugins are disabled for extension point", "extension", pluginType)
510 continue
511 }
512
513
514
515 multiPointEnabled := newOrderedSet()
516 overridePlugins := newOrderedSet()
517 for _, ep := range profile.Plugins.MultiPoint.Enabled {
518 pg, ok := f.pluginsMap[ep.Name]
519 if !ok {
520 return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
521 }
522
523
524 if !reflect.TypeOf(pg).Implements(pluginType) {
525 continue
526 }
527
528
529 if disabledSet.Has(ep.Name) {
530 logger.V(4).Info("Skipped disabled plugin for extension point", "plugin", ep.Name, "extension", pluginType)
531 continue
532 }
533
534
535
536
537
538 if enabledSet.has(ep.Name) {
539 overridePlugins.insert(ep.Name)
540 logger.Info("MultiPoint plugin is explicitly re-configured; overriding", "plugin", ep.Name)
541 continue
542 }
543
544
545
546 if multiPointEnabled.has(ep.Name) {
547 return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
548 }
549
550
551 multiPointEnabled.insert(ep.Name)
552 }
553
554
555
556
557
558 newPlugins := reflect.New(reflect.TypeOf(e.slicePtr).Elem()).Elem()
559
560 for _, name := range slice.CopyStrings(enabledSet.list) {
561 if overridePlugins.has(name) {
562 newPlugins = reflect.Append(newPlugins, reflect.ValueOf(f.pluginsMap[name]))
563 enabledSet.delete(name)
564 }
565 }
566
567 for _, name := range multiPointEnabled.list {
568 newPlugins = reflect.Append(newPlugins, reflect.ValueOf(f.pluginsMap[name]))
569 }
570
571 for _, name := range enabledSet.list {
572 newPlugins = reflect.Append(newPlugins, reflect.ValueOf(f.pluginsMap[name]))
573 }
574 plugins.Set(newPlugins)
575 }
576 return nil
577 }
578
579 func shouldHaveEnqueueExtensions(p framework.Plugin) bool {
580 switch p.(type) {
581
582
583 case framework.PreEnqueuePlugin, framework.PreFilterPlugin, framework.FilterPlugin, framework.ReservePlugin, framework.PermitPlugin:
584 return true
585 }
586 return false
587 }
588
589 func (f *frameworkImpl) fillEnqueueExtensions(p framework.Plugin) {
590 if !shouldHaveEnqueueExtensions(p) {
591
592 return
593 }
594
595 ext, ok := p.(framework.EnqueueExtensions)
596 if !ok {
597
598
599
600 f.enqueueExtensions = append(f.enqueueExtensions, &defaultEnqueueExtension{pluginName: p.Name()})
601 return
602 }
603
604 f.enqueueExtensions = append(f.enqueueExtensions, ext)
605 }
606
607
608 type defaultEnqueueExtension struct {
609 pluginName string
610 }
611
612 func (p *defaultEnqueueExtension) Name() string { return p.pluginName }
613 func (p *defaultEnqueueExtension) EventsToRegister() []framework.ClusterEventWithHint {
614
615
616
617
618 return framework.UnrollWildCardResource()
619 }
620
621 func updatePluginList(pluginList interface{}, pluginSet config.PluginSet, pluginsMap map[string]framework.Plugin) error {
622 plugins := reflect.ValueOf(pluginList).Elem()
623 pluginType := plugins.Type().Elem()
624 set := sets.New[string]()
625 for _, ep := range pluginSet.Enabled {
626 pg, ok := pluginsMap[ep.Name]
627 if !ok {
628 return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name)
629 }
630
631 if !reflect.TypeOf(pg).Implements(pluginType) {
632 return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name())
633 }
634
635 if set.Has(ep.Name) {
636 return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name())
637 }
638
639 set.Insert(ep.Name)
640
641 newPlugins := reflect.Append(plugins, reflect.ValueOf(pg))
642 plugins.Set(newPlugins)
643 }
644 return nil
645 }
646
647
648 func (f *frameworkImpl) PreEnqueuePlugins() []framework.PreEnqueuePlugin {
649 return f.preEnqueuePlugins
650 }
651
652
653 func (f *frameworkImpl) EnqueueExtensions() []framework.EnqueueExtensions {
654 return f.enqueueExtensions
655 }
656
657
658 func (f *frameworkImpl) QueueSortFunc() framework.LessFunc {
659 if f == nil {
660
661
662 return func(_, _ *framework.QueuedPodInfo) bool { return false }
663 }
664
665 if len(f.queueSortPlugins) == 0 {
666 panic("No QueueSort plugin is registered in the frameworkImpl.")
667 }
668
669
670 return f.queueSortPlugins[0].Less
671 }
672
673
674
675
676
677
678
679 func (f *frameworkImpl) RunPreFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (_ *framework.PreFilterResult, status *framework.Status) {
680 startTime := time.Now()
681 skipPlugins := sets.New[string]()
682 defer func() {
683 state.SkipFilterPlugins = skipPlugins
684 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
685 }()
686 var result *framework.PreFilterResult
687 var pluginsWithNodes []string
688 logger := klog.FromContext(ctx)
689 verboseLogs := logger.V(4).Enabled()
690 if verboseLogs {
691 logger = klog.LoggerWithName(logger, "PreFilter")
692 }
693 var returnStatus *framework.Status
694 for _, pl := range f.preFilterPlugins {
695 ctx := ctx
696 if verboseLogs {
697 logger := klog.LoggerWithName(logger, pl.Name())
698 ctx = klog.NewContext(ctx, logger)
699 }
700 r, s := f.runPreFilterPlugin(ctx, pl, state, pod)
701 if s.IsSkip() {
702 skipPlugins.Insert(pl.Name())
703 continue
704 }
705 if !s.IsSuccess() {
706 s.SetPlugin(pl.Name())
707 if s.Code() == framework.UnschedulableAndUnresolvable {
708
709
710 return nil, s
711 }
712 if s.Code() == framework.Unschedulable {
713
714
715
716 returnStatus = s
717 continue
718 }
719 return nil, framework.AsStatus(fmt.Errorf("running PreFilter plugin %q: %w", pl.Name(), s.AsError())).WithPlugin(pl.Name())
720 }
721 if !r.AllNodes() {
722 pluginsWithNodes = append(pluginsWithNodes, pl.Name())
723 }
724 result = result.Merge(r)
725 if !result.AllNodes() && len(result.NodeNames) == 0 {
726 msg := fmt.Sprintf("node(s) didn't satisfy plugin(s) %v simultaneously", pluginsWithNodes)
727 if len(pluginsWithNodes) == 1 {
728 msg = fmt.Sprintf("node(s) didn't satisfy plugin %v", pluginsWithNodes[0])
729 }
730
731
732 return result, framework.NewStatus(framework.UnschedulableAndUnresolvable, msg)
733 }
734 }
735 return result, returnStatus
736 }
737
738 func (f *frameworkImpl) runPreFilterPlugin(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
739 if !state.ShouldRecordPluginMetrics() {
740 return pl.PreFilter(ctx, state, pod)
741 }
742 startTime := time.Now()
743 result, status := pl.PreFilter(ctx, state, pod)
744 f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
745 return result, status
746 }
747
748
749
750
751 func (f *frameworkImpl) RunPreFilterExtensionAddPod(
752 ctx context.Context,
753 state *framework.CycleState,
754 podToSchedule *v1.Pod,
755 podInfoToAdd *framework.PodInfo,
756 nodeInfo *framework.NodeInfo,
757 ) (status *framework.Status) {
758 logger := klog.FromContext(ctx)
759 verboseLogs := logger.V(4).Enabled()
760 if verboseLogs {
761 logger = klog.LoggerWithName(logger, "PreFilterExtension")
762 }
763 for _, pl := range f.preFilterPlugins {
764 if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) {
765 continue
766 }
767 ctx := ctx
768 if verboseLogs {
769 logger := klog.LoggerWithName(logger, pl.Name())
770 ctx = klog.NewContext(ctx, logger)
771 }
772 status = f.runPreFilterExtensionAddPod(ctx, pl, state, podToSchedule, podInfoToAdd, nodeInfo)
773 if !status.IsSuccess() {
774 err := status.AsError()
775 logger.Error(err, "Plugin failed", "pod", klog.KObj(podToSchedule), "node", klog.KObj(nodeInfo.Node()), "operation", "addPod", "plugin", pl.Name())
776 return framework.AsStatus(fmt.Errorf("running AddPod on PreFilter plugin %q: %w", pl.Name(), err))
777 }
778 }
779
780 return nil
781 }
782
783 func (f *frameworkImpl) runPreFilterExtensionAddPod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
784 if !state.ShouldRecordPluginMetrics() {
785 return pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
786 }
787 startTime := time.Now()
788 status := pl.PreFilterExtensions().AddPod(ctx, state, podToSchedule, podInfoToAdd, nodeInfo)
789 f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilterExtensionAddPod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
790 return status
791 }
792
793
794
795
796 func (f *frameworkImpl) RunPreFilterExtensionRemovePod(
797 ctx context.Context,
798 state *framework.CycleState,
799 podToSchedule *v1.Pod,
800 podInfoToRemove *framework.PodInfo,
801 nodeInfo *framework.NodeInfo,
802 ) (status *framework.Status) {
803 logger := klog.FromContext(ctx)
804 verboseLogs := logger.V(4).Enabled()
805 if verboseLogs {
806 logger = klog.LoggerWithName(logger, "PreFilterExtension")
807 }
808 for _, pl := range f.preFilterPlugins {
809 if pl.PreFilterExtensions() == nil || state.SkipFilterPlugins.Has(pl.Name()) {
810 continue
811 }
812 ctx := ctx
813 if verboseLogs {
814 logger := klog.LoggerWithName(logger, pl.Name())
815 ctx = klog.NewContext(ctx, logger)
816 }
817 status = f.runPreFilterExtensionRemovePod(ctx, pl, state, podToSchedule, podInfoToRemove, nodeInfo)
818 if !status.IsSuccess() {
819 err := status.AsError()
820 logger.Error(err, "Plugin failed", "node", klog.KObj(nodeInfo.Node()), "operation", "removePod", "plugin", pl.Name(), "pod", klog.KObj(podToSchedule))
821 return framework.AsStatus(fmt.Errorf("running RemovePod on PreFilter plugin %q: %w", pl.Name(), err))
822 }
823 }
824
825 return nil
826 }
827
828 func (f *frameworkImpl) runPreFilterExtensionRemovePod(ctx context.Context, pl framework.PreFilterPlugin, state *framework.CycleState, podToSchedule *v1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
829 if !state.ShouldRecordPluginMetrics() {
830 return pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
831 }
832 startTime := time.Now()
833 status := pl.PreFilterExtensions().RemovePod(ctx, state, podToSchedule, podInfoToRemove, nodeInfo)
834 f.metricsRecorder.ObservePluginDurationAsync(metrics.PreFilterExtensionRemovePod, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
835 return status
836 }
837
838
839
840
841
842 func (f *frameworkImpl) RunFilterPlugins(
843 ctx context.Context,
844 state *framework.CycleState,
845 pod *v1.Pod,
846 nodeInfo *framework.NodeInfo,
847 ) *framework.Status {
848 logger := klog.FromContext(ctx)
849 verboseLogs := logger.V(4).Enabled()
850 if verboseLogs {
851 logger = klog.LoggerWithName(logger, "Filter")
852 }
853
854 for _, pl := range f.filterPlugins {
855 if state.SkipFilterPlugins.Has(pl.Name()) {
856 continue
857 }
858 ctx := ctx
859 if verboseLogs {
860 logger := klog.LoggerWithName(logger, pl.Name())
861 ctx = klog.NewContext(ctx, logger)
862 }
863 if status := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo); !status.IsSuccess() {
864 if !status.IsRejected() {
865
866
867 status = framework.AsStatus(fmt.Errorf("running %q filter plugin: %w", pl.Name(), status.AsError()))
868 }
869 status.SetPlugin(pl.Name())
870 return status
871 }
872 }
873
874 return nil
875 }
876
877 func (f *frameworkImpl) runFilterPlugin(ctx context.Context, pl framework.FilterPlugin, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
878 if !state.ShouldRecordPluginMetrics() {
879 return pl.Filter(ctx, state, pod, nodeInfo)
880 }
881 startTime := time.Now()
882 status := pl.Filter(ctx, state, pod, nodeInfo)
883 f.metricsRecorder.ObservePluginDurationAsync(metrics.Filter, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
884 return status
885 }
886
887
888
889 func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (_ *framework.PostFilterResult, status *framework.Status) {
890 startTime := time.Now()
891 defer func() {
892 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostFilter, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
893 }()
894
895 logger := klog.FromContext(ctx)
896 verboseLogs := logger.V(4).Enabled()
897 if verboseLogs {
898 logger = klog.LoggerWithName(logger, "PostFilter")
899 }
900
901
902 var result *framework.PostFilterResult
903 var reasons []string
904 var rejectorPlugin string
905 for _, pl := range f.postFilterPlugins {
906 ctx := ctx
907 if verboseLogs {
908 logger := klog.LoggerWithName(logger, pl.Name())
909 ctx = klog.NewContext(ctx, logger)
910 }
911 r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
912 if s.IsSuccess() {
913 return r, s
914 } else if s.Code() == framework.UnschedulableAndUnresolvable {
915 return r, s.WithPlugin(pl.Name())
916 } else if !s.IsRejected() {
917
918 return nil, framework.AsStatus(s.AsError()).WithPlugin(pl.Name())
919 } else if r != nil && r.Mode() != framework.ModeNoop {
920 result = r
921 }
922
923 reasons = append(reasons, s.Reasons()...)
924
925
926 if len(rejectorPlugin) == 0 {
927 rejectorPlugin = pl.Name()
928 }
929 }
930
931 return result, framework.NewStatus(framework.Unschedulable, reasons...).WithPlugin(rejectorPlugin)
932 }
933
934 func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
935 if !state.ShouldRecordPluginMetrics() {
936 return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
937 }
938 startTime := time.Now()
939 r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
940 f.metricsRecorder.ObservePluginDurationAsync(metrics.PostFilter, pl.Name(), s.Code().String(), metrics.SinceInSeconds(startTime))
941 return r, s
942 }
943
944
945
946
947
948
949
950
951
952
953
954 func (f *frameworkImpl) RunFilterPluginsWithNominatedPods(ctx context.Context, state *framework.CycleState, pod *v1.Pod, info *framework.NodeInfo) *framework.Status {
955 var status *framework.Status
956
957 podsAdded := false
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976 logger := klog.FromContext(ctx)
977 logger = klog.LoggerWithName(logger, "FilterWithNominatedPods")
978 ctx = klog.NewContext(ctx, logger)
979 for i := 0; i < 2; i++ {
980 stateToUse := state
981 nodeInfoToUse := info
982 if i == 0 {
983 var err error
984 podsAdded, stateToUse, nodeInfoToUse, err = addNominatedPods(ctx, f, pod, state, info)
985 if err != nil {
986 return framework.AsStatus(err)
987 }
988 } else if !podsAdded || !status.IsSuccess() {
989 break
990 }
991
992 status = f.RunFilterPlugins(ctx, stateToUse, pod, nodeInfoToUse)
993 if !status.IsSuccess() && !status.IsRejected() {
994 return status
995 }
996 }
997
998 return status
999 }
1000
1001
1002
1003
1004 func addNominatedPods(ctx context.Context, fh framework.Handle, pod *v1.Pod, state *framework.CycleState, nodeInfo *framework.NodeInfo) (bool, *framework.CycleState, *framework.NodeInfo, error) {
1005 if fh == nil {
1006
1007 return false, state, nodeInfo, nil
1008 }
1009 nominatedPodInfos := fh.NominatedPodsForNode(nodeInfo.Node().Name)
1010 if len(nominatedPodInfos) == 0 {
1011 return false, state, nodeInfo, nil
1012 }
1013 nodeInfoOut := nodeInfo.Snapshot()
1014 stateOut := state.Clone()
1015 podsAdded := false
1016 for _, pi := range nominatedPodInfos {
1017 if corev1.PodPriority(pi.Pod) >= corev1.PodPriority(pod) && pi.Pod.UID != pod.UID {
1018 nodeInfoOut.AddPodInfo(pi)
1019 status := fh.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pi, nodeInfoOut)
1020 if !status.IsSuccess() {
1021 return false, state, nodeInfo, status.AsError()
1022 }
1023 podsAdded = true
1024 }
1025 }
1026 return podsAdded, stateOut, nodeInfoOut, nil
1027 }
1028
1029
1030
1031
1032
1033 func (f *frameworkImpl) RunPreScorePlugins(
1034 ctx context.Context,
1035 state *framework.CycleState,
1036 pod *v1.Pod,
1037 nodes []*framework.NodeInfo,
1038 ) (status *framework.Status) {
1039 startTime := time.Now()
1040 skipPlugins := sets.New[string]()
1041 defer func() {
1042 state.SkipScorePlugins = skipPlugins
1043 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreScore, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
1044 }()
1045 logger := klog.FromContext(ctx)
1046 verboseLogs := logger.V(4).Enabled()
1047 if verboseLogs {
1048 logger = klog.LoggerWithName(logger, "PreScore")
1049 }
1050 for _, pl := range f.preScorePlugins {
1051 ctx := ctx
1052 if verboseLogs {
1053 logger := klog.LoggerWithName(logger, pl.Name())
1054 ctx = klog.NewContext(ctx, logger)
1055 }
1056 status = f.runPreScorePlugin(ctx, pl, state, pod, nodes)
1057 if status.IsSkip() {
1058 skipPlugins.Insert(pl.Name())
1059 continue
1060 }
1061 if !status.IsSuccess() {
1062 return framework.AsStatus(fmt.Errorf("running PreScore plugin %q: %w", pl.Name(), status.AsError()))
1063 }
1064 }
1065 return nil
1066 }
1067
1068 func (f *frameworkImpl) runPreScorePlugin(ctx context.Context, pl framework.PreScorePlugin, state *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) *framework.Status {
1069 if !state.ShouldRecordPluginMetrics() {
1070 return pl.PreScore(ctx, state, pod, nodes)
1071 }
1072 startTime := time.Now()
1073 status := pl.PreScore(ctx, state, pod, nodes)
1074 f.metricsRecorder.ObservePluginDurationAsync(metrics.PreScore, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
1075 return status
1076 }
1077
1078
1079
1080
1081
1082 func (f *frameworkImpl) RunScorePlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) (ns []framework.NodePluginScores, status *framework.Status) {
1083 startTime := time.Now()
1084 defer func() {
1085 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Score, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
1086 }()
1087 allNodePluginScores := make([]framework.NodePluginScores, len(nodes))
1088 numPlugins := len(f.scorePlugins)
1089 plugins := make([]framework.ScorePlugin, 0, numPlugins)
1090 pluginToNodeScores := make(map[string]framework.NodeScoreList, numPlugins)
1091 for _, pl := range f.scorePlugins {
1092 if state.SkipScorePlugins.Has(pl.Name()) {
1093 continue
1094 }
1095 plugins = append(plugins, pl)
1096 pluginToNodeScores[pl.Name()] = make(framework.NodeScoreList, len(nodes))
1097 }
1098 ctx, cancel := context.WithCancel(ctx)
1099 defer cancel()
1100 errCh := parallelize.NewErrorChannel()
1101
1102 if len(plugins) > 0 {
1103 logger := klog.FromContext(ctx)
1104 verboseLogs := logger.V(4).Enabled()
1105 if verboseLogs {
1106 logger = klog.LoggerWithName(logger, "Score")
1107 }
1108
1109 f.Parallelizer().Until(ctx, len(nodes), func(index int) {
1110 nodeName := nodes[index].Node().Name
1111 logger := logger
1112 if verboseLogs {
1113 logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
1114 }
1115 for _, pl := range plugins {
1116 ctx := ctx
1117 if verboseLogs {
1118 logger := klog.LoggerWithName(logger, pl.Name())
1119 ctx = klog.NewContext(ctx, logger)
1120 }
1121 s, status := f.runScorePlugin(ctx, pl, state, pod, nodeName)
1122 if !status.IsSuccess() {
1123 err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
1124 errCh.SendErrorWithCancel(err, cancel)
1125 return
1126 }
1127 pluginToNodeScores[pl.Name()][index] = framework.NodeScore{
1128 Name: nodeName,
1129 Score: s,
1130 }
1131 }
1132 }, metrics.Score)
1133 if err := errCh.ReceiveError(); err != nil {
1134 return nil, framework.AsStatus(fmt.Errorf("running Score plugins: %w", err))
1135 }
1136 }
1137
1138
1139 f.Parallelizer().Until(ctx, len(plugins), func(index int) {
1140 pl := plugins[index]
1141 if pl.ScoreExtensions() == nil {
1142 return
1143 }
1144 nodeScoreList := pluginToNodeScores[pl.Name()]
1145 status := f.runScoreExtension(ctx, pl, state, pod, nodeScoreList)
1146 if !status.IsSuccess() {
1147 err := fmt.Errorf("plugin %q failed with: %w", pl.Name(), status.AsError())
1148 errCh.SendErrorWithCancel(err, cancel)
1149 return
1150 }
1151 }, metrics.Score)
1152 if err := errCh.ReceiveError(); err != nil {
1153 return nil, framework.AsStatus(fmt.Errorf("running Normalize on Score plugins: %w", err))
1154 }
1155
1156
1157
1158 f.Parallelizer().Until(ctx, len(nodes), func(index int) {
1159 nodePluginScores := framework.NodePluginScores{
1160 Name: nodes[index].Node().Name,
1161 Scores: make([]framework.PluginScore, len(plugins)),
1162 }
1163
1164 for i, pl := range plugins {
1165 weight := f.scorePluginWeight[pl.Name()]
1166 nodeScoreList := pluginToNodeScores[pl.Name()]
1167 score := nodeScoreList[index].Score
1168
1169 if score > framework.MaxNodeScore || score < framework.MinNodeScore {
1170 err := fmt.Errorf("plugin %q returns an invalid score %v, it should in the range of [%v, %v] after normalizing", pl.Name(), score, framework.MinNodeScore, framework.MaxNodeScore)
1171 errCh.SendErrorWithCancel(err, cancel)
1172 return
1173 }
1174 weightedScore := score * int64(weight)
1175 nodePluginScores.Scores[i] = framework.PluginScore{
1176 Name: pl.Name(),
1177 Score: weightedScore,
1178 }
1179 nodePluginScores.TotalScore += weightedScore
1180 }
1181 allNodePluginScores[index] = nodePluginScores
1182 }, metrics.Score)
1183 if err := errCh.ReceiveError(); err != nil {
1184 return nil, framework.AsStatus(fmt.Errorf("applying score defaultWeights on Score plugins: %w", err))
1185 }
1186
1187 return allNodePluginScores, nil
1188 }
1189
1190 func (f *frameworkImpl) runScorePlugin(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
1191 if !state.ShouldRecordPluginMetrics() {
1192 return pl.Score(ctx, state, pod, nodeName)
1193 }
1194 startTime := time.Now()
1195 s, status := pl.Score(ctx, state, pod, nodeName)
1196 f.metricsRecorder.ObservePluginDurationAsync(metrics.Score, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
1197 return s, status
1198 }
1199
1200 func (f *frameworkImpl) runScoreExtension(ctx context.Context, pl framework.ScorePlugin, state *framework.CycleState, pod *v1.Pod, nodeScoreList framework.NodeScoreList) *framework.Status {
1201 if !state.ShouldRecordPluginMetrics() {
1202 return pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
1203 }
1204 startTime := time.Now()
1205 status := pl.ScoreExtensions().NormalizeScore(ctx, state, pod, nodeScoreList)
1206 f.metricsRecorder.ObservePluginDurationAsync(metrics.ScoreExtensionNormalize, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
1207 return status
1208 }
1209
1210
1211
1212
1213 func (f *frameworkImpl) RunPreBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
1214 startTime := time.Now()
1215 defer func() {
1216 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PreBind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
1217 }()
1218 logger := klog.FromContext(ctx)
1219 verboseLogs := logger.V(4).Enabled()
1220 if verboseLogs {
1221 logger = klog.LoggerWithName(logger, "PreBind")
1222 logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
1223 }
1224 for _, pl := range f.preBindPlugins {
1225 ctx := ctx
1226 if verboseLogs {
1227 logger := klog.LoggerWithName(logger, pl.Name())
1228 ctx = klog.NewContext(ctx, logger)
1229 }
1230 status = f.runPreBindPlugin(ctx, pl, state, pod, nodeName)
1231 if !status.IsSuccess() {
1232 if status.IsRejected() {
1233 logger.V(4).Info("Pod rejected by PreBind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
1234 status.SetPlugin(pl.Name())
1235 return status
1236 }
1237 err := status.AsError()
1238 logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
1239 return framework.AsStatus(fmt.Errorf("running PreBind plugin %q: %w", pl.Name(), err))
1240 }
1241 }
1242 return nil
1243 }
1244
1245 func (f *frameworkImpl) runPreBindPlugin(ctx context.Context, pl framework.PreBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
1246 if !state.ShouldRecordPluginMetrics() {
1247 return pl.PreBind(ctx, state, pod, nodeName)
1248 }
1249 startTime := time.Now()
1250 status := pl.PreBind(ctx, state, pod, nodeName)
1251 f.metricsRecorder.ObservePluginDurationAsync(metrics.PreBind, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
1252 return status
1253 }
1254
1255
1256 func (f *frameworkImpl) RunBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
1257 startTime := time.Now()
1258 defer func() {
1259 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Bind, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
1260 }()
1261 if len(f.bindPlugins) == 0 {
1262 return framework.NewStatus(framework.Skip, "")
1263 }
1264 logger := klog.FromContext(ctx)
1265 verboseLogs := logger.V(4).Enabled()
1266 if verboseLogs {
1267 logger = klog.LoggerWithName(logger, "Bind")
1268 }
1269 for _, pl := range f.bindPlugins {
1270 ctx := ctx
1271 if verboseLogs {
1272 logger := klog.LoggerWithName(logger, pl.Name())
1273 ctx = klog.NewContext(ctx, logger)
1274 }
1275 status = f.runBindPlugin(ctx, pl, state, pod, nodeName)
1276 if status.IsSkip() {
1277 continue
1278 }
1279 if !status.IsSuccess() {
1280 if status.IsRejected() {
1281 logger.V(4).Info("Pod rejected by Bind plugin", "pod", klog.KObj(pod), "node", nodeName, "plugin", pl.Name(), "status", status.Message())
1282 status.SetPlugin(pl.Name())
1283 return status
1284 }
1285 err := status.AsError()
1286 logger.Error(err, "Plugin Failed", "plugin", pl.Name(), "pod", klog.KObj(pod), "node", nodeName)
1287 return framework.AsStatus(fmt.Errorf("running Bind plugin %q: %w", pl.Name(), err))
1288 }
1289 return status
1290 }
1291 return status
1292 }
1293
1294 func (f *frameworkImpl) runBindPlugin(ctx context.Context, bp framework.BindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
1295 if !state.ShouldRecordPluginMetrics() {
1296 return bp.Bind(ctx, state, pod, nodeName)
1297 }
1298 startTime := time.Now()
1299 status := bp.Bind(ctx, state, pod, nodeName)
1300 f.metricsRecorder.ObservePluginDurationAsync(metrics.Bind, bp.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
1301 return status
1302 }
1303
1304
1305 func (f *frameworkImpl) RunPostBindPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
1306 startTime := time.Now()
1307 defer func() {
1308 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.PostBind, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
1309 }()
1310 logger := klog.FromContext(ctx)
1311 verboseLogs := logger.V(4).Enabled()
1312 if verboseLogs {
1313 logger = klog.LoggerWithName(logger, "PostBind")
1314 }
1315 for _, pl := range f.postBindPlugins {
1316 ctx := ctx
1317 if verboseLogs {
1318 logger := klog.LoggerWithName(logger, pl.Name())
1319 ctx = klog.NewContext(ctx, logger)
1320 }
1321 f.runPostBindPlugin(ctx, pl, state, pod, nodeName)
1322 }
1323 }
1324
1325 func (f *frameworkImpl) runPostBindPlugin(ctx context.Context, pl framework.PostBindPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) {
1326 if !state.ShouldRecordPluginMetrics() {
1327 pl.PostBind(ctx, state, pod, nodeName)
1328 return
1329 }
1330 startTime := time.Now()
1331 pl.PostBind(ctx, state, pod, nodeName)
1332 f.metricsRecorder.ObservePluginDurationAsync(metrics.PostBind, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
1333 }
1334
1335
1336
1337
1338
1339
1340 func (f *frameworkImpl) RunReservePluginsReserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
1341 startTime := time.Now()
1342 defer func() {
1343 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Reserve, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
1344 }()
1345 logger := klog.FromContext(ctx)
1346 verboseLogs := logger.V(4).Enabled()
1347 if verboseLogs {
1348 logger = klog.LoggerWithName(logger, "Reserve")
1349 logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
1350 }
1351 for _, pl := range f.reservePlugins {
1352 ctx := ctx
1353 if verboseLogs {
1354 logger := klog.LoggerWithName(logger, pl.Name())
1355 ctx = klog.NewContext(ctx, logger)
1356 }
1357 status = f.runReservePluginReserve(ctx, pl, state, pod, nodeName)
1358 if !status.IsSuccess() {
1359 if status.IsRejected() {
1360 logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
1361 status.SetPlugin(pl.Name())
1362 return status
1363 }
1364 err := status.AsError()
1365 logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod))
1366 return framework.AsStatus(fmt.Errorf("running Reserve plugin %q: %w", pl.Name(), err))
1367 }
1368 }
1369 return nil
1370 }
1371
1372 func (f *frameworkImpl) runReservePluginReserve(ctx context.Context, pl framework.ReservePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
1373 if !state.ShouldRecordPluginMetrics() {
1374 return pl.Reserve(ctx, state, pod, nodeName)
1375 }
1376 startTime := time.Now()
1377 status := pl.Reserve(ctx, state, pod, nodeName)
1378 f.metricsRecorder.ObservePluginDurationAsync(metrics.Reserve, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
1379 return status
1380 }
1381
1382
1383
1384 func (f *frameworkImpl) RunReservePluginsUnreserve(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) {
1385 startTime := time.Now()
1386 defer func() {
1387 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Unreserve, framework.Success.String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
1388 }()
1389
1390
1391 logger := klog.FromContext(ctx)
1392 verboseLogs := logger.V(4).Enabled()
1393 if verboseLogs {
1394 logger = klog.LoggerWithName(logger, "Unreserve")
1395 logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
1396 }
1397 for i := len(f.reservePlugins) - 1; i >= 0; i-- {
1398 pl := f.reservePlugins[i]
1399 ctx := ctx
1400 if verboseLogs {
1401 logger := klog.LoggerWithName(logger, pl.Name())
1402 ctx = klog.NewContext(ctx, logger)
1403 }
1404 f.runReservePluginUnreserve(ctx, pl, state, pod, nodeName)
1405 }
1406 }
1407
1408 func (f *frameworkImpl) runReservePluginUnreserve(ctx context.Context, pl framework.ReservePlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) {
1409 if !state.ShouldRecordPluginMetrics() {
1410 pl.Unreserve(ctx, state, pod, nodeName)
1411 return
1412 }
1413 startTime := time.Now()
1414 pl.Unreserve(ctx, state, pod, nodeName)
1415 f.metricsRecorder.ObservePluginDurationAsync(metrics.Unreserve, pl.Name(), framework.Success.String(), metrics.SinceInSeconds(startTime))
1416 }
1417
1418
1419
1420
1421
1422
1423
1424 func (f *frameworkImpl) RunPermitPlugins(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
1425 startTime := time.Now()
1426 defer func() {
1427 metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Permit, status.Code().String(), f.profileName).Observe(metrics.SinceInSeconds(startTime))
1428 }()
1429 pluginsWaitTime := make(map[string]time.Duration)
1430 statusCode := framework.Success
1431 logger := klog.FromContext(ctx)
1432 verboseLogs := logger.V(4).Enabled()
1433 if verboseLogs {
1434 logger = klog.LoggerWithName(logger, "Permit")
1435 logger = klog.LoggerWithValues(logger, "node", klog.ObjectRef{Name: nodeName})
1436 }
1437 for _, pl := range f.permitPlugins {
1438 ctx := ctx
1439 if verboseLogs {
1440 logger := klog.LoggerWithName(logger, pl.Name())
1441 ctx = klog.NewContext(ctx, logger)
1442 }
1443 status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName)
1444 if !status.IsSuccess() {
1445 if status.IsRejected() {
1446 logger.V(4).Info("Pod rejected by plugin", "pod", klog.KObj(pod), "plugin", pl.Name(), "status", status.Message())
1447 return status.WithPlugin(pl.Name())
1448 }
1449 if status.IsWait() {
1450
1451 if timeout > maxTimeout {
1452 timeout = maxTimeout
1453 }
1454 pluginsWaitTime[pl.Name()] = timeout
1455 statusCode = framework.Wait
1456 } else {
1457 err := status.AsError()
1458 logger.Error(err, "Plugin failed", "plugin", pl.Name(), "pod", klog.KObj(pod))
1459 return framework.AsStatus(fmt.Errorf("running Permit plugin %q: %w", pl.Name(), err)).WithPlugin(pl.Name())
1460 }
1461 }
1462 }
1463 if statusCode == framework.Wait {
1464 waitingPod := newWaitingPod(pod, pluginsWaitTime)
1465 f.waitingPods.add(waitingPod)
1466 msg := fmt.Sprintf("one or more plugins asked to wait and no plugin rejected pod %q", pod.Name)
1467 logger.V(4).Info("One or more plugins asked to wait and no plugin rejected pod", "pod", klog.KObj(pod))
1468 return framework.NewStatus(framework.Wait, msg)
1469 }
1470 return nil
1471 }
1472
1473 func (f *frameworkImpl) runPermitPlugin(ctx context.Context, pl framework.PermitPlugin, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
1474 if !state.ShouldRecordPluginMetrics() {
1475 return pl.Permit(ctx, state, pod, nodeName)
1476 }
1477 startTime := time.Now()
1478 status, timeout := pl.Permit(ctx, state, pod, nodeName)
1479 f.metricsRecorder.ObservePluginDurationAsync(metrics.Permit, pl.Name(), status.Code().String(), metrics.SinceInSeconds(startTime))
1480 return status, timeout
1481 }
1482
1483
1484 func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *framework.Status {
1485 waitingPod := f.waitingPods.get(pod.UID)
1486 if waitingPod == nil {
1487 return nil
1488 }
1489 defer f.waitingPods.remove(pod.UID)
1490
1491 logger := klog.FromContext(ctx)
1492 logger.V(4).Info("Pod waiting on permit", "pod", klog.KObj(pod))
1493
1494 startTime := time.Now()
1495 s := <-waitingPod.s
1496 metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime))
1497
1498 if !s.IsSuccess() {
1499 if s.IsRejected() {
1500 logger.V(4).Info("Pod rejected while waiting on permit", "pod", klog.KObj(pod), "status", s.Message())
1501 return s
1502 }
1503 err := s.AsError()
1504 logger.Error(err, "Failed waiting on permit for pod", "pod", klog.KObj(pod))
1505 return framework.AsStatus(fmt.Errorf("waiting on permit for pod: %w", err)).WithPlugin(s.Plugin())
1506 }
1507 return nil
1508 }
1509
1510
1511
1512
1513
1514 func (f *frameworkImpl) SnapshotSharedLister() framework.SharedLister {
1515 return f.snapshotSharedLister
1516 }
1517
1518
1519 func (f *frameworkImpl) IterateOverWaitingPods(callback func(framework.WaitingPod)) {
1520 f.waitingPods.iterate(callback)
1521 }
1522
1523
1524 func (f *frameworkImpl) GetWaitingPod(uid types.UID) framework.WaitingPod {
1525 if wp := f.waitingPods.get(uid); wp != nil {
1526 return wp
1527 }
1528 return nil
1529 }
1530
1531
1532
1533 func (f *frameworkImpl) RejectWaitingPod(uid types.UID) bool {
1534 if waitingPod := f.waitingPods.get(uid); waitingPod != nil {
1535 waitingPod.Reject("", "removed")
1536 return true
1537 }
1538 return false
1539 }
1540
1541
1542 func (f *frameworkImpl) HasFilterPlugins() bool {
1543 return len(f.filterPlugins) > 0
1544 }
1545
1546
1547 func (f *frameworkImpl) HasPostFilterPlugins() bool {
1548 return len(f.postFilterPlugins) > 0
1549 }
1550
1551
1552 func (f *frameworkImpl) HasScorePlugins() bool {
1553 return len(f.scorePlugins) > 0
1554 }
1555
1556
1557
1558 func (f *frameworkImpl) ListPlugins() *config.Plugins {
1559 m := config.Plugins{}
1560
1561 for _, e := range f.getExtensionPoints(&m) {
1562 plugins := reflect.ValueOf(e.slicePtr).Elem()
1563 extName := plugins.Type().Elem().Name()
1564 var cfgs []config.Plugin
1565 for i := 0; i < plugins.Len(); i++ {
1566 name := plugins.Index(i).Interface().(framework.Plugin).Name()
1567 p := config.Plugin{Name: name}
1568 if extName == "ScorePlugin" {
1569
1570 p.Weight = int32(f.scorePluginWeight[name])
1571 }
1572 cfgs = append(cfgs, p)
1573 }
1574 if len(cfgs) > 0 {
1575 e.plugins.Enabled = cfgs
1576 }
1577 }
1578 return &m
1579 }
1580
1581
1582 func (f *frameworkImpl) ClientSet() clientset.Interface {
1583 return f.clientSet
1584 }
1585
1586
1587 func (f *frameworkImpl) KubeConfig() *restclient.Config {
1588 return f.kubeConfig
1589 }
1590
1591
1592 func (f *frameworkImpl) EventRecorder() events.EventRecorder {
1593 return f.eventRecorder
1594 }
1595
1596
1597 func (f *frameworkImpl) SharedInformerFactory() informers.SharedInformerFactory {
1598 return f.informerFactory
1599 }
1600
1601 func (f *frameworkImpl) pluginsNeeded(plugins *config.Plugins) sets.Set[string] {
1602 pgSet := sets.Set[string]{}
1603
1604 if plugins == nil {
1605 return pgSet
1606 }
1607
1608 find := func(pgs *config.PluginSet) {
1609 for _, pg := range pgs.Enabled {
1610 pgSet.Insert(pg.Name)
1611 }
1612 }
1613
1614 for _, e := range f.getExtensionPoints(plugins) {
1615 find(e.plugins)
1616 }
1617
1618 find(&plugins.MultiPoint)
1619
1620 return pgSet
1621 }
1622
1623
1624 func (f *frameworkImpl) ProfileName() string {
1625 return f.profileName
1626 }
1627
1628
1629 func (f *frameworkImpl) PercentageOfNodesToScore() *int32 {
1630 return f.percentageOfNodesToScore
1631 }
1632
1633
1634 func (f *frameworkImpl) Parallelizer() parallelize.Parallelizer {
1635 return f.parallelizer
1636 }
1637
View as plain text