...
1
16
17
18 package profile
19
20 import (
21 "context"
22 "errors"
23 "fmt"
24
25 "github.com/google/go-cmp/cmp"
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/client-go/kubernetes/scheme"
28 "k8s.io/client-go/tools/events"
29 "k8s.io/kubernetes/pkg/scheduler/apis/config"
30 "k8s.io/kubernetes/pkg/scheduler/framework"
31 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
32 )
33
34
35 type RecorderFactory func(string) events.EventRecorder
36
37
38 func newProfile(ctx context.Context, cfg config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
39 opts ...frameworkruntime.Option) (framework.Framework, error) {
40 recorder := recorderFact(cfg.SchedulerName)
41 opts = append(opts, frameworkruntime.WithEventRecorder(recorder))
42 return frameworkruntime.NewFramework(ctx, r, &cfg, opts...)
43 }
44
45
46 type Map map[string]framework.Framework
47
48
49 func NewMap(ctx context.Context, cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
50 opts ...frameworkruntime.Option) (Map, error) {
51 m := make(Map)
52 v := cfgValidator{m: m}
53
54 for _, cfg := range cfgs {
55 p, err := newProfile(ctx, cfg, r, recorderFact, opts...)
56 if err != nil {
57 return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
58 }
59 if err := v.validate(cfg, p); err != nil {
60 return nil, err
61 }
62 m[cfg.SchedulerName] = p
63 }
64 return m, nil
65 }
66
67
68 func (m Map) HandlesSchedulerName(name string) bool {
69 _, ok := m[name]
70 return ok
71 }
72
73
74 func (m Map) Close() error {
75 var errs []error
76 for name, f := range m {
77 err := f.Close()
78 if err != nil {
79 errs = append(errs, fmt.Errorf("framework %s failed to close: %w", name, err))
80 }
81 }
82 return errors.Join(errs...)
83 }
84
85
86 func NewRecorderFactory(b events.EventBroadcaster) RecorderFactory {
87 return func(name string) events.EventRecorder {
88 return b.NewRecorder(scheme.Scheme, name)
89 }
90 }
91
92 type cfgValidator struct {
93 m Map
94 queueSort string
95 queueSortArgs runtime.Object
96 }
97
98 func (v *cfgValidator) validate(cfg config.KubeSchedulerProfile, f framework.Framework) error {
99 if len(f.ProfileName()) == 0 {
100 return errors.New("scheduler name is needed")
101 }
102 if cfg.Plugins == nil {
103 return fmt.Errorf("plugins required for profile with scheduler name %q", f.ProfileName())
104 }
105 if v.m[f.ProfileName()] != nil {
106 return fmt.Errorf("duplicate profile with scheduler name %q", f.ProfileName())
107 }
108
109 queueSort := f.ListPlugins().QueueSort.Enabled[0].Name
110 var queueSortArgs runtime.Object
111 for _, plCfg := range cfg.PluginConfig {
112 if plCfg.Name == queueSort {
113 queueSortArgs = plCfg.Args
114 break
115 }
116 }
117 if len(v.queueSort) == 0 {
118 v.queueSort = queueSort
119 v.queueSortArgs = queueSortArgs
120 return nil
121 }
122 if v.queueSort != queueSort {
123 return fmt.Errorf("different queue sort plugins for profile %q: %q, first: %q", cfg.SchedulerName, queueSort, v.queueSort)
124 }
125 if !cmp.Equal(v.queueSortArgs, queueSortArgs) {
126 return fmt.Errorf("different queue sort plugin args for profile %q", cfg.SchedulerName)
127 }
128 return nil
129 }
130
View as plain text