1
16
17 package scheduler
18
19 import (
20 "context"
21 "fmt"
22 "testing"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
27 apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
28 "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/runtime/schema"
33 "k8s.io/apimachinery/pkg/types"
34 "k8s.io/apimachinery/pkg/util/sets"
35 "k8s.io/apimachinery/pkg/util/uuid"
36 "k8s.io/apimachinery/pkg/util/wait"
37 utilfeature "k8s.io/apiserver/pkg/util/feature"
38 "k8s.io/client-go/dynamic"
39 "k8s.io/client-go/kubernetes"
40 featuregatetesting "k8s.io/component-base/featuregate/testing"
41 "k8s.io/klog/v2"
42 configv1 "k8s.io/kube-scheduler/config/v1"
43 apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
44 "k8s.io/kubernetes/pkg/features"
45 "k8s.io/kubernetes/pkg/scheduler"
46 configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing"
47 "k8s.io/kubernetes/pkg/scheduler/framework"
48 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
49 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
50 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
51 st "k8s.io/kubernetes/pkg/scheduler/testing"
52 testfwk "k8s.io/kubernetes/test/integration/framework"
53 testutils "k8s.io/kubernetes/test/integration/util"
54 imageutils "k8s.io/kubernetes/test/utils/image"
55 "k8s.io/utils/pointer"
56 )
57
58 func TestSchedulingGates(t *testing.T) {
59 tests := []struct {
60 name string
61 pods []*v1.Pod
62 want []string
63 rmPodsSchedulingGates []int
64 wantPostGatesRemoval []string
65 }{
66 {
67 name: "regular pods",
68 pods: []*v1.Pod{
69 st.MakePod().Name("p1").Container("pause").Obj(),
70 st.MakePod().Name("p2").Container("pause").Obj(),
71 },
72 want: []string{"p1", "p2"},
73 },
74 {
75 name: "one pod carrying scheduling gates",
76 pods: []*v1.Pod{
77 st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
78 st.MakePod().Name("p2").Container("pause").Obj(),
79 },
80 want: []string{"p2"},
81 },
82 {
83 name: "two pod carrying scheduling gates, and remove gates of one pod",
84 pods: []*v1.Pod{
85 st.MakePod().Name("p1").SchedulingGates([]string{"foo"}).Container("pause").Obj(),
86 st.MakePod().Name("p2").SchedulingGates([]string{"bar"}).Container("pause").Obj(),
87 st.MakePod().Name("p3").Container("pause").Obj(),
88 },
89 want: []string{"p3"},
90 rmPodsSchedulingGates: []int{1},
91 wantPostGatesRemoval: []string{"p2"},
92 },
93 }
94
95 for _, tt := range tests {
96 t.Run(tt.name, func(t *testing.T) {
97
98
99
100 testCtx := testutils.InitTestSchedulerWithOptions(
101 t,
102 testutils.InitTestAPIServer(t, "pod-scheduling-gates", nil),
103 0,
104 scheduler.WithPodInitialBackoffSeconds(0),
105 scheduler.WithPodMaxBackoffSeconds(0),
106 )
107 testutils.SyncSchedulerInformerFactory(testCtx)
108
109 cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
110 for _, p := range tt.pods {
111 p.Namespace = ns
112 if _, err := cs.CoreV1().Pods(ns).Create(ctx, p, metav1.CreateOptions{}); err != nil {
113 t.Fatalf("Failed to create Pod %q: %v", p.Name, err)
114 }
115 }
116
117
118 if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
119 pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
120 return len(pendingPods) == len(tt.pods), nil
121 }); err != nil {
122 t.Fatal(err)
123 }
124
125
126 for _, wantPod := range tt.want {
127 podInfo := testutils.NextPodOrDie(t, testCtx)
128 if got := podInfo.Pod.Name; got != wantPod {
129 t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
130 }
131 }
132
133 if len(tt.rmPodsSchedulingGates) == 0 {
134 return
135 }
136
137 for _, idx := range tt.rmPodsSchedulingGates {
138 patch := `{"spec": {"schedulingGates": null}}`
139 podName := tt.pods[idx].Name
140 if _, err := cs.CoreV1().Pods(ns).Patch(ctx, podName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}); err != nil {
141 t.Fatalf("Failed to patch pod %v: %v", podName, err)
142 }
143 }
144
145 for _, wantPod := range tt.wantPostGatesRemoval {
146 podInfo := testutils.NextPodOrDie(t, testCtx)
147 if got := podInfo.Pod.Name; got != wantPod {
148 t.Errorf("Want %v to be popped out, but got %v", wantPod, got)
149 }
150 }
151 })
152 }
153 }
154
155
156
157 func TestCoreResourceEnqueue(t *testing.T) {
158 tests := []struct {
159 name string
160
161 initialNode *v1.Node
162
163 initialPod *v1.Pod
164
165
166 pods []*v1.Pod
167
168 triggerFn func(testCtx *testutils.TestContext) error
169
170 wantRequeuedPods sets.Set[string]
171 }{
172 {
173 name: "Pod without a required toleration to a node isn't requeued to activeQ",
174 initialNode: st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(),
175 pods: []*v1.Pod{
176
177
178
179 st.MakePod().Name("pod1").Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
180 st.MakePod().Name("pod2").Toleration(v1.TaintNodeNotReady).Req(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Container("image").Obj(),
181 },
182 triggerFn: func(testCtx *testutils.TestContext) error {
183
184
185
186 if _, err := testCtx.ClientSet.CoreV1().Nodes().UpdateStatus(testCtx.Ctx, st.MakeNode().Name("fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "4"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj(), metav1.UpdateOptions{}); err != nil {
187 return fmt.Errorf("failed to update the node: %w", err)
188 }
189 return nil
190 },
191 wantRequeuedPods: sets.New("pod2"),
192 },
193 {
194 name: "Pod rejected by the PodAffinity plugin is requeued when a new Node is created and turned to ready",
195 initialNode: st.MakeNode().Name("fake-node").Label("node", "fake-node").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Obj(),
196 initialPod: st.MakePod().Label("anti", "anti").Name("pod1").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Node("fake-node").Obj(),
197 pods: []*v1.Pod{
198
199 st.MakePod().Label("anti", "anti").Name("pod2").PodAntiAffinityExists("anti", "node", st.PodAntiAffinityWithRequiredReq).Container("image").Obj(),
200 },
201 triggerFn: func(testCtx *testutils.TestContext) error {
202
203
204
205
206
207 node := st.MakeNode().Name("fake-node2").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: v1.TaintNodeNotReady, Effect: v1.TaintEffectNoSchedule}}).Obj()
208 if _, err := testCtx.ClientSet.CoreV1().Nodes().Create(testCtx.Ctx, st.MakeNode().Name("fake-node2").Capacity(map[v1.ResourceName]string{v1.ResourceCPU: "2"}).Taints([]v1.Taint{{Key: "foo", Effect: v1.TaintEffectNoSchedule}}).Obj(), metav1.CreateOptions{}); err != nil {
209 return fmt.Errorf("failed to create a newnode: %w", err)
210 }
211
212
213
214 node.Spec.Taints = nil
215 if _, err := testCtx.ClientSet.CoreV1().Nodes().Update(testCtx.Ctx, node, metav1.UpdateOptions{}); err != nil {
216 return fmt.Errorf("failed to remove taints off the node: %w", err)
217 }
218 return nil
219 },
220 wantRequeuedPods: sets.New("pod2"),
221 },
222 }
223
224 for _, featureEnabled := range []bool{false, true} {
225 for _, tt := range tests {
226 t.Run(fmt.Sprintf("%s [SchedulerQueueingHints enabled: %v]", tt.name, featureEnabled), func(t *testing.T) {
227 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, featureEnabled)()
228
229
230
231
232 testCtx := testutils.InitTestSchedulerWithOptions(
233 t,
234 testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
235 0,
236 scheduler.WithPodInitialBackoffSeconds(0),
237 scheduler.WithPodMaxBackoffSeconds(0),
238 )
239 testutils.SyncSchedulerInformerFactory(testCtx)
240
241 defer testCtx.Scheduler.SchedulingQueue.Close()
242
243 cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
244
245 if _, err := cs.CoreV1().Nodes().Create(ctx, tt.initialNode, metav1.CreateOptions{}); err != nil {
246 t.Fatalf("Failed to create an initial Node %q: %v", tt.initialNode.Name, err)
247 }
248
249 if tt.initialPod != nil {
250 if _, err := cs.CoreV1().Pods(ns).Create(ctx, tt.initialPod, metav1.CreateOptions{}); err != nil {
251 t.Fatalf("Failed to create an initial Pod %q: %v", tt.initialPod.Name, err)
252 }
253 }
254
255 for _, pod := range tt.pods {
256 if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
257 t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
258 }
259 }
260
261
262 if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
263 pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
264 return len(pendingPods) == len(tt.pods), nil
265 }); err != nil {
266 t.Fatal(err)
267 }
268
269 t.Log("Confirmed Pods in the scheduling queue, starting to schedule them")
270
271
272 for i := 0; i < len(tt.pods); i++ {
273 testCtx.Scheduler.ScheduleOne(testCtx.Ctx)
274 }
275
276 if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
277 pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
278 return len(pendingPods) == len(tt.pods), nil
279 }); err != nil {
280 t.Fatal(err)
281 }
282
283 t.Log("finished initial schedulings for all Pods, will trigger triggerFn")
284
285 err := tt.triggerFn(testCtx)
286 if err != nil {
287 t.Fatalf("Failed to trigger the event: %v", err)
288 }
289
290 t.Log("triggered tt.triggerFn, will check if tt.requeuedPods are requeued")
291
292
293 var requeuedPods sets.Set[string]
294 if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
295 requeuedPods = sets.Set[string]{}
296 for _, requeuedPod := range testCtx.Scheduler.SchedulingQueue.PodsInActiveQ() {
297 requeuedPods.Insert(requeuedPod.Name)
298 }
299
300 return requeuedPods.Equal(tt.wantRequeuedPods), nil
301 }); err != nil {
302 t.Fatalf("Expect Pods %v to be requeued, but %v are requeued actually", tt.wantRequeuedPods, requeuedPods)
303 }
304 })
305 }
306 }
307 }
308
309 var _ framework.FilterPlugin = &fakeCRPlugin{}
310 var _ framework.EnqueueExtensions = &fakeCRPlugin{}
311
312 type fakeCRPlugin struct{}
313
314 func (f *fakeCRPlugin) Name() string {
315 return "fakeCRPlugin"
316 }
317
318 func (f *fakeCRPlugin) Filter(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ *framework.NodeInfo) *framework.Status {
319 return framework.NewStatus(framework.Unschedulable, "always fail")
320 }
321
322
323
324 func (f *fakeCRPlugin) EventsToRegister() []framework.ClusterEventWithHint {
325 return []framework.ClusterEventWithHint{
326 {Event: framework.ClusterEvent{Resource: "foos.v1.example.com", ActionType: framework.All}},
327 }
328 }
329
330
331
332 func TestCustomResourceEnqueue(t *testing.T) {
333
334 server := apiservertesting.StartTestServerOrDie(
335 t, apiservertesting.NewDefaultTestServerOptions(),
336 []string{"--disable-admission-plugins=ServiceAccount,TaintNodesByCondition", "--runtime-config=api/all=true"},
337 testfwk.SharedEtcd(),
338 )
339 testCtx := &testutils.TestContext{}
340 ctx, cancel := context.WithCancel(context.Background())
341 testCtx.Ctx = ctx
342 testCtx.CloseFn = func() {
343 cancel()
344 server.TearDownFn()
345 }
346
347 apiExtensionClient := apiextensionsclient.NewForConfigOrDie(server.ClientConfig)
348 dynamicClient := dynamic.NewForConfigOrDie(server.ClientConfig)
349
350
351 fooCRD := &apiextensionsv1.CustomResourceDefinition{
352 ObjectMeta: metav1.ObjectMeta{
353 Name: "foos.example.com",
354 },
355 Spec: apiextensionsv1.CustomResourceDefinitionSpec{
356 Group: "example.com",
357 Scope: apiextensionsv1.NamespaceScoped,
358 Names: apiextensionsv1.CustomResourceDefinitionNames{
359 Plural: "foos",
360 Kind: "Foo",
361 },
362 Versions: []apiextensionsv1.CustomResourceDefinitionVersion{
363 {
364 Name: "v1",
365 Served: true,
366 Storage: true,
367 Schema: &apiextensionsv1.CustomResourceValidation{
368 OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
369 Type: "object",
370 Properties: map[string]apiextensionsv1.JSONSchemaProps{
371 "field": {Type: "string"},
372 },
373 },
374 },
375 },
376 },
377 },
378 }
379 var err error
380 fooCRD, err = apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Create(testCtx.Ctx, fooCRD, metav1.CreateOptions{})
381 if err != nil {
382 t.Fatal(err)
383 }
384
385 registry := frameworkruntime.Registry{
386 "fakeCRPlugin": func(_ context.Context, _ runtime.Object, fh framework.Handle) (framework.Plugin, error) {
387 return &fakeCRPlugin{}, nil
388 },
389 }
390 cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
391 Profiles: []configv1.KubeSchedulerProfile{{
392 SchedulerName: pointer.String(v1.DefaultSchedulerName),
393 Plugins: &configv1.Plugins{
394 Filter: configv1.PluginSet{
395 Enabled: []configv1.Plugin{
396 {Name: "fakeCRPlugin"},
397 },
398 },
399 },
400 }}})
401
402 testCtx.KubeConfig = server.ClientConfig
403 testCtx.ClientSet = kubernetes.NewForConfigOrDie(server.ClientConfig)
404 testCtx.NS, err = testCtx.ClientSet.CoreV1().Namespaces().Create(testCtx.Ctx, &v1.Namespace{
405 ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("cr-enqueue-%v", string(uuid.NewUUID()))}}, metav1.CreateOptions{})
406 if err != nil && !errors.IsAlreadyExists(err) {
407 t.Fatalf("Failed to integration test ns: %v", err)
408 }
409
410
411
412
413 testCtx = testutils.InitTestSchedulerWithOptions(
414 t,
415 testCtx,
416 0,
417 scheduler.WithProfiles(cfg.Profiles...),
418 scheduler.WithFrameworkOutOfTreeRegistry(registry),
419 scheduler.WithPodInitialBackoffSeconds(0),
420 scheduler.WithPodMaxBackoffSeconds(0),
421 )
422 testutils.SyncSchedulerInformerFactory(testCtx)
423
424 defer testutils.CleanupTest(t, testCtx)
425
426 cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
427 logger := klog.FromContext(ctx)
428
429 node := st.MakeNode().Name("fake-node").Obj()
430 if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
431 t.Fatalf("Failed to create Node %q: %v", node.Name, err)
432 }
433
434
435 pause := imageutils.GetPauseImageName()
436 pod := st.MakePod().Namespace(ns).Name("fake-pod").Container(pause).Obj()
437 if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
438 t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
439 }
440
441
442 if err := wait.PollUntilContextTimeout(ctx, time.Millisecond*200, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
443 pendingPods, _ := testCtx.Scheduler.SchedulingQueue.PendingPods()
444 return len(pendingPods) == 1, nil
445 }); err != nil {
446 t.Fatal(err)
447 }
448
449
450 podInfo := testutils.NextPodOrDie(t, testCtx)
451 fwk, ok := testCtx.Scheduler.Profiles[podInfo.Pod.Spec.SchedulerName]
452 if !ok {
453 t.Fatalf("Cannot find the profile for Pod %v", podInfo.Pod.Name)
454 }
455
456 _, fitError := testCtx.Scheduler.SchedulePod(ctx, fwk, framework.NewCycleState(), podInfo.Pod)
457
458 if fitError == nil {
459 t.Fatalf("Expect Pod %v to fail at scheduling.", podInfo.Pod.Name)
460 }
461 testCtx.Scheduler.FailureHandler(ctx, fwk, podInfo, framework.NewStatus(framework.Unschedulable).WithError(fitError), nil, time.Now())
462
463
464
465 testCtx.Scheduler.SchedulingQueue.AddUnschedulableIfNotPresent(logger, podInfo, 10)
466
467
468
469 crdGVR := schema.GroupVersionResource{Group: fooCRD.Spec.Group, Version: fooCRD.Spec.Versions[0].Name, Resource: "foos"}
470 crClient := dynamicClient.Resource(crdGVR).Namespace(ns)
471 if _, err := crClient.Create(ctx, &unstructured.Unstructured{
472 Object: map[string]interface{}{
473 "apiVersion": "example.com/v1",
474 "kind": "Foo",
475 "metadata": map[string]interface{}{"name": "foo1"},
476 },
477 }, metav1.CreateOptions{}); err != nil {
478 t.Fatalf("Unable to create cr: %v", err)
479 }
480
481
482 podInfo = testutils.NextPodOrDie(t, testCtx)
483 if podInfo.Attempts != 2 {
484 t.Errorf("Expected the Pod to be attempted 2 times, but got %v", podInfo.Attempts)
485 }
486 }
487
488
489
490 func TestRequeueByBindFailure(t *testing.T) {
491 fakeBind := &firstFailBindPlugin{}
492 registry := frameworkruntime.Registry{
493 "firstFailBindPlugin": func(ctx context.Context, o runtime.Object, fh framework.Handle) (framework.Plugin, error) {
494 binder, err := defaultbinder.New(ctx, nil, fh)
495 if err != nil {
496 return nil, err
497 }
498
499 fakeBind.defaultBinderPlugin = binder.(framework.BindPlugin)
500 return fakeBind, nil
501 },
502 }
503
504 cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
505 Profiles: []configv1.KubeSchedulerProfile{{
506 SchedulerName: pointer.String(v1.DefaultSchedulerName),
507 Plugins: &configv1.Plugins{
508 MultiPoint: configv1.PluginSet{
509 Enabled: []configv1.Plugin{
510 {Name: "firstFailBindPlugin"},
511 },
512 Disabled: []configv1.Plugin{
513 {Name: names.DefaultBinder},
514 },
515 },
516 },
517 }}})
518
519
520 testCtx := testutils.InitTestSchedulerWithOptions(
521 t,
522 testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
523 0,
524 scheduler.WithPodInitialBackoffSeconds(0),
525 scheduler.WithPodMaxBackoffSeconds(0),
526 scheduler.WithProfiles(cfg.Profiles...),
527 scheduler.WithFrameworkOutOfTreeRegistry(registry),
528 )
529 testutils.SyncSchedulerInformerFactory(testCtx)
530
531 go testCtx.Scheduler.Run(testCtx.Ctx)
532
533 cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
534 node := st.MakeNode().Name("fake-node").Obj()
535 if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
536 t.Fatalf("Failed to create Node %q: %v", node.Name, err)
537 }
538
539 pod := st.MakePod().Namespace(ns).Name("pod-1").Container(imageutils.GetPauseImageName()).Obj()
540 if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
541 t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
542 }
543
544
545
546
547
548 err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, testutils.PodScheduled(cs, ns, pod.Name))
549 if err != nil {
550 t.Fatalf("Expect pod-1 to be scheduled by the bind plugin: %v", err)
551 }
552
553
554 if fakeBind.counter != 1 {
555 t.Fatalf("Expect pod-1 to be scheduled by the bind plugin in the second binding try: %v", err)
556 }
557 }
558
559
560 type firstFailBindPlugin struct {
561 counter int
562 defaultBinderPlugin framework.BindPlugin
563 }
564
565 func (*firstFailBindPlugin) Name() string {
566 return "firstFailBindPlugin"
567 }
568
569 func (p *firstFailBindPlugin) Bind(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodename string) *framework.Status {
570 if p.counter == 0 {
571
572 p.counter++
573 return framework.NewStatus(framework.Error, "firstFailBindPlugin rejects the Pod")
574 }
575
576 return p.defaultBinderPlugin.Bind(ctx, state, pod, nodename)
577 }
578
579
580
581 func TestRequeueByPermitRejection(t *testing.T) {
582 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.SchedulerQueueingHints, true)()
583 queueingHintCalledCounter := 0
584 fakePermit := &fakePermitPlugin{}
585 registry := frameworkruntime.Registry{
586 fakePermitPluginName: func(ctx context.Context, o runtime.Object, fh framework.Handle) (framework.Plugin, error) {
587 fakePermit = &fakePermitPlugin{
588 frameworkHandler: fh,
589 schedulingHint: func(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
590 queueingHintCalledCounter++
591 return framework.Queue, nil
592 },
593 }
594 return fakePermit, nil
595 },
596 }
597 cfg := configtesting.V1ToInternalWithDefaults(t, configv1.KubeSchedulerConfiguration{
598 Profiles: []configv1.KubeSchedulerProfile{{
599 SchedulerName: pointer.String(v1.DefaultSchedulerName),
600 Plugins: &configv1.Plugins{
601 MultiPoint: configv1.PluginSet{
602 Enabled: []configv1.Plugin{
603 {Name: fakePermitPluginName},
604 },
605 },
606 },
607 }}})
608
609
610 testCtx := testutils.InitTestSchedulerWithOptions(
611 t,
612 testutils.InitTestAPIServer(t, "core-res-enqueue", nil),
613 0,
614 scheduler.WithPodInitialBackoffSeconds(0),
615 scheduler.WithPodMaxBackoffSeconds(0),
616 scheduler.WithProfiles(cfg.Profiles...),
617 scheduler.WithFrameworkOutOfTreeRegistry(registry),
618 )
619 testutils.SyncSchedulerInformerFactory(testCtx)
620
621 go testCtx.Scheduler.Run(testCtx.Ctx)
622
623 cs, ns, ctx := testCtx.ClientSet, testCtx.NS.Name, testCtx.Ctx
624 node := st.MakeNode().Name("fake-node").Obj()
625 if _, err := cs.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
626 t.Fatalf("Failed to create Node %q: %v", node.Name, err)
627 }
628
629 pod := st.MakePod().Namespace(ns).Name("pod-1").Container(imageutils.GetPauseImageName()).Obj()
630 if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
631 t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
632 }
633
634
635 node.Labels = map[string]string{"updated": ""}
636 if _, err := cs.CoreV1().Nodes().Update(ctx, node, metav1.UpdateOptions{}); err != nil {
637 t.Fatalf("Failed to add labels to the node: %v", err)
638 }
639
640
641
642 pod = st.MakePod().Namespace(ns).Name("pod-2").Container(imageutils.GetPauseImageName()).Obj()
643 if _, err := cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
644 t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
645 }
646
647
648
649 fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
650 if wp.GetPod().Name == "pod-1" {
651 wp.Reject(fakePermitPluginName, "fakePermitPlugin rejects the Pod")
652 return
653 }
654 })
655
656
657 err := wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (done bool, err error) {
658 fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
659 if wp.GetPod().Name == "pod-2" {
660 wp.Allow(fakePermitPluginName)
661 }
662 })
663
664 return testutils.PodScheduled(cs, ns, "pod-2")(ctx)
665 })
666 if err != nil {
667 t.Fatalf("Expect pod-2 to be scheduled")
668 }
669
670 err = wait.PollUntilContextTimeout(ctx, 200*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (done bool, err error) {
671 pod1Found := false
672 fakePermit.frameworkHandler.IterateOverWaitingPods(func(wp framework.WaitingPod) {
673 if wp.GetPod().Name == "pod-1" {
674 pod1Found = true
675 wp.Allow(fakePermitPluginName)
676 }
677 })
678 return pod1Found, nil
679 })
680 if err != nil {
681 t.Fatal("Expect pod-1 to be scheduled again")
682 }
683
684 if queueingHintCalledCounter != 1 {
685 t.Fatalf("Expected the scheduling hint to be called 1 time, but %v", queueingHintCalledCounter)
686 }
687 }
688
689 type fakePermitPlugin struct {
690 frameworkHandler framework.Handle
691 schedulingHint framework.QueueingHintFn
692 }
693
694 const fakePermitPluginName = "fakePermitPlugin"
695
696 func (p *fakePermitPlugin) Name() string {
697 return fakePermitPluginName
698 }
699
700 func (p *fakePermitPlugin) Permit(ctx context.Context, state *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) {
701 return framework.NewStatus(framework.Wait), wait.ForeverTestTimeout
702 }
703
704 func (p *fakePermitPlugin) EventsToRegister() []framework.ClusterEventWithHint {
705 return []framework.ClusterEventWithHint{
706 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeLabel}, QueueingHintFn: p.schedulingHint},
707 }
708 }
709
View as plain text