1
16
17 package scheduler
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "math"
24 "math/rand"
25 "reflect"
26 "regexp"
27 "sort"
28 "strconv"
29 "sync"
30 "testing"
31 "time"
32
33 "github.com/google/go-cmp/cmp"
34 v1 "k8s.io/api/core/v1"
35 eventsv1 "k8s.io/api/events/v1"
36 "k8s.io/apimachinery/pkg/api/resource"
37 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
38 "k8s.io/apimachinery/pkg/runtime"
39 "k8s.io/apimachinery/pkg/types"
40 "k8s.io/apimachinery/pkg/util/sets"
41 "k8s.io/apimachinery/pkg/util/wait"
42 "k8s.io/client-go/informers"
43 clientsetfake "k8s.io/client-go/kubernetes/fake"
44 "k8s.io/client-go/kubernetes/scheme"
45 clienttesting "k8s.io/client-go/testing"
46 clientcache "k8s.io/client-go/tools/cache"
47 "k8s.io/client-go/tools/events"
48 "k8s.io/component-helpers/storage/volume"
49 "k8s.io/klog/v2"
50 "k8s.io/klog/v2/ktesting"
51 extenderv1 "k8s.io/kube-scheduler/extender/v1"
52 schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
53 "k8s.io/kubernetes/pkg/scheduler/framework"
54 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
55 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
56 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
57 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeports"
58 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
59 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
60 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
61 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
62 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
63 internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
64 fakecache "k8s.io/kubernetes/pkg/scheduler/internal/cache/fake"
65 internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
66 "k8s.io/kubernetes/pkg/scheduler/profile"
67 st "k8s.io/kubernetes/pkg/scheduler/testing"
68 tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
69 schedutil "k8s.io/kubernetes/pkg/scheduler/util"
70 "k8s.io/utils/ptr"
71 )
72
73 const (
74 testSchedulerName = "test-scheduler"
75 mb int64 = 1024 * 1024
76 )
77
78 var (
79 emptySnapshot = internalcache.NewEmptySnapshot()
80 podTopologySpreadFunc = frameworkruntime.FactoryAdapter(feature.Features{}, podtopologyspread.New)
81 errPrioritize = fmt.Errorf("priority map encounters an error")
82 )
83
84 type mockScheduleResult struct {
85 result ScheduleResult
86 err error
87 }
88
89 type fakeExtender struct {
90 isBinder bool
91 interestedPodName string
92 ignorable bool
93 gotBind bool
94 errBind bool
95 isPrioritizer bool
96 isFilter bool
97 }
98
99 func (f *fakeExtender) Name() string {
100 return "fakeExtender"
101 }
102
103 func (f *fakeExtender) IsIgnorable() bool {
104 return f.ignorable
105 }
106
107 func (f *fakeExtender) ProcessPreemption(
108 _ *v1.Pod,
109 _ map[string]*extenderv1.Victims,
110 _ framework.NodeInfoLister,
111 ) (map[string]*extenderv1.Victims, error) {
112 return nil, nil
113 }
114
115 func (f *fakeExtender) SupportsPreemption() bool {
116 return false
117 }
118
119 func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*framework.NodeInfo) ([]*framework.NodeInfo, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) {
120 return nil, nil, nil, nil
121 }
122
123 func (f *fakeExtender) Prioritize(
124 _ *v1.Pod,
125 _ []*framework.NodeInfo,
126 ) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) {
127 return nil, 0, nil
128 }
129
130 func (f *fakeExtender) Bind(binding *v1.Binding) error {
131 if f.isBinder {
132 if f.errBind {
133 return errors.New("bind error")
134 }
135 f.gotBind = true
136 return nil
137 }
138 return errors.New("not a binder")
139 }
140
141 func (f *fakeExtender) IsBinder() bool {
142 return f.isBinder
143 }
144
145 func (f *fakeExtender) IsInterested(pod *v1.Pod) bool {
146 return pod != nil && pod.Name == f.interestedPodName
147 }
148
149 func (f *fakeExtender) IsPrioritizer() bool {
150 return f.isPrioritizer
151 }
152
153 func (f *fakeExtender) IsFilter() bool {
154 return f.isFilter
155 }
156
157 type falseMapPlugin struct{}
158
159 func newFalseMapPlugin() frameworkruntime.PluginFactory {
160 return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
161 return &falseMapPlugin{}, nil
162 }
163 }
164
165 func (pl *falseMapPlugin) Name() string {
166 return "FalseMap"
167 }
168
169 func (pl *falseMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (int64, *framework.Status) {
170 return 0, framework.AsStatus(errPrioritize)
171 }
172
173 func (pl *falseMapPlugin) ScoreExtensions() framework.ScoreExtensions {
174 return nil
175 }
176
177 type numericMapPlugin struct{}
178
179 func newNumericMapPlugin() frameworkruntime.PluginFactory {
180 return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
181 return &numericMapPlugin{}, nil
182 }
183 }
184
185 func (pl *numericMapPlugin) Name() string {
186 return "NumericMap"
187 }
188
189 func (pl *numericMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
190 score, err := strconv.Atoi(nodeName)
191 if err != nil {
192 return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Error converting nodename to int: %+v", nodeName))
193 }
194 return int64(score), nil
195 }
196
197 func (pl *numericMapPlugin) ScoreExtensions() framework.ScoreExtensions {
198 return nil
199 }
200
201
202 func NewNoPodsFilterPlugin(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
203 return &noPodsFilterPlugin{}, nil
204 }
205
206 type reverseNumericMapPlugin struct{}
207
208 func (pl *reverseNumericMapPlugin) Name() string {
209 return "ReverseNumericMap"
210 }
211
212 func (pl *reverseNumericMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) {
213 score, err := strconv.Atoi(nodeName)
214 if err != nil {
215 return 0, framework.NewStatus(framework.Error, fmt.Sprintf("Error converting nodename to int: %+v", nodeName))
216 }
217 return int64(score), nil
218 }
219
220 func (pl *reverseNumericMapPlugin) ScoreExtensions() framework.ScoreExtensions {
221 return pl
222 }
223
224 func (pl *reverseNumericMapPlugin) NormalizeScore(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeScores framework.NodeScoreList) *framework.Status {
225 var maxScore float64
226 minScore := math.MaxFloat64
227
228 for _, hostPriority := range nodeScores {
229 maxScore = math.Max(maxScore, float64(hostPriority.Score))
230 minScore = math.Min(minScore, float64(hostPriority.Score))
231 }
232 for i, hostPriority := range nodeScores {
233 nodeScores[i] = framework.NodeScore{
234 Name: hostPriority.Name,
235 Score: int64(maxScore + minScore - float64(hostPriority.Score)),
236 }
237 }
238 return nil
239 }
240
241 func newReverseNumericMapPlugin() frameworkruntime.PluginFactory {
242 return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
243 return &reverseNumericMapPlugin{}, nil
244 }
245 }
246
247 type trueMapPlugin struct{}
248
249 func (pl *trueMapPlugin) Name() string {
250 return "TrueMap"
251 }
252
253 func (pl *trueMapPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (int64, *framework.Status) {
254 return 1, nil
255 }
256
257 func (pl *trueMapPlugin) ScoreExtensions() framework.ScoreExtensions {
258 return pl
259 }
260
261 func (pl *trueMapPlugin) NormalizeScore(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeScores framework.NodeScoreList) *framework.Status {
262 for _, host := range nodeScores {
263 if host.Name == "" {
264 return framework.NewStatus(framework.Error, "unexpected empty host name")
265 }
266 }
267 return nil
268 }
269
270 func newTrueMapPlugin() frameworkruntime.PluginFactory {
271 return func(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
272 return &trueMapPlugin{}, nil
273 }
274 }
275
276 type noPodsFilterPlugin struct{}
277
278
279 func (pl *noPodsFilterPlugin) Name() string {
280 return "NoPodsFilter"
281 }
282
283
284 func (pl *noPodsFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
285 if len(nodeInfo.Pods) == 0 {
286 return nil
287 }
288 return framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake)
289 }
290
291 type fakeNodeSelectorArgs struct {
292 NodeName string `json:"nodeName"`
293 }
294
295 type fakeNodeSelector struct {
296 fakeNodeSelectorArgs
297 }
298
299 func (s *fakeNodeSelector) Name() string {
300 return "FakeNodeSelector"
301 }
302
303 func (s *fakeNodeSelector) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
304 if nodeInfo.Node().Name != s.NodeName {
305 return framework.NewStatus(framework.UnschedulableAndUnresolvable)
306 }
307 return nil
308 }
309
310 func newFakeNodeSelector(_ context.Context, args runtime.Object, _ framework.Handle) (framework.Plugin, error) {
311 pl := &fakeNodeSelector{}
312 if err := frameworkruntime.DecodeInto(args, &pl.fakeNodeSelectorArgs); err != nil {
313 return nil, err
314 }
315 return pl, nil
316 }
317
318 const (
319 fakeSpecifiedNodeNameAnnotation = "fake-specified-node-name"
320 )
321
322
323 type fakeNodeSelectorDependOnPodAnnotation struct{}
324
325 func (f *fakeNodeSelectorDependOnPodAnnotation) Name() string {
326 return "FakeNodeSelectorDependOnPodAnnotation"
327 }
328
329
330 func (f *fakeNodeSelectorDependOnPodAnnotation) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
331 resolveNodeNameFromPodAnnotation := func(pod *v1.Pod) (string, error) {
332 if pod == nil {
333 return "", fmt.Errorf("empty pod")
334 }
335 nodeName, ok := pod.Annotations[fakeSpecifiedNodeNameAnnotation]
336 if !ok {
337 return "", fmt.Errorf("no specified node name on pod %s/%s annotation", pod.Namespace, pod.Name)
338 }
339 return nodeName, nil
340 }
341
342 nodeName, err := resolveNodeNameFromPodAnnotation(pod)
343 if err != nil {
344 return framework.AsStatus(err)
345 }
346 if nodeInfo.Node().Name != nodeName {
347 return framework.NewStatus(framework.UnschedulableAndUnresolvable)
348 }
349 return nil
350 }
351
352 func newFakeNodeSelectorDependOnPodAnnotation(_ context.Context, _ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
353 return &fakeNodeSelectorDependOnPodAnnotation{}, nil
354 }
355
356 type TestPlugin struct {
357 name string
358 }
359
360 var _ framework.ScorePlugin = &TestPlugin{}
361 var _ framework.FilterPlugin = &TestPlugin{}
362
363 func (t *TestPlugin) Name() string {
364 return t.name
365 }
366
367 func (t *TestPlugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) {
368 return 1, nil
369 }
370
371 func (t *TestPlugin) ScoreExtensions() framework.ScoreExtensions {
372 return nil
373 }
374
375 func (t *TestPlugin) Filter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
376 return nil
377 }
378
379 func TestSchedulerMultipleProfilesScheduling(t *testing.T) {
380 nodes := []runtime.Object{
381 st.MakeNode().Name("node1").UID("node1").Obj(),
382 st.MakeNode().Name("node2").UID("node2").Obj(),
383 st.MakeNode().Name("node3").UID("node3").Obj(),
384 }
385 pods := []*v1.Pod{
386 st.MakePod().Name("pod1").UID("pod1").SchedulerName("match-node3").Obj(),
387 st.MakePod().Name("pod2").UID("pod2").SchedulerName("match-node2").Obj(),
388 st.MakePod().Name("pod3").UID("pod3").SchedulerName("match-node2").Obj(),
389 st.MakePod().Name("pod4").UID("pod4").SchedulerName("match-node3").Obj(),
390 }
391 wantBindings := map[string]string{
392 "pod1": "node3",
393 "pod2": "node2",
394 "pod3": "node2",
395 "pod4": "node3",
396 }
397 wantControllers := map[string]string{
398 "pod1": "match-node3",
399 "pod2": "match-node2",
400 "pod3": "match-node2",
401 "pod4": "match-node3",
402 }
403
404
405
406
407 objs := append([]runtime.Object{
408 &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ""}}}, nodes...)
409 client := clientsetfake.NewSimpleClientset(objs...)
410 broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
411 ctx, cancel := context.WithCancel(context.Background())
412 defer cancel()
413
414 informerFactory := informers.NewSharedInformerFactory(client, 0)
415 sched, err := New(
416 ctx,
417 client,
418 informerFactory,
419 nil,
420 profile.NewRecorderFactory(broadcaster),
421 WithProfiles(
422 schedulerapi.KubeSchedulerProfile{SchedulerName: "match-node2",
423 Plugins: &schedulerapi.Plugins{
424 Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelector"}}},
425 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
426 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
427 },
428 PluginConfig: []schedulerapi.PluginConfig{
429 {
430 Name: "FakeNodeSelector",
431 Args: &runtime.Unknown{Raw: []byte(`{"nodeName":"node2"}`)},
432 },
433 },
434 },
435 schedulerapi.KubeSchedulerProfile{
436 SchedulerName: "match-node3",
437 Plugins: &schedulerapi.Plugins{
438 Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelector"}}},
439 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
440 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
441 },
442 PluginConfig: []schedulerapi.PluginConfig{
443 {
444 Name: "FakeNodeSelector",
445 Args: &runtime.Unknown{Raw: []byte(`{"nodeName":"node3"}`)},
446 },
447 },
448 },
449 ),
450 WithFrameworkOutOfTreeRegistry(frameworkruntime.Registry{
451 "FakeNodeSelector": newFakeNodeSelector,
452 }),
453 )
454 if err != nil {
455 t.Fatal(err)
456 }
457
458
459 var wg sync.WaitGroup
460 wg.Add(2 * len(pods))
461 bindings := make(map[string]string)
462 client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
463 if action.GetSubresource() != "binding" {
464 return false, nil, nil
465 }
466 binding := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
467 bindings[binding.Name] = binding.Target.Name
468 wg.Done()
469 return true, binding, nil
470 })
471 controllers := make(map[string]string)
472 stopFn, err := broadcaster.StartEventWatcher(func(obj runtime.Object) {
473 e, ok := obj.(*eventsv1.Event)
474 if !ok || e.Reason != "Scheduled" {
475 return
476 }
477 controllers[e.Regarding.Name] = e.ReportingController
478 wg.Done()
479 })
480 if err != nil {
481 t.Fatal(err)
482 }
483 defer stopFn()
484
485
486 informerFactory.Start(ctx.Done())
487 informerFactory.WaitForCacheSync(ctx.Done())
488 if err = sched.WaitForHandlersSync(ctx); err != nil {
489 t.Fatalf("Handlers failed to sync: %v: ", err)
490 }
491 go sched.Run(ctx)
492
493
494 for _, p := range pods {
495 _, err := client.CoreV1().Pods("").Create(ctx, p, metav1.CreateOptions{})
496 if err != nil {
497 t.Fatal(err)
498 }
499 }
500 wg.Wait()
501
502
503 if diff := cmp.Diff(wantBindings, bindings); diff != "" {
504 t.Errorf("pods were scheduled incorrectly (-want, +got):\n%s", diff)
505 }
506 if diff := cmp.Diff(wantControllers, controllers); diff != "" {
507 t.Errorf("events were reported with wrong controllers (-want, +got):\n%s", diff)
508 }
509 }
510
511
512 func TestSchedulerGuaranteeNonNilNodeInSchedulingCycle(t *testing.T) {
513 random := rand.New(rand.NewSource(time.Now().UnixNano()))
514 ctx, cancel := context.WithCancel(context.Background())
515 defer cancel()
516
517 var (
518 initialNodeNumber = 1000
519 initialPodNumber = 500
520 waitSchedulingPodNumber = 200
521 deleteNodeNumberPerRound = 20
522 createPodNumberPerRound = 50
523
524 fakeSchedulerName = "fake-scheduler"
525 fakeNamespace = "fake-namespace"
526
527 initialNodes []runtime.Object
528 initialPods []runtime.Object
529 )
530
531 for i := 0; i < initialNodeNumber; i++ {
532 nodeName := fmt.Sprintf("node%d", i)
533 initialNodes = append(initialNodes, st.MakeNode().Name(nodeName).UID(nodeName).Obj())
534 }
535
536 for i := 0; i < initialPodNumber; i++ {
537 podName := fmt.Sprintf("scheduled-pod%d", i)
538 assignedNodeName := fmt.Sprintf("node%d", random.Intn(initialNodeNumber))
539 initialPods = append(initialPods, st.MakePod().Name(podName).UID(podName).Node(assignedNodeName).Obj())
540 }
541
542 objs := []runtime.Object{&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: fakeNamespace}}}
543 objs = append(objs, initialNodes...)
544 objs = append(objs, initialPods...)
545 client := clientsetfake.NewSimpleClientset(objs...)
546 broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
547
548 informerFactory := informers.NewSharedInformerFactory(client, 0)
549 sched, err := New(
550 ctx,
551 client,
552 informerFactory,
553 nil,
554 profile.NewRecorderFactory(broadcaster),
555 WithProfiles(
556 schedulerapi.KubeSchedulerProfile{SchedulerName: fakeSchedulerName,
557 Plugins: &schedulerapi.Plugins{
558 Filter: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "FakeNodeSelectorDependOnPodAnnotation"}}},
559 QueueSort: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "PrioritySort"}}},
560 Bind: schedulerapi.PluginSet{Enabled: []schedulerapi.Plugin{{Name: "DefaultBinder"}}},
561 },
562 },
563 ),
564 WithFrameworkOutOfTreeRegistry(frameworkruntime.Registry{
565 "FakeNodeSelectorDependOnPodAnnotation": newFakeNodeSelectorDependOnPodAnnotation,
566 }),
567 )
568 if err != nil {
569 t.Fatal(err)
570 }
571
572
573 informerFactory.Start(ctx.Done())
574 informerFactory.WaitForCacheSync(ctx.Done())
575 go sched.Run(ctx)
576
577 var deleteNodeIndex int
578 deleteNodesOneRound := func() {
579 for i := 0; i < deleteNodeNumberPerRound; i++ {
580 if deleteNodeIndex >= initialNodeNumber {
581
582 return
583 }
584 deleteNodeName := fmt.Sprintf("node%d", deleteNodeIndex)
585 if err := client.CoreV1().Nodes().Delete(ctx, deleteNodeName, metav1.DeleteOptions{}); err != nil {
586 t.Fatal(err)
587 }
588 deleteNodeIndex++
589 }
590 }
591 var createPodIndex int
592 createPodsOneRound := func() {
593 if createPodIndex > waitSchedulingPodNumber {
594 return
595 }
596 for i := 0; i < createPodNumberPerRound; i++ {
597 podName := fmt.Sprintf("pod%d", createPodIndex)
598
599 specifiedNodeName := fmt.Sprintf("node%d", random.Intn(initialNodeNumber))
600
601 waitSchedulingPod := st.MakePod().Namespace(fakeNamespace).Name(podName).UID(podName).Annotation(fakeSpecifiedNodeNameAnnotation, specifiedNodeName).SchedulerName(fakeSchedulerName).Obj()
602 if _, err := client.CoreV1().Pods(fakeNamespace).Create(ctx, waitSchedulingPod, metav1.CreateOptions{}); err != nil {
603 t.Fatal(err)
604 }
605 createPodIndex++
606 }
607 }
608
609
610
611
612
613 go wait.Until(deleteNodesOneRound, 10*time.Millisecond, ctx.Done())
614 go wait.Until(createPodsOneRound, 9*time.Millisecond, ctx.Done())
615
616
617 allWaitSchedulingPods := sets.New[string]()
618 for i := 0; i < waitSchedulingPodNumber; i++ {
619 allWaitSchedulingPods.Insert(fmt.Sprintf("pod%d", i))
620 }
621 var wg sync.WaitGroup
622 wg.Add(waitSchedulingPodNumber)
623 stopFn, err := broadcaster.StartEventWatcher(func(obj runtime.Object) {
624 e, ok := obj.(*eventsv1.Event)
625 if !ok || (e.Reason != "Scheduled" && e.Reason != "FailedScheduling") {
626 return
627 }
628 if allWaitSchedulingPods.Has(e.Regarding.Name) {
629 wg.Done()
630 allWaitSchedulingPods.Delete(e.Regarding.Name)
631 }
632 })
633 if err != nil {
634 t.Fatal(err)
635 }
636 defer stopFn()
637
638 wg.Wait()
639 }
640
641 func TestSchedulerScheduleOne(t *testing.T) {
642 testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
643 client := clientsetfake.NewSimpleClientset(&testNode)
644 eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
645 errS := errors.New("scheduler")
646 errB := errors.New("binder")
647 preBindErr := errors.New("on PreBind")
648
649 table := []struct {
650 name string
651 injectBindError error
652 sendPod *v1.Pod
653 registerPluginFuncs []tf.RegisterPluginFunc
654 expectErrorPod *v1.Pod
655 expectForgetPod *v1.Pod
656 expectAssumedPod *v1.Pod
657 expectError error
658 expectBind *v1.Binding
659 eventReason string
660 mockResult mockScheduleResult
661 }{
662 {
663 name: "error reserve pod",
664 sendPod: podWithID("foo", ""),
665 mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
666 registerPluginFuncs: []tf.RegisterPluginFunc{
667 tf.RegisterReservePlugin("FakeReserve", tf.NewFakeReservePlugin(framework.NewStatus(framework.Error, "reserve error"))),
668 },
669 expectErrorPod: podWithID("foo", testNode.Name),
670 expectForgetPod: podWithID("foo", testNode.Name),
671 expectAssumedPod: podWithID("foo", testNode.Name),
672 expectError: fmt.Errorf(`running Reserve plugin "FakeReserve": %w`, errors.New("reserve error")),
673 eventReason: "FailedScheduling",
674 },
675 {
676 name: "error permit pod",
677 sendPod: podWithID("foo", ""),
678 mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
679 registerPluginFuncs: []tf.RegisterPluginFunc{
680 tf.RegisterPermitPlugin("FakePermit", tf.NewFakePermitPlugin(framework.NewStatus(framework.Error, "permit error"), time.Minute)),
681 },
682 expectErrorPod: podWithID("foo", testNode.Name),
683 expectForgetPod: podWithID("foo", testNode.Name),
684 expectAssumedPod: podWithID("foo", testNode.Name),
685 expectError: fmt.Errorf(`running Permit plugin "FakePermit": %w`, errors.New("permit error")),
686 eventReason: "FailedScheduling",
687 },
688 {
689 name: "error prebind pod",
690 sendPod: podWithID("foo", ""),
691 mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
692 registerPluginFuncs: []tf.RegisterPluginFunc{
693 tf.RegisterPreBindPlugin("FakePreBind", tf.NewFakePreBindPlugin(framework.AsStatus(preBindErr))),
694 },
695 expectErrorPod: podWithID("foo", testNode.Name),
696 expectForgetPod: podWithID("foo", testNode.Name),
697 expectAssumedPod: podWithID("foo", testNode.Name),
698 expectError: fmt.Errorf(`running PreBind plugin "FakePreBind": %w`, preBindErr),
699 eventReason: "FailedScheduling",
700 },
701 {
702 name: "bind assumed pod scheduled",
703 sendPod: podWithID("foo", ""),
704 mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
705 expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
706 expectAssumedPod: podWithID("foo", testNode.Name),
707 eventReason: "Scheduled",
708 },
709 {
710 name: "error pod failed scheduling",
711 sendPod: podWithID("foo", ""),
712 mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, errS},
713 expectError: errS,
714 expectErrorPod: podWithID("foo", ""),
715 eventReason: "FailedScheduling",
716 },
717 {
718 name: "error bind forget pod failed scheduling",
719 sendPod: podWithID("foo", ""),
720 mockResult: mockScheduleResult{ScheduleResult{SuggestedHost: testNode.Name, EvaluatedNodes: 1, FeasibleNodes: 1}, nil},
721 expectBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: testNode.Name}},
722 expectAssumedPod: podWithID("foo", testNode.Name),
723 injectBindError: errB,
724 expectError: fmt.Errorf("running Bind plugin %q: %w", "DefaultBinder", errors.New("binder")),
725 expectErrorPod: podWithID("foo", testNode.Name),
726 expectForgetPod: podWithID("foo", testNode.Name),
727 eventReason: "FailedScheduling",
728 },
729 {
730 name: "deleting pod",
731 sendPod: deletingPod("foo"),
732 mockResult: mockScheduleResult{ScheduleResult{}, nil},
733 eventReason: "FailedScheduling",
734 },
735 }
736
737 for _, item := range table {
738 t.Run(item.name, func(t *testing.T) {
739 var gotError error
740 var gotPod *v1.Pod
741 var gotForgetPod *v1.Pod
742 var gotAssumedPod *v1.Pod
743 var gotBinding *v1.Binding
744 cache := &fakecache.Cache{
745 ForgetFunc: func(pod *v1.Pod) {
746 gotForgetPod = pod
747 },
748 AssumeFunc: func(pod *v1.Pod) {
749 gotAssumedPod = pod
750 },
751 IsAssumedPodFunc: func(pod *v1.Pod) bool {
752 if pod == nil || gotAssumedPod == nil {
753 return false
754 }
755 return pod.UID == gotAssumedPod.UID
756 },
757 }
758 client := clientsetfake.NewSimpleClientset(item.sendPod)
759 client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
760 if action.GetSubresource() != "binding" {
761 return false, nil, nil
762 }
763 gotBinding = action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
764 return true, gotBinding, item.injectBindError
765 })
766 registerPluginFuncs := append(item.registerPluginFuncs,
767 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
768 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
769 )
770 ctx, cancel := context.WithCancel(context.Background())
771 defer cancel()
772 fwk, err := tf.NewFramework(ctx,
773 registerPluginFuncs,
774 testSchedulerName,
775 frameworkruntime.WithClientSet(client),
776 frameworkruntime.WithEventRecorder(eventBroadcaster.NewRecorder(scheme.Scheme, testSchedulerName)))
777 if err != nil {
778 t.Fatal(err)
779 }
780
781 sched := &Scheduler{
782 Cache: cache,
783 client: client,
784 NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) {
785 return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, item.sendPod)}, nil
786 },
787 SchedulingQueue: internalqueue.NewTestQueue(ctx, nil),
788 Profiles: profile.Map{testSchedulerName: fwk},
789 }
790
791 sched.SchedulePod = func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error) {
792 return item.mockResult.result, item.mockResult.err
793 }
794 sched.FailureHandler = func(_ context.Context, fwk framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) {
795 gotPod = p.Pod
796 gotError = status.AsError()
797
798 msg := truncateMessage(gotError.Error())
799 fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
800 }
801 called := make(chan struct{})
802 stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
803 e, _ := obj.(*eventsv1.Event)
804 if e.Reason != item.eventReason {
805 t.Errorf("got event %v, want %v", e.Reason, item.eventReason)
806 }
807 close(called)
808 })
809 if err != nil {
810 t.Fatal(err)
811 }
812 sched.ScheduleOne(ctx)
813 <-called
814 if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
815 t.Errorf("assumed pod: wanted %v, got %v", e, a)
816 }
817 if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) {
818 t.Errorf("error pod: wanted %v, got %v", e, a)
819 }
820 if e, a := item.expectForgetPod, gotForgetPod; !reflect.DeepEqual(e, a) {
821 t.Errorf("forget pod: wanted %v, got %v", e, a)
822 }
823 if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) {
824 t.Errorf("error: wanted %v, got %v", e, a)
825 }
826 if diff := cmp.Diff(item.expectBind, gotBinding); diff != "" {
827 t.Errorf("got binding diff (-want, +got): %s", diff)
828 }
829 stopFunc()
830 })
831 }
832 }
833
834 func TestSchedulerNoPhantomPodAfterExpire(t *testing.T) {
835 logger, ctx := ktesting.NewTestContext(t)
836 ctx, cancel := context.WithCancel(ctx)
837 defer cancel()
838 queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
839 scache := internalcache.New(ctx, 100*time.Millisecond)
840 pod := podWithPort("pod.Name", "", 8080)
841 node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
842 scache.AddNode(logger, &node)
843
844 fns := []tf.RegisterPluginFunc{
845 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
846 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
847 tf.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
848 }
849 scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, pod, &node, fns...)
850
851 waitPodExpireChan := make(chan struct{})
852 timeout := make(chan struct{})
853 go func() {
854 for {
855 select {
856 case <-timeout:
857 return
858 default:
859 }
860 pods, err := scache.PodCount()
861 if err != nil {
862 errChan <- fmt.Errorf("cache.List failed: %v", err)
863 return
864 }
865 if pods == 0 {
866 close(waitPodExpireChan)
867 return
868 }
869 time.Sleep(100 * time.Millisecond)
870 }
871 }()
872
873 select {
874 case err := <-errChan:
875 t.Fatal(err)
876 case <-waitPodExpireChan:
877 case <-time.After(wait.ForeverTestTimeout):
878 close(timeout)
879 t.Fatalf("timeout timeout in waiting pod expire after %v", wait.ForeverTestTimeout)
880 }
881
882
883 secondPod := podWithPort("bar", "", 8080)
884 queuedPodStore.Add(secondPod)
885 scheduler.ScheduleOne(ctx)
886 select {
887 case b := <-bindingChan:
888 expectBinding := &v1.Binding{
889 ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
890 Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
891 }
892 if !reflect.DeepEqual(expectBinding, b) {
893 t.Errorf("binding want=%v, get=%v", expectBinding, b)
894 }
895 case <-time.After(wait.ForeverTestTimeout):
896 t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
897 }
898 }
899
900 func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
901 logger, ctx := ktesting.NewTestContext(t)
902 ctx, cancel := context.WithCancel(ctx)
903 defer cancel()
904 queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
905 scache := internalcache.New(ctx, 10*time.Minute)
906 firstPod := podWithPort("pod.Name", "", 8080)
907 node := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
908 scache.AddNode(logger, &node)
909 fns := []tf.RegisterPluginFunc{
910 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
911 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
912 tf.RegisterPluginAsExtensions(nodeports.Name, nodeports.New, "Filter", "PreFilter"),
913 }
914 scheduler, bindingChan, errChan := setupTestSchedulerWithOnePodOnNode(ctx, t, queuedPodStore, scache, firstPod, &node, fns...)
915
916
917 secondPod := podWithPort("bar", "", 8080)
918 queuedPodStore.Add(secondPod)
919
920
921
922 scheduler.ScheduleOne(ctx)
923 select {
924 case err := <-errChan:
925 expectErr := &framework.FitError{
926 Pod: secondPod,
927 NumAllNodes: 1,
928 Diagnosis: framework.Diagnosis{
929 NodeToStatusMap: framework.NodeToStatusMap{
930 node.Name: framework.NewStatus(framework.Unschedulable, nodeports.ErrReason).WithPlugin(nodeports.Name),
931 },
932 UnschedulablePlugins: sets.New(nodeports.Name),
933 },
934 }
935 if !reflect.DeepEqual(expectErr, err) {
936 t.Errorf("err want=%v, get=%v", expectErr, err)
937 }
938 case <-time.After(wait.ForeverTestTimeout):
939 t.Fatalf("timeout in fitting after %v", wait.ForeverTestTimeout)
940 }
941
942
943
944
945
946 firstPod.Spec.NodeName = node.Name
947 if err := scache.AddPod(logger, firstPod); err != nil {
948 t.Fatalf("err: %v", err)
949 }
950 if err := scache.RemovePod(logger, firstPod); err != nil {
951 t.Fatalf("err: %v", err)
952 }
953
954 queuedPodStore.Add(secondPod)
955 scheduler.ScheduleOne(ctx)
956 select {
957 case b := <-bindingChan:
958 expectBinding := &v1.Binding{
959 ObjectMeta: metav1.ObjectMeta{Name: "bar", UID: types.UID("bar")},
960 Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
961 }
962 if !reflect.DeepEqual(expectBinding, b) {
963 t.Errorf("binding want=%v, get=%v", expectBinding, b)
964 }
965 case <-time.After(wait.ForeverTestTimeout):
966 t.Fatalf("timeout in binding after %v", wait.ForeverTestTimeout)
967 }
968 }
969
970 func TestSchedulerFailedSchedulingReasons(t *testing.T) {
971 logger, ctx := ktesting.NewTestContext(t)
972 ctx, cancel := context.WithCancel(ctx)
973 defer cancel()
974 queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
975 scache := internalcache.New(ctx, 10*time.Minute)
976
977
978 var cpu = int64(4)
979 var mem = int64(500)
980 podWithTooBigResourceRequests := podWithResources("bar", "", v1.ResourceList{
981 v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
982 v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
983 }, v1.ResourceList{
984 v1.ResourceCPU: *(resource.NewQuantity(cpu, resource.DecimalSI)),
985 v1.ResourceMemory: *(resource.NewQuantity(mem, resource.DecimalSI)),
986 })
987
988
989 var nodes []*v1.Node
990 var objects []runtime.Object
991 for i := 0; i < 100; i++ {
992 uid := fmt.Sprintf("node%v", i)
993 node := v1.Node{
994 ObjectMeta: metav1.ObjectMeta{Name: uid, UID: types.UID(uid)},
995 Status: v1.NodeStatus{
996 Capacity: v1.ResourceList{
997 v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
998 v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
999 v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
1000 },
1001 Allocatable: v1.ResourceList{
1002 v1.ResourceCPU: *(resource.NewQuantity(cpu/2, resource.DecimalSI)),
1003 v1.ResourceMemory: *(resource.NewQuantity(mem/5, resource.DecimalSI)),
1004 v1.ResourcePods: *(resource.NewQuantity(10, resource.DecimalSI)),
1005 }},
1006 }
1007 scache.AddNode(logger, &node)
1008 nodes = append(nodes, &node)
1009 objects = append(objects, &node)
1010 }
1011
1012
1013 failedNodeStatues := framework.NodeToStatusMap{}
1014 for _, node := range nodes {
1015 failedNodeStatues[node.Name] = framework.NewStatus(
1016 framework.Unschedulable,
1017 fmt.Sprintf("Insufficient %v", v1.ResourceCPU),
1018 fmt.Sprintf("Insufficient %v", v1.ResourceMemory),
1019 ).WithPlugin(noderesources.Name)
1020 }
1021 fns := []tf.RegisterPluginFunc{
1022 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1023 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1024 tf.RegisterPluginAsExtensions(noderesources.Name, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewFit), "Filter", "PreFilter"),
1025 }
1026
1027 informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objects...), 0)
1028 scheduler, _, errChan := setupTestScheduler(ctx, t, queuedPodStore, scache, informerFactory, nil, fns...)
1029
1030 queuedPodStore.Add(podWithTooBigResourceRequests)
1031 scheduler.ScheduleOne(ctx)
1032 select {
1033 case err := <-errChan:
1034 expectErr := &framework.FitError{
1035 Pod: podWithTooBigResourceRequests,
1036 NumAllNodes: len(nodes),
1037 Diagnosis: framework.Diagnosis{
1038 NodeToStatusMap: failedNodeStatues,
1039 UnschedulablePlugins: sets.New(noderesources.Name),
1040 },
1041 }
1042 if len(fmt.Sprint(expectErr)) > 150 {
1043 t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
1044 }
1045 if !reflect.DeepEqual(expectErr, err) {
1046 t.Errorf("\n err \nWANT=%+v,\nGOT=%+v", expectErr, err)
1047 }
1048 case <-time.After(wait.ForeverTestTimeout):
1049 t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
1050 }
1051 }
1052
1053 func TestSchedulerWithVolumeBinding(t *testing.T) {
1054 findErr := fmt.Errorf("find err")
1055 assumeErr := fmt.Errorf("assume err")
1056 bindErr := fmt.Errorf("bind err")
1057 client := clientsetfake.NewSimpleClientset()
1058
1059 eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
1060
1061
1062 chanTimeout := 2 * time.Second
1063
1064 table := []struct {
1065 name string
1066 expectError error
1067 expectPodBind *v1.Binding
1068 expectAssumeCalled bool
1069 expectBindCalled bool
1070 eventReason string
1071 volumeBinderConfig *volumebinding.FakeVolumeBinderConfig
1072 }{
1073 {
1074 name: "all bound",
1075 volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
1076 AllBound: true,
1077 },
1078 expectAssumeCalled: true,
1079 expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "node1"}},
1080 eventReason: "Scheduled",
1081 },
1082 {
1083 name: "bound/invalid pv affinity",
1084 volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
1085 AllBound: true,
1086 FindReasons: volumebinding.ConflictReasons{volumebinding.ErrReasonNodeConflict},
1087 },
1088 eventReason: "FailedScheduling",
1089 expectError: makePredicateError("1 node(s) had volume node affinity conflict"),
1090 },
1091 {
1092 name: "unbound/no matches",
1093 volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
1094 FindReasons: volumebinding.ConflictReasons{volumebinding.ErrReasonBindConflict},
1095 },
1096 eventReason: "FailedScheduling",
1097 expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind"),
1098 },
1099 {
1100 name: "bound and unbound unsatisfied",
1101 volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
1102 FindReasons: volumebinding.ConflictReasons{volumebinding.ErrReasonBindConflict, volumebinding.ErrReasonNodeConflict},
1103 },
1104 eventReason: "FailedScheduling",
1105 expectError: makePredicateError("1 node(s) didn't find available persistent volumes to bind, 1 node(s) had volume node affinity conflict"),
1106 },
1107 {
1108 name: "unbound/found matches/bind succeeds",
1109 volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{},
1110 expectAssumeCalled: true,
1111 expectBindCalled: true,
1112 expectPodBind: &v1.Binding{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "foo-ns", UID: types.UID("foo")}, Target: v1.ObjectReference{Kind: "Node", Name: "node1"}},
1113 eventReason: "Scheduled",
1114 },
1115 {
1116 name: "predicate error",
1117 volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
1118 FindErr: findErr,
1119 },
1120 eventReason: "FailedScheduling",
1121 expectError: fmt.Errorf("running %q filter plugin: %v", volumebinding.Name, findErr),
1122 },
1123 {
1124 name: "assume error",
1125 volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
1126 AssumeErr: assumeErr,
1127 },
1128 expectAssumeCalled: true,
1129 eventReason: "FailedScheduling",
1130 expectError: fmt.Errorf("running Reserve plugin %q: %w", volumebinding.Name, assumeErr),
1131 },
1132 {
1133 name: "bind error",
1134 volumeBinderConfig: &volumebinding.FakeVolumeBinderConfig{
1135 BindErr: bindErr,
1136 },
1137 expectAssumeCalled: true,
1138 expectBindCalled: true,
1139 eventReason: "FailedScheduling",
1140 expectError: fmt.Errorf("running PreBind plugin %q: %w", volumebinding.Name, bindErr),
1141 },
1142 }
1143
1144 for _, item := range table {
1145 t.Run(item.name, func(t *testing.T) {
1146 ctx, cancel := context.WithCancel(context.Background())
1147 defer cancel()
1148 fakeVolumeBinder := volumebinding.NewFakeVolumeBinder(item.volumeBinderConfig)
1149 s, bindingChan, errChan := setupTestSchedulerWithVolumeBinding(ctx, t, fakeVolumeBinder, eventBroadcaster)
1150 eventChan := make(chan struct{})
1151 stopFunc, err := eventBroadcaster.StartEventWatcher(func(obj runtime.Object) {
1152 e, _ := obj.(*eventsv1.Event)
1153 if e, a := item.eventReason, e.Reason; e != a {
1154 t.Errorf("expected %v, got %v", e, a)
1155 }
1156 close(eventChan)
1157 })
1158 if err != nil {
1159 t.Fatal(err)
1160 }
1161 s.ScheduleOne(ctx)
1162
1163 select {
1164 case <-eventChan:
1165 case <-time.After(wait.ForeverTestTimeout):
1166 t.Fatalf("scheduling timeout after %v", wait.ForeverTestTimeout)
1167 }
1168 stopFunc()
1169
1170 var (
1171 gotErr error
1172 gotBind *v1.Binding
1173 )
1174 select {
1175 case gotErr = <-errChan:
1176 case gotBind = <-bindingChan:
1177 case <-time.After(chanTimeout):
1178 t.Fatalf("did not receive pod binding or error after %v", chanTimeout)
1179 }
1180 if item.expectError != nil {
1181 if gotErr == nil || item.expectError.Error() != gotErr.Error() {
1182 t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectError, gotErr)
1183 }
1184 } else if gotErr != nil {
1185 t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectError, gotErr)
1186 }
1187 if !cmp.Equal(item.expectPodBind, gotBind) {
1188 t.Errorf("err \nWANT=%+v,\nGOT=%+v", item.expectPodBind, gotBind)
1189 }
1190
1191 if item.expectAssumeCalled != fakeVolumeBinder.AssumeCalled {
1192 t.Errorf("expectedAssumeCall %v", item.expectAssumeCalled)
1193 }
1194
1195 if item.expectBindCalled != fakeVolumeBinder.BindCalled {
1196 t.Errorf("expectedBindCall %v", item.expectBindCalled)
1197 }
1198 })
1199 }
1200 }
1201
1202 func TestSchedulerBinding(t *testing.T) {
1203 table := []struct {
1204 podName string
1205 extenders []framework.Extender
1206 wantBinderID int
1207 name string
1208 }{
1209 {
1210 name: "the extender is not a binder",
1211 podName: "pod0",
1212 extenders: []framework.Extender{
1213 &fakeExtender{isBinder: false, interestedPodName: "pod0"},
1214 },
1215 wantBinderID: -1,
1216 },
1217 {
1218 name: "one of the extenders is a binder and interested in pod",
1219 podName: "pod0",
1220 extenders: []framework.Extender{
1221 &fakeExtender{isBinder: false, interestedPodName: "pod0"},
1222 &fakeExtender{isBinder: true, interestedPodName: "pod0"},
1223 },
1224 wantBinderID: 1,
1225 },
1226 {
1227 name: "one of the extenders is a binder, but not interested in pod",
1228 podName: "pod1",
1229 extenders: []framework.Extender{
1230 &fakeExtender{isBinder: false, interestedPodName: "pod1"},
1231 &fakeExtender{isBinder: true, interestedPodName: "pod0"},
1232 },
1233 wantBinderID: -1,
1234 },
1235 {
1236 name: "ignore when extender bind failed",
1237 podName: "pod1",
1238 extenders: []framework.Extender{
1239 &fakeExtender{isBinder: true, errBind: true, interestedPodName: "pod1", ignorable: true},
1240 },
1241 wantBinderID: -1,
1242 },
1243 }
1244
1245 for _, test := range table {
1246 t.Run(test.name, func(t *testing.T) {
1247 pod := st.MakePod().Name(test.podName).Obj()
1248 defaultBound := false
1249 client := clientsetfake.NewSimpleClientset(pod)
1250 client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
1251 if action.GetSubresource() == "binding" {
1252 defaultBound = true
1253 }
1254 return false, nil, nil
1255 })
1256 _, ctx := ktesting.NewTestContext(t)
1257 ctx, cancel := context.WithCancel(ctx)
1258 defer cancel()
1259 fwk, err := tf.NewFramework(ctx,
1260 []tf.RegisterPluginFunc{
1261 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1262 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1263 }, "", frameworkruntime.WithClientSet(client), frameworkruntime.WithEventRecorder(&events.FakeRecorder{}))
1264 if err != nil {
1265 t.Fatal(err)
1266 }
1267 sched := &Scheduler{
1268 Extenders: test.extenders,
1269 Cache: internalcache.New(ctx, 100*time.Millisecond),
1270 nodeInfoSnapshot: nil,
1271 percentageOfNodesToScore: 0,
1272 }
1273 status := sched.bind(ctx, fwk, pod, "node", nil)
1274 if !status.IsSuccess() {
1275 t.Error(status.AsError())
1276 }
1277
1278
1279 if wantBound := test.wantBinderID == -1; defaultBound != wantBound {
1280 t.Errorf("got bound with default binding: %v, want %v", defaultBound, wantBound)
1281 }
1282
1283
1284 for i, ext := range test.extenders {
1285 wantBound := i == test.wantBinderID
1286 if gotBound := ext.(*fakeExtender).gotBind; gotBound != wantBound {
1287 t.Errorf("got bound with extender #%d: %v, want %v", i, gotBound, wantBound)
1288 }
1289 }
1290
1291 })
1292 }
1293 }
1294
1295 func TestUpdatePod(t *testing.T) {
1296 tests := []struct {
1297 name string
1298 currentPodConditions []v1.PodCondition
1299 newPodCondition *v1.PodCondition
1300 currentNominatedNodeName string
1301 newNominatingInfo *framework.NominatingInfo
1302 expectedPatchRequests int
1303 expectedPatchDataPattern string
1304 }{
1305 {
1306 name: "Should make patch request to add pod condition when there are none currently",
1307 currentPodConditions: []v1.PodCondition{},
1308 newPodCondition: &v1.PodCondition{
1309 Type: "newType",
1310 Status: "newStatus",
1311 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
1312 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
1313 Reason: "newReason",
1314 Message: "newMessage",
1315 },
1316 expectedPatchRequests: 1,
1317 expectedPatchDataPattern: `{"status":{"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`,
1318 },
1319 {
1320 name: "Should make patch request to add a new pod condition when there is already one with another type",
1321 currentPodConditions: []v1.PodCondition{
1322 {
1323 Type: "someOtherType",
1324 Status: "someOtherTypeStatus",
1325 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 11, 0, 0, 0, 0, time.UTC)),
1326 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 10, 0, 0, 0, 0, time.UTC)),
1327 Reason: "someOtherTypeReason",
1328 Message: "someOtherTypeMessage",
1329 },
1330 },
1331 newPodCondition: &v1.PodCondition{
1332 Type: "newType",
1333 Status: "newStatus",
1334 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
1335 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
1336 Reason: "newReason",
1337 Message: "newMessage",
1338 },
1339 expectedPatchRequests: 1,
1340 expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"someOtherType"},{"type":"newType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"newType"}]}}`,
1341 },
1342 {
1343 name: "Should make patch request to update an existing pod condition",
1344 currentPodConditions: []v1.PodCondition{
1345 {
1346 Type: "currentType",
1347 Status: "currentStatus",
1348 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
1349 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
1350 Reason: "currentReason",
1351 Message: "currentMessage",
1352 },
1353 },
1354 newPodCondition: &v1.PodCondition{
1355 Type: "currentType",
1356 Status: "newStatus",
1357 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
1358 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 1, 1, 1, 1, time.UTC)),
1359 Reason: "newReason",
1360 Message: "newMessage",
1361 },
1362 expectedPatchRequests: 1,
1363 expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","lastTransitionTime":".*","message":"newMessage","reason":"newReason","status":"newStatus","type":"currentType"}]}}`,
1364 },
1365 {
1366 name: "Should make patch request to update an existing pod condition, but the transition time should remain unchanged because the status is the same",
1367 currentPodConditions: []v1.PodCondition{
1368 {
1369 Type: "currentType",
1370 Status: "currentStatus",
1371 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
1372 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
1373 Reason: "currentReason",
1374 Message: "currentMessage",
1375 },
1376 },
1377 newPodCondition: &v1.PodCondition{
1378 Type: "currentType",
1379 Status: "currentStatus",
1380 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 1, 1, 1, 1, time.UTC)),
1381 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
1382 Reason: "newReason",
1383 Message: "newMessage",
1384 },
1385 expectedPatchRequests: 1,
1386 expectedPatchDataPattern: `{"status":{"\$setElementOrder/conditions":\[{"type":"currentType"}],"conditions":\[{"lastProbeTime":"2020-05-13T01:01:01Z","message":"newMessage","reason":"newReason","type":"currentType"}]}}`,
1387 },
1388 {
1389 name: "Should not make patch request if pod condition already exists and is identical and nominated node name is not set",
1390 currentPodConditions: []v1.PodCondition{
1391 {
1392 Type: "currentType",
1393 Status: "currentStatus",
1394 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
1395 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
1396 Reason: "currentReason",
1397 Message: "currentMessage",
1398 },
1399 },
1400 newPodCondition: &v1.PodCondition{
1401 Type: "currentType",
1402 Status: "currentStatus",
1403 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
1404 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
1405 Reason: "currentReason",
1406 Message: "currentMessage",
1407 },
1408 currentNominatedNodeName: "node1",
1409 expectedPatchRequests: 0,
1410 },
1411 {
1412 name: "Should make patch request if pod condition already exists and is identical but nominated node name is set and different",
1413 currentPodConditions: []v1.PodCondition{
1414 {
1415 Type: "currentType",
1416 Status: "currentStatus",
1417 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
1418 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
1419 Reason: "currentReason",
1420 Message: "currentMessage",
1421 },
1422 },
1423 newPodCondition: &v1.PodCondition{
1424 Type: "currentType",
1425 Status: "currentStatus",
1426 LastProbeTime: metav1.NewTime(time.Date(2020, 5, 13, 0, 0, 0, 0, time.UTC)),
1427 LastTransitionTime: metav1.NewTime(time.Date(2020, 5, 12, 0, 0, 0, 0, time.UTC)),
1428 Reason: "currentReason",
1429 Message: "currentMessage",
1430 },
1431 newNominatingInfo: &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "node1"},
1432 expectedPatchRequests: 1,
1433 expectedPatchDataPattern: `{"status":{"nominatedNodeName":"node1"}}`,
1434 },
1435 }
1436 for _, test := range tests {
1437 t.Run(test.name, func(t *testing.T) {
1438 actualPatchRequests := 0
1439 var actualPatchData string
1440 cs := &clientsetfake.Clientset{}
1441 cs.AddReactor("patch", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
1442 actualPatchRequests++
1443 patch := action.(clienttesting.PatchAction)
1444 actualPatchData = string(patch.GetPatch())
1445
1446
1447 return true, &v1.Pod{}, nil
1448 })
1449
1450 pod := st.MakePod().Name("foo").NominatedNodeName(test.currentNominatedNodeName).Conditions(test.currentPodConditions).Obj()
1451
1452 ctx, cancel := context.WithCancel(context.Background())
1453 defer cancel()
1454 if err := updatePod(ctx, cs, pod, test.newPodCondition, test.newNominatingInfo); err != nil {
1455 t.Fatalf("Error calling update: %v", err)
1456 }
1457
1458 if actualPatchRequests != test.expectedPatchRequests {
1459 t.Fatalf("Actual patch requests (%d) does not equal expected patch requests (%d), actual patch data: %v", actualPatchRequests, test.expectedPatchRequests, actualPatchData)
1460 }
1461
1462 regex, err := regexp.Compile(test.expectedPatchDataPattern)
1463 if err != nil {
1464 t.Fatalf("Error compiling regexp for %v: %v", test.expectedPatchDataPattern, err)
1465 }
1466
1467 if test.expectedPatchRequests > 0 && !regex.MatchString(actualPatchData) {
1468 t.Fatalf("Patch data mismatch: Actual was %v, but expected to match regexp %v", actualPatchData, test.expectedPatchDataPattern)
1469 }
1470 })
1471 }
1472 }
1473
1474 func Test_SelectHost(t *testing.T) {
1475 tests := []struct {
1476 name string
1477 list []framework.NodePluginScores
1478 topNodesCnt int
1479 possibleNodes sets.Set[string]
1480 possibleNodeLists [][]framework.NodePluginScores
1481 wantError error
1482 }{
1483 {
1484 name: "unique properly ordered scores",
1485 list: []framework.NodePluginScores{
1486 {Name: "node1", TotalScore: 1},
1487 {Name: "node2", TotalScore: 2},
1488 },
1489 topNodesCnt: 2,
1490 possibleNodes: sets.New("node2"),
1491 possibleNodeLists: [][]framework.NodePluginScores{
1492 {
1493 {Name: "node2", TotalScore: 2},
1494 {Name: "node1", TotalScore: 1},
1495 },
1496 },
1497 },
1498 {
1499 name: "numberOfNodeScoresToReturn > len(list)",
1500 list: []framework.NodePluginScores{
1501 {Name: "node1", TotalScore: 1},
1502 {Name: "node2", TotalScore: 2},
1503 },
1504 topNodesCnt: 100,
1505 possibleNodes: sets.New("node2"),
1506 possibleNodeLists: [][]framework.NodePluginScores{
1507 {
1508 {Name: "node2", TotalScore: 2},
1509 {Name: "node1", TotalScore: 1},
1510 },
1511 },
1512 },
1513 {
1514 name: "equal scores",
1515 list: []framework.NodePluginScores{
1516 {Name: "node2.1", TotalScore: 2},
1517 {Name: "node2.2", TotalScore: 2},
1518 {Name: "node2.3", TotalScore: 2},
1519 },
1520 topNodesCnt: 2,
1521 possibleNodes: sets.New("node2.1", "node2.2", "node2.3"),
1522 possibleNodeLists: [][]framework.NodePluginScores{
1523 {
1524 {Name: "node2.1", TotalScore: 2},
1525 {Name: "node2.2", TotalScore: 2},
1526 },
1527 {
1528 {Name: "node2.1", TotalScore: 2},
1529 {Name: "node2.3", TotalScore: 2},
1530 },
1531 {
1532 {Name: "node2.2", TotalScore: 2},
1533 {Name: "node2.1", TotalScore: 2},
1534 },
1535 {
1536 {Name: "node2.2", TotalScore: 2},
1537 {Name: "node2.3", TotalScore: 2},
1538 },
1539 {
1540 {Name: "node2.3", TotalScore: 2},
1541 {Name: "node2.1", TotalScore: 2},
1542 },
1543 {
1544 {Name: "node2.3", TotalScore: 2},
1545 {Name: "node2.2", TotalScore: 2},
1546 },
1547 },
1548 },
1549 {
1550 name: "out of order scores",
1551 list: []framework.NodePluginScores{
1552 {Name: "node3.1", TotalScore: 3},
1553 {Name: "node2.1", TotalScore: 2},
1554 {Name: "node1.1", TotalScore: 1},
1555 {Name: "node3.2", TotalScore: 3},
1556 },
1557 topNodesCnt: 3,
1558 possibleNodes: sets.New("node3.1", "node3.2"),
1559 possibleNodeLists: [][]framework.NodePluginScores{
1560 {
1561 {Name: "node3.1", TotalScore: 3},
1562 {Name: "node3.2", TotalScore: 3},
1563 {Name: "node2.1", TotalScore: 2},
1564 },
1565 {
1566 {Name: "node3.2", TotalScore: 3},
1567 {Name: "node3.1", TotalScore: 3},
1568 {Name: "node2.1", TotalScore: 2},
1569 },
1570 },
1571 },
1572 {
1573 name: "empty priority list",
1574 list: []framework.NodePluginScores{},
1575 possibleNodes: sets.Set[string]{},
1576 wantError: errEmptyPriorityList,
1577 },
1578 }
1579
1580 for _, test := range tests {
1581 t.Run(test.name, func(t *testing.T) {
1582
1583 for i := 0; i < 10; i++ {
1584 got, scoreList, err := selectHost(test.list, test.topNodesCnt)
1585 if err != test.wantError {
1586 t.Fatalf("unexpected error is returned from selectHost: got: %v want: %v", err, test.wantError)
1587 }
1588 if test.possibleNodes.Len() == 0 {
1589 if got != "" {
1590 t.Fatalf("expected nothing returned as selected Node, but actually %s is returned from selectHost", got)
1591 }
1592 return
1593 }
1594 if !test.possibleNodes.Has(got) {
1595 t.Errorf("got %s is not in the possible map %v", got, test.possibleNodes)
1596 }
1597 if got != scoreList[0].Name {
1598 t.Errorf("The head of list should be the selected Node's score: got: %v, expected: %v", scoreList[0], got)
1599 }
1600 for _, list := range test.possibleNodeLists {
1601 if cmp.Equal(list, scoreList) {
1602 return
1603 }
1604 }
1605 t.Errorf("Unexpected scoreList: %v", scoreList)
1606 }
1607 })
1608 }
1609 }
1610
1611 func TestFindNodesThatPassExtenders(t *testing.T) {
1612 tests := []struct {
1613 name string
1614 extenders []tf.FakeExtender
1615 nodes []*v1.Node
1616 filteredNodesStatuses framework.NodeToStatusMap
1617 expectsErr bool
1618 expectedNodes []*v1.Node
1619 expectedStatuses framework.NodeToStatusMap
1620 }{
1621 {
1622 name: "error",
1623 extenders: []tf.FakeExtender{
1624 {
1625 ExtenderName: "FakeExtender1",
1626 Predicates: []tf.FitPredicate{tf.ErrorPredicateExtender},
1627 },
1628 },
1629 nodes: makeNodeList([]string{"a"}),
1630 filteredNodesStatuses: make(framework.NodeToStatusMap),
1631 expectsErr: true,
1632 },
1633 {
1634 name: "success",
1635 extenders: []tf.FakeExtender{
1636 {
1637 ExtenderName: "FakeExtender1",
1638 Predicates: []tf.FitPredicate{tf.TruePredicateExtender},
1639 },
1640 },
1641 nodes: makeNodeList([]string{"a"}),
1642 filteredNodesStatuses: make(framework.NodeToStatusMap),
1643 expectsErr: false,
1644 expectedNodes: makeNodeList([]string{"a"}),
1645 expectedStatuses: make(framework.NodeToStatusMap),
1646 },
1647 {
1648 name: "unschedulable",
1649 extenders: []tf.FakeExtender{
1650 {
1651 ExtenderName: "FakeExtender1",
1652 Predicates: []tf.FitPredicate{func(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
1653 if node.Node().Name == "a" {
1654 return framework.NewStatus(framework.Success)
1655 }
1656 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
1657 }},
1658 },
1659 },
1660 nodes: makeNodeList([]string{"a", "b"}),
1661 filteredNodesStatuses: make(framework.NodeToStatusMap),
1662 expectsErr: false,
1663 expectedNodes: makeNodeList([]string{"a"}),
1664 expectedStatuses: framework.NodeToStatusMap{
1665 "b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
1666 },
1667 },
1668 {
1669 name: "unschedulable and unresolvable",
1670 extenders: []tf.FakeExtender{
1671 {
1672 ExtenderName: "FakeExtender1",
1673 Predicates: []tf.FitPredicate{func(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
1674 if node.Node().Name == "a" {
1675 return framework.NewStatus(framework.Success)
1676 }
1677 if node.Node().Name == "b" {
1678 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
1679 }
1680 return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
1681 }},
1682 },
1683 },
1684 nodes: makeNodeList([]string{"a", "b", "c"}),
1685 filteredNodesStatuses: make(framework.NodeToStatusMap),
1686 expectsErr: false,
1687 expectedNodes: makeNodeList([]string{"a"}),
1688 expectedStatuses: framework.NodeToStatusMap{
1689 "b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
1690 "c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
1691 },
1692 },
1693 {
1694 name: "extender may overwrite the statuses",
1695 extenders: []tf.FakeExtender{
1696 {
1697 ExtenderName: "FakeExtender1",
1698 Predicates: []tf.FitPredicate{func(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
1699 if node.Node().Name == "a" {
1700 return framework.NewStatus(framework.Success)
1701 }
1702 if node.Node().Name == "b" {
1703 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
1704 }
1705 return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
1706 }},
1707 },
1708 },
1709 nodes: makeNodeList([]string{"a", "b", "c"}),
1710 filteredNodesStatuses: framework.NodeToStatusMap{
1711 "c": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c")),
1712 },
1713 expectsErr: false,
1714 expectedNodes: makeNodeList([]string{"a"}),
1715 expectedStatuses: framework.NodeToStatusMap{
1716 "b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
1717 "c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeFilterPlugin: node %q failed", "c"), fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
1718 },
1719 },
1720 {
1721 name: "multiple extenders",
1722 extenders: []tf.FakeExtender{
1723 {
1724 ExtenderName: "FakeExtender1",
1725 Predicates: []tf.FitPredicate{func(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
1726 if node.Node().Name == "a" {
1727 return framework.NewStatus(framework.Success)
1728 }
1729 if node.Node().Name == "b" {
1730 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
1731 }
1732 return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
1733 }},
1734 },
1735 {
1736 ExtenderName: "FakeExtender1",
1737 Predicates: []tf.FitPredicate{func(pod *v1.Pod, node *framework.NodeInfo) *framework.Status {
1738 if node.Node().Name == "a" {
1739 return framework.NewStatus(framework.Success)
1740 }
1741 return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Node().Name))
1742 }},
1743 },
1744 },
1745 nodes: makeNodeList([]string{"a", "b", "c"}),
1746 filteredNodesStatuses: make(framework.NodeToStatusMap),
1747 expectsErr: false,
1748 expectedNodes: makeNodeList([]string{"a"}),
1749 expectedStatuses: framework.NodeToStatusMap{
1750 "b": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("FakeExtender: node %q failed", "b")),
1751 "c": framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("FakeExtender: node %q failed and unresolvable", "c")),
1752 },
1753 },
1754 }
1755
1756 cmpOpts := []cmp.Option{
1757 cmp.Comparer(func(s1 framework.Status, s2 framework.Status) bool {
1758 return s1.Code() == s2.Code() && reflect.DeepEqual(s1.Reasons(), s2.Reasons())
1759 }),
1760 }
1761
1762 for _, tt := range tests {
1763 t.Run(tt.name, func(t *testing.T) {
1764 _, ctx := ktesting.NewTestContext(t)
1765 var extenders []framework.Extender
1766 for ii := range tt.extenders {
1767 extenders = append(extenders, &tt.extenders[ii])
1768 }
1769
1770 pod := st.MakePod().Name("1").UID("1").Obj()
1771 got, err := findNodesThatPassExtenders(ctx, extenders, pod, tf.BuildNodeInfos(tt.nodes), tt.filteredNodesStatuses)
1772 nodes := make([]*v1.Node, len(got))
1773 for i := 0; i < len(got); i++ {
1774 nodes[i] = got[i].Node()
1775 }
1776 if tt.expectsErr {
1777 if err == nil {
1778 t.Error("Unexpected non-error")
1779 }
1780 } else {
1781 if err != nil {
1782 t.Errorf("Unexpected error: %v", err)
1783 }
1784 if diff := cmp.Diff(tt.expectedNodes, nodes); diff != "" {
1785 t.Errorf("filtered nodes (-want,+got):\n%s", diff)
1786 }
1787 if diff := cmp.Diff(tt.expectedStatuses, tt.filteredNodesStatuses, cmpOpts...); diff != "" {
1788 t.Errorf("filtered statuses (-want,+got):\n%s", diff)
1789 }
1790 }
1791 })
1792 }
1793 }
1794
1795 func TestSchedulerSchedulePod(t *testing.T) {
1796 fts := feature.Features{}
1797 tests := []struct {
1798 name string
1799 registerPlugins []tf.RegisterPluginFunc
1800 extenders []tf.FakeExtender
1801 nodes []string
1802 pvcs []v1.PersistentVolumeClaim
1803 pod *v1.Pod
1804 pods []*v1.Pod
1805 wantNodes sets.Set[string]
1806 wantEvaluatedNodes *int32
1807 wErr error
1808 }{
1809 {
1810 registerPlugins: []tf.RegisterPluginFunc{
1811 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1812 tf.RegisterFilterPlugin("FalseFilter", tf.NewFalseFilterPlugin),
1813 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1814 },
1815 nodes: []string{"node1", "node2"},
1816 pod: st.MakePod().Name("2").UID("2").Obj(),
1817 name: "test 1",
1818 wErr: &framework.FitError{
1819 Pod: st.MakePod().Name("2").UID("2").Obj(),
1820 NumAllNodes: 2,
1821 Diagnosis: framework.Diagnosis{
1822 NodeToStatusMap: framework.NodeToStatusMap{
1823 "node1": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
1824 "node2": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
1825 },
1826 UnschedulablePlugins: sets.New("FalseFilter"),
1827 },
1828 },
1829 },
1830 {
1831 registerPlugins: []tf.RegisterPluginFunc{
1832 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1833 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
1834 tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
1835 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1836 },
1837 nodes: []string{"node1", "node2"},
1838 pod: st.MakePod().Name("ignore").UID("ignore").Obj(),
1839 wantNodes: sets.New("node1", "node2"),
1840 name: "test 2",
1841 wErr: nil,
1842 },
1843 {
1844
1845 registerPlugins: []tf.RegisterPluginFunc{
1846 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1847 tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
1848 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1849 },
1850 nodes: []string{"node1", "node2"},
1851 pod: st.MakePod().Name("node2").UID("node2").Obj(),
1852 wantNodes: sets.New("node2"),
1853 name: "test 3",
1854 wErr: nil,
1855 },
1856 {
1857 registerPlugins: []tf.RegisterPluginFunc{
1858 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1859 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
1860 tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
1861 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1862 },
1863 nodes: []string{"3", "2", "1"},
1864 pod: st.MakePod().Name("ignore").UID("ignore").Obj(),
1865 wantNodes: sets.New("3"),
1866 name: "test 4",
1867 wErr: nil,
1868 },
1869 {
1870 registerPlugins: []tf.RegisterPluginFunc{
1871 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1872 tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
1873 tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
1874 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1875 },
1876 nodes: []string{"3", "2", "1"},
1877 pod: st.MakePod().Name("2").UID("2").Obj(),
1878 wantNodes: sets.New("2"),
1879 name: "test 5",
1880 wErr: nil,
1881 },
1882 {
1883 registerPlugins: []tf.RegisterPluginFunc{
1884 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1885 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
1886 tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
1887 tf.RegisterScorePlugin("ReverseNumericMap", newReverseNumericMapPlugin(), 2),
1888 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1889 },
1890 nodes: []string{"3", "2", "1"},
1891 pod: st.MakePod().Name("2").UID("2").Obj(),
1892 wantNodes: sets.New("1"),
1893 name: "test 6",
1894 wErr: nil,
1895 },
1896 {
1897 registerPlugins: []tf.RegisterPluginFunc{
1898 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1899 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
1900 tf.RegisterFilterPlugin("FalseFilter", tf.NewFalseFilterPlugin),
1901 tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
1902 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1903 },
1904 nodes: []string{"3", "2", "1"},
1905 pod: st.MakePod().Name("2").UID("2").Obj(),
1906 name: "test 7",
1907 wErr: &framework.FitError{
1908 Pod: st.MakePod().Name("2").UID("2").Obj(),
1909 NumAllNodes: 3,
1910 Diagnosis: framework.Diagnosis{
1911 NodeToStatusMap: framework.NodeToStatusMap{
1912 "3": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
1913 "2": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
1914 "1": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("FalseFilter"),
1915 },
1916 UnschedulablePlugins: sets.New("FalseFilter"),
1917 },
1918 },
1919 },
1920 {
1921 registerPlugins: []tf.RegisterPluginFunc{
1922 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1923 tf.RegisterFilterPlugin("NoPodsFilter", NewNoPodsFilterPlugin),
1924 tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
1925 tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
1926 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1927 },
1928 pods: []*v1.Pod{
1929 st.MakePod().Name("2").UID("2").Node("2").Phase(v1.PodRunning).Obj(),
1930 },
1931 pod: st.MakePod().Name("2").UID("2").Obj(),
1932 nodes: []string{"1", "2"},
1933 name: "test 8",
1934 wErr: &framework.FitError{
1935 Pod: st.MakePod().Name("2").UID("2").Obj(),
1936 NumAllNodes: 2,
1937 Diagnosis: framework.Diagnosis{
1938 NodeToStatusMap: framework.NodeToStatusMap{
1939 "1": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
1940 "2": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("NoPodsFilter"),
1941 },
1942 UnschedulablePlugins: sets.New("MatchFilter", "NoPodsFilter"),
1943 },
1944 },
1945 },
1946 {
1947
1948 registerPlugins: []tf.RegisterPluginFunc{
1949 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1950 tf.RegisterPreFilterPlugin(volumebinding.Name, frameworkruntime.FactoryAdapter(fts, volumebinding.New)),
1951 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
1952 tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
1953 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1954 },
1955 nodes: []string{"node1", "node2"},
1956 pvcs: []v1.PersistentVolumeClaim{
1957 {
1958 ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault},
1959 Spec: v1.PersistentVolumeClaimSpec{VolumeName: "existingPV"},
1960 },
1961 },
1962 pod: st.MakePod().Name("ignore").UID("ignore").Namespace(v1.NamespaceDefault).PVC("existingPVC").Obj(),
1963 wantNodes: sets.New("node1", "node2"),
1964 name: "existing PVC",
1965 wErr: nil,
1966 },
1967 {
1968
1969 registerPlugins: []tf.RegisterPluginFunc{
1970 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1971 tf.RegisterPreFilterPlugin(volumebinding.Name, frameworkruntime.FactoryAdapter(fts, volumebinding.New)),
1972 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
1973 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1974 },
1975 nodes: []string{"node1", "node2"},
1976 pod: st.MakePod().Name("ignore").UID("ignore").PVC("unknownPVC").Obj(),
1977 name: "unknown PVC",
1978 wErr: &framework.FitError{
1979 Pod: st.MakePod().Name("ignore").UID("ignore").PVC("unknownPVC").Obj(),
1980 NumAllNodes: 2,
1981 Diagnosis: framework.Diagnosis{
1982 NodeToStatusMap: framework.NodeToStatusMap{
1983 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "unknownPVC" not found`).WithPlugin("VolumeBinding"),
1984 "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "unknownPVC" not found`).WithPlugin("VolumeBinding"),
1985 },
1986 PreFilterMsg: `persistentvolumeclaim "unknownPVC" not found`,
1987 UnschedulablePlugins: sets.New(volumebinding.Name),
1988 },
1989 },
1990 },
1991 {
1992
1993 registerPlugins: []tf.RegisterPluginFunc{
1994 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
1995 tf.RegisterPreFilterPlugin(volumebinding.Name, frameworkruntime.FactoryAdapter(fts, volumebinding.New)),
1996 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
1997 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
1998 },
1999 nodes: []string{"node1", "node2"},
2000 pvcs: []v1.PersistentVolumeClaim{{ObjectMeta: metav1.ObjectMeta{Name: "existingPVC", UID: types.UID("existingPVC"), Namespace: v1.NamespaceDefault, DeletionTimestamp: &metav1.Time{}}}},
2001 pod: st.MakePod().Name("ignore").UID("ignore").Namespace(v1.NamespaceDefault).PVC("existingPVC").Obj(),
2002 name: "deleted PVC",
2003 wErr: &framework.FitError{
2004 Pod: st.MakePod().Name("ignore").UID("ignore").Namespace(v1.NamespaceDefault).PVC("existingPVC").Obj(),
2005 NumAllNodes: 2,
2006 Diagnosis: framework.Diagnosis{
2007 NodeToStatusMap: framework.NodeToStatusMap{
2008 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "existingPVC" is being deleted`).WithPlugin("VolumeBinding"),
2009 "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, `persistentvolumeclaim "existingPVC" is being deleted`).WithPlugin("VolumeBinding"),
2010 },
2011 PreFilterMsg: `persistentvolumeclaim "existingPVC" is being deleted`,
2012 UnschedulablePlugins: sets.New(volumebinding.Name),
2013 },
2014 },
2015 },
2016 {
2017 registerPlugins: []tf.RegisterPluginFunc{
2018 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2019 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
2020 tf.RegisterScorePlugin("FalseMap", newFalseMapPlugin(), 1),
2021 tf.RegisterScorePlugin("TrueMap", newTrueMapPlugin(), 2),
2022 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2023 },
2024 nodes: []string{"2", "1"},
2025 pod: st.MakePod().Name("2").Obj(),
2026 name: "test error with priority map",
2027 wErr: fmt.Errorf("running Score plugins: %w", fmt.Errorf(`plugin "FalseMap" failed with: %w`, errPrioritize)),
2028 },
2029 {
2030 name: "test podtopologyspread plugin - 2 nodes with maxskew=1",
2031 registerPlugins: []tf.RegisterPluginFunc{
2032 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2033 tf.RegisterPluginAsExtensions(
2034 podtopologyspread.Name,
2035 podTopologySpreadFunc,
2036 "PreFilter",
2037 "Filter",
2038 ),
2039 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2040 },
2041 nodes: []string{"node1", "node2"},
2042 pod: st.MakePod().Name("p").UID("p").Label("foo", "").SpreadConstraint(1, "hostname", v1.DoNotSchedule, &metav1.LabelSelector{
2043 MatchExpressions: []metav1.LabelSelectorRequirement{
2044 {
2045 Key: "foo",
2046 Operator: metav1.LabelSelectorOpExists,
2047 },
2048 },
2049 }, nil, nil, nil, nil).Obj(),
2050 pods: []*v1.Pod{
2051 st.MakePod().Name("pod1").UID("pod1").Label("foo", "").Node("node1").Phase(v1.PodRunning).Obj(),
2052 },
2053 wantNodes: sets.New("node2"),
2054 wErr: nil,
2055 },
2056 {
2057 name: "test podtopologyspread plugin - 3 nodes with maxskew=2",
2058 registerPlugins: []tf.RegisterPluginFunc{
2059 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2060 tf.RegisterPluginAsExtensions(
2061 podtopologyspread.Name,
2062 podTopologySpreadFunc,
2063 "PreFilter",
2064 "Filter",
2065 ),
2066 tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
2067 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2068 },
2069 nodes: []string{"node1", "node2", "node3"},
2070 pod: st.MakePod().Name("p").UID("p").Label("foo", "").SpreadConstraint(2, "hostname", v1.DoNotSchedule, &metav1.LabelSelector{
2071 MatchExpressions: []metav1.LabelSelectorRequirement{
2072 {
2073 Key: "foo",
2074 Operator: metav1.LabelSelectorOpExists,
2075 },
2076 },
2077 }, nil, nil, nil, nil).Obj(),
2078 pods: []*v1.Pod{
2079 st.MakePod().Name("pod1a").UID("pod1a").Label("foo", "").Node("node1").Phase(v1.PodRunning).Obj(),
2080 st.MakePod().Name("pod1b").UID("pod1b").Label("foo", "").Node("node1").Phase(v1.PodRunning).Obj(),
2081 st.MakePod().Name("pod2").UID("pod2").Label("foo", "").Node("node2").Phase(v1.PodRunning).Obj(),
2082 },
2083 wantNodes: sets.New("node2", "node3"),
2084 wErr: nil,
2085 },
2086 {
2087 name: "test with filter plugin returning Unschedulable status",
2088 registerPlugins: []tf.RegisterPluginFunc{
2089 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2090 tf.RegisterFilterPlugin(
2091 "FakeFilter",
2092 tf.NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}),
2093 ),
2094 tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
2095 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2096 },
2097 nodes: []string{"3"},
2098 pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
2099 wantNodes: nil,
2100 wErr: &framework.FitError{
2101 Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
2102 NumAllNodes: 1,
2103 Diagnosis: framework.Diagnosis{
2104 NodeToStatusMap: framework.NodeToStatusMap{
2105 "3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
2106 },
2107 UnschedulablePlugins: sets.New("FakeFilter"),
2108 },
2109 },
2110 },
2111 {
2112 name: "test with extender which filters out some Nodes",
2113 registerPlugins: []tf.RegisterPluginFunc{
2114 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2115 tf.RegisterFilterPlugin(
2116 "FakeFilter",
2117 tf.NewFakeFilterPlugin(map[string]framework.Code{"3": framework.Unschedulable}),
2118 ),
2119 tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
2120 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2121 },
2122 extenders: []tf.FakeExtender{
2123 {
2124 ExtenderName: "FakeExtender1",
2125 Predicates: []tf.FitPredicate{tf.FalsePredicateExtender},
2126 },
2127 },
2128 nodes: []string{"1", "2", "3"},
2129 pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
2130 wantNodes: nil,
2131 wErr: &framework.FitError{
2132 Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
2133 NumAllNodes: 3,
2134 Diagnosis: framework.Diagnosis{
2135 NodeToStatusMap: framework.NodeToStatusMap{
2136 "1": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "1" failed`),
2137 "2": framework.NewStatus(framework.Unschedulable, `FakeExtender: node "2" failed`),
2138 "3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
2139 },
2140 UnschedulablePlugins: sets.New("FakeFilter", framework.ExtenderName),
2141 },
2142 },
2143 },
2144 {
2145 name: "test with filter plugin returning UnschedulableAndUnresolvable status",
2146 registerPlugins: []tf.RegisterPluginFunc{
2147 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2148 tf.RegisterFilterPlugin(
2149 "FakeFilter",
2150 tf.NewFakeFilterPlugin(map[string]framework.Code{"3": framework.UnschedulableAndUnresolvable}),
2151 ),
2152 tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
2153 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2154 },
2155 nodes: []string{"3"},
2156 pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
2157 wantNodes: nil,
2158 wErr: &framework.FitError{
2159 Pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
2160 NumAllNodes: 1,
2161 Diagnosis: framework.Diagnosis{
2162 NodeToStatusMap: framework.NodeToStatusMap{
2163 "3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injecting failure for pod test-filter").WithPlugin("FakeFilter"),
2164 },
2165 UnschedulablePlugins: sets.New("FakeFilter"),
2166 },
2167 },
2168 },
2169 {
2170 name: "test with partial failed filter plugin",
2171 registerPlugins: []tf.RegisterPluginFunc{
2172 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2173 tf.RegisterFilterPlugin(
2174 "FakeFilter",
2175 tf.NewFakeFilterPlugin(map[string]framework.Code{"1": framework.Unschedulable}),
2176 ),
2177 tf.RegisterScorePlugin("NumericMap", newNumericMapPlugin(), 1),
2178 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2179 },
2180 nodes: []string{"1", "2"},
2181 pod: st.MakePod().Name("test-filter").UID("test-filter").Obj(),
2182 wantNodes: nil,
2183 wErr: nil,
2184 },
2185 {
2186 name: "test prefilter plugin returning Unschedulable status",
2187 registerPlugins: []tf.RegisterPluginFunc{
2188 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2189 tf.RegisterPreFilterPlugin(
2190 "FakePreFilter",
2191 tf.NewFakePreFilterPlugin("FakePreFilter", nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status")),
2192 ),
2193 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2194 },
2195 nodes: []string{"1", "2"},
2196 pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2197 wantNodes: nil,
2198 wErr: &framework.FitError{
2199 Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2200 NumAllNodes: 2,
2201 Diagnosis: framework.Diagnosis{
2202 NodeToStatusMap: framework.NodeToStatusMap{
2203 "1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status").WithPlugin("FakePreFilter"),
2204 "2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "injected unschedulable status").WithPlugin("FakePreFilter"),
2205 },
2206 PreFilterMsg: "injected unschedulable status",
2207 UnschedulablePlugins: sets.New("FakePreFilter"),
2208 },
2209 },
2210 },
2211 {
2212 name: "test prefilter plugin returning error status",
2213 registerPlugins: []tf.RegisterPluginFunc{
2214 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2215 tf.RegisterPreFilterPlugin(
2216 "FakePreFilter",
2217 tf.NewFakePreFilterPlugin("FakePreFilter", nil, framework.NewStatus(framework.Error, "injected error status")),
2218 ),
2219 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2220 },
2221 nodes: []string{"1", "2"},
2222 pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2223 wantNodes: nil,
2224 wErr: fmt.Errorf(`running PreFilter plugin "FakePreFilter": %w`, errors.New("injected error status")),
2225 },
2226 {
2227 name: "test prefilter plugin returning node",
2228 registerPlugins: []tf.RegisterPluginFunc{
2229 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2230 tf.RegisterPreFilterPlugin(
2231 "FakePreFilter1",
2232 tf.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
2233 ),
2234 tf.RegisterPreFilterPlugin(
2235 "FakePreFilter2",
2236 tf.NewFakePreFilterPlugin("FakePreFilter2", &framework.PreFilterResult{NodeNames: sets.New("node2")}, nil),
2237 ),
2238 tf.RegisterPreFilterPlugin(
2239 "FakePreFilter3",
2240 tf.NewFakePreFilterPlugin("FakePreFilter3", &framework.PreFilterResult{NodeNames: sets.New("node1", "node2")}, nil),
2241 ),
2242 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2243 },
2244 nodes: []string{"node1", "node2", "node3"},
2245 pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2246 wantNodes: sets.New("node2"),
2247 wantEvaluatedNodes: ptr.To[int32](3),
2248 },
2249 {
2250 name: "test prefilter plugin returning non-intersecting nodes",
2251 registerPlugins: []tf.RegisterPluginFunc{
2252 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2253 tf.RegisterPreFilterPlugin(
2254 "FakePreFilter1",
2255 tf.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
2256 ),
2257 tf.RegisterPreFilterPlugin(
2258 "FakePreFilter2",
2259 tf.NewFakePreFilterPlugin("FakePreFilter2", &framework.PreFilterResult{NodeNames: sets.New("node2")}, nil),
2260 ),
2261 tf.RegisterPreFilterPlugin(
2262 "FakePreFilter3",
2263 tf.NewFakePreFilterPlugin("FakePreFilter3", &framework.PreFilterResult{NodeNames: sets.New("node1")}, nil),
2264 ),
2265 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2266 },
2267 nodes: []string{"node1", "node2", "node3"},
2268 pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2269 wErr: &framework.FitError{
2270 Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2271 NumAllNodes: 3,
2272 Diagnosis: framework.Diagnosis{
2273 NodeToStatusMap: framework.NodeToStatusMap{
2274 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
2275 "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
2276 "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously"),
2277 },
2278 UnschedulablePlugins: sets.Set[string]{},
2279 PreFilterMsg: "node(s) didn't satisfy plugin(s) [FakePreFilter2 FakePreFilter3] simultaneously",
2280 },
2281 },
2282 },
2283 {
2284 name: "test prefilter plugin returning empty node set",
2285 registerPlugins: []tf.RegisterPluginFunc{
2286 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2287 tf.RegisterPreFilterPlugin(
2288 "FakePreFilter1",
2289 tf.NewFakePreFilterPlugin("FakePreFilter1", nil, nil),
2290 ),
2291 tf.RegisterPreFilterPlugin(
2292 "FakePreFilter2",
2293 tf.NewFakePreFilterPlugin("FakePreFilter2", &framework.PreFilterResult{NodeNames: sets.New[string]()}, nil),
2294 ),
2295 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2296 },
2297 nodes: []string{"node1"},
2298 pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2299 wErr: &framework.FitError{
2300 Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2301 NumAllNodes: 1,
2302 Diagnosis: framework.Diagnosis{
2303 NodeToStatusMap: framework.NodeToStatusMap{
2304 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) didn't satisfy plugin FakePreFilter2"),
2305 },
2306 UnschedulablePlugins: sets.Set[string]{},
2307 PreFilterMsg: "node(s) didn't satisfy plugin FakePreFilter2",
2308 },
2309 },
2310 },
2311 {
2312 name: "test some nodes are filtered out by prefilter plugin and other are filtered out by filter plugin",
2313 registerPlugins: []tf.RegisterPluginFunc{
2314 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2315 tf.RegisterPreFilterPlugin(
2316 "FakePreFilter",
2317 tf.NewFakePreFilterPlugin("FakePreFilter", &framework.PreFilterResult{NodeNames: sets.New[string]("node2")}, nil),
2318 ),
2319 tf.RegisterFilterPlugin(
2320 "FakeFilter",
2321 tf.NewFakeFilterPlugin(map[string]framework.Code{"node2": framework.Unschedulable}),
2322 ),
2323 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2324 },
2325 nodes: []string{"node1", "node2"},
2326 pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2327 wErr: &framework.FitError{
2328 Pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2329 NumAllNodes: 2,
2330 Diagnosis: framework.Diagnosis{
2331 NodeToStatusMap: framework.NodeToStatusMap{
2332 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result"),
2333 "node2": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-prefilter").WithPlugin("FakeFilter"),
2334 },
2335 UnschedulablePlugins: sets.New("FakeFilter"),
2336 PreFilterMsg: "",
2337 },
2338 },
2339 },
2340 {
2341 name: "test prefilter plugin returning skip",
2342 registerPlugins: []tf.RegisterPluginFunc{
2343 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2344 tf.RegisterPreFilterPlugin(
2345 "FakePreFilter1",
2346 tf.NewFakePreFilterPlugin("FakeFilter1", nil, nil),
2347 ),
2348 tf.RegisterFilterPlugin(
2349 "FakeFilter1",
2350 tf.NewFakeFilterPlugin(map[string]framework.Code{
2351 "node1": framework.Unschedulable,
2352 }),
2353 ),
2354 tf.RegisterPluginAsExtensions("FakeFilter2", func(_ context.Context, configuration runtime.Object, f framework.Handle) (framework.Plugin, error) {
2355 return tf.FakePreFilterAndFilterPlugin{
2356 FakePreFilterPlugin: &tf.FakePreFilterPlugin{
2357 Result: nil,
2358 Status: framework.NewStatus(framework.Skip),
2359 },
2360 FakeFilterPlugin: &tf.FakeFilterPlugin{
2361
2362
2363 FailedNodeReturnCodeMap: map[string]framework.Code{
2364 "node1": framework.Error, "node2": framework.Error, "node3": framework.Error,
2365 },
2366 },
2367 }, nil
2368 }, "PreFilter", "Filter"),
2369 tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
2370 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2371 },
2372 nodes: []string{"node1", "node2", "node3"},
2373 pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2374 wantNodes: sets.New("node2", "node3"),
2375 wantEvaluatedNodes: ptr.To[int32](3),
2376 },
2377 {
2378 name: "test all prescore plugins return skip",
2379 registerPlugins: []tf.RegisterPluginFunc{
2380 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2381 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
2382 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2383 tf.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", tf.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0,
2384 framework.NewStatus(framework.Skip, "fake skip"),
2385 framework.NewStatus(framework.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"),
2386 ), "PreScore", "Score"),
2387 },
2388 nodes: []string{"node1", "node2"},
2389 pod: st.MakePod().Name("ignore").UID("ignore").Obj(),
2390 wantNodes: sets.New("node1", "node2"),
2391 },
2392 {
2393 name: "test without score plugin no extra nodes are evaluated",
2394 registerPlugins: []tf.RegisterPluginFunc{
2395 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2396 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
2397 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2398 },
2399 nodes: []string{"node1", "node2", "node3"},
2400 pod: st.MakePod().Name("pod1").UID("pod1").Obj(),
2401 wantNodes: sets.New("node1", "node2", "node3"),
2402 wantEvaluatedNodes: ptr.To[int32](1),
2403 },
2404 {
2405 name: "test no score plugin, prefilter plugin returning 2 nodes",
2406 registerPlugins: []tf.RegisterPluginFunc{
2407 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2408 tf.RegisterPreFilterPlugin(
2409 "FakePreFilter",
2410 tf.NewFakePreFilterPlugin("FakePreFilter", &framework.PreFilterResult{NodeNames: sets.New("node1", "node2")}, nil),
2411 ),
2412 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2413 },
2414 nodes: []string{"node1", "node2", "node3"},
2415 pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
2416 wantNodes: sets.New("node1", "node2"),
2417 wantEvaluatedNodes: ptr.To[int32](2),
2418 },
2419 }
2420 for _, test := range tests {
2421 t.Run(test.name, func(t *testing.T) {
2422 logger, ctx := ktesting.NewTestContext(t)
2423 ctx, cancel := context.WithCancel(ctx)
2424 defer cancel()
2425
2426 cache := internalcache.New(ctx, time.Duration(0))
2427 for _, pod := range test.pods {
2428 cache.AddPod(logger, pod)
2429 }
2430 var nodes []*v1.Node
2431 for _, name := range test.nodes {
2432 node := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{"hostname": name}}}
2433 nodes = append(nodes, node)
2434 cache.AddNode(logger, node)
2435 }
2436
2437 cs := clientsetfake.NewSimpleClientset()
2438 informerFactory := informers.NewSharedInformerFactory(cs, 0)
2439 for _, pvc := range test.pvcs {
2440 metav1.SetMetaDataAnnotation(&pvc.ObjectMeta, volume.AnnBindCompleted, "true")
2441 cs.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, &pvc, metav1.CreateOptions{})
2442 if pvName := pvc.Spec.VolumeName; pvName != "" {
2443 pv := v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: pvName}}
2444 cs.CoreV1().PersistentVolumes().Create(ctx, &pv, metav1.CreateOptions{})
2445 }
2446 }
2447 snapshot := internalcache.NewSnapshot(test.pods, nodes)
2448 fwk, err := tf.NewFramework(
2449 ctx,
2450 test.registerPlugins, "",
2451 frameworkruntime.WithSnapshotSharedLister(snapshot),
2452 frameworkruntime.WithInformerFactory(informerFactory),
2453 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
2454 )
2455 if err != nil {
2456 t.Fatal(err)
2457 }
2458
2459 var extenders []framework.Extender
2460 for ii := range test.extenders {
2461 extenders = append(extenders, &test.extenders[ii])
2462 }
2463 sched := &Scheduler{
2464 Cache: cache,
2465 nodeInfoSnapshot: snapshot,
2466 percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
2467 Extenders: extenders,
2468 }
2469 sched.applyDefaultHandlers()
2470
2471 informerFactory.Start(ctx.Done())
2472 informerFactory.WaitForCacheSync(ctx.Done())
2473
2474 result, err := sched.SchedulePod(ctx, fwk, framework.NewCycleState(), test.pod)
2475 if err != test.wErr {
2476 gotFitErr, gotOK := err.(*framework.FitError)
2477 wantFitErr, wantOK := test.wErr.(*framework.FitError)
2478 if gotOK != wantOK {
2479 t.Errorf("Expected err to be FitError: %v, but got %v (error: %v)", wantOK, gotOK, err)
2480 } else if gotOK {
2481 if diff := cmp.Diff(gotFitErr, wantFitErr); diff != "" {
2482 t.Errorf("Unexpected fitErr: (-want, +got): %s", diff)
2483 }
2484 }
2485 }
2486 if test.wantNodes != nil && !test.wantNodes.Has(result.SuggestedHost) {
2487 t.Errorf("Expected: %s, got: %s", test.wantNodes, result.SuggestedHost)
2488 }
2489 wantEvaluatedNodes := len(test.nodes)
2490 if test.wantEvaluatedNodes != nil {
2491 wantEvaluatedNodes = int(*test.wantEvaluatedNodes)
2492 }
2493 if test.wErr == nil && wantEvaluatedNodes != result.EvaluatedNodes {
2494 t.Errorf("Expected EvaluatedNodes: %d, got: %d", wantEvaluatedNodes, result.EvaluatedNodes)
2495 }
2496 })
2497 }
2498 }
2499
2500 func TestFindFitAllError(t *testing.T) {
2501 ctx, cancel := context.WithCancel(context.Background())
2502 defer cancel()
2503
2504 nodes := makeNodeList([]string{"3", "2", "1"})
2505 scheduler := makeScheduler(ctx, nodes)
2506
2507 fwk, err := tf.NewFramework(
2508 ctx,
2509 []tf.RegisterPluginFunc{
2510 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2511 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
2512 tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
2513 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2514 },
2515 "",
2516 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
2517 )
2518 if err != nil {
2519 t.Fatal(err)
2520 }
2521
2522 _, diagnosis, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), &v1.Pod{})
2523 if err != nil {
2524 t.Errorf("unexpected error: %v", err)
2525 }
2526
2527 expected := framework.Diagnosis{
2528 NodeToStatusMap: framework.NodeToStatusMap{
2529 "1": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
2530 "2": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
2531 "3": framework.NewStatus(framework.Unschedulable, tf.ErrReasonFake).WithPlugin("MatchFilter"),
2532 },
2533 UnschedulablePlugins: sets.New("MatchFilter"),
2534 }
2535 if diff := cmp.Diff(diagnosis, expected); diff != "" {
2536 t.Errorf("Unexpected diagnosis: (-want, +got): %s", diff)
2537 }
2538 }
2539
2540 func TestFindFitSomeError(t *testing.T) {
2541 ctx, cancel := context.WithCancel(context.Background())
2542 defer cancel()
2543
2544 nodes := makeNodeList([]string{"3", "2", "1"})
2545 scheduler := makeScheduler(ctx, nodes)
2546
2547 fwk, err := tf.NewFramework(
2548 ctx,
2549 []tf.RegisterPluginFunc{
2550 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2551 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
2552 tf.RegisterFilterPlugin("MatchFilter", tf.NewMatchFilterPlugin),
2553 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2554 },
2555 "",
2556 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
2557 )
2558 if err != nil {
2559 t.Fatal(err)
2560 }
2561
2562 pod := st.MakePod().Name("1").UID("1").Obj()
2563 _, diagnosis, err := scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), pod)
2564 if err != nil {
2565 t.Errorf("unexpected error: %v", err)
2566 }
2567
2568 if len(diagnosis.NodeToStatusMap) != len(nodes)-1 {
2569 t.Errorf("unexpected failed status map: %v", diagnosis.NodeToStatusMap)
2570 }
2571
2572 if diff := cmp.Diff(sets.New("MatchFilter"), diagnosis.UnschedulablePlugins); diff != "" {
2573 t.Errorf("Unexpected unschedulablePlugins: (-want, +got): %s", diagnosis.UnschedulablePlugins)
2574 }
2575
2576 for _, node := range nodes {
2577 if node.Name == pod.Name {
2578 continue
2579 }
2580 t.Run(node.Name, func(t *testing.T) {
2581 status, found := diagnosis.NodeToStatusMap[node.Name]
2582 if !found {
2583 t.Errorf("failed to find node %v in %v", node.Name, diagnosis.NodeToStatusMap)
2584 }
2585 reasons := status.Reasons()
2586 if len(reasons) != 1 || reasons[0] != tf.ErrReasonFake {
2587 t.Errorf("unexpected failures: %v", reasons)
2588 }
2589 })
2590 }
2591 }
2592
2593 func TestFindFitPredicateCallCounts(t *testing.T) {
2594 tests := []struct {
2595 name string
2596 pod *v1.Pod
2597 expectedCount int32
2598 }{
2599 {
2600 name: "nominated pods have lower priority, predicate is called once",
2601 pod: st.MakePod().Name("1").UID("1").Priority(highPriority).Obj(),
2602 expectedCount: 1,
2603 },
2604 {
2605 name: "nominated pods have higher priority, predicate is called twice",
2606 pod: st.MakePod().Name("1").UID("1").Priority(lowPriority).Obj(),
2607 expectedCount: 2,
2608 },
2609 }
2610
2611 for _, test := range tests {
2612 t.Run(test.name, func(t *testing.T) {
2613 nodes := makeNodeList([]string{"1"})
2614
2615 plugin := tf.FakeFilterPlugin{}
2616 registerFakeFilterFunc := tf.RegisterFilterPlugin(
2617 "FakeFilter",
2618 func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
2619 return &plugin, nil
2620 },
2621 )
2622 registerPlugins := []tf.RegisterPluginFunc{
2623 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2624 registerFakeFilterFunc,
2625 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2626 }
2627 logger, ctx := ktesting.NewTestContext(t)
2628 ctx, cancel := context.WithCancel(ctx)
2629 defer cancel()
2630 fwk, err := tf.NewFramework(
2631 ctx,
2632 registerPlugins, "",
2633 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
2634 )
2635 if err != nil {
2636 t.Fatal(err)
2637 }
2638
2639 scheduler := makeScheduler(ctx, nodes)
2640 if err := scheduler.Cache.UpdateSnapshot(logger, scheduler.nodeInfoSnapshot); err != nil {
2641 t.Fatal(err)
2642 }
2643 podinfo, err := framework.NewPodInfo(st.MakePod().UID("nominated").Priority(midPriority).Obj())
2644 if err != nil {
2645 t.Fatal(err)
2646 }
2647 fwk.AddNominatedPod(logger, podinfo, &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: "1"})
2648
2649 _, _, err = scheduler.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod)
2650 if err != nil {
2651 t.Errorf("unexpected error: %v", err)
2652 }
2653 if test.expectedCount != plugin.NumFilterCalled {
2654 t.Errorf("predicate was called %d times, expected is %d", plugin.NumFilterCalled, test.expectedCount)
2655 }
2656 })
2657 }
2658 }
2659
2660
2661
2662
2663
2664
2665 func TestZeroRequest(t *testing.T) {
2666
2667 noResources := v1.PodSpec{
2668 Containers: []v1.Container{
2669 {},
2670 },
2671 }
2672 noResources1 := noResources
2673 noResources1.NodeName = "node1"
2674
2675 small := v1.PodSpec{
2676 Containers: []v1.Container{
2677 {
2678 Resources: v1.ResourceRequirements{
2679 Requests: v1.ResourceList{
2680 v1.ResourceCPU: resource.MustParse(
2681 strconv.FormatInt(schedutil.DefaultMilliCPURequest, 10) + "m"),
2682 v1.ResourceMemory: resource.MustParse(
2683 strconv.FormatInt(schedutil.DefaultMemoryRequest, 10)),
2684 },
2685 },
2686 },
2687 },
2688 }
2689 small2 := small
2690 small2.NodeName = "node2"
2691
2692 large := v1.PodSpec{
2693 Containers: []v1.Container{
2694 {
2695 Resources: v1.ResourceRequirements{
2696 Requests: v1.ResourceList{
2697 v1.ResourceCPU: resource.MustParse(
2698 strconv.FormatInt(schedutil.DefaultMilliCPURequest*3, 10) + "m"),
2699 v1.ResourceMemory: resource.MustParse(
2700 strconv.FormatInt(schedutil.DefaultMemoryRequest*3, 10)),
2701 },
2702 },
2703 },
2704 },
2705 }
2706 large1 := large
2707 large1.NodeName = "node1"
2708 large2 := large
2709 large2.NodeName = "node2"
2710 tests := []struct {
2711 pod *v1.Pod
2712 pods []*v1.Pod
2713 nodes []*v1.Node
2714 name string
2715 expectedScore int64
2716 }{
2717
2718
2719
2720 {
2721 pod: &v1.Pod{Spec: noResources},
2722 nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
2723 name: "test priority of zero-request pod with node with zero-request pod",
2724 pods: []*v1.Pod{
2725 {Spec: large1}, {Spec: noResources1},
2726 {Spec: large2}, {Spec: small2},
2727 },
2728 expectedScore: 150,
2729 },
2730 {
2731 pod: &v1.Pod{Spec: small},
2732 nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
2733 name: "test priority of nonzero-request pod with node with zero-request pod",
2734 pods: []*v1.Pod{
2735 {Spec: large1}, {Spec: noResources1},
2736 {Spec: large2}, {Spec: small2},
2737 },
2738 expectedScore: 150,
2739 },
2740
2741 {
2742 pod: &v1.Pod{Spec: large},
2743 nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
2744 name: "test priority of larger pod with node with zero-request pod",
2745 pods: []*v1.Pod{
2746 {Spec: large1}, {Spec: noResources1},
2747 {Spec: large2}, {Spec: small2},
2748 },
2749 expectedScore: 130,
2750 },
2751 }
2752
2753 for _, test := range tests {
2754 t.Run(test.name, func(t *testing.T) {
2755 client := clientsetfake.NewSimpleClientset()
2756 informerFactory := informers.NewSharedInformerFactory(client, 0)
2757
2758 snapshot := internalcache.NewSnapshot(test.pods, test.nodes)
2759 fts := feature.Features{}
2760 pluginRegistrations := []tf.RegisterPluginFunc{
2761 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2762 tf.RegisterScorePlugin(noderesources.Name, frameworkruntime.FactoryAdapter(fts, noderesources.NewFit), 1),
2763 tf.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(fts, noderesources.NewBalancedAllocation), 1),
2764 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2765 }
2766 ctx, cancel := context.WithCancel(context.Background())
2767 defer cancel()
2768 fwk, err := tf.NewFramework(
2769 ctx,
2770 pluginRegistrations, "",
2771 frameworkruntime.WithInformerFactory(informerFactory),
2772 frameworkruntime.WithSnapshotSharedLister(snapshot),
2773 frameworkruntime.WithClientSet(client),
2774 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
2775 )
2776 if err != nil {
2777 t.Fatalf("error creating framework: %+v", err)
2778 }
2779
2780 sched := &Scheduler{
2781 nodeInfoSnapshot: snapshot,
2782 percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
2783 }
2784 sched.applyDefaultHandlers()
2785
2786 state := framework.NewCycleState()
2787 _, _, err = sched.findNodesThatFitPod(ctx, fwk, state, test.pod)
2788 if err != nil {
2789 t.Fatalf("error filtering nodes: %+v", err)
2790 }
2791 fwk.RunPreScorePlugins(ctx, state, test.pod, tf.BuildNodeInfos(test.nodes))
2792 list, err := prioritizeNodes(ctx, nil, fwk, state, test.pod, tf.BuildNodeInfos(test.nodes))
2793 if err != nil {
2794 t.Errorf("unexpected error: %v", err)
2795 }
2796 for _, hp := range list {
2797 if hp.TotalScore != test.expectedScore {
2798 t.Errorf("expected %d for all priorities, got list %#v", test.expectedScore, list)
2799 }
2800 }
2801 })
2802 }
2803 }
2804
2805 func Test_prioritizeNodes(t *testing.T) {
2806 imageStatus1 := []v1.ContainerImage{
2807 {
2808 Names: []string{
2809 "gcr.io/40:latest",
2810 "gcr.io/40:v1",
2811 },
2812 SizeBytes: int64(80 * mb),
2813 },
2814 {
2815 Names: []string{
2816 "gcr.io/300:latest",
2817 "gcr.io/300:v1",
2818 },
2819 SizeBytes: int64(300 * mb),
2820 },
2821 }
2822
2823 imageStatus2 := []v1.ContainerImage{
2824 {
2825 Names: []string{
2826 "gcr.io/300:latest",
2827 },
2828 SizeBytes: int64(300 * mb),
2829 },
2830 {
2831 Names: []string{
2832 "gcr.io/40:latest",
2833 "gcr.io/40:v1",
2834 },
2835 SizeBytes: int64(80 * mb),
2836 },
2837 }
2838
2839 imageStatus3 := []v1.ContainerImage{
2840 {
2841 Names: []string{
2842 "gcr.io/600:latest",
2843 },
2844 SizeBytes: int64(600 * mb),
2845 },
2846 {
2847 Names: []string{
2848 "gcr.io/40:latest",
2849 },
2850 SizeBytes: int64(80 * mb),
2851 },
2852 {
2853 Names: []string{
2854 "gcr.io/900:latest",
2855 },
2856 SizeBytes: int64(900 * mb),
2857 },
2858 }
2859 tests := []struct {
2860 name string
2861 pod *v1.Pod
2862 pods []*v1.Pod
2863 nodes []*v1.Node
2864 pluginRegistrations []tf.RegisterPluginFunc
2865 extenders []tf.FakeExtender
2866 want []framework.NodePluginScores
2867 }{
2868 {
2869 name: "the score from all plugins should be recorded in PluginToNodeScores",
2870 pod: &v1.Pod{},
2871 nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
2872 pluginRegistrations: []tf.RegisterPluginFunc{
2873 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2874 tf.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1),
2875 tf.RegisterScorePlugin("Node2Prioritizer", tf.NewNode2PrioritizerPlugin(), 1),
2876 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2877 },
2878 extenders: nil,
2879 want: []framework.NodePluginScores{
2880 {
2881 Name: "node1",
2882 Scores: []framework.PluginScore{
2883 {
2884 Name: "Node2Prioritizer",
2885 Score: 10,
2886 },
2887 {
2888 Name: "NodeResourcesBalancedAllocation",
2889 Score: 100,
2890 },
2891 },
2892 TotalScore: 110,
2893 },
2894 {
2895 Name: "node2",
2896 Scores: []framework.PluginScore{
2897 {
2898 Name: "Node2Prioritizer",
2899 Score: 100,
2900 },
2901 {
2902 Name: "NodeResourcesBalancedAllocation",
2903 Score: 100,
2904 },
2905 },
2906 TotalScore: 200,
2907 },
2908 },
2909 },
2910 {
2911 name: "the score from extender should also be recorded in PluginToNodeScores with plugin scores",
2912 pod: &v1.Pod{},
2913 nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
2914 pluginRegistrations: []tf.RegisterPluginFunc{
2915 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2916 tf.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1),
2917 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2918 },
2919 extenders: []tf.FakeExtender{
2920 {
2921 ExtenderName: "FakeExtender1",
2922 Weight: 1,
2923 Prioritizers: []tf.PriorityConfig{
2924 {
2925 Weight: 3,
2926 Function: tf.Node1PrioritizerExtender,
2927 },
2928 },
2929 },
2930 {
2931 ExtenderName: "FakeExtender2",
2932 Weight: 1,
2933 Prioritizers: []tf.PriorityConfig{
2934 {
2935 Weight: 2,
2936 Function: tf.Node2PrioritizerExtender,
2937 },
2938 },
2939 },
2940 },
2941 want: []framework.NodePluginScores{
2942 {
2943 Name: "node1",
2944 Scores: []framework.PluginScore{
2945
2946 {
2947 Name: "FakeExtender1",
2948 Score: 300,
2949 },
2950 {
2951 Name: "FakeExtender2",
2952 Score: 20,
2953 },
2954 {
2955 Name: "NodeResourcesBalancedAllocation",
2956 Score: 100,
2957 },
2958 },
2959 TotalScore: 420,
2960 },
2961 {
2962 Name: "node2",
2963 Scores: []framework.PluginScore{
2964 {
2965 Name: "FakeExtender1",
2966 Score: 30,
2967 },
2968 {
2969 Name: "FakeExtender2",
2970 Score: 200,
2971 },
2972 {
2973 Name: "NodeResourcesBalancedAllocation",
2974 Score: 100,
2975 },
2976 },
2977 TotalScore: 330,
2978 },
2979 },
2980 },
2981 {
2982 name: "plugin which returned skip in preScore shouldn't be executed in the score phase",
2983 pod: &v1.Pod{},
2984 nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
2985 pluginRegistrations: []tf.RegisterPluginFunc{
2986 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
2987 tf.RegisterScorePlugin(noderesources.BalancedAllocationName, frameworkruntime.FactoryAdapter(feature.Features{}, noderesources.NewBalancedAllocation), 1),
2988 tf.RegisterScorePlugin("Node2Prioritizer", tf.NewNode2PrioritizerPlugin(), 1),
2989 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
2990 tf.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", tf.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0,
2991 framework.NewStatus(framework.Skip, "fake skip"),
2992 framework.NewStatus(framework.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"),
2993 ), "PreScore", "Score"),
2994 },
2995 extenders: nil,
2996 want: []framework.NodePluginScores{
2997 {
2998 Name: "node1",
2999 Scores: []framework.PluginScore{
3000 {
3001 Name: "Node2Prioritizer",
3002 Score: 10,
3003 },
3004 {
3005 Name: "NodeResourcesBalancedAllocation",
3006 Score: 100,
3007 },
3008 },
3009 TotalScore: 110,
3010 },
3011 {
3012 Name: "node2",
3013 Scores: []framework.PluginScore{
3014 {
3015 Name: "Node2Prioritizer",
3016 Score: 100,
3017 },
3018 {
3019 Name: "NodeResourcesBalancedAllocation",
3020 Score: 100,
3021 },
3022 },
3023 TotalScore: 200,
3024 },
3025 },
3026 },
3027 {
3028 name: "all score plugins are skipped",
3029 pod: &v1.Pod{},
3030 nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10), makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10)},
3031 pluginRegistrations: []tf.RegisterPluginFunc{
3032 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
3033 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
3034 tf.RegisterPluginAsExtensions("FakePreScoreAndScorePlugin", tf.NewFakePreScoreAndScorePlugin("FakePreScoreAndScorePlugin", 0,
3035 framework.NewStatus(framework.Skip, "fake skip"),
3036 framework.NewStatus(framework.Error, "this score function shouldn't be executed because this plugin returned Skip in the PreScore"),
3037 ), "PreScore", "Score"),
3038 },
3039 extenders: nil,
3040 want: []framework.NodePluginScores{
3041 {Name: "node1", Scores: []framework.PluginScore{}},
3042 {Name: "node2", Scores: []framework.PluginScore{}},
3043 },
3044 },
3045 {
3046 name: "the score from Image Locality plugin with image in all nodes",
3047 pod: &v1.Pod{
3048 Spec: v1.PodSpec{
3049 Containers: []v1.Container{
3050 {
3051 Image: "gcr.io/40",
3052 },
3053 },
3054 },
3055 },
3056 nodes: []*v1.Node{
3057 makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10, imageStatus1...),
3058 makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10, imageStatus2...),
3059 makeNode("node3", 1000, schedutil.DefaultMemoryRequest*10, imageStatus3...),
3060 },
3061 pluginRegistrations: []tf.RegisterPluginFunc{
3062 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
3063 tf.RegisterScorePlugin(imagelocality.Name, imagelocality.New, 1),
3064 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
3065 },
3066 extenders: nil,
3067 want: []framework.NodePluginScores{
3068 {
3069 Name: "node1",
3070 Scores: []framework.PluginScore{
3071 {
3072 Name: "ImageLocality",
3073 Score: 5,
3074 },
3075 },
3076 TotalScore: 5,
3077 },
3078 {
3079 Name: "node2",
3080 Scores: []framework.PluginScore{
3081 {
3082 Name: "ImageLocality",
3083 Score: 5,
3084 },
3085 },
3086 TotalScore: 5,
3087 },
3088 {
3089 Name: "node3",
3090 Scores: []framework.PluginScore{
3091 {
3092 Name: "ImageLocality",
3093 Score: 5,
3094 },
3095 },
3096 TotalScore: 5,
3097 },
3098 },
3099 },
3100 {
3101 name: "the score from Image Locality plugin with image in partial nodes",
3102 pod: &v1.Pod{
3103 Spec: v1.PodSpec{
3104 Containers: []v1.Container{
3105 {
3106 Image: "gcr.io/300",
3107 },
3108 },
3109 },
3110 },
3111 nodes: []*v1.Node{makeNode("node1", 1000, schedutil.DefaultMemoryRequest*10, imageStatus1...),
3112 makeNode("node2", 1000, schedutil.DefaultMemoryRequest*10, imageStatus2...),
3113 makeNode("node3", 1000, schedutil.DefaultMemoryRequest*10, imageStatus3...),
3114 },
3115 pluginRegistrations: []tf.RegisterPluginFunc{
3116 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
3117 tf.RegisterScorePlugin(imagelocality.Name, imagelocality.New, 1),
3118 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
3119 },
3120 extenders: nil,
3121 want: []framework.NodePluginScores{
3122 {
3123 Name: "node1",
3124 Scores: []framework.PluginScore{
3125 {
3126 Name: "ImageLocality",
3127 Score: 18,
3128 },
3129 },
3130 TotalScore: 18,
3131 },
3132 {
3133 Name: "node2",
3134 Scores: []framework.PluginScore{
3135 {
3136 Name: "ImageLocality",
3137 Score: 18,
3138 },
3139 },
3140 TotalScore: 18,
3141 },
3142 {
3143 Name: "node3",
3144 Scores: []framework.PluginScore{
3145 {
3146 Name: "ImageLocality",
3147 Score: 0,
3148 },
3149 },
3150 TotalScore: 0,
3151 },
3152 },
3153 },
3154 }
3155
3156 for _, test := range tests {
3157 t.Run(test.name, func(t *testing.T) {
3158 client := clientsetfake.NewSimpleClientset()
3159 informerFactory := informers.NewSharedInformerFactory(client, 0)
3160
3161 ctx, cancel := context.WithCancel(context.Background())
3162 defer cancel()
3163 cache := internalcache.New(ctx, time.Duration(0))
3164 for _, node := range test.nodes {
3165 cache.AddNode(klog.FromContext(ctx), node)
3166 }
3167 snapshot := internalcache.NewEmptySnapshot()
3168 if err := cache.UpdateSnapshot(klog.FromContext(ctx), snapshot); err != nil {
3169 t.Fatal(err)
3170 }
3171 fwk, err := tf.NewFramework(
3172 ctx,
3173 test.pluginRegistrations, "",
3174 frameworkruntime.WithInformerFactory(informerFactory),
3175 frameworkruntime.WithSnapshotSharedLister(snapshot),
3176 frameworkruntime.WithClientSet(client),
3177 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
3178 )
3179 if err != nil {
3180 t.Fatalf("error creating framework: %+v", err)
3181 }
3182
3183 state := framework.NewCycleState()
3184 var extenders []framework.Extender
3185 for ii := range test.extenders {
3186 extenders = append(extenders, &test.extenders[ii])
3187 }
3188 nodesscores, err := prioritizeNodes(ctx, extenders, fwk, state, test.pod, tf.BuildNodeInfos(test.nodes))
3189 if err != nil {
3190 t.Errorf("unexpected error: %v", err)
3191 }
3192 for i := range nodesscores {
3193 sort.Slice(nodesscores[i].Scores, func(j, k int) bool {
3194 return nodesscores[i].Scores[j].Name < nodesscores[i].Scores[k].Name
3195 })
3196 }
3197
3198 if diff := cmp.Diff(test.want, nodesscores); diff != "" {
3199 t.Errorf("returned nodes scores (-want,+got):\n%s", diff)
3200 }
3201 })
3202 }
3203 }
3204
3205 var lowPriority, midPriority, highPriority = int32(0), int32(100), int32(1000)
3206
3207 func TestNumFeasibleNodesToFind(t *testing.T) {
3208 tests := []struct {
3209 name string
3210 globalPercentage int32
3211 profilePercentage *int32
3212 numAllNodes int32
3213 wantNumNodes int32
3214 }{
3215 {
3216 name: "not set percentageOfNodesToScore and nodes number not more than 50",
3217 numAllNodes: 10,
3218 wantNumNodes: 10,
3219 },
3220 {
3221 name: "set profile percentageOfNodesToScore and nodes number not more than 50",
3222 profilePercentage: ptr.To[int32](40),
3223 numAllNodes: 10,
3224 wantNumNodes: 10,
3225 },
3226 {
3227 name: "not set percentageOfNodesToScore and nodes number more than 50",
3228 numAllNodes: 1000,
3229 wantNumNodes: 420,
3230 },
3231 {
3232 name: "set profile percentageOfNodesToScore and nodes number more than 50",
3233 profilePercentage: ptr.To[int32](40),
3234 numAllNodes: 1000,
3235 wantNumNodes: 400,
3236 },
3237 {
3238 name: "set global and profile percentageOfNodesToScore and nodes number more than 50",
3239 globalPercentage: 100,
3240 profilePercentage: ptr.To[int32](40),
3241 numAllNodes: 1000,
3242 wantNumNodes: 400,
3243 },
3244 {
3245 name: "set global percentageOfNodesToScore and nodes number more than 50",
3246 globalPercentage: 40,
3247 numAllNodes: 1000,
3248 wantNumNodes: 400,
3249 },
3250 {
3251 name: "not set profile percentageOfNodesToScore and nodes number more than 50*125",
3252 numAllNodes: 6000,
3253 wantNumNodes: 300,
3254 },
3255 {
3256 name: "set profile percentageOfNodesToScore and nodes number more than 50*125",
3257 profilePercentage: ptr.To[int32](40),
3258 numAllNodes: 6000,
3259 wantNumNodes: 2400,
3260 },
3261 }
3262
3263 for _, tt := range tests {
3264 t.Run(tt.name, func(t *testing.T) {
3265 sched := &Scheduler{
3266 percentageOfNodesToScore: tt.globalPercentage,
3267 }
3268 if gotNumNodes := sched.numFeasibleNodesToFind(tt.profilePercentage, tt.numAllNodes); gotNumNodes != tt.wantNumNodes {
3269 t.Errorf("Scheduler.numFeasibleNodesToFind() = %v, want %v", gotNumNodes, tt.wantNumNodes)
3270 }
3271 })
3272 }
3273 }
3274
3275 func TestFairEvaluationForNodes(t *testing.T) {
3276 numAllNodes := 500
3277 nodeNames := make([]string, 0, numAllNodes)
3278 for i := 0; i < numAllNodes; i++ {
3279 nodeNames = append(nodeNames, strconv.Itoa(i))
3280 }
3281 nodes := makeNodeList(nodeNames)
3282 ctx, cancel := context.WithCancel(context.Background())
3283 defer cancel()
3284 sched := makeScheduler(ctx, nodes)
3285
3286 fwk, err := tf.NewFramework(
3287 ctx,
3288 []tf.RegisterPluginFunc{
3289 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
3290 tf.RegisterFilterPlugin("TrueFilter", tf.NewTrueFilterPlugin),
3291 tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
3292 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
3293 },
3294 "",
3295 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(nil)),
3296 )
3297 if err != nil {
3298 t.Fatal(err)
3299 }
3300
3301
3302 sched.percentageOfNodesToScore = 30
3303 nodesToFind := int(sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes)))
3304
3305
3306 for i := 0; i < 2*(numAllNodes/nodesToFind+1); i++ {
3307 nodesThatFit, _, err := sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), &v1.Pod{})
3308 if err != nil {
3309 t.Errorf("unexpected error: %v", err)
3310 }
3311 if len(nodesThatFit) != nodesToFind {
3312 t.Errorf("got %d nodes filtered, want %d", len(nodesThatFit), nodesToFind)
3313 }
3314 if sched.nextStartNodeIndex != (i+1)*nodesToFind%numAllNodes {
3315 t.Errorf("got %d lastProcessedNodeIndex, want %d", sched.nextStartNodeIndex, (i+1)*nodesToFind%numAllNodes)
3316 }
3317 }
3318 }
3319
3320 func TestPreferNominatedNodeFilterCallCounts(t *testing.T) {
3321 tests := []struct {
3322 name string
3323 pod *v1.Pod
3324 nodeReturnCodeMap map[string]framework.Code
3325 expectedCount int32
3326 expectedPatchRequests int
3327 }{
3328 {
3329 name: "pod has the nominated node set, filter is called only once",
3330 pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(),
3331 expectedCount: 1,
3332 },
3333 {
3334 name: "pod without the nominated pod, filter is called for each node",
3335 pod: st.MakePod().Name("p_without_nominated_node").UID("p").Priority(highPriority).Obj(),
3336 expectedCount: 3,
3337 },
3338 {
3339 name: "nominated pod cannot pass the filter, filter is called for each node",
3340 pod: st.MakePod().Name("p_with_nominated_node").UID("p").Priority(highPriority).NominatedNodeName("node1").Obj(),
3341 nodeReturnCodeMap: map[string]framework.Code{"node1": framework.Unschedulable},
3342 expectedCount: 4,
3343 },
3344 }
3345
3346 for _, test := range tests {
3347 t.Run(test.name, func(t *testing.T) {
3348 logger, ctx := ktesting.NewTestContext(t)
3349 ctx, cancel := context.WithCancel(ctx)
3350 defer cancel()
3351
3352
3353 nodes := makeNodeList([]string{"node1", "node2", "node3"})
3354 client := clientsetfake.NewSimpleClientset(test.pod)
3355 informerFactory := informers.NewSharedInformerFactory(client, 0)
3356 cache := internalcache.New(ctx, time.Duration(0))
3357 for _, n := range nodes {
3358 cache.AddNode(logger, n)
3359 }
3360 plugin := tf.FakeFilterPlugin{FailedNodeReturnCodeMap: test.nodeReturnCodeMap}
3361 registerFakeFilterFunc := tf.RegisterFilterPlugin(
3362 "FakeFilter",
3363 func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
3364 return &plugin, nil
3365 },
3366 )
3367 registerPlugins := []tf.RegisterPluginFunc{
3368 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
3369 registerFakeFilterFunc,
3370 tf.RegisterScorePlugin("EqualPrioritizerPlugin", tf.NewEqualPrioritizerPlugin(), 1),
3371 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
3372 }
3373 fwk, err := tf.NewFramework(
3374 ctx,
3375 registerPlugins, "",
3376 frameworkruntime.WithClientSet(client),
3377 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
3378 )
3379 if err != nil {
3380 t.Fatal(err)
3381 }
3382 snapshot := internalcache.NewSnapshot(nil, nodes)
3383
3384 sched := &Scheduler{
3385 Cache: cache,
3386 nodeInfoSnapshot: snapshot,
3387 percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
3388 }
3389 sched.applyDefaultHandlers()
3390
3391 _, _, err = sched.findNodesThatFitPod(ctx, fwk, framework.NewCycleState(), test.pod)
3392 if err != nil {
3393 t.Errorf("unexpected error: %v", err)
3394 }
3395 if test.expectedCount != plugin.NumFilterCalled {
3396 t.Errorf("predicate was called %d times, expected is %d", plugin.NumFilterCalled, test.expectedCount)
3397 }
3398 })
3399 }
3400 }
3401
3402 func podWithID(id, desiredHost string) *v1.Pod {
3403 return st.MakePod().Name(id).UID(id).Node(desiredHost).SchedulerName(testSchedulerName).Obj()
3404 }
3405
3406 func deletingPod(id string) *v1.Pod {
3407 return st.MakePod().Name(id).UID(id).Terminating().Node("").SchedulerName(testSchedulerName).Obj()
3408 }
3409
3410 func podWithPort(id, desiredHost string, port int) *v1.Pod {
3411 pod := podWithID(id, desiredHost)
3412 pod.Spec.Containers = []v1.Container{
3413 {Name: "ctr", Ports: []v1.ContainerPort{{HostPort: int32(port)}}},
3414 }
3415 return pod
3416 }
3417
3418 func podWithResources(id, desiredHost string, limits v1.ResourceList, requests v1.ResourceList) *v1.Pod {
3419 pod := podWithID(id, desiredHost)
3420 pod.Spec.Containers = []v1.Container{
3421 {Name: "ctr", Resources: v1.ResourceRequirements{Limits: limits, Requests: requests}},
3422 }
3423 return pod
3424 }
3425
3426 func makeNodeList(nodeNames []string) []*v1.Node {
3427 result := make([]*v1.Node, 0, len(nodeNames))
3428 for _, nodeName := range nodeNames {
3429 result = append(result, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
3430 }
3431 return result
3432 }
3433
3434
3435 func makeScheduler(ctx context.Context, nodes []*v1.Node) *Scheduler {
3436 logger := klog.FromContext(ctx)
3437 cache := internalcache.New(ctx, time.Duration(0))
3438 for _, n := range nodes {
3439 cache.AddNode(logger, n)
3440 }
3441
3442 sched := &Scheduler{
3443 Cache: cache,
3444 nodeInfoSnapshot: emptySnapshot,
3445 percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
3446 }
3447 sched.applyDefaultHandlers()
3448 cache.UpdateSnapshot(logger, sched.nodeInfoSnapshot)
3449 return sched
3450 }
3451
3452 func makeNode(node string, milliCPU, memory int64, images ...v1.ContainerImage) *v1.Node {
3453 return &v1.Node{
3454 ObjectMeta: metav1.ObjectMeta{Name: node},
3455 Status: v1.NodeStatus{
3456 Capacity: v1.ResourceList{
3457 v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
3458 v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
3459 "pods": *resource.NewQuantity(100, resource.DecimalSI),
3460 },
3461 Allocatable: v1.ResourceList{
3462
3463 v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI),
3464 v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI),
3465 "pods": *resource.NewQuantity(100, resource.DecimalSI),
3466 },
3467 Images: images,
3468 },
3469 }
3470 }
3471
3472
3473
3474 func setupTestSchedulerWithOnePodOnNode(ctx context.Context, t *testing.T, queuedPodStore *clientcache.FIFO, scache internalcache.Cache,
3475 pod *v1.Pod, node *v1.Node, fns ...tf.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
3476 scheduler, bindingChan, errChan := setupTestScheduler(ctx, t, queuedPodStore, scache, nil, nil, fns...)
3477
3478 queuedPodStore.Add(pod)
3479
3480
3481
3482 scheduler.ScheduleOne(ctx)
3483
3484
3485
3486 select {
3487 case b := <-bindingChan:
3488 expectBinding := &v1.Binding{
3489 ObjectMeta: metav1.ObjectMeta{Name: pod.Name, UID: types.UID(pod.Name)},
3490 Target: v1.ObjectReference{Kind: "Node", Name: node.Name},
3491 }
3492 if !reflect.DeepEqual(expectBinding, b) {
3493 t.Errorf("binding want=%v, get=%v", expectBinding, b)
3494 }
3495 case <-time.After(wait.ForeverTestTimeout):
3496 t.Fatalf("timeout after %v", wait.ForeverTestTimeout)
3497 }
3498 return scheduler, bindingChan, errChan
3499 }
3500
3501
3502
3503 func setupTestScheduler(ctx context.Context, t *testing.T, queuedPodStore *clientcache.FIFO, cache internalcache.Cache, informerFactory informers.SharedInformerFactory, broadcaster events.EventBroadcaster, fns ...tf.RegisterPluginFunc) (*Scheduler, chan *v1.Binding, chan error) {
3504 bindingChan := make(chan *v1.Binding, 1)
3505 client := clientsetfake.NewSimpleClientset()
3506 client.PrependReactor("create", "pods", func(action clienttesting.Action) (bool, runtime.Object, error) {
3507 var b *v1.Binding
3508 if action.GetSubresource() == "binding" {
3509 b := action.(clienttesting.CreateAction).GetObject().(*v1.Binding)
3510 bindingChan <- b
3511 }
3512 return true, b, nil
3513 })
3514
3515 var recorder events.EventRecorder
3516 if broadcaster != nil {
3517 recorder = broadcaster.NewRecorder(scheme.Scheme, testSchedulerName)
3518 } else {
3519 recorder = &events.FakeRecorder{}
3520 }
3521
3522 if informerFactory == nil {
3523 informerFactory = informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(), 0)
3524 }
3525 schedulingQueue := internalqueue.NewTestQueueWithInformerFactory(ctx, nil, informerFactory)
3526
3527 fwk, _ := tf.NewFramework(
3528 ctx,
3529 fns,
3530 testSchedulerName,
3531 frameworkruntime.WithClientSet(client),
3532 frameworkruntime.WithEventRecorder(recorder),
3533 frameworkruntime.WithInformerFactory(informerFactory),
3534 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
3535 )
3536
3537 errChan := make(chan error, 1)
3538 sched := &Scheduler{
3539 Cache: cache,
3540 client: client,
3541 nodeInfoSnapshot: internalcache.NewEmptySnapshot(),
3542 percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
3543 NextPod: func(logger klog.Logger) (*framework.QueuedPodInfo, error) {
3544 return &framework.QueuedPodInfo{PodInfo: mustNewPodInfo(t, clientcache.Pop(queuedPodStore).(*v1.Pod))}, nil
3545 },
3546 SchedulingQueue: schedulingQueue,
3547 Profiles: profile.Map{testSchedulerName: fwk},
3548 }
3549
3550 sched.SchedulePod = sched.schedulePod
3551 sched.FailureHandler = func(_ context.Context, _ framework.Framework, p *framework.QueuedPodInfo, status *framework.Status, _ *framework.NominatingInfo, _ time.Time) {
3552 err := status.AsError()
3553 errChan <- err
3554
3555 msg := truncateMessage(err.Error())
3556 fwk.EventRecorder().Eventf(p.Pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
3557 }
3558 return sched, bindingChan, errChan
3559 }
3560
3561 func setupTestSchedulerWithVolumeBinding(ctx context.Context, t *testing.T, volumeBinder volumebinding.SchedulerVolumeBinder, broadcaster events.EventBroadcaster) (*Scheduler, chan *v1.Binding, chan error) {
3562 logger := klog.FromContext(ctx)
3563 testNode := v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", UID: types.UID("node1")}}
3564 queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
3565 pod := podWithID("foo", "")
3566 pod.Namespace = "foo-ns"
3567 pod.Spec.Volumes = append(pod.Spec.Volumes, v1.Volume{Name: "testVol",
3568 VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: "testPVC"}}})
3569 queuedPodStore.Add(pod)
3570 scache := internalcache.New(ctx, 10*time.Minute)
3571 scache.AddNode(logger, &testNode)
3572 testPVC := v1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: "testPVC", Namespace: pod.Namespace, UID: types.UID("testPVC")}}
3573 client := clientsetfake.NewSimpleClientset(&testNode, &testPVC)
3574 informerFactory := informers.NewSharedInformerFactory(client, 0)
3575 pvcInformer := informerFactory.Core().V1().PersistentVolumeClaims()
3576 pvcInformer.Informer().GetStore().Add(&testPVC)
3577
3578 fns := []tf.RegisterPluginFunc{
3579 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
3580 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
3581 tf.RegisterPluginAsExtensions(volumebinding.Name, func(ctx context.Context, plArgs runtime.Object, handle framework.Handle) (framework.Plugin, error) {
3582 return &volumebinding.VolumeBinding{Binder: volumeBinder, PVCLister: pvcInformer.Lister()}, nil
3583 }, "PreFilter", "Filter", "Reserve", "PreBind"),
3584 }
3585 s, bindingChan, errChan := setupTestScheduler(ctx, t, queuedPodStore, scache, informerFactory, broadcaster, fns...)
3586 return s, bindingChan, errChan
3587 }
3588
3589
3590
3591
3592 func makePredicateError(failReason string) error {
3593 s := fmt.Sprintf("0/1 nodes are available: %v.", failReason)
3594 return fmt.Errorf(s)
3595 }
3596
3597 func mustNewPodInfo(t *testing.T, pod *v1.Pod) *framework.PodInfo {
3598 podInfo, err := framework.NewPodInfo(pod)
3599 if err != nil {
3600 t.Fatal(err)
3601 }
3602 return podInfo
3603 }
3604
View as plain text