1
16
17 package dynamicresources
18
19 import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "slices"
25 "sort"
26 "sync"
27
28 "github.com/google/go-cmp/cmp"
29
30 v1 "k8s.io/api/core/v1"
31 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
32 apiequality "k8s.io/apimachinery/pkg/api/equality"
33 apierrors "k8s.io/apimachinery/pkg/api/errors"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/labels"
36 "k8s.io/apimachinery/pkg/runtime"
37 "k8s.io/apimachinery/pkg/runtime/schema"
38 "k8s.io/apimachinery/pkg/types"
39 "k8s.io/apimachinery/pkg/util/sets"
40 resourcev1alpha2apply "k8s.io/client-go/applyconfigurations/resource/v1alpha2"
41 "k8s.io/client-go/kubernetes"
42 resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
43 "k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
44 "k8s.io/dynamic-resource-allocation/resourceclaim"
45 "k8s.io/klog/v2"
46 "k8s.io/kubernetes/pkg/scheduler/framework"
47 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
48 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
49 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
50 schedutil "k8s.io/kubernetes/pkg/scheduler/util"
51 "k8s.io/utils/ptr"
52 )
53
54 const (
55
56 Name = names.DynamicResources
57
58 stateKey framework.StateKey = Name
59 )
60
61
62
63
64 type stateData struct {
65
66 preScored bool
67
68
69
70
71
72
73
74
75 claims []*resourcev1alpha2.ResourceClaim
76
77
78
79 podSchedulingState podSchedulingState
80
81
82
83 resources resources
84
85
86 mutex sync.Mutex
87
88
89
90
91
92
93
94
95 unavailableClaims sets.Set[int]
96
97 informationsForClaim []informationForClaim
98 }
99
100 func (d *stateData) Clone() framework.StateData {
101 return d
102 }
103
104 type informationForClaim struct {
105
106
107
108 availableOnNode *nodeaffinity.NodeSelector
109
110
111
112
113 status *resourcev1alpha2.ResourceClaimSchedulingStatus
114
115
116
117 structuredParameters bool
118 controller *claimController
119
120
121 allocation *resourcev1alpha2.AllocationResult
122 allocationDriverName string
123 }
124
125 type podSchedulingState struct {
126
127
128
129
130
131
132
133 schedulingCtx *resourcev1alpha2.PodSchedulingContext
134
135
136 selectedNode *string
137
138
139
140 potentialNodes *[]string
141 }
142
143 func (p *podSchedulingState) isDirty() bool {
144 return p.selectedNode != nil ||
145 p.potentialNodes != nil
146 }
147
148
149
150 func (p *podSchedulingState) init(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) error {
151 schedulingCtx, err := podSchedulingContextLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
152 switch {
153 case apierrors.IsNotFound(err):
154 return nil
155 case err != nil:
156 return err
157 default:
158
159 if !metav1.IsControlledBy(schedulingCtx, pod) {
160 return fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name)
161 }
162 }
163 p.schedulingCtx = schedulingCtx
164 return nil
165 }
166
167
168
169 func (p *podSchedulingState) publish(ctx context.Context, pod *v1.Pod, clientset kubernetes.Interface) error {
170 if !p.isDirty() {
171 return nil
172 }
173
174 var err error
175 logger := klog.FromContext(ctx)
176 if p.schedulingCtx != nil {
177
178 schedulingCtx := p.schedulingCtx.DeepCopy()
179 if p.selectedNode != nil {
180 schedulingCtx.Spec.SelectedNode = *p.selectedNode
181 }
182 if p.potentialNodes != nil {
183 schedulingCtx.Spec.PotentialNodes = *p.potentialNodes
184 }
185 if loggerV := logger.V(6); loggerV.Enabled() {
186
187 loggerV.Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx))
188 } else {
189 logger.V(5).Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx))
190 }
191 _, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{})
192 if apierrors.IsConflict(err) {
193
194
195
196
197
198
199
200
201
202
203
204
205
206 spec := resourcev1alpha2apply.PodSchedulingContextSpec()
207 spec.SelectedNode = p.selectedNode
208 if p.potentialNodes != nil {
209 spec.PotentialNodes = *p.potentialNodes
210 } else {
211
212
213
214 spec.PotentialNodes = p.schedulingCtx.Spec.PotentialNodes
215 }
216 schedulingCtxApply := resourcev1alpha2apply.PodSchedulingContext(pod.Name, pod.Namespace).WithSpec(spec)
217
218 if loggerV := logger.V(6); loggerV.Enabled() {
219
220 loggerV.Info("Patching PodSchedulingContext", "podSchedulingCtx", klog.KObj(pod), "podSchedulingCtxApply", klog.Format(schedulingCtxApply))
221 } else {
222 logger.V(5).Info("Patching PodSchedulingContext", "podSchedulingCtx", klog.KObj(pod))
223 }
224 _, err = clientset.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Apply(ctx, schedulingCtxApply, metav1.ApplyOptions{FieldManager: "kube-scheduler", Force: true})
225 }
226
227 } else {
228
229 schedulingCtx := &resourcev1alpha2.PodSchedulingContext{
230 ObjectMeta: metav1.ObjectMeta{
231 Name: pod.Name,
232 Namespace: pod.Namespace,
233 OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pod, schema.GroupVersionKind{Version: "v1", Kind: "Pod"})},
234 },
235 }
236 if p.selectedNode != nil {
237 schedulingCtx.Spec.SelectedNode = *p.selectedNode
238 }
239 if p.potentialNodes != nil {
240 schedulingCtx.Spec.PotentialNodes = *p.potentialNodes
241 }
242 if loggerV := logger.V(6); loggerV.Enabled() {
243
244 loggerV.Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx))
245 } else {
246 logger.V(5).Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx))
247 }
248 _, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{})
249 }
250 if err != nil {
251 return err
252 }
253 p.potentialNodes = nil
254 p.selectedNode = nil
255 return nil
256 }
257
258 func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podClaimName string) *resourcev1alpha2.ResourceClaimSchedulingStatus {
259 if schedulingCtx == nil {
260 return nil
261 }
262 for _, status := range schedulingCtx.Status.ResourceClaims {
263 if status.Name == podClaimName {
264 return &status
265 }
266 }
267 return nil
268 }
269
270
271 type dynamicResources struct {
272 enabled bool
273 fh framework.Handle
274 clientset kubernetes.Interface
275 claimLister resourcev1alpha2listers.ResourceClaimLister
276 classLister resourcev1alpha2listers.ResourceClassLister
277 podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister
278 claimParametersLister resourcev1alpha2listers.ResourceClaimParametersLister
279 classParametersLister resourcev1alpha2listers.ResourceClassParametersLister
280 resourceSliceLister resourcev1alpha2listers.ResourceSliceLister
281 claimNameLookup *resourceclaim.Lookup
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305 claimAssumeCache volumebinding.AssumeCache
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336 inFlightAllocations sync.Map
337 }
338
339
340 func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
341 if !fts.EnableDynamicResourceAllocation {
342
343 return &dynamicResources{}, nil
344 }
345
346 logger := klog.FromContext(ctx)
347 pl := &dynamicResources{
348 enabled: true,
349 fh: fh,
350 clientset: fh.ClientSet(),
351 claimLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
352 classLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
353 podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
354 claimParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(),
355 classParametersLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(),
356 resourceSliceLister: fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(),
357 claimNameLookup: resourceclaim.NewNameLookup(fh.ClientSet()),
358 claimAssumeCache: volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
359 }
360
361 return pl, nil
362 }
363
364 var _ framework.PreEnqueuePlugin = &dynamicResources{}
365 var _ framework.PreFilterPlugin = &dynamicResources{}
366 var _ framework.FilterPlugin = &dynamicResources{}
367 var _ framework.PostFilterPlugin = &dynamicResources{}
368 var _ framework.PreScorePlugin = &dynamicResources{}
369 var _ framework.ReservePlugin = &dynamicResources{}
370 var _ framework.EnqueueExtensions = &dynamicResources{}
371 var _ framework.PreBindPlugin = &dynamicResources{}
372 var _ framework.PostBindPlugin = &dynamicResources{}
373
374
375 func (pl *dynamicResources) Name() string {
376 return Name
377 }
378
379
380
381 func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint {
382 if !pl.enabled {
383 return nil
384 }
385
386 events := []framework.ClusterEventWithHint{
387
388
389 {Event: framework.ClusterEvent{Resource: framework.ResourceClaimParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimParametersChange},
390 {Event: framework.ClusterEvent{Resource: framework.ResourceClassParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClassParametersChange},
391
392
393 {Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
394
395
396 {Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodSchedulingContextChange},
397
398
399
400
401
402
403
404
405
406
407
408 {Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
409
410 {Event: framework.ClusterEvent{Resource: framework.ResourceClass, ActionType: framework.Add | framework.Update}},
411 }
412 return events
413 }
414
415
416
417
418 func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status *framework.Status) {
419 if err := pl.foreachPodResourceClaim(pod, nil); err != nil {
420 return statusUnschedulable(klog.FromContext(ctx), err.Error())
421 }
422 return nil
423 }
424
425
426
427
428
429 func (pl *dynamicResources) isSchedulableAfterClaimParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
430 originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClaimParameters](oldObj, newObj)
431 if err != nil {
432
433 return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimParametersChange: %w", err)
434 }
435
436 usesParameters := false
437 if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
438 ref := claim.Spec.ParametersRef
439 if ref == nil {
440 return
441 }
442
443
444 if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
445 ref.Kind == "ResourceClaimParameters" {
446 if modifiedParameters.Name == ref.Name {
447 usesParameters = true
448 }
449 return
450 }
451
452
453 generatedFrom := modifiedParameters.GeneratedFrom
454 if generatedFrom == nil {
455 return
456 }
457 if generatedFrom.APIGroup == ref.APIGroup &&
458 generatedFrom.Kind == ref.Kind &&
459 generatedFrom.Name == ref.Name {
460 usesParameters = true
461 }
462 }); err != nil {
463
464
465
466 logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedParameters), "reason", err.Error())
467 return framework.QueueSkip, nil
468 }
469
470 if !usesParameters {
471
472 logger.V(6).Info("unrelated claim parameters got modified", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
473 return framework.QueueSkip, nil
474 }
475
476 if originalParameters == nil {
477 logger.V(4).Info("claim parameters for pod got created", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
478 return framework.Queue, nil
479 }
480
481
482
483
484 if apiequality.Semantic.DeepEqual(&originalParameters.DriverRequests, &modifiedParameters.DriverRequests) {
485 logger.V(6).Info("claim parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
486 return framework.QueueSkip, nil
487 }
488
489 logger.V(4).Info("requests in claim parameters for pod got updated", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
490 return framework.Queue, nil
491 }
492
493
494
495
496
497 func (pl *dynamicResources) isSchedulableAfterClassParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
498 originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClassParameters](oldObj, newObj)
499 if err != nil {
500
501 return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClassParametersChange: %w", err)
502 }
503
504 usesParameters := false
505 if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
506 class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
507 if err != nil {
508 if !apierrors.IsNotFound(err) {
509 logger.Error(err, "look up resource class")
510 }
511 return
512 }
513 ref := class.ParametersRef
514 if ref == nil {
515 return
516 }
517
518
519 if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
520 ref.Kind == "ResourceClassParameters" {
521 if modifiedParameters.Name == ref.Name {
522 usesParameters = true
523 }
524 return
525 }
526
527
528 generatedFrom := modifiedParameters.GeneratedFrom
529 if generatedFrom == nil {
530 return
531 }
532 if generatedFrom.APIGroup == ref.APIGroup &&
533 generatedFrom.Kind == ref.Kind &&
534 generatedFrom.Name == ref.Name {
535 usesParameters = true
536 }
537 }); err != nil {
538
539
540
541 logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters), "reason", err.Error())
542 return framework.QueueSkip, nil
543 }
544
545 if !usesParameters {
546
547 logger.V(6).Info("unrelated class parameters got modified", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
548 return framework.QueueSkip, nil
549 }
550
551 if originalParameters == nil {
552 logger.V(4).Info("class parameters for pod got created", "pod", klog.KObj(pod), "class", klog.KObj(modifiedParameters))
553 return framework.Queue, nil
554 }
555
556
557
558
559 if apiequality.Semantic.DeepEqual(&originalParameters.Filters, &modifiedParameters.Filters) {
560 logger.V(6).Info("class parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
561 return framework.QueueSkip, nil
562 }
563
564 logger.V(4).Info("filters in class parameters for pod got updated", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
565 return framework.Queue, nil
566 }
567
568
569
570
571
572 func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
573 originalClaim, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](oldObj, newObj)
574 if err != nil {
575
576 return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimChange: %w", err)
577 }
578
579 usesClaim := false
580 if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
581 if claim.UID == modifiedClaim.UID {
582 usesClaim = true
583 }
584 }); err != nil {
585
586
587
588 logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error())
589 return framework.QueueSkip, nil
590 }
591
592 if originalClaim != nil &&
593 resourceclaim.IsAllocatedWithStructuredParameters(originalClaim) &&
594 modifiedClaim.Status.Allocation == nil {
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615 logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
616 return framework.Queue, nil
617 }
618
619 if !usesClaim {
620
621 logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
622 return framework.QueueSkip, nil
623 }
624
625 if originalClaim == nil {
626 logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
627 return framework.Queue, nil
628 }
629
630
631
632
633
634 if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) {
635 if loggerV := logger.V(7); loggerV.Enabled() {
636
637 loggerV.Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "diff", cmp.Diff(originalClaim, modifiedClaim))
638 } else {
639 logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
640 }
641 return framework.QueueSkip, nil
642 }
643
644 logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
645 return framework.Queue, nil
646 }
647
648
649
650
651
652
653 func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
654
655
656 if oldObj != nil && newObj == nil {
657 logger.V(4).Info("PodSchedulingContext got deleted")
658 return framework.QueueSkip, nil
659 }
660
661 oldPodScheduling, newPodScheduling, err := schedutil.As[*resourcev1alpha2.PodSchedulingContext](oldObj, newObj)
662 if err != nil {
663
664 return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterPodSchedulingContextChange: %w", err)
665 }
666 podScheduling := newPodScheduling
667
668 if podScheduling.Name != pod.Name || podScheduling.Namespace != pod.Namespace {
669 logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling))
670 return framework.QueueSkip, nil
671 }
672
673
674
675
676
677
678 pendingDelayedClaims := 0
679 if err := pl.foreachPodResourceClaim(pod, func(podResourceName string, claim *resourcev1alpha2.ResourceClaim) {
680 if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
681 claim.Status.Allocation == nil &&
682 !podSchedulingHasClaimInfo(podScheduling, podResourceName) {
683 pendingDelayedClaims++
684 }
685 }); err != nil {
686
687
688
689 logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
690 return framework.QueueSkip, nil
691 }
692
693
694 if pendingDelayedClaims > 0 {
695
696
697
698
699 if loggerV := logger.V(6); loggerV.Enabled() {
700 loggerV.Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling))
701 } else {
702 logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod))
703 }
704 return framework.QueueSkip, nil
705 }
706
707 if oldPodScheduling == nil ||
708 len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) {
709
710 logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
711 return framework.Queue, nil
712 }
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732 if podScheduling.Spec.SelectedNode != "" {
733 for _, claimStatus := range podScheduling.Status.ResourceClaims {
734 if sliceContains(claimStatus.UnsuitableNodes, podScheduling.Spec.SelectedNode) {
735 logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name)
736 return framework.Queue, nil
737 }
738 }
739 }
740
741
742 if oldPodScheduling != nil &&
743 !apiequality.Semantic.DeepEqual(&oldPodScheduling.Spec, &podScheduling.Spec) &&
744 apiequality.Semantic.DeepEqual(&oldPodScheduling.Status, &podScheduling.Status) {
745 logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod))
746 return framework.QueueSkip, nil
747 }
748
749
750
751
752
753
754 if loggerV := logger.V(6); loggerV.Enabled() {
755 loggerV.Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling))
756 } else {
757 logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod))
758 }
759 return framework.Queue, nil
760
761 }
762
763 func podSchedulingHasClaimInfo(podScheduling *resourcev1alpha2.PodSchedulingContext, podResourceName string) bool {
764 for _, claimStatus := range podScheduling.Status.ResourceClaims {
765 if claimStatus.Name == podResourceName {
766 return true
767 }
768 }
769 return false
770 }
771
772 func sliceContains(hay []string, needle string) bool {
773 for _, item := range hay {
774 if item == needle {
775 return true
776 }
777 }
778 return false
779 }
780
781
782 func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2.ResourceClaim, error) {
783 claims := make([]*resourcev1alpha2.ResourceClaim, 0, len(pod.Spec.ResourceClaims))
784 if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
785
786
787
788
789 claims = append(claims, claim)
790 }); err != nil {
791 return nil, err
792 }
793 return claims, nil
794 }
795
796
797
798 func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourcev1alpha2.ResourceClaim)) error {
799 for _, resource := range pod.Spec.ResourceClaims {
800 claimName, mustCheckOwner, err := pl.claimNameLookup.Name(pod, &resource)
801 if err != nil {
802 return err
803 }
804
805
806
807 if claimName == nil {
808 continue
809 }
810 claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
811 if err != nil {
812 return err
813 }
814
815 if claim.DeletionTimestamp != nil {
816 return fmt.Errorf("resourceclaim %q is being deleted", claim.Name)
817 }
818
819 if mustCheckOwner {
820 if err := resourceclaim.IsForPod(pod, claim); err != nil {
821 return err
822 }
823 }
824 if cb != nil {
825 cb(resource.Name, claim)
826 }
827 }
828 return nil
829 }
830
831
832
833
834 func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
835 if !pl.enabled {
836 return nil, framework.NewStatus(framework.Skip)
837 }
838 logger := klog.FromContext(ctx)
839
840
841
842
843
844 s := &stateData{}
845 state.Write(stateKey, s)
846
847 claims, err := pl.podResourceClaims(pod)
848 if err != nil {
849 return nil, statusUnschedulable(logger, err.Error())
850 }
851 logger.V(5).Info("pod resource claims", "pod", klog.KObj(pod), "resourceclaims", klog.KObjSlice(claims))
852
853
854
855 if len(claims) == 0 {
856 return nil, framework.NewStatus(framework.Skip)
857 }
858
859
860 if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil {
861 return nil, statusError(logger, err)
862 }
863
864 s.informationsForClaim = make([]informationForClaim, len(claims))
865 needResourceInformation := false
866 for index, claim := range claims {
867 if claim.Status.DeallocationRequested {
868
869 return nil, statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
870 }
871 if claim.Status.Allocation != nil &&
872 !resourceclaim.CanBeReserved(claim) &&
873 !resourceclaim.IsReservedForPod(pod, claim) {
874
875 return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
876 }
877
878 if claim.Status.Allocation != nil {
879 if claim.Status.Allocation.AvailableOnNodes != nil {
880 nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes)
881 if err != nil {
882 return nil, statusError(logger, err)
883 }
884 s.informationsForClaim[index].availableOnNode = nodeSelector
885 }
886
887
888
889 s.informationsForClaim[index].structuredParameters = slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer)
890 } else {
891
892
893
894
895 class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
896 if err != nil {
897
898 if apierrors.IsNotFound(err) {
899
900
901 return nil, statusUnschedulable(logger, fmt.Sprintf("resource class %s does not exist", claim.Spec.ResourceClassName))
902 }
903
904 return nil, statusError(logger, fmt.Errorf("look up resource class: %v", err))
905 }
906 if class.SuitableNodes != nil {
907 selector, err := nodeaffinity.NewNodeSelector(class.SuitableNodes)
908 if err != nil {
909 return nil, statusError(logger, err)
910 }
911 s.informationsForClaim[index].availableOnNode = selector
912 }
913 s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name)
914
915 if class.StructuredParameters != nil && *class.StructuredParameters {
916 s.informationsForClaim[index].structuredParameters = true
917
918
919
920
921 if _, found := pl.inFlightAllocations.Load(claim.UID); found {
922 return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim)))
923 }
924
925
926
927
928
929
930
931 classParameters, claimParameters, status := pl.lookupParameters(logger, class, claim)
932 if status != nil {
933 return nil, status
934 }
935 controller, err := newClaimController(logger, class, classParameters, claimParameters)
936 if err != nil {
937 return nil, statusError(logger, err)
938 }
939 s.informationsForClaim[index].controller = controller
940 needResourceInformation = true
941 } else if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate {
942
943 return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
944 }
945 }
946 }
947
948 if needResourceInformation {
949
950
951
952
953
954
955
956
957
958 resources, err := newResourceModel(logger, pl.resourceSliceLister, pl.claimAssumeCache, &pl.inFlightAllocations)
959 logger.V(5).Info("Resource usage", "resources", klog.Format(resources))
960 if err != nil {
961 return nil, statusError(logger, err)
962 }
963 s.resources = resources
964 }
965
966 s.claims = claims
967 return nil, nil
968 }
969
970 func (pl *dynamicResources) lookupParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters, status *framework.Status) {
971 classParameters, status = pl.lookupClassParameters(logger, class)
972 if status != nil {
973 return
974 }
975 claimParameters, status = pl.lookupClaimParameters(logger, class, claim)
976 return
977 }
978
979 func (pl *dynamicResources) lookupClassParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass) (*resourcev1alpha2.ResourceClassParameters, *framework.Status) {
980 defaultClassParameters := resourcev1alpha2.ResourceClassParameters{}
981
982 if class.ParametersRef == nil {
983 return &defaultClassParameters, nil
984 }
985
986 if class.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
987 class.ParametersRef.Kind == "ResourceClassParameters" {
988
989 parameters, err := pl.classParametersLister.ResourceClassParameters(class.ParametersRef.Namespace).Get(class.ParametersRef.Name)
990 if err != nil {
991 if apierrors.IsNotFound(err) {
992 return nil, statusUnschedulable(logger, fmt.Sprintf("class parameters %s not found", klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name)))
993 }
994 return nil, statusError(logger, fmt.Errorf("get class parameters %s: %v", klog.KRef(class.Namespace, class.ParametersRef.Name), err))
995 }
996 return parameters, nil
997 }
998
999
1000 allParameters, err := pl.classParametersLister.ResourceClassParameters(class.Namespace).List(labels.Everything())
1001 if err != nil {
1002 return nil, statusError(logger, fmt.Errorf("listing class parameters failed: %v", err))
1003 }
1004 for _, parameters := range allParameters {
1005 if parameters.GeneratedFrom == nil {
1006 continue
1007 }
1008 if parameters.GeneratedFrom.APIGroup == class.ParametersRef.APIGroup &&
1009 parameters.GeneratedFrom.Kind == class.ParametersRef.Kind &&
1010 parameters.GeneratedFrom.Name == class.ParametersRef.Name &&
1011 parameters.GeneratedFrom.Namespace == class.ParametersRef.Namespace {
1012 return parameters, nil
1013 }
1014 }
1015 return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name)))
1016 }
1017
1018 func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (*resourcev1alpha2.ResourceClaimParameters, *framework.Status) {
1019 defaultClaimParameters := resourcev1alpha2.ResourceClaimParameters{
1020 Shareable: true,
1021 DriverRequests: []resourcev1alpha2.DriverRequests{
1022 {
1023 DriverName: class.DriverName,
1024 Requests: []resourcev1alpha2.ResourceRequest{
1025 {
1026 ResourceRequestModel: resourcev1alpha2.ResourceRequestModel{
1027
1028
1029
1030
1031 NamedResources: &resourcev1alpha2.NamedResourcesRequest{
1032 Selector: "true",
1033 },
1034 },
1035 },
1036 },
1037 },
1038 },
1039 }
1040
1041 if claim.Spec.ParametersRef == nil {
1042 return &defaultClaimParameters, nil
1043 }
1044 if claim.Spec.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
1045 claim.Spec.ParametersRef.Kind == "ResourceClaimParameters" {
1046
1047 parameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).Get(claim.Spec.ParametersRef.Name)
1048 if err != nil {
1049 if apierrors.IsNotFound(err) {
1050 return nil, statusUnschedulable(logger, fmt.Sprintf("claim parameters %s not found", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
1051 }
1052 return nil, statusError(logger, fmt.Errorf("get claim parameters %s: %v", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), err))
1053 }
1054 return parameters, nil
1055 }
1056
1057
1058 allParameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).List(labels.Everything())
1059 if err != nil {
1060 return nil, statusError(logger, fmt.Errorf("listing claim parameters failed: %v", err))
1061 }
1062 for _, parameters := range allParameters {
1063 if parameters.GeneratedFrom == nil {
1064 continue
1065 }
1066 if parameters.GeneratedFrom.APIGroup == claim.Spec.ParametersRef.APIGroup &&
1067 parameters.GeneratedFrom.Kind == claim.Spec.ParametersRef.Kind &&
1068 parameters.GeneratedFrom.Name == claim.Spec.ParametersRef.Name {
1069 return parameters, nil
1070 }
1071 }
1072 return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
1073 }
1074
1075
1076 func (pl *dynamicResources) PreFilterExtensions() framework.PreFilterExtensions {
1077 return nil
1078 }
1079
1080 func getStateData(cs *framework.CycleState) (*stateData, error) {
1081 state, err := cs.Read(stateKey)
1082 if err != nil {
1083 return nil, err
1084 }
1085 s, ok := state.(*stateData)
1086 if !ok {
1087 return nil, errors.New("unable to convert state into stateData")
1088 }
1089 return s, nil
1090 }
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101 func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
1102 if !pl.enabled {
1103 return nil
1104 }
1105 state, err := getStateData(cs)
1106 if err != nil {
1107 return statusError(klog.FromContext(ctx), err)
1108 }
1109 if len(state.claims) == 0 {
1110 return nil
1111 }
1112
1113 logger := klog.FromContext(ctx)
1114 node := nodeInfo.Node()
1115
1116 var unavailableClaims []int
1117 for index, claim := range state.claims {
1118 logger.V(10).Info("filtering based on resource claims of the pod", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
1119 switch {
1120 case claim.Status.Allocation != nil:
1121 if nodeSelector := state.informationsForClaim[index].availableOnNode; nodeSelector != nil {
1122 if !nodeSelector.Match(node) {
1123 logger.V(5).Info("AvailableOnNodes does not match", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
1124 unavailableClaims = append(unavailableClaims, index)
1125 }
1126 }
1127 case claim.Status.DeallocationRequested:
1128
1129 return statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
1130 case claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer ||
1131 state.informationsForClaim[index].structuredParameters:
1132 if selector := state.informationsForClaim[index].availableOnNode; selector != nil {
1133 if matches := selector.Match(node); !matches {
1134 return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclassName", claim.Spec.ResourceClassName)
1135 }
1136 }
1137
1138 if state.informationsForClaim[index].structuredParameters {
1139 suitable, err := state.informationsForClaim[index].controller.nodeIsSuitable(ctx, node.Name, state.resources)
1140 if err != nil {
1141
1142
1143
1144
1145 return statusUnschedulable(logger, fmt.Sprintf("checking structured parameters failed: %v", err), "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
1146 }
1147 if !suitable {
1148 return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
1149 }
1150 } else {
1151 if status := state.informationsForClaim[index].status; status != nil {
1152 for _, unsuitableNode := range status.UnsuitableNodes {
1153 if node.Name == unsuitableNode {
1154 return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes)
1155 }
1156 }
1157 }
1158 }
1159 default:
1160
1161
1162
1163 return statusError(logger, fmt.Errorf("internal error, unexpected allocation mode %v", claim.Spec.AllocationMode))
1164 }
1165 }
1166
1167 if len(unavailableClaims) > 0 {
1168 state.mutex.Lock()
1169 defer state.mutex.Unlock()
1170 if state.unavailableClaims == nil {
1171 state.unavailableClaims = sets.New[int]()
1172 }
1173
1174 for _, index := range unavailableClaims {
1175 claim := state.claims[index]
1176
1177
1178
1179
1180
1181
1182
1183 if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer ||
1184 state.informationsForClaim[index].controller != nil {
1185 state.unavailableClaims.Insert(index)
1186 }
1187 }
1188 return statusUnschedulable(logger, "resourceclaim not available on the node", "pod", klog.KObj(pod))
1189 }
1190
1191 return nil
1192 }
1193
1194
1195
1196
1197
1198 func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
1199 if !pl.enabled {
1200 return nil, framework.NewStatus(framework.Unschedulable, "plugin disabled")
1201 }
1202 logger := klog.FromContext(ctx)
1203 state, err := getStateData(cs)
1204 if err != nil {
1205 return nil, statusError(logger, err)
1206 }
1207 if len(state.claims) == 0 {
1208 return nil, framework.NewStatus(framework.Unschedulable, "no new claims to deallocate")
1209 }
1210
1211
1212
1213 for index := range state.unavailableClaims {
1214 claim := state.claims[index]
1215 if len(claim.Status.ReservedFor) == 0 ||
1216 len(claim.Status.ReservedFor) == 1 && claim.Status.ReservedFor[0].UID == pod.UID {
1217
1218
1219
1220
1221 clearAllocation := state.informationsForClaim[index].controller != nil
1222
1223
1224
1225
1226
1227
1228 if !clearAllocation &&
1229 state.podSchedulingState.schedulingCtx != nil &&
1230 state.podSchedulingState.schedulingCtx.Spec.SelectedNode != "" {
1231 state.podSchedulingState.selectedNode = ptr.To("")
1232 if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
1233 return nil, statusError(logger, err)
1234 }
1235 }
1236
1237 claim := claim.DeepCopy()
1238 claim.Status.ReservedFor = nil
1239 if clearAllocation {
1240 claim.Status.Allocation = nil
1241 } else {
1242 claim.Status.DeallocationRequested = true
1243 }
1244 logger.V(5).Info("Requesting deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
1245 if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
1246 return nil, statusError(logger, err)
1247 }
1248 return nil, framework.NewStatus(framework.Unschedulable, "deallocation of ResourceClaim completed")
1249 }
1250 }
1251 return nil, framework.NewStatus(framework.Unschedulable, "still not schedulable")
1252 }
1253
1254
1255
1256
1257 func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) *framework.Status {
1258 if !pl.enabled {
1259 return nil
1260 }
1261 state, err := getStateData(cs)
1262 if err != nil {
1263 return statusError(klog.FromContext(ctx), err)
1264 }
1265 defer func() {
1266 state.preScored = true
1267 }()
1268 if len(state.claims) == 0 {
1269 return nil
1270 }
1271
1272 logger := klog.FromContext(ctx)
1273 pending := false
1274 for index, claim := range state.claims {
1275 if claim.Status.Allocation == nil &&
1276 state.informationsForClaim[index].controller == nil {
1277 pending = true
1278 break
1279 }
1280 }
1281 if !pending {
1282 logger.V(5).Info("no pending claims with control plane controller", "pod", klog.KObj(pod))
1283 return nil
1284 }
1285
1286 if haveAllPotentialNodes(state.podSchedulingState.schedulingCtx, nodes) {
1287 logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
1288 return nil
1289 }
1290
1291
1292
1293
1294
1295 logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
1296 numNodes := len(nodes)
1297 if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize {
1298 numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize
1299 }
1300 potentialNodes := make([]string, 0, numNodes)
1301 if numNodes == len(nodes) {
1302
1303 for _, node := range nodes {
1304 potentialNodes = append(potentialNodes, node.Node().Name)
1305 }
1306 } else {
1307
1308
1309
1310
1311 nodeNames := map[string]struct{}{}
1312 for _, node := range nodes {
1313 nodeNames[node.Node().Name] = struct{}{}
1314 }
1315 for nodeName := range nodeNames {
1316 if len(potentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize {
1317 break
1318 }
1319 potentialNodes = append(potentialNodes, nodeName)
1320 }
1321 }
1322 sort.Strings(potentialNodes)
1323 state.podSchedulingState.potentialNodes = &potentialNodes
1324 return nil
1325 }
1326
1327 func haveAllPotentialNodes(schedulingCtx *resourcev1alpha2.PodSchedulingContext, nodes []*framework.NodeInfo) bool {
1328 if schedulingCtx == nil {
1329 return false
1330 }
1331 for _, node := range nodes {
1332 if !haveNode(schedulingCtx.Spec.PotentialNodes, node.Node().Name) {
1333 return false
1334 }
1335 }
1336 return true
1337 }
1338
1339 func haveNode(nodeNames []string, nodeName string) bool {
1340 for _, n := range nodeNames {
1341 if n == nodeName {
1342 return true
1343 }
1344 }
1345 return false
1346 }
1347
1348
1349 func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
1350 if !pl.enabled {
1351 return nil
1352 }
1353 state, err := getStateData(cs)
1354 if err != nil {
1355 return statusError(klog.FromContext(ctx), err)
1356 }
1357 if len(state.claims) == 0 {
1358 return nil
1359 }
1360
1361 numDelayedAllocationPending := 0
1362 numClaimsWithStatusInfo := 0
1363 claimsWithBuiltinController := make([]int, 0, len(state.claims))
1364 logger := klog.FromContext(ctx)
1365 for index, claim := range state.claims {
1366 if claim.Status.Allocation != nil {
1367
1368
1369
1370
1371
1372 continue
1373 }
1374
1375
1376 if state.informationsForClaim[index].controller != nil {
1377 claimsWithBuiltinController = append(claimsWithBuiltinController, index)
1378 continue
1379 }
1380
1381
1382 numDelayedAllocationPending++
1383
1384
1385
1386 if statusForClaim(state.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil {
1387 numClaimsWithStatusInfo++
1388 }
1389 }
1390
1391 if numDelayedAllocationPending == 0 && len(claimsWithBuiltinController) == 0 {
1392
1393 return nil
1394 }
1395
1396 if !state.preScored && numDelayedAllocationPending > 0 {
1397
1398
1399
1400
1401
1402
1403 if state.podSchedulingState.schedulingCtx == nil ||
1404 !containsNode(state.podSchedulingState.schedulingCtx.Spec.PotentialNodes, nodeName) {
1405 potentialNodes := []string{nodeName}
1406 state.podSchedulingState.potentialNodes = &potentialNodes
1407 logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
1408 }
1409 }
1410
1411
1412 for _, index := range claimsWithBuiltinController {
1413 claim := state.claims[index]
1414 driverName, allocation, err := state.informationsForClaim[index].controller.allocate(ctx, nodeName, state.resources)
1415 if err != nil {
1416
1417
1418 return statusError(logger, fmt.Errorf("claim allocation failed unexpectedly: %v", err))
1419 }
1420 state.informationsForClaim[index].allocation = allocation
1421 state.informationsForClaim[index].allocationDriverName = driverName
1422 claim = claim.DeepCopy()
1423 claim.Status.DriverName = driverName
1424 claim.Status.Allocation = allocation
1425 pl.inFlightAllocations.Store(claim.UID, claim)
1426 logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", klog.Format(allocation))
1427 }
1428
1429
1430
1431
1432
1433
1434
1435
1436 if numDelayedAllocationPending == 1 && len(claimsWithBuiltinController) == 0 ||
1437 numClaimsWithStatusInfo+len(claimsWithBuiltinController) == numDelayedAllocationPending && len(claimsWithBuiltinController) < numDelayedAllocationPending {
1438
1439
1440
1441
1442
1443
1444 if state.podSchedulingState.schedulingCtx == nil ||
1445 state.podSchedulingState.schedulingCtx.Spec.SelectedNode != nodeName {
1446 state.podSchedulingState.selectedNode = &nodeName
1447 logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
1448
1449 return nil
1450 }
1451 }
1452
1453
1454 if state.podSchedulingState.isDirty() {
1455
1456 return nil
1457 }
1458
1459
1460
1461
1462 if numDelayedAllocationPending == 0 {
1463 return nil
1464 }
1465
1466
1467
1468
1469
1470
1471
1472
1473 return statusPending(logger, "waiting for resource driver to provide information", "pod", klog.KObj(pod))
1474 }
1475
1476 func containsNode(hay []string, needle string) bool {
1477 for _, node := range hay {
1478 if node == needle {
1479 return true
1480 }
1481 }
1482 return false
1483 }
1484
1485
1486
1487 func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
1488 if !pl.enabled {
1489 return
1490 }
1491 state, err := getStateData(cs)
1492 if err != nil {
1493 return
1494 }
1495 if len(state.claims) == 0 {
1496 return
1497 }
1498
1499 logger := klog.FromContext(ctx)
1500
1501
1502
1503
1504
1505
1506 if state.podSchedulingState.isDirty() {
1507 if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
1508 logger.Error(err, "publish PodSchedulingContext")
1509 }
1510 }
1511
1512 for index, claim := range state.claims {
1513
1514
1515 if state.informationsForClaim[index].controller != nil {
1516 if _, found := pl.inFlightAllocations.LoadAndDelete(state.claims[index].UID); found {
1517 pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name)
1518 }
1519 }
1520
1521 if claim.Status.Allocation != nil &&
1522 resourceclaim.IsReservedForPod(pod, claim) {
1523
1524
1525
1526 patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"$patch": "delete", "uid": %q} ] }}`,
1527 claim.UID,
1528 pod.UID,
1529 )
1530 logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim), "pod", klog.KObj(pod))
1531 claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
1532 if err != nil {
1533
1534 logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim))
1535 }
1536 }
1537 }
1538 }
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548 func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
1549 if !pl.enabled {
1550 return nil
1551 }
1552 state, err := getStateData(cs)
1553 if err != nil {
1554 return statusError(klog.FromContext(ctx), err)
1555 }
1556 if len(state.claims) == 0 {
1557 return nil
1558 }
1559
1560 logger := klog.FromContext(ctx)
1561
1562
1563
1564 if state.podSchedulingState.isDirty() {
1565 if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
1566 return statusError(logger, err)
1567 }
1568 return statusPending(logger, "waiting for resource driver", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
1569 }
1570
1571 for index, claim := range state.claims {
1572 if !resourceclaim.IsReservedForPod(pod, claim) {
1573 claim, err := pl.bindClaim(ctx, state, index, pod, nodeName)
1574 if err != nil {
1575 return statusError(logger, err)
1576 }
1577 state.claims[index] = claim
1578 }
1579 }
1580
1581
1582 return nil
1583 }
1584
1585
1586
1587
1588 func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) {
1589 logger := klog.FromContext(ctx)
1590 claim := state.claims[index]
1591 allocationPatch := ""
1592
1593 allocation := state.informationsForClaim[index].allocation
1594 logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation))
1595
1596
1597 if allocation != nil {
1598 buffer, err := json.Marshal(allocation)
1599 if err != nil {
1600 return nil, fmt.Errorf("marshaling AllocationResult failed: %v", err)
1601 }
1602 allocationPatch = fmt.Sprintf(`"driverName": %q, "allocation": %s, `, state.informationsForClaim[index].allocationDriverName, string(buffer))
1603
1604
1605
1606
1607
1608
1609 if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
1610 claim := state.claims[index].DeepCopy()
1611 claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
1612 if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
1613 return nil, fmt.Errorf("add finalizer: %v", err)
1614 }
1615 }
1616 }
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630 patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": {%s "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`,
1631 claim.UID,
1632 allocationPatch,
1633 pod.Name,
1634 pod.UID,
1635 )
1636 if loggerV := logger.V(6); loggerV.Enabled() {
1637 logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim), "patch", patch)
1638 } else {
1639 logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
1640 }
1641 claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
1642 logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim), "err", err)
1643 if allocationPatch != "" {
1644
1645
1646 if err == nil {
1647
1648
1649 if err := pl.claimAssumeCache.Assume(claim); err != nil {
1650 logger.V(5).Info("Claim not stored in assume cache", "err", err)
1651 }
1652 }
1653 pl.inFlightAllocations.Delete(claim.UID)
1654 }
1655 return claim, err
1656 }
1657
1658
1659
1660
1661
1662
1663 func (pl *dynamicResources) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
1664 if !pl.enabled {
1665 return
1666 }
1667 state, err := getStateData(cs)
1668 if err != nil {
1669 return
1670 }
1671 if len(state.claims) == 0 {
1672 return
1673 }
1674
1675
1676
1677
1678
1679 logger := klog.FromContext(ctx)
1680 err = pl.clientset.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
1681 switch {
1682 case apierrors.IsNotFound(err):
1683 logger.V(5).Info("no PodSchedulingContext object to delete")
1684 case err != nil:
1685 logger.Error(err, "delete PodSchedulingContext")
1686 default:
1687 logger.V(5).Info("PodSchedulingContext object deleted")
1688 }
1689 }
1690
1691
1692
1693 func statusUnschedulable(logger klog.Logger, reason string, kv ...interface{}) *framework.Status {
1694 if loggerV := logger.V(5); loggerV.Enabled() {
1695 helper, loggerV := loggerV.WithCallStackHelper()
1696 helper()
1697 kv = append(kv, "reason", reason)
1698
1699 loggerV.Info("pod unschedulable", kv...)
1700 }
1701 return framework.NewStatus(framework.UnschedulableAndUnresolvable, reason)
1702 }
1703
1704
1705
1706 func statusPending(logger klog.Logger, reason string, kv ...interface{}) *framework.Status {
1707 if loggerV := logger.V(5); loggerV.Enabled() {
1708 helper, loggerV := loggerV.WithCallStackHelper()
1709 helper()
1710 kv = append(kv, "reason", reason)
1711
1712 loggerV.Info("pod waiting for external component", kv...)
1713 }
1714
1715
1716 return framework.NewStatus(framework.Pending, reason)
1717 }
1718
1719
1720
1721 func statusError(logger klog.Logger, err error, kv ...interface{}) *framework.Status {
1722 if loggerV := logger.V(5); loggerV.Enabled() {
1723 helper, loggerV := loggerV.WithCallStackHelper()
1724 helper()
1725
1726 loggerV.Error(err, "dynamic resource plugin failed", kv...)
1727 }
1728 return framework.AsStatus(err)
1729 }
1730
View as plain text