1
16
17 package scheduler
18
19 import (
20 "context"
21 "fmt"
22 "sort"
23 "strings"
24 "testing"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28 v1 "k8s.io/api/core/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/runtime"
31 utilfeature "k8s.io/apiserver/pkg/util/feature"
32 "k8s.io/client-go/informers"
33 "k8s.io/client-go/kubernetes"
34 "k8s.io/client-go/kubernetes/fake"
35 "k8s.io/client-go/kubernetes/scheme"
36 "k8s.io/client-go/tools/cache"
37 "k8s.io/client-go/tools/events"
38 featuregatetesting "k8s.io/component-base/featuregate/testing"
39 "k8s.io/klog/v2"
40 "k8s.io/klog/v2/ktesting"
41 "k8s.io/kubernetes/pkg/features"
42 schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
43 "k8s.io/kubernetes/pkg/scheduler/apis/config/testing/defaults"
44 "k8s.io/kubernetes/pkg/scheduler/framework"
45 "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
46 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
47 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
48 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
49 internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
50 internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
51 "k8s.io/kubernetes/pkg/scheduler/profile"
52 st "k8s.io/kubernetes/pkg/scheduler/testing"
53 tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
54 testingclock "k8s.io/utils/clock/testing"
55 "k8s.io/utils/ptr"
56 )
57
58 func TestSchedulerCreation(t *testing.T) {
59 invalidRegistry := map[string]frameworkruntime.PluginFactory{
60 defaultbinder.Name: defaultbinder.New,
61 }
62 validRegistry := map[string]frameworkruntime.PluginFactory{
63 "Foo": defaultbinder.New,
64 }
65 cases := []struct {
66 name string
67 opts []Option
68 wantErr string
69 wantProfiles []string
70 wantExtenders []string
71 }{
72 {
73 name: "valid out-of-tree registry",
74 opts: []Option{
75 WithFrameworkOutOfTreeRegistry(validRegistry),
76 WithProfiles(
77 schedulerapi.KubeSchedulerProfile{
78 SchedulerName: "default-scheduler",
79 Plugins: &schedulerapi.Plugins{
80 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
81 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
82 },
83 },
84 )},
85 wantProfiles: []string{"default-scheduler"},
86 },
87 {
88 name: "repeated plugin name in out-of-tree plugin",
89 opts: []Option{
90 WithFrameworkOutOfTreeRegistry(invalidRegistry),
91 WithProfiles(
92 schedulerapi.KubeSchedulerProfile{
93 SchedulerName: "default-scheduler",
94 Plugins: &schedulerapi.Plugins{
95 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
96 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
97 },
98 },
99 )},
100 wantProfiles: []string{"default-scheduler"},
101 wantErr: "a plugin named DefaultBinder already exists",
102 },
103 {
104 name: "multiple profiles",
105 opts: []Option{
106 WithProfiles(
107 schedulerapi.KubeSchedulerProfile{
108 SchedulerName: "foo",
109 Plugins: &schedulerapi.Plugins{
110 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
111 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
112 },
113 },
114 schedulerapi.KubeSchedulerProfile{
115 SchedulerName: "bar",
116 Plugins: &schedulerapi.Plugins{
117 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
118 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
119 },
120 },
121 )},
122 wantProfiles: []string{"bar", "foo"},
123 },
124 {
125 name: "Repeated profiles",
126 opts: []Option{
127 WithProfiles(
128 schedulerapi.KubeSchedulerProfile{
129 SchedulerName: "foo",
130 Plugins: &schedulerapi.Plugins{
131 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
132 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
133 },
134 },
135 schedulerapi.KubeSchedulerProfile{
136 SchedulerName: "bar",
137 Plugins: &schedulerapi.Plugins{
138 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
139 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
140 },
141 },
142 schedulerapi.KubeSchedulerProfile{
143 SchedulerName: "foo",
144 Plugins: &schedulerapi.Plugins{
145 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
146 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
147 },
148 },
149 )},
150 wantErr: "duplicate profile with scheduler name \"foo\"",
151 },
152 {
153 name: "With extenders",
154 opts: []Option{
155 WithProfiles(
156 schedulerapi.KubeSchedulerProfile{
157 SchedulerName: "default-scheduler",
158 Plugins: &schedulerapi.Plugins{
159 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
160 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
161 },
162 },
163 ),
164 WithExtenders(
165 schedulerapi.Extender{
166 URLPrefix: "http://extender.kube-system/",
167 },
168 ),
169 },
170 wantProfiles: []string{"default-scheduler"},
171 wantExtenders: []string{"http://extender.kube-system/"},
172 },
173 }
174
175 for _, tc := range cases {
176 t.Run(tc.name, func(t *testing.T) {
177 client := fake.NewSimpleClientset()
178 informerFactory := informers.NewSharedInformerFactory(client, 0)
179
180 eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
181
182 _, ctx := ktesting.NewTestContext(t)
183 ctx, cancel := context.WithCancel(ctx)
184 defer cancel()
185 s, err := New(
186 ctx,
187 client,
188 informerFactory,
189 nil,
190 profile.NewRecorderFactory(eventBroadcaster),
191 tc.opts...,
192 )
193
194
195 if len(tc.wantErr) != 0 {
196 if err == nil || !strings.Contains(err.Error(), tc.wantErr) {
197 t.Errorf("got error %q, want %q", err, tc.wantErr)
198 }
199 return
200 }
201 if err != nil {
202 t.Fatalf("Failed to create scheduler: %v", err)
203 }
204
205
206 profiles := make([]string, 0, len(s.Profiles))
207 for name := range s.Profiles {
208 profiles = append(profiles, name)
209 }
210 sort.Strings(profiles)
211 if diff := cmp.Diff(tc.wantProfiles, profiles); diff != "" {
212 t.Errorf("unexpected profiles (-want, +got):\n%s", diff)
213 }
214
215
216 if len(tc.wantExtenders) != 0 {
217
218 extenders := make([]string, 0, len(s.Extenders))
219 for _, e := range s.Extenders {
220 extenders = append(extenders, e.Name())
221 }
222 if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
223 t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
224 }
225
226
227 for _, p := range s.Profiles {
228 extenders := make([]string, 0, len(p.Extenders()))
229 for _, e := range p.Extenders() {
230 extenders = append(extenders, e.Name())
231 }
232 if diff := cmp.Diff(tc.wantExtenders, extenders); diff != "" {
233 t.Errorf("unexpected extenders (-want, +got):\n%s", diff)
234 }
235 }
236 }
237 })
238 }
239 }
240
241 func TestFailureHandler(t *testing.T) {
242 testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Obj()
243 testPodUpdated := testPod.DeepCopy()
244 testPodUpdated.Labels = map[string]string{"foo": ""}
245
246 tests := []struct {
247 name string
248 podUpdatedDuringScheduling bool
249 podDeletedDuringScheduling bool
250 expect *v1.Pod
251 }{
252 {
253 name: "pod is updated during a scheduling cycle",
254 podUpdatedDuringScheduling: true,
255 expect: testPodUpdated,
256 },
257 {
258 name: "pod is not updated during a scheduling cycle",
259 expect: testPod,
260 },
261 {
262 name: "pod is deleted during a scheduling cycle",
263 podDeletedDuringScheduling: true,
264 expect: nil,
265 },
266 }
267
268 for _, tt := range tests {
269 t.Run(tt.name, func(t *testing.T) {
270 logger, ctx := ktesting.NewTestContext(t)
271 ctx, cancel := context.WithCancel(ctx)
272 defer cancel()
273
274 client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}})
275 informerFactory := informers.NewSharedInformerFactory(client, 0)
276 podInformer := informerFactory.Core().V1().Pods()
277
278 podInformer.Informer().GetStore().Add(testPod)
279
280 queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
281 schedulerCache := internalcache.New(ctx, 30*time.Second)
282
283 if err := queue.Add(logger, testPod); err != nil {
284 t.Fatalf("Add failed: %v", err)
285 }
286
287 if _, err := queue.Pop(logger); err != nil {
288 t.Fatalf("Pop failed: %v", err)
289 }
290
291 if tt.podUpdatedDuringScheduling {
292 podInformer.Informer().GetStore().Update(testPodUpdated)
293 queue.Update(logger, testPod, testPodUpdated)
294 }
295 if tt.podDeletedDuringScheduling {
296 podInformer.Informer().GetStore().Delete(testPod)
297 queue.Delete(testPod)
298 }
299
300 s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
301 if err != nil {
302 t.Fatal(err)
303 }
304
305 testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
306 s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable), nil, time.Now())
307
308 var got *v1.Pod
309 if tt.podUpdatedDuringScheduling {
310 head, e := queue.Pop(logger)
311 if e != nil {
312 t.Fatalf("Cannot pop pod from the activeQ: %v", e)
313 }
314 got = head.Pod
315 } else {
316 got = getPodFromPriorityQueue(queue, testPod)
317 }
318
319 if diff := cmp.Diff(tt.expect, got); diff != "" {
320 t.Errorf("Unexpected pod (-want, +got): %s", diff)
321 }
322 })
323 }
324 }
325
326 func TestFailureHandler_PodAlreadyBound(t *testing.T) {
327 logger, ctx := ktesting.NewTestContext(t)
328 ctx, cancel := context.WithCancel(ctx)
329 defer cancel()
330
331 nodeFoo := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}
332 testPod := st.MakePod().Name("test-pod").Namespace(v1.NamespaceDefault).Node("foo").Obj()
333
334 client := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testPod}}, &v1.NodeList{Items: []v1.Node{nodeFoo}})
335 informerFactory := informers.NewSharedInformerFactory(client, 0)
336 podInformer := informerFactory.Core().V1().Pods()
337
338 podInformer.Informer().GetStore().Add(testPod)
339
340 queue := internalqueue.NewPriorityQueue(nil, informerFactory, internalqueue.WithClock(testingclock.NewFakeClock(time.Now())))
341 schedulerCache := internalcache.New(ctx, 30*time.Second)
342
343
344 schedulerCache.AddNode(logger, &nodeFoo)
345
346 s, fwk, err := initScheduler(ctx, schedulerCache, queue, client, informerFactory)
347 if err != nil {
348 t.Fatal(err)
349 }
350
351 testPodInfo := &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, testPod)}
352 s.FailureHandler(ctx, fwk, testPodInfo, framework.NewStatus(framework.Unschedulable).WithError(fmt.Errorf("binding rejected: timeout")), nil, time.Now())
353
354 pod := getPodFromPriorityQueue(queue, testPod)
355 if pod != nil {
356 t.Fatalf("Unexpected pod: %v should not be in PriorityQueue when the NodeName of pod is not empty", pod.Name)
357 }
358 }
359
360
361 func TestWithPercentageOfNodesToScore(t *testing.T) {
362 tests := []struct {
363 name string
364 percentageOfNodesToScoreConfig *int32
365 wantedPercentageOfNodesToScore int32
366 }{
367 {
368 name: "percentageOfNodesScore is nil",
369 percentageOfNodesToScoreConfig: nil,
370 wantedPercentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
371 },
372 {
373 name: "percentageOfNodesScore is not nil",
374 percentageOfNodesToScoreConfig: ptr.To[int32](10),
375 wantedPercentageOfNodesToScore: 10,
376 },
377 }
378
379 for _, tt := range tests {
380 t.Run(tt.name, func(t *testing.T) {
381 client := fake.NewSimpleClientset()
382 informerFactory := informers.NewSharedInformerFactory(client, 0)
383 eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
384 _, ctx := ktesting.NewTestContext(t)
385 ctx, cancel := context.WithCancel(ctx)
386 defer cancel()
387 sched, err := New(
388 ctx,
389 client,
390 informerFactory,
391 nil,
392 profile.NewRecorderFactory(eventBroadcaster),
393 WithPercentageOfNodesToScore(tt.percentageOfNodesToScoreConfig),
394 )
395 if err != nil {
396 t.Fatalf("Failed to create scheduler: %v", err)
397 }
398 if sched.percentageOfNodesToScore != tt.wantedPercentageOfNodesToScore {
399 t.Errorf("scheduler.percercentageOfNodesToScore = %v, want %v", sched.percentageOfNodesToScore, tt.wantedPercentageOfNodesToScore)
400 }
401 })
402 }
403 }
404
405
406
407 func getPodFromPriorityQueue(queue *internalqueue.PriorityQueue, pod *v1.Pod) *v1.Pod {
408 podList, _ := queue.PendingPods()
409 if len(podList) == 0 {
410 return nil
411 }
412
413 queryPodKey, err := cache.MetaNamespaceKeyFunc(pod)
414 if err != nil {
415 return nil
416 }
417
418 for _, foundPod := range podList {
419 foundPodKey, err := cache.MetaNamespaceKeyFunc(foundPod)
420 if err != nil {
421 return nil
422 }
423
424 if foundPodKey == queryPodKey {
425 return foundPod
426 }
427 }
428
429 return nil
430 }
431
432 func initScheduler(ctx context.Context, cache internalcache.Cache, queue internalqueue.SchedulingQueue,
433 client kubernetes.Interface, informerFactory informers.SharedInformerFactory) (*Scheduler, framework.Framework, error) {
434 logger := klog.FromContext(ctx)
435 registerPluginFuncs := []tf.RegisterPluginFunc{
436 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
437 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
438 }
439 eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
440 fwk, err := tf.NewFramework(ctx,
441 registerPluginFuncs,
442 testSchedulerName,
443 frameworkruntime.WithClientSet(client),
444 frameworkruntime.WithInformerFactory(informerFactory),
445 frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)),
446 )
447 if err != nil {
448 return nil, nil, err
449 }
450
451 s := &Scheduler{
452 Cache: cache,
453 client: client,
454 StopEverything: ctx.Done(),
455 SchedulingQueue: queue,
456 Profiles: profile.Map{testSchedulerName: fwk},
457 logger: logger,
458 }
459 s.applyDefaultHandlers()
460
461 return s, fwk, nil
462 }
463
464 func TestInitPluginsWithIndexers(t *testing.T) {
465 tests := []struct {
466 name string
467
468 entrypoints map[string]frameworkruntime.PluginFactory
469 wantErr string
470 }{
471 {
472 name: "register indexer, no conflicts",
473 entrypoints: map[string]frameworkruntime.PluginFactory{
474 "AddIndexer": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
475 podInformer := handle.SharedInformerFactory().Core().V1().Pods()
476 err := podInformer.Informer().AddIndexers(cache.Indexers{
477 "nodeName": indexByPodSpecNodeName,
478 })
479 return &TestPlugin{name: "AddIndexer"}, err
480 },
481 },
482 },
483 {
484 name: "register the same indexer name multiple times, conflict",
485
486 entrypoints: map[string]frameworkruntime.PluginFactory{
487 "AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
488 podInformer := handle.SharedInformerFactory().Core().V1().Pods()
489 err := podInformer.Informer().AddIndexers(cache.Indexers{
490 "nodeName": indexByPodSpecNodeName,
491 })
492 return &TestPlugin{name: "AddIndexer1"}, err
493 },
494 "AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
495 podInformer := handle.SharedInformerFactory().Core().V1().Pods()
496 err := podInformer.Informer().AddIndexers(cache.Indexers{
497 "nodeName": indexByPodAnnotationNodeName,
498 })
499 return &TestPlugin{name: "AddIndexer1"}, err
500 },
501 },
502 wantErr: "indexer conflict",
503 },
504 {
505 name: "register the same indexer body with different names, no conflicts",
506
507 entrypoints: map[string]frameworkruntime.PluginFactory{
508 "AddIndexer1": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
509 podInformer := handle.SharedInformerFactory().Core().V1().Pods()
510 err := podInformer.Informer().AddIndexers(cache.Indexers{
511 "nodeName1": indexByPodSpecNodeName,
512 })
513 return &TestPlugin{name: "AddIndexer1"}, err
514 },
515 "AddIndexer2": func(ctx context.Context, obj runtime.Object, handle framework.Handle) (framework.Plugin, error) {
516 podInformer := handle.SharedInformerFactory().Core().V1().Pods()
517 err := podInformer.Informer().AddIndexers(cache.Indexers{
518 "nodeName2": indexByPodAnnotationNodeName,
519 })
520 return &TestPlugin{name: "AddIndexer2"}, err
521 },
522 },
523 },
524 }
525
526 for _, tt := range tests {
527 t.Run(tt.name, func(t *testing.T) {
528 fakeInformerFactory := NewInformerFactory(&fake.Clientset{}, 0*time.Second)
529
530 var registerPluginFuncs []tf.RegisterPluginFunc
531 for name, entrypoint := range tt.entrypoints {
532 registerPluginFuncs = append(registerPluginFuncs,
533
534 tf.RegisterFilterPlugin(name, entrypoint),
535 )
536 }
537
538 registerPluginFuncs = append(registerPluginFuncs,
539 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
540 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
541 )
542 _, ctx := ktesting.NewTestContext(t)
543 ctx, cancel := context.WithCancel(ctx)
544 defer cancel()
545 _, err := tf.NewFramework(ctx, registerPluginFuncs, "test", frameworkruntime.WithInformerFactory(fakeInformerFactory))
546
547 if len(tt.wantErr) > 0 {
548 if err == nil || !strings.Contains(err.Error(), tt.wantErr) {
549 t.Errorf("got error %q, want %q", err, tt.wantErr)
550 }
551 return
552 }
553 if err != nil {
554 t.Fatalf("Failed to create scheduler: %v", err)
555 }
556 })
557 }
558 }
559
560 func indexByPodSpecNodeName(obj interface{}) ([]string, error) {
561 pod, ok := obj.(*v1.Pod)
562 if !ok {
563 return []string{}, nil
564 }
565 if len(pod.Spec.NodeName) == 0 {
566 return []string{}, nil
567 }
568 return []string{pod.Spec.NodeName}, nil
569 }
570
571 func indexByPodAnnotationNodeName(obj interface{}) ([]string, error) {
572 pod, ok := obj.(*v1.Pod)
573 if !ok {
574 return []string{}, nil
575 }
576 if len(pod.Annotations) == 0 {
577 return []string{}, nil
578 }
579 nodeName, ok := pod.Annotations["node-name"]
580 if !ok {
581 return []string{}, nil
582 }
583 return []string{nodeName}, nil
584 }
585
586 const (
587 filterWithoutEnqueueExtensions = "filterWithoutEnqueueExtensions"
588 fakeNode = "fakeNode"
589 fakePod = "fakePod"
590 emptyEventsToRegister = "emptyEventsToRegister"
591 queueSort = "no-op-queue-sort-plugin"
592 fakeBind = "bind-plugin"
593 emptyEventExtensions = "emptyEventExtensions"
594 )
595
596 func Test_buildQueueingHintMap(t *testing.T) {
597 tests := []struct {
598 name string
599 plugins []framework.Plugin
600 want map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction
601 featuregateDisabled bool
602 }{
603 {
604 name: "filter without EnqueueExtensions plugin",
605 plugins: []framework.Plugin{&filterWithoutEnqueueExtensionsPlugin{}},
606 want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
607 {Resource: framework.Pod, ActionType: framework.All}: {
608 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
609 },
610 {Resource: framework.Node, ActionType: framework.All}: {
611 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
612 },
613 {Resource: framework.CSINode, ActionType: framework.All}: {
614 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
615 },
616 {Resource: framework.CSIDriver, ActionType: framework.All}: {
617 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
618 },
619 {Resource: framework.CSIStorageCapacity, ActionType: framework.All}: {
620 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
621 },
622 {Resource: framework.PersistentVolume, ActionType: framework.All}: {
623 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
624 },
625 {Resource: framework.StorageClass, ActionType: framework.All}: {
626 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
627 },
628 {Resource: framework.PersistentVolumeClaim, ActionType: framework.All}: {
629 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
630 },
631 {Resource: framework.PodSchedulingContext, ActionType: framework.All}: {
632 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
633 },
634 {Resource: framework.ResourceClaim, ActionType: framework.All}: {
635 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
636 },
637 {Resource: framework.ResourceClass, ActionType: framework.All}: {
638 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
639 },
640 {Resource: framework.ResourceClaimParameters, ActionType: framework.All}: {
641 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
642 },
643 {Resource: framework.ResourceClassParameters, ActionType: framework.All}: {
644 {PluginName: filterWithoutEnqueueExtensions, QueueingHintFn: defaultQueueingHintFn},
645 },
646 },
647 },
648 {
649 name: "node and pod plugin",
650 plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
651 want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
652 {Resource: framework.Pod, ActionType: framework.Add}: {
653 {PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
654 },
655 {Resource: framework.Node, ActionType: framework.Add}: {
656 {PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
657 },
658 {Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
659 {PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn},
660 },
661 },
662 },
663 {
664 name: "node and pod plugin (featuregate is disabled)",
665 plugins: []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}},
666 featuregateDisabled: true,
667 want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
668 {Resource: framework.Pod, ActionType: framework.Add}: {
669 {PluginName: fakePod, QueueingHintFn: defaultQueueingHintFn},
670 },
671 {Resource: framework.Node, ActionType: framework.Add}: {
672 {PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn},
673 },
674 {Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
675 {PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn},
676 },
677 },
678 },
679 {
680 name: "register plugin with empty event",
681 plugins: []framework.Plugin{&emptyEventPlugin{}},
682 want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{},
683 },
684 {
685 name: "register plugins including emptyEventPlugin",
686 plugins: []framework.Plugin{&emptyEventPlugin{}, &fakeNodePlugin{}},
687 want: map[framework.ClusterEvent][]*internalqueue.QueueingHintFunction{
688 {Resource: framework.Pod, ActionType: framework.Add}: {
689 {PluginName: fakePod, QueueingHintFn: fakePodPluginQueueingFn},
690 },
691 {Resource: framework.Node, ActionType: framework.Add}: {
692 {PluginName: fakeNode, QueueingHintFn: fakeNodePluginQueueingFn},
693 },
694 {Resource: framework.Node, ActionType: framework.UpdateNodeTaint}: {
695 {PluginName: fakeNode, QueueingHintFn: defaultQueueingHintFn},
696 },
697 },
698 },
699 }
700
701 for _, tt := range tests {
702 t.Run(tt.name, func(t *testing.T) {
703 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, !tt.featuregateDisabled)()
704 logger, ctx := ktesting.NewTestContext(t)
705 ctx, cancel := context.WithCancel(ctx)
706 defer cancel()
707 registry := frameworkruntime.Registry{}
708 cfgPls := &schedulerapi.Plugins{}
709 plugins := append(tt.plugins, &fakebindPlugin{}, &fakeQueueSortPlugin{})
710 for _, pl := range plugins {
711 tmpPl := pl
712 if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
713 return tmpPl, nil
714 }); err != nil {
715 t.Fatalf("fail to register filter plugin (%s)", pl.Name())
716 }
717 cfgPls.MultiPoint.Enabled = append(cfgPls.MultiPoint.Enabled, schedulerapi.Plugin{Name: pl.Name()})
718 }
719
720 profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls}
721 fwk, err := newFramework(ctx, registry, profile)
722 if err != nil {
723 t.Fatal(err)
724 }
725
726 exts := fwk.EnqueueExtensions()
727
728 sort.Slice(exts, func(i, j int) bool {
729 return exts[i].Name() < exts[j].Name()
730 })
731
732 got := buildQueueingHintMap(exts)
733
734 for e, fns := range got {
735 wantfns, ok := tt.want[e]
736 if !ok {
737 t.Errorf("got unexpected event %v", e)
738 continue
739 }
740 if len(fns) != len(wantfns) {
741 t.Errorf("got %v queueing hint functions, want %v", len(fns), len(wantfns))
742 continue
743 }
744 for i, fn := range fns {
745 if fn.PluginName != wantfns[i].PluginName {
746 t.Errorf("got plugin name %v, want %v", fn.PluginName, wantfns[i].PluginName)
747 continue
748 }
749 got, gotErr := fn.QueueingHintFn(logger, nil, nil, nil)
750 want, wantErr := wantfns[i].QueueingHintFn(logger, nil, nil, nil)
751 if got != want || gotErr != wantErr {
752 t.Errorf("got queueing hint function (%v) returning (%v, %v), expect it to return (%v, %v)", fn.PluginName, got, gotErr, want, wantErr)
753 continue
754 }
755 }
756 }
757 })
758 }
759 }
760
761
762 func Test_UnionedGVKs(t *testing.T) {
763 tests := []struct {
764 name string
765 plugins schedulerapi.PluginSet
766 want map[framework.GVK]framework.ActionType
767 }{
768 {
769 name: "filter without EnqueueExtensions plugin",
770 plugins: schedulerapi.PluginSet{
771 Enabled: []schedulerapi.Plugin{
772 {Name: filterWithoutEnqueueExtensions},
773 {Name: queueSort},
774 {Name: fakeBind},
775 },
776 Disabled: []schedulerapi.Plugin{{Name: "*"}},
777 },
778 want: map[framework.GVK]framework.ActionType{
779 framework.Pod: framework.All,
780 framework.Node: framework.All,
781 framework.CSINode: framework.All,
782 framework.CSIDriver: framework.All,
783 framework.CSIStorageCapacity: framework.All,
784 framework.PersistentVolume: framework.All,
785 framework.PersistentVolumeClaim: framework.All,
786 framework.StorageClass: framework.All,
787 framework.PodSchedulingContext: framework.All,
788 framework.ResourceClaim: framework.All,
789 framework.ResourceClass: framework.All,
790 framework.ResourceClaimParameters: framework.All,
791 framework.ResourceClassParameters: framework.All,
792 },
793 },
794 {
795 name: "node plugin",
796 plugins: schedulerapi.PluginSet{
797 Enabled: []schedulerapi.Plugin{
798 {Name: fakeNode},
799 {Name: queueSort},
800 {Name: fakeBind},
801 },
802 Disabled: []schedulerapi.Plugin{{Name: "*"}},
803 },
804 want: map[framework.GVK]framework.ActionType{
805 framework.Node: framework.Add | framework.UpdateNodeTaint,
806 },
807 },
808 {
809 name: "pod plugin",
810 plugins: schedulerapi.PluginSet{
811 Enabled: []schedulerapi.Plugin{
812 {Name: fakePod},
813 {Name: queueSort},
814 {Name: fakeBind},
815 },
816 Disabled: []schedulerapi.Plugin{{Name: "*"}},
817 },
818 want: map[framework.GVK]framework.ActionType{
819 framework.Pod: framework.Add,
820 },
821 },
822 {
823 name: "node and pod plugin",
824 plugins: schedulerapi.PluginSet{
825 Enabled: []schedulerapi.Plugin{
826 {Name: fakePod},
827 {Name: fakeNode},
828 {Name: queueSort},
829 {Name: fakeBind},
830 },
831 Disabled: []schedulerapi.Plugin{{Name: "*"}},
832 },
833 want: map[framework.GVK]framework.ActionType{
834 framework.Pod: framework.Add,
835 framework.Node: framework.Add | framework.UpdateNodeTaint,
836 },
837 },
838 {
839 name: "empty EventsToRegister plugin",
840 plugins: schedulerapi.PluginSet{
841 Enabled: []schedulerapi.Plugin{
842 {Name: emptyEventsToRegister},
843 {Name: queueSort},
844 {Name: fakeBind},
845 },
846 Disabled: []schedulerapi.Plugin{{Name: "*"}},
847 },
848 want: map[framework.GVK]framework.ActionType{},
849 },
850 {
851 name: "plugins with default profile",
852 plugins: schedulerapi.PluginSet{Enabled: defaults.PluginsV1.MultiPoint.Enabled},
853 want: map[framework.GVK]framework.ActionType{
854 framework.Pod: framework.All,
855 framework.Node: framework.All,
856 framework.CSINode: framework.All - framework.Delete,
857 framework.CSIDriver: framework.All - framework.Delete,
858 framework.CSIStorageCapacity: framework.All - framework.Delete,
859 framework.PersistentVolume: framework.All - framework.Delete,
860 framework.PersistentVolumeClaim: framework.All - framework.Delete,
861 framework.StorageClass: framework.All - framework.Delete,
862 },
863 },
864 }
865 for _, tt := range tests {
866 t.Run(tt.name, func(t *testing.T) {
867 _, ctx := ktesting.NewTestContext(t)
868 ctx, cancel := context.WithCancel(ctx)
869 defer cancel()
870 registry := plugins.NewInTreeRegistry()
871
872 cfgPls := &schedulerapi.Plugins{MultiPoint: tt.plugins}
873 plugins := []framework.Plugin{&fakeNodePlugin{}, &fakePodPlugin{}, &filterWithoutEnqueueExtensionsPlugin{}, &emptyEventsToRegisterPlugin{}, &fakeQueueSortPlugin{}, &fakebindPlugin{}}
874 for _, pl := range plugins {
875 tmpPl := pl
876 if err := registry.Register(pl.Name(), func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
877 return tmpPl, nil
878 }); err != nil {
879 t.Fatalf("fail to register filter plugin (%s)", pl.Name())
880 }
881 }
882
883 profile := schedulerapi.KubeSchedulerProfile{Plugins: cfgPls, PluginConfig: defaults.PluginConfigsV1}
884 fwk, err := newFramework(ctx, registry, profile)
885 if err != nil {
886 t.Fatal(err)
887 }
888
889 queueingHintsPerProfile := internalqueue.QueueingHintMapPerProfile{
890 "default": buildQueueingHintMap(fwk.EnqueueExtensions()),
891 }
892 got := unionedGVKs(queueingHintsPerProfile)
893
894 if diff := cmp.Diff(tt.want, got); diff != "" {
895 t.Errorf("Unexpected eventToPlugin map (-want,+got):%s", diff)
896 }
897 })
898 }
899 }
900
901 func newFramework(ctx context.Context, r frameworkruntime.Registry, profile schedulerapi.KubeSchedulerProfile) (framework.Framework, error) {
902 return frameworkruntime.NewFramework(ctx, r, &profile,
903 frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(nil, nil)),
904 frameworkruntime.WithInformerFactory(informers.NewSharedInformerFactory(fake.NewSimpleClientset(), 0)),
905 )
906 }
907
908 var _ framework.QueueSortPlugin = &fakeQueueSortPlugin{}
909
910
911 type fakeQueueSortPlugin struct{}
912
913 func (pl *fakeQueueSortPlugin) Name() string {
914 return queueSort
915 }
916
917 func (pl *fakeQueueSortPlugin) Less(_, _ *framework.QueuedPodInfo) bool {
918 return false
919 }
920
921 var _ framework.BindPlugin = &fakebindPlugin{}
922
923
924 type fakebindPlugin struct{}
925
926 func (t *fakebindPlugin) Name() string {
927 return fakeBind
928 }
929
930 func (t *fakebindPlugin) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
931 return nil
932 }
933
934
935 type filterWithoutEnqueueExtensionsPlugin struct{}
936
937 func (*filterWithoutEnqueueExtensionsPlugin) Name() string { return filterWithoutEnqueueExtensions }
938
939 func (*filterWithoutEnqueueExtensionsPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
940 return nil
941 }
942
943 var hintFromFakeNode = framework.QueueingHint(100)
944
945 type fakeNodePlugin struct{}
946
947 var fakeNodePluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
948 return hintFromFakeNode, nil
949 }
950
951 func (*fakeNodePlugin) Name() string { return fakeNode }
952
953 func (*fakeNodePlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
954 return nil
955 }
956
957 func (pl *fakeNodePlugin) EventsToRegister() []framework.ClusterEventWithHint {
958 return []framework.ClusterEventWithHint{
959 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: fakeNodePluginQueueingFn},
960 }
961 }
962
963 var hintFromFakePod = framework.QueueingHint(101)
964
965 type fakePodPlugin struct{}
966
967 var fakePodPluginQueueingFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
968 return hintFromFakePod, nil
969 }
970
971 func (*fakePodPlugin) Name() string { return fakePod }
972
973 func (*fakePodPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
974 return nil
975 }
976
977 func (pl *fakePodPlugin) EventsToRegister() []framework.ClusterEventWithHint {
978 return []framework.ClusterEventWithHint{
979 {Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Add}, QueueingHintFn: fakePodPluginQueueingFn},
980 }
981 }
982
983 type emptyEventPlugin struct{}
984
985 func (*emptyEventPlugin) Name() string { return emptyEventExtensions }
986
987 func (*emptyEventPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
988 return nil
989 }
990
991 func (pl *emptyEventPlugin) EventsToRegister() []framework.ClusterEventWithHint {
992 return nil
993 }
994
995
996
997
998 type emptyEventsToRegisterPlugin struct{}
999
1000 func (*emptyEventsToRegisterPlugin) Name() string { return emptyEventsToRegister }
1001
1002 func (*emptyEventsToRegisterPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
1003 return nil
1004 }
1005
1006 func (*emptyEventsToRegisterPlugin) EventsToRegister() []framework.ClusterEventWithHint { return nil }
1007
View as plain text