1
16
17 package dynamicresources
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "sort"
24 "sync"
25 "testing"
26 "time"
27
28 "github.com/google/go-cmp/cmp"
29 "github.com/stretchr/testify/assert"
30 "github.com/stretchr/testify/require"
31
32 v1 "k8s.io/api/core/v1"
33 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 apiruntime "k8s.io/apimachinery/pkg/runtime"
36 "k8s.io/apimachinery/pkg/runtime/schema"
37 "k8s.io/apimachinery/pkg/types"
38 "k8s.io/client-go/informers"
39 "k8s.io/client-go/kubernetes/fake"
40 cgotesting "k8s.io/client-go/testing"
41 "k8s.io/client-go/tools/cache"
42 "k8s.io/klog/v2/ktesting"
43 _ "k8s.io/klog/v2/ktesting/init"
44 "k8s.io/kubernetes/pkg/scheduler/framework"
45 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
46 "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
47 st "k8s.io/kubernetes/pkg/scheduler/testing"
48 )
49
50 var (
51 podKind = v1.SchemeGroupVersion.WithKind("Pod")
52
53 podName = "my-pod"
54 podUID = "1234"
55 resourceName = "my-resource"
56 resourceName2 = resourceName + "-2"
57 claimName = podName + "-" + resourceName
58 claimName2 = podName + "-" + resourceName + "-2"
59 className = "my-resource-class"
60 namespace = "default"
61
62 resourceClass = &resourcev1alpha2.ResourceClass{
63 ObjectMeta: metav1.ObjectMeta{
64 Name: className,
65 },
66 DriverName: "some-driver",
67 }
68
69 podWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
70 UID(podUID).
71 PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimName: &claimName}}).
72 Obj()
73 otherPodWithClaimName = st.MakePod().Name(podName).Namespace(namespace).
74 UID(podUID + "-II").
75 PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimName: &claimName}}).
76 Obj()
77 podWithClaimTemplate = st.MakePod().Name(podName).Namespace(namespace).
78 UID(podUID).
79 PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimTemplateName: &claimName}}).
80 Obj()
81 podWithClaimTemplateInStatus = func() *v1.Pod {
82 pod := podWithClaimTemplate.DeepCopy()
83 pod.Status.ResourceClaimStatuses = []v1.PodResourceClaimStatus{
84 {
85 Name: pod.Spec.ResourceClaims[0].Name,
86 ResourceClaimName: &claimName,
87 },
88 }
89 return pod
90 }()
91 podWithTwoClaimNames = st.MakePod().Name(podName).Namespace(namespace).
92 UID(podUID).
93 PodResourceClaims(v1.PodResourceClaim{Name: resourceName, Source: v1.ClaimSource{ResourceClaimName: &claimName}}).
94 PodResourceClaims(v1.PodResourceClaim{Name: resourceName2, Source: v1.ClaimSource{ResourceClaimName: &claimName2}}).
95 Obj()
96
97 workerNode = &st.MakeNode().Name("worker").Label("nodename", "worker").Node
98
99 claim = st.MakeResourceClaim().
100 Name(claimName).
101 Namespace(namespace).
102 ResourceClassName(className).
103 Obj()
104 pendingImmediateClaim = st.FromResourceClaim(claim).
105 AllocationMode(resourcev1alpha2.AllocationModeImmediate).
106 Obj()
107 pendingDelayedClaim = st.FromResourceClaim(claim).
108 OwnerReference(podName, podUID, podKind).
109 AllocationMode(resourcev1alpha2.AllocationModeWaitForFirstConsumer).
110 Obj()
111 pendingDelayedClaim2 = st.FromResourceClaim(pendingDelayedClaim).
112 Name(claimName2).
113 Obj()
114 deallocatingClaim = st.FromResourceClaim(pendingImmediateClaim).
115 Allocation(&resourcev1alpha2.AllocationResult{}).
116 DeallocationRequested(true).
117 Obj()
118 inUseClaim = st.FromResourceClaim(pendingImmediateClaim).
119 Allocation(&resourcev1alpha2.AllocationResult{}).
120 ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
121 Obj()
122 allocatedClaim = st.FromResourceClaim(pendingDelayedClaim).
123 Allocation(&resourcev1alpha2.AllocationResult{}).
124 Obj()
125 allocatedDelayedClaimWithWrongTopology = st.FromResourceClaim(allocatedClaim).
126 Allocation(&resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("no-such-label", []string{"no-such-value"}).Obj()}).
127 Obj()
128 allocatedImmediateClaimWithWrongTopology = st.FromResourceClaim(allocatedDelayedClaimWithWrongTopology).
129 AllocationMode(resourcev1alpha2.AllocationModeImmediate).
130 Obj()
131 allocatedClaimWithGoodTopology = st.FromResourceClaim(allocatedClaim).
132 Allocation(&resourcev1alpha2.AllocationResult{AvailableOnNodes: st.MakeNodeSelector().In("nodename", []string{"worker"}).Obj()}).
133 Obj()
134 otherClaim = st.MakeResourceClaim().
135 Name("not-my-claim").
136 Namespace(namespace).
137 ResourceClassName(className).
138 Obj()
139
140 scheduling = st.MakePodSchedulingContexts().Name(podName).Namespace(namespace).
141 OwnerReference(podName, podUID, podKind).
142 Obj()
143 schedulingPotential = st.FromPodSchedulingContexts(scheduling).
144 PotentialNodes(workerNode.Name).
145 Obj()
146 schedulingSelectedPotential = st.FromPodSchedulingContexts(schedulingPotential).
147 SelectedNode(workerNode.Name).
148 Obj()
149 schedulingInfo = st.FromPodSchedulingContexts(schedulingPotential).
150 ResourceClaims(resourcev1alpha2.ResourceClaimSchedulingStatus{Name: resourceName},
151 resourcev1alpha2.ResourceClaimSchedulingStatus{Name: resourceName2}).
152 Obj()
153 )
154
155
156
157 type result struct {
158 status *framework.Status
159
160
161
162 changes change
163
164
165 added []metav1.Object
166
167
168 removed []metav1.Object
169 }
170
171
172
173
174 type change struct {
175 scheduling func(*resourcev1alpha2.PodSchedulingContext) *resourcev1alpha2.PodSchedulingContext
176 claim func(*resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim
177 }
178 type perNodeResult map[string]result
179
180 func (p perNodeResult) forNode(nodeName string) result {
181 if p == nil {
182 return result{}
183 }
184 return p[nodeName]
185 }
186
187 type want struct {
188 preenqueue result
189 preFilterResult *framework.PreFilterResult
190 prefilter result
191 filter perNodeResult
192 prescore result
193 reserve result
194 unreserve result
195 prebind result
196 postbind result
197 postFilterResult *framework.PostFilterResult
198 postfilter result
199
200
201
202 unreserveAfterBindFailure *result
203 }
204
205
206
207
208
209 type prepare struct {
210 filter change
211 prescore change
212 reserve change
213 unreserve change
214 prebind change
215 postbind change
216 postfilter change
217 }
218
219 func TestPlugin(t *testing.T) {
220 testcases := map[string]struct {
221 nodes []*v1.Node
222 pod *v1.Pod
223 claims []*resourcev1alpha2.ResourceClaim
224 classes []*resourcev1alpha2.ResourceClass
225 schedulings []*resourcev1alpha2.PodSchedulingContext
226
227 prepare prepare
228 want want
229 disable bool
230 }{
231 "empty": {
232 pod: st.MakePod().Name("foo").Namespace("default").Obj(),
233 want: want{
234 prefilter: result{
235 status: framework.NewStatus(framework.Skip),
236 },
237 postfilter: result{
238 status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
239 },
240 },
241 },
242 "claim-reference": {
243 pod: podWithClaimName,
244 claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
245 want: want{
246 prebind: result{
247 changes: change{
248 claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
249 if claim.Name == claimName {
250 claim = claim.DeepCopy()
251 claim.Status.ReservedFor = inUseClaim.Status.ReservedFor
252 }
253 return claim
254 },
255 },
256 },
257 },
258 },
259 "claim-template": {
260 pod: podWithClaimTemplateInStatus,
261 claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
262 want: want{
263 prebind: result{
264 changes: change{
265 claim: func(claim *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
266 if claim.Name == claimName {
267 claim = claim.DeepCopy()
268 claim.Status.ReservedFor = inUseClaim.Status.ReservedFor
269 }
270 return claim
271 },
272 },
273 },
274 },
275 },
276 "missing-claim": {
277 pod: podWithClaimTemplate,
278 claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim, otherClaim},
279 want: want{
280 preenqueue: result{
281 status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `pod "default/my-pod": ResourceClaim not created yet`),
282 },
283 },
284 },
285 "deleted-claim": {
286 pod: podWithClaimTemplateInStatus,
287 claims: func() []*resourcev1alpha2.ResourceClaim {
288 claim := allocatedClaim.DeepCopy()
289 claim.DeletionTimestamp = &metav1.Time{Time: time.Now()}
290 return []*resourcev1alpha2.ResourceClaim{claim}
291 }(),
292 want: want{
293 preenqueue: result{
294 status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim "my-pod-my-resource" is being deleted`),
295 },
296 },
297 },
298 "wrong-claim": {
299 pod: podWithClaimTemplateInStatus,
300 claims: func() []*resourcev1alpha2.ResourceClaim {
301 claim := allocatedClaim.DeepCopy()
302 claim.OwnerReferences[0].UID += "123"
303 return []*resourcev1alpha2.ResourceClaim{claim}
304 }(),
305 want: want{
306 preenqueue: result{
307 status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `ResourceClaim default/my-pod-my-resource was not created for pod default/my-pod (pod is not owner)`),
308 },
309 },
310 },
311 "waiting-for-immediate-allocation": {
312 pod: podWithClaimName,
313 claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
314 classes: []*resourcev1alpha2.ResourceClass{resourceClass},
315 want: want{
316 prefilter: result{
317 status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `unallocated immediate resourceclaim`),
318 },
319 postfilter: result{
320 status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
321 },
322 },
323 },
324 "waiting-for-deallocation": {
325 pod: podWithClaimName,
326 claims: []*resourcev1alpha2.ResourceClaim{deallocatingClaim},
327 want: want{
328 prefilter: result{
329 status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim must be reallocated`),
330 },
331 postfilter: result{
332 status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
333 },
334 },
335 },
336 "delayed-allocation-missing-class": {
337 pod: podWithClaimName,
338 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
339 want: want{
340 prefilter: result{
341 status: framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("resource class %s does not exist", className)),
342 },
343 postfilter: result{
344 status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
345 },
346 },
347 },
348 "delayed-allocation-scheduling-select-immediately": {
349
350
351 pod: podWithClaimName,
352 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
353 classes: []*resourcev1alpha2.ResourceClass{resourceClass},
354 want: want{
355 prebind: result{
356 status: framework.NewStatus(framework.Pending, `waiting for resource driver`),
357 added: []metav1.Object{schedulingSelectedPotential},
358 },
359 },
360 },
361 "delayed-allocation-scheduling-ask": {
362
363
364
365 pod: podWithTwoClaimNames,
366 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim, pendingDelayedClaim2},
367 classes: []*resourcev1alpha2.ResourceClass{resourceClass},
368 want: want{
369 prebind: result{
370 status: framework.NewStatus(framework.Pending, `waiting for resource driver`),
371 added: []metav1.Object{schedulingPotential},
372 },
373 },
374 },
375 "delayed-allocation-scheduling-finish": {
376
377
378 pod: podWithClaimName,
379 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
380 schedulings: []*resourcev1alpha2.PodSchedulingContext{schedulingInfo},
381 classes: []*resourcev1alpha2.ResourceClass{resourceClass},
382 want: want{
383 prebind: result{
384 status: framework.NewStatus(framework.Pending, `waiting for resource driver`),
385 changes: change{
386 scheduling: func(in *resourcev1alpha2.PodSchedulingContext) *resourcev1alpha2.PodSchedulingContext {
387 return st.FromPodSchedulingContexts(in).
388 SelectedNode(workerNode.Name).
389 Obj()
390 },
391 },
392 },
393 },
394 },
395 "delayed-allocation-scheduling-finish-concurrent-label-update": {
396
397
398 pod: podWithClaimName,
399 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
400 schedulings: []*resourcev1alpha2.PodSchedulingContext{schedulingInfo},
401 classes: []*resourcev1alpha2.ResourceClass{resourceClass},
402 prepare: prepare{
403 prebind: change{
404 scheduling: func(in *resourcev1alpha2.PodSchedulingContext) *resourcev1alpha2.PodSchedulingContext {
405
406
407
408 return st.FromPodSchedulingContexts(in).
409 Label("hello", "world").
410 Obj()
411 },
412 },
413 },
414 want: want{
415 prebind: result{
416 status: framework.AsStatus(errors.New(`ResourceVersion must match the object that gets updated`)),
417 },
418 },
419 },
420 "delayed-allocation-scheduling-completed": {
421
422 pod: podWithClaimName,
423 claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim},
424 schedulings: []*resourcev1alpha2.PodSchedulingContext{schedulingInfo},
425 classes: []*resourcev1alpha2.ResourceClass{resourceClass},
426 want: want{
427 prebind: result{
428 changes: change{
429 claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
430 return st.FromResourceClaim(in).
431 ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
432 Obj()
433 },
434 },
435 },
436 postbind: result{
437 removed: []metav1.Object{schedulingInfo},
438 },
439 },
440 },
441 "in-use-by-other": {
442 nodes: []*v1.Node{},
443 pod: otherPodWithClaimName,
444 claims: []*resourcev1alpha2.ResourceClaim{inUseClaim},
445 classes: []*resourcev1alpha2.ResourceClass{},
446 schedulings: []*resourcev1alpha2.PodSchedulingContext{},
447 prepare: prepare{},
448 want: want{
449 prefilter: result{
450 status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim in use`),
451 },
452 postfilter: result{
453 status: framework.NewStatus(framework.Unschedulable, `no new claims to deallocate`),
454 },
455 },
456 },
457 "wrong-topology-delayed-allocation": {
458
459
460 pod: podWithClaimName,
461 claims: []*resourcev1alpha2.ResourceClaim{allocatedDelayedClaimWithWrongTopology},
462 want: want{
463 filter: perNodeResult{
464 workerNode.Name: {
465 status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`),
466 },
467 },
468 postfilter: result{
469
470 changes: change{
471 claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
472 return st.FromResourceClaim(in).
473 DeallocationRequested(true).
474 Obj()
475 },
476 },
477 status: framework.NewStatus(framework.Unschedulable, `deallocation of ResourceClaim completed`),
478 },
479 },
480 },
481 "wrong-topology-immediate-allocation": {
482
483
484 pod: podWithClaimName,
485 claims: []*resourcev1alpha2.ResourceClaim{allocatedImmediateClaimWithWrongTopology},
486 want: want{
487 filter: perNodeResult{
488 workerNode.Name: {
489 status: framework.NewStatus(framework.UnschedulableAndUnresolvable, `resourceclaim not available on the node`),
490 },
491 },
492 postfilter: result{
493
494
495 status: framework.NewStatus(framework.Unschedulable, `still not schedulable`),
496 },
497 },
498 },
499 "good-topology": {
500 pod: podWithClaimName,
501 claims: []*resourcev1alpha2.ResourceClaim{allocatedClaimWithGoodTopology},
502 want: want{
503 prebind: result{
504 changes: change{
505 claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
506 return st.FromResourceClaim(in).
507 ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
508 Obj()
509 },
510 },
511 },
512 },
513 },
514 "bind-failure": {
515 pod: podWithClaimName,
516 claims: []*resourcev1alpha2.ResourceClaim{allocatedClaimWithGoodTopology},
517 want: want{
518 prebind: result{
519 changes: change{
520 claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
521 return st.FromResourceClaim(in).
522 ReservedFor(resourcev1alpha2.ResourceClaimConsumerReference{Resource: "pods", Name: podName, UID: types.UID(podUID)}).
523 Obj()
524 },
525 },
526 },
527 unreserveAfterBindFailure: &result{
528 changes: change{
529 claim: func(in *resourcev1alpha2.ResourceClaim) *resourcev1alpha2.ResourceClaim {
530 out := in.DeepCopy()
531 out.Status.ReservedFor = []resourcev1alpha2.ResourceClaimConsumerReference{}
532 return out
533 },
534 },
535 },
536 },
537 },
538 "reserved-okay": {
539 pod: podWithClaimName,
540 claims: []*resourcev1alpha2.ResourceClaim{inUseClaim},
541 },
542 "disable": {
543 pod: podWithClaimName,
544 claims: []*resourcev1alpha2.ResourceClaim{inUseClaim},
545 want: want{
546 prefilter: result{
547 status: framework.NewStatus(framework.Skip),
548 },
549 },
550 disable: true,
551 },
552 }
553
554 for name, tc := range testcases {
555
556 tc := tc
557 t.Run(name, func(t *testing.T) {
558 t.Parallel()
559 nodes := tc.nodes
560 if nodes == nil {
561 nodes = []*v1.Node{workerNode}
562 }
563 testCtx := setup(t, nodes, tc.claims, tc.classes, tc.schedulings)
564 testCtx.p.enabled = !tc.disable
565 initialObjects := testCtx.listAll(t)
566
567 status := testCtx.p.PreEnqueue(testCtx.ctx, tc.pod)
568 t.Run("PreEnqueue", func(t *testing.T) {
569 testCtx.verify(t, tc.want.preenqueue, initialObjects, nil, status)
570 })
571 if !status.IsSuccess() {
572 return
573 }
574
575 result, status := testCtx.p.PreFilter(testCtx.ctx, testCtx.state, tc.pod)
576 t.Run("prefilter", func(t *testing.T) {
577 assert.Equal(t, tc.want.preFilterResult, result)
578 testCtx.verify(t, tc.want.prefilter, initialObjects, result, status)
579 })
580 if status.IsSkip() {
581 return
582 }
583 unschedulable := status.Code() != framework.Success
584
585 var potentialNodes []*framework.NodeInfo
586
587 initialObjects = testCtx.listAll(t)
588 testCtx.updateAPIServer(t, initialObjects, tc.prepare.filter)
589 if !unschedulable {
590 for _, nodeInfo := range testCtx.nodeInfos {
591 initialObjects = testCtx.listAll(t)
592 status := testCtx.p.Filter(testCtx.ctx, testCtx.state, tc.pod, nodeInfo)
593 nodeName := nodeInfo.Node().Name
594 t.Run(fmt.Sprintf("filter/%s", nodeInfo.Node().Name), func(t *testing.T) {
595 testCtx.verify(t, tc.want.filter.forNode(nodeName), initialObjects, nil, status)
596 })
597 if status.Code() != framework.Success {
598 unschedulable = true
599 } else {
600 potentialNodes = append(potentialNodes, nodeInfo)
601 }
602 }
603 }
604
605 if !unschedulable && len(potentialNodes) > 0 {
606 initialObjects = testCtx.listAll(t)
607 initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.prescore)
608 status := testCtx.p.PreScore(testCtx.ctx, testCtx.state, tc.pod, potentialNodes)
609 t.Run("prescore", func(t *testing.T) {
610 testCtx.verify(t, tc.want.prescore, initialObjects, nil, status)
611 })
612 if status.Code() != framework.Success {
613 unschedulable = true
614 }
615 }
616
617 var selectedNode *framework.NodeInfo
618 if !unschedulable && len(potentialNodes) > 0 {
619 selectedNode = potentialNodes[0]
620
621 initialObjects = testCtx.listAll(t)
622 initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.reserve)
623 status := testCtx.p.Reserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
624 t.Run("reserve", func(t *testing.T) {
625 testCtx.verify(t, tc.want.reserve, initialObjects, nil, status)
626 })
627 if status.Code() != framework.Success {
628 unschedulable = true
629 }
630 }
631
632 if selectedNode != nil {
633 if unschedulable {
634 initialObjects = testCtx.listAll(t)
635 initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.unreserve)
636 testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
637 t.Run("unreserve", func(t *testing.T) {
638 testCtx.verify(t, tc.want.unreserve, initialObjects, nil, status)
639 })
640 } else {
641 initialObjects = testCtx.listAll(t)
642 initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.prebind)
643 status := testCtx.p.PreBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
644 t.Run("prebind", func(t *testing.T) {
645 testCtx.verify(t, tc.want.prebind, initialObjects, nil, status)
646 })
647
648 if tc.want.unreserveAfterBindFailure != nil {
649 initialObjects = testCtx.listAll(t)
650 testCtx.p.Unreserve(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
651 t.Run("unreserverAfterBindFailure", func(t *testing.T) {
652 testCtx.verify(t, *tc.want.unreserveAfterBindFailure, initialObjects, nil, status)
653 })
654 } else if status.IsSuccess() {
655 initialObjects = testCtx.listAll(t)
656 initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postbind)
657 testCtx.p.PostBind(testCtx.ctx, testCtx.state, tc.pod, selectedNode.Node().Name)
658 t.Run("postbind", func(t *testing.T) {
659 testCtx.verify(t, tc.want.postbind, initialObjects, nil, nil)
660 })
661 }
662 }
663 } else {
664 initialObjects = testCtx.listAll(t)
665 initialObjects = testCtx.updateAPIServer(t, initialObjects, tc.prepare.postfilter)
666 result, status := testCtx.p.PostFilter(testCtx.ctx, testCtx.state, tc.pod, nil )
667 t.Run("postfilter", func(t *testing.T) {
668 assert.Equal(t, tc.want.postFilterResult, result)
669 testCtx.verify(t, tc.want.postfilter, initialObjects, nil, status)
670 })
671 }
672 })
673 }
674 }
675
676 type testContext struct {
677 ctx context.Context
678 client *fake.Clientset
679 informerFactory informers.SharedInformerFactory
680 p *dynamicResources
681 nodeInfos []*framework.NodeInfo
682 state *framework.CycleState
683 }
684
685 func (tc *testContext) verify(t *testing.T, expected result, initialObjects []metav1.Object, result interface{}, status *framework.Status) {
686 t.Helper()
687 assert.Equal(t, expected.status, status)
688 objects := tc.listAll(t)
689 wantObjects := update(t, initialObjects, expected.changes)
690 wantObjects = append(wantObjects, expected.added...)
691 for _, remove := range expected.removed {
692 for i, obj := range wantObjects {
693
694
695 if obj.GetName() == remove.GetName() && obj.GetNamespace() == remove.GetNamespace() {
696 wantObjects = append(wantObjects[0:i], wantObjects[i+1:]...)
697 break
698 }
699 }
700 }
701 sortObjects(wantObjects)
702 stripObjects(wantObjects)
703 stripObjects(objects)
704 assert.Equal(t, wantObjects, objects)
705 }
706
707
708
709 type setGVK interface {
710 SetGroupVersionKind(gvk schema.GroupVersionKind)
711 }
712
713
714
715 func stripObjects(objects []metav1.Object) {
716 for _, obj := range objects {
717 obj.SetResourceVersion("")
718 obj.SetUID("")
719 if objType, ok := obj.(setGVK); ok {
720 objType.SetGroupVersionKind(schema.GroupVersionKind{})
721 }
722 }
723 }
724
725 func (tc *testContext) listAll(t *testing.T) (objects []metav1.Object) {
726 t.Helper()
727 claims, err := tc.client.ResourceV1alpha2().ResourceClaims("").List(tc.ctx, metav1.ListOptions{})
728 require.NoError(t, err, "list claims")
729 for _, claim := range claims.Items {
730 claim := claim
731 objects = append(objects, &claim)
732 }
733 schedulings, err := tc.client.ResourceV1alpha2().PodSchedulingContexts("").List(tc.ctx, metav1.ListOptions{})
734 require.NoError(t, err, "list pod scheduling")
735 for _, scheduling := range schedulings.Items {
736 scheduling := scheduling
737 objects = append(objects, &scheduling)
738 }
739
740 sortObjects(objects)
741 return
742 }
743
744
745 func (tc *testContext) updateAPIServer(t *testing.T, objects []metav1.Object, updates change) []metav1.Object {
746 modified := update(t, objects, updates)
747 for i := range modified {
748 obj := modified[i]
749 if diff := cmp.Diff(objects[i], obj); diff != "" {
750 t.Logf("Updating %T %q, diff (-old, +new):\n%s", obj, obj.GetName(), diff)
751 switch obj := obj.(type) {
752 case *resourcev1alpha2.ResourceClaim:
753 obj, err := tc.client.ResourceV1alpha2().ResourceClaims(obj.Namespace).Update(tc.ctx, obj, metav1.UpdateOptions{})
754 if err != nil {
755 t.Fatalf("unexpected error during prepare update: %v", err)
756 }
757 modified[i] = obj
758 case *resourcev1alpha2.PodSchedulingContext:
759 obj, err := tc.client.ResourceV1alpha2().PodSchedulingContexts(obj.Namespace).Update(tc.ctx, obj, metav1.UpdateOptions{})
760 if err != nil {
761 t.Fatalf("unexpected error during prepare update: %v", err)
762 }
763 modified[i] = obj
764 default:
765 t.Fatalf("unsupported object type %T", obj)
766 }
767 }
768 }
769 return modified
770 }
771
772 func sortObjects(objects []metav1.Object) {
773 sort.Slice(objects, func(i, j int) bool {
774 if objects[i].GetNamespace() < objects[j].GetNamespace() {
775 return true
776 }
777 return objects[i].GetName() < objects[j].GetName()
778 })
779 }
780
781
782
783
784 func update(t *testing.T, objects []metav1.Object, updates change) []metav1.Object {
785 var updated []metav1.Object
786
787 for _, obj := range objects {
788 switch in := obj.(type) {
789 case *resourcev1alpha2.ResourceClaim:
790 if updates.claim != nil {
791 obj = updates.claim(in)
792 }
793 case *resourcev1alpha2.PodSchedulingContext:
794 if updates.scheduling != nil {
795 obj = updates.scheduling(in)
796 }
797 }
798 updated = append(updated, obj)
799 }
800
801 return updated
802 }
803
804 func setup(t *testing.T, nodes []*v1.Node, claims []*resourcev1alpha2.ResourceClaim, classes []*resourcev1alpha2.ResourceClass, schedulings []*resourcev1alpha2.PodSchedulingContext) (result *testContext) {
805 t.Helper()
806
807 tc := &testContext{}
808 _, ctx := ktesting.NewTestContext(t)
809 ctx, cancel := context.WithCancel(ctx)
810 t.Cleanup(cancel)
811 tc.ctx = ctx
812
813 tc.client = fake.NewSimpleClientset()
814 reactor := createReactor(tc.client.Tracker())
815 tc.client.PrependReactor("*", "*", reactor)
816 tc.informerFactory = informers.NewSharedInformerFactory(tc.client, 0)
817
818 opts := []runtime.Option{
819 runtime.WithClientSet(tc.client),
820 runtime.WithInformerFactory(tc.informerFactory),
821 }
822 fh, err := runtime.NewFramework(ctx, nil, nil, opts...)
823 if err != nil {
824 t.Fatal(err)
825 }
826
827 pl, err := New(ctx, nil, fh, feature.Features{EnableDynamicResourceAllocation: true})
828 if err != nil {
829 t.Fatal(err)
830 }
831 tc.p = pl.(*dynamicResources)
832
833
834
835 for _, claim := range claims {
836 _, err := tc.client.ResourceV1alpha2().ResourceClaims(claim.Namespace).Create(tc.ctx, claim, metav1.CreateOptions{})
837 require.NoError(t, err, "create resource claim")
838 }
839 for _, class := range classes {
840 _, err := tc.client.ResourceV1alpha2().ResourceClasses().Create(tc.ctx, class, metav1.CreateOptions{})
841 require.NoError(t, err, "create resource class")
842 }
843 for _, scheduling := range schedulings {
844 _, err := tc.client.ResourceV1alpha2().PodSchedulingContexts(scheduling.Namespace).Create(tc.ctx, scheduling, metav1.CreateOptions{})
845 require.NoError(t, err, "create pod scheduling")
846 }
847
848 tc.informerFactory.Start(tc.ctx.Done())
849 t.Cleanup(func() {
850
851 cancel()
852
853 tc.informerFactory.Shutdown()
854 })
855
856 tc.informerFactory.WaitForCacheSync(tc.ctx.Done())
857
858 for _, node := range nodes {
859 nodeInfo := framework.NewNodeInfo()
860 nodeInfo.SetNode(node)
861 tc.nodeInfos = append(tc.nodeInfos, nodeInfo)
862 }
863 tc.state = framework.NewCycleState()
864
865 return tc
866 }
867
868
869
870
871
872 func createReactor(tracker cgotesting.ObjectTracker) func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) {
873 var uidCounter int
874 var resourceVersionCounter int
875 var mutex sync.Mutex
876
877 return func(action cgotesting.Action) (handled bool, ret apiruntime.Object, err error) {
878 createAction, ok := action.(cgotesting.CreateAction)
879 if !ok {
880 return false, nil, nil
881 }
882 obj, ok := createAction.GetObject().(metav1.Object)
883 if !ok {
884 return false, nil, nil
885 }
886
887 mutex.Lock()
888 defer mutex.Unlock()
889 switch action.GetVerb() {
890 case "create":
891 if obj.GetUID() != "" {
892 return true, nil, errors.New("UID must not be set on create")
893 }
894 if obj.GetResourceVersion() != "" {
895 return true, nil, errors.New("ResourceVersion must not be set on create")
896 }
897 obj.SetUID(types.UID(fmt.Sprintf("UID-%d", uidCounter)))
898 uidCounter++
899 obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter))
900 resourceVersionCounter++
901 case "update":
902 uid := obj.GetUID()
903 resourceVersion := obj.GetResourceVersion()
904 if uid == "" {
905 return true, nil, errors.New("UID must be set on update")
906 }
907 if resourceVersion == "" {
908 return true, nil, errors.New("ResourceVersion must be set on update")
909 }
910
911 oldObj, err := tracker.Get(action.GetResource(), obj.GetNamespace(), obj.GetName())
912 if err != nil {
913 return true, nil, err
914 }
915 oldObjMeta, ok := oldObj.(metav1.Object)
916 if !ok {
917 return true, nil, errors.New("internal error: unexpected old object type")
918 }
919 if oldObjMeta.GetResourceVersion() != resourceVersion {
920 return true, nil, errors.New("ResourceVersion must match the object that gets updated")
921 }
922
923 obj.SetResourceVersion(fmt.Sprintf("REV-%d", resourceVersionCounter))
924 resourceVersionCounter++
925 }
926 return false, nil, nil
927 }
928 }
929
930 func Test_isSchedulableAfterClaimChange(t *testing.T) {
931 testcases := map[string]struct {
932 pod *v1.Pod
933 claims []*resourcev1alpha2.ResourceClaim
934 oldObj, newObj interface{}
935 expectedHint framework.QueueingHint
936 expectedErr bool
937 }{
938 "skip-deletes": {
939 pod: podWithClaimTemplate,
940 oldObj: allocatedClaim,
941 newObj: nil,
942 expectedHint: framework.QueueSkip,
943 },
944 "backoff-wrong-new-object": {
945 pod: podWithClaimTemplate,
946 newObj: "not-a-claim",
947 expectedErr: true,
948 },
949 "skip-wrong-claim": {
950 pod: podWithClaimTemplate,
951 newObj: func() *resourcev1alpha2.ResourceClaim {
952 claim := allocatedClaim.DeepCopy()
953 claim.OwnerReferences[0].UID += "123"
954 return claim
955 }(),
956 expectedHint: framework.QueueSkip,
957 },
958 "skip-unrelated-claim": {
959 pod: podWithClaimTemplate,
960 claims: []*resourcev1alpha2.ResourceClaim{allocatedClaim},
961 newObj: func() *resourcev1alpha2.ResourceClaim {
962 claim := allocatedClaim.DeepCopy()
963 claim.Name += "-foo"
964 claim.UID += "123"
965 return claim
966 }(),
967 expectedHint: framework.QueueSkip,
968 },
969 "queue-on-add": {
970 pod: podWithClaimName,
971 newObj: pendingImmediateClaim,
972 expectedHint: framework.Queue,
973 },
974 "backoff-wrong-old-object": {
975 pod: podWithClaimName,
976 oldObj: "not-a-claim",
977 newObj: pendingImmediateClaim,
978 expectedErr: true,
979 },
980 "skip-adding-finalizer": {
981 pod: podWithClaimName,
982 claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
983 oldObj: pendingImmediateClaim,
984 newObj: func() *resourcev1alpha2.ResourceClaim {
985 claim := pendingImmediateClaim.DeepCopy()
986 claim.Finalizers = append(claim.Finalizers, "foo")
987 return claim
988 }(),
989 expectedHint: framework.QueueSkip,
990 },
991 "queue-on-status-change": {
992 pod: podWithClaimName,
993 claims: []*resourcev1alpha2.ResourceClaim{pendingImmediateClaim},
994 oldObj: pendingImmediateClaim,
995 newObj: func() *resourcev1alpha2.ResourceClaim {
996 claim := pendingImmediateClaim.DeepCopy()
997 claim.Status.Allocation = &resourcev1alpha2.AllocationResult{}
998 return claim
999 }(),
1000 expectedHint: framework.Queue,
1001 },
1002 }
1003
1004 for name, tc := range testcases {
1005 t.Run(name, func(t *testing.T) {
1006 logger, _ := ktesting.NewTestContext(t)
1007 testCtx := setup(t, nil, tc.claims, nil, nil)
1008 if claim, ok := tc.newObj.(*resourcev1alpha2.ResourceClaim); ok {
1009
1010 store := testCtx.informerFactory.Resource().V1alpha2().ResourceClaims().Informer().GetStore()
1011 if tc.oldObj == nil {
1012 require.NoError(t, store.Add(claim))
1013 } else {
1014 require.NoError(t, store.Update(claim))
1015 }
1016 }
1017 actualHint, err := testCtx.p.isSchedulableAfterClaimChange(logger, tc.pod, tc.oldObj, tc.newObj)
1018 if tc.expectedErr {
1019 require.Error(t, err)
1020 return
1021 }
1022
1023 require.NoError(t, err)
1024 require.Equal(t, tc.expectedHint, actualHint)
1025 })
1026 }
1027 }
1028
1029 func Test_isSchedulableAfterPodSchedulingContextChange(t *testing.T) {
1030 testcases := map[string]struct {
1031 pod *v1.Pod
1032 schedulings []*resourcev1alpha2.PodSchedulingContext
1033 claims []*resourcev1alpha2.ResourceClaim
1034 oldObj, newObj interface{}
1035 expectedHint framework.QueueingHint
1036 expectedErr bool
1037 }{
1038 "skip-deleted": {
1039 pod: podWithClaimTemplate,
1040 oldObj: scheduling,
1041 expectedHint: framework.QueueSkip,
1042 },
1043 "skip-missed-deleted": {
1044 pod: podWithClaimTemplate,
1045 oldObj: cache.DeletedFinalStateUnknown{
1046 Obj: scheduling,
1047 },
1048 expectedHint: framework.QueueSkip,
1049 },
1050 "backoff-wrong-old-object": {
1051 pod: podWithClaimTemplate,
1052 oldObj: "not-a-scheduling-context",
1053 newObj: scheduling,
1054 expectedErr: true,
1055 },
1056 "backoff-missed-wrong-old-object": {
1057 pod: podWithClaimTemplate,
1058 oldObj: cache.DeletedFinalStateUnknown{
1059 Obj: "not-a-scheduling-context",
1060 },
1061 newObj: scheduling,
1062 expectedErr: true,
1063 },
1064 "skip-unrelated-object": {
1065 pod: podWithClaimTemplate,
1066 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
1067 newObj: func() *resourcev1alpha2.PodSchedulingContext {
1068 scheduling := scheduling.DeepCopy()
1069 scheduling.Name += "-foo"
1070 return scheduling
1071 }(),
1072 expectedHint: framework.QueueSkip,
1073 },
1074 "backoff-wrong-new-object": {
1075 pod: podWithClaimTemplate,
1076 oldObj: scheduling,
1077 newObj: "not-a-scheduling-context",
1078 expectedErr: true,
1079 },
1080 "skip-missing-claim": {
1081 pod: podWithClaimTemplate,
1082 oldObj: scheduling,
1083 newObj: schedulingInfo,
1084 expectedHint: framework.QueueSkip,
1085 },
1086 "skip-missing-infos": {
1087 pod: podWithClaimTemplateInStatus,
1088 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
1089 oldObj: scheduling,
1090 newObj: scheduling,
1091 expectedHint: framework.QueueSkip,
1092 },
1093 "queue-new-infos": {
1094 pod: podWithClaimTemplateInStatus,
1095 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
1096 oldObj: scheduling,
1097 newObj: schedulingInfo,
1098 expectedHint: framework.Queue,
1099 },
1100 "queue-bad-selected-node": {
1101 pod: podWithClaimTemplateInStatus,
1102 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
1103 oldObj: func() *resourcev1alpha2.PodSchedulingContext {
1104 scheduling := schedulingInfo.DeepCopy()
1105 scheduling.Spec.SelectedNode = workerNode.Name
1106 return scheduling
1107 }(),
1108 newObj: func() *resourcev1alpha2.PodSchedulingContext {
1109 scheduling := schedulingInfo.DeepCopy()
1110 scheduling.Spec.SelectedNode = workerNode.Name
1111 scheduling.Status.ResourceClaims[0].UnsuitableNodes = append(scheduling.Status.ResourceClaims[0].UnsuitableNodes, scheduling.Spec.SelectedNode)
1112 return scheduling
1113 }(),
1114 expectedHint: framework.Queue,
1115 },
1116 "skip-spec-changes": {
1117 pod: podWithClaimTemplateInStatus,
1118 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
1119 oldObj: schedulingInfo,
1120 newObj: func() *resourcev1alpha2.PodSchedulingContext {
1121 scheduling := schedulingInfo.DeepCopy()
1122 scheduling.Spec.SelectedNode = workerNode.Name
1123 return scheduling
1124 }(),
1125 expectedHint: framework.QueueSkip,
1126 },
1127 "backoff-other-changes": {
1128 pod: podWithClaimTemplateInStatus,
1129 claims: []*resourcev1alpha2.ResourceClaim{pendingDelayedClaim},
1130 oldObj: schedulingInfo,
1131 newObj: func() *resourcev1alpha2.PodSchedulingContext {
1132 scheduling := schedulingInfo.DeepCopy()
1133 scheduling.Finalizers = append(scheduling.Finalizers, "foo")
1134 return scheduling
1135 }(),
1136 expectedHint: framework.Queue,
1137 },
1138 }
1139
1140 for name, tc := range testcases {
1141 tc := tc
1142 t.Run(name, func(t *testing.T) {
1143 t.Parallel()
1144 logger, _ := ktesting.NewTestContext(t)
1145 testCtx := setup(t, nil, tc.claims, nil, tc.schedulings)
1146 actualHint, err := testCtx.p.isSchedulableAfterPodSchedulingContextChange(logger, tc.pod, tc.oldObj, tc.newObj)
1147 if tc.expectedErr {
1148 require.Error(t, err)
1149 return
1150 }
1151
1152 require.NoError(t, err)
1153 require.Equal(t, tc.expectedHint, actualHint)
1154 })
1155 }
1156 }
1157
View as plain text