1
16
17 package resourceclaim
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "slices"
24 "strings"
25 "time"
26
27 v1 "k8s.io/api/core/v1"
28 resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/types"
32 "k8s.io/apimachinery/pkg/util/runtime"
33 "k8s.io/apimachinery/pkg/util/wait"
34 corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
35 v1informers "k8s.io/client-go/informers/core/v1"
36 resourcev1alpha2informers "k8s.io/client-go/informers/resource/v1alpha2"
37 clientset "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/kubernetes/scheme"
39 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
40 v1listers "k8s.io/client-go/listers/core/v1"
41 resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
42 "k8s.io/client-go/tools/cache"
43 "k8s.io/client-go/tools/record"
44 "k8s.io/client-go/util/workqueue"
45 "k8s.io/dynamic-resource-allocation/resourceclaim"
46 "k8s.io/klog/v2"
47 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
48 "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
49 "k8s.io/utils/pointer"
50 )
51
52 const (
53
54 podResourceClaimIndex = "pod-resource-claim-index"
55
56
57
58
59
60 podResourceClaimAnnotation = "resource.kubernetes.io/pod-claim-name"
61
62
63
64 claimPodOwnerIndex = "claim-pod-owner-index"
65
66
67 fieldManager = "ResourceClaimController"
68
69 maxUIDCacheEntries = 500
70 )
71
72
73 type Controller struct {
74
75
76 kubeClient clientset.Interface
77
78
79
80
81 claimLister resourcev1alpha2listers.ResourceClaimLister
82 claimsSynced cache.InformerSynced
83 claimCache cache.MutationCache
84
85
86
87
88 podLister v1listers.PodLister
89 podSynced cache.InformerSynced
90
91
92
93
94
95 podSchedulingLister resourcev1alpha2listers.PodSchedulingContextLister
96 podSchedulingSynced cache.InformerSynced
97
98
99
100
101
102 templateLister resourcev1alpha2listers.ResourceClaimTemplateLister
103 templatesSynced cache.InformerSynced
104
105
106
107 podIndexer cache.Indexer
108
109
110 recorder record.EventRecorder
111
112 queue workqueue.RateLimitingInterface
113
114
115
116
117 deletedObjects *uidCache
118 }
119
120 const (
121 claimKeyPrefix = "claim:"
122 podKeyPrefix = "pod:"
123 )
124
125
126 func NewController(
127 logger klog.Logger,
128 kubeClient clientset.Interface,
129 podInformer v1informers.PodInformer,
130 podSchedulingInformer resourcev1alpha2informers.PodSchedulingContextInformer,
131 claimInformer resourcev1alpha2informers.ResourceClaimInformer,
132 templateInformer resourcev1alpha2informers.ResourceClaimTemplateInformer) (*Controller, error) {
133
134 ec := &Controller{
135 kubeClient: kubeClient,
136 podLister: podInformer.Lister(),
137 podIndexer: podInformer.Informer().GetIndexer(),
138 podSynced: podInformer.Informer().HasSynced,
139 podSchedulingLister: podSchedulingInformer.Lister(),
140 podSchedulingSynced: podSchedulingInformer.Informer().HasSynced,
141 claimLister: claimInformer.Lister(),
142 claimsSynced: claimInformer.Informer().HasSynced,
143 templateLister: templateInformer.Lister(),
144 templatesSynced: templateInformer.Informer().HasSynced,
145 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"),
146 deletedObjects: newUIDCache(maxUIDCacheEntries),
147 }
148
149 metrics.RegisterMetrics()
150
151 if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
152 AddFunc: func(obj interface{}) {
153 ec.enqueuePod(logger, obj, false)
154 },
155 UpdateFunc: func(old, updated interface{}) {
156 ec.enqueuePod(logger, updated, false)
157 },
158 DeleteFunc: func(obj interface{}) {
159 ec.enqueuePod(logger, obj, true)
160 },
161 }); err != nil {
162 return nil, err
163 }
164 if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
165 AddFunc: func(obj interface{}) {
166 logger.V(6).Info("new claim", "claimDump", obj)
167 ec.enqueueResourceClaim(logger, obj, false)
168 },
169 UpdateFunc: func(old, updated interface{}) {
170 logger.V(6).Info("updated claim", "claimDump", updated)
171 ec.enqueueResourceClaim(logger, updated, false)
172 },
173 DeleteFunc: func(obj interface{}) {
174 logger.V(6).Info("deleted claim", "claimDump", obj)
175 ec.enqueueResourceClaim(logger, obj, true)
176 },
177 }); err != nil {
178 return nil, err
179 }
180 if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimIndex: podResourceClaimIndexFunc}); err != nil {
181 return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
182 }
183
184
185
186
187
188
189
190
191 claimInformerCache := claimInformer.Informer().GetIndexer()
192 if err := claimInformerCache.AddIndexers(cache.Indexers{claimPodOwnerIndex: claimPodOwnerIndexFunc}); err != nil {
193 return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
194 }
195 ec.claimCache = cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache,
196
197
198 time.Hour,
199
200
201
202
203 true,
204 )
205
206 return ec, nil
207 }
208
209 func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bool) {
210 if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
211 obj = d.Obj
212 }
213 pod, ok := obj.(*v1.Pod)
214 if !ok {
215
216 logger.Error(nil, "enqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj))
217 return
218 }
219
220 if len(pod.Spec.ResourceClaims) == 0 {
221
222 return
223 }
224
225 if deleted {
226 logger.V(6).Info("pod got deleted", "pod", klog.KObj(pod))
227 ec.deletedObjects.Add(pod.UID)
228 }
229
230 logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted)
231
232
233 if needsClaims, reason := podNeedsClaims(pod, deleted); !needsClaims {
234 for _, podClaim := range pod.Spec.ResourceClaims {
235 claimName, _, err := resourceclaim.Name(pod, &podClaim)
236 switch {
237 case err != nil:
238
239
240
241 logger.V(6).Info("Nothing to do for claim during pod change", "err", err, "reason", reason)
242 case claimName != nil:
243 key := claimKeyPrefix + pod.Namespace + "/" + *claimName
244 logger.V(6).Info("Process claim", "pod", klog.KObj(pod), "key", key, "reason", reason)
245 ec.queue.Add(key)
246 default:
247
248 logger.V(6).Info("Nothing to do for skipped claim during pod change", "reason", reason)
249 }
250 }
251 }
252
253 needsWork, reason := ec.podNeedsWork(pod)
254 if needsWork {
255 logger.V(6).Info("enqueing pod", "pod", klog.KObj(pod), "reason", reason)
256 ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name)
257 return
258 }
259 logger.V(6).Info("not enqueing pod", "pod", klog.KObj(pod), "reason", reason)
260 }
261
262 func podNeedsClaims(pod *v1.Pod, deleted bool) (bool, string) {
263 if deleted {
264 return false, "pod got removed"
265 }
266 if podutil.IsPodTerminal(pod) {
267 return false, "pod has terminated"
268 }
269 if pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" {
270 return false, "pod got deleted before scheduling"
271 }
272
273 return true, "pod might run"
274 }
275
276
277
278
279 func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) {
280 if pod.DeletionTimestamp != nil {
281
282 return false, "pod is deleted"
283 }
284
285 for _, podClaim := range pod.Spec.ResourceClaims {
286 claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim)
287 if err != nil {
288 return true, err.Error()
289 }
290
291
292 if claimName == nil {
293 return false, "claim is not needed"
294 }
295 claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
296 if apierrors.IsNotFound(err) {
297 if podClaim.Source.ResourceClaimTemplateName != nil {
298 return true, "must create ResourceClaim from template"
299 }
300
301 return false, "claim is missing and must be created by user"
302 }
303 if err != nil {
304
305 return true, fmt.Sprintf("internal error while checking for claim: %v", err)
306 }
307
308 if checkOwner &&
309 resourceclaim.IsForPod(pod, claim) != nil {
310
311 return false, "conflicting claim needs to be removed by user"
312 }
313
314
315
316
317 if pod.Spec.NodeName == "" {
318 continue
319 }
320
321
322
323
324
325
326
327
328 if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
329 claim.Status.Allocation == nil {
330 scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
331 if apierrors.IsNotFound(err) {
332 return true, "need to create PodSchedulingContext for scheduled pod"
333 }
334 if err != nil {
335
336 return true, fmt.Sprintf("internal error while checking for PodSchedulingContext: %v", err)
337 }
338 if scheduling.Spec.SelectedNode != pod.Spec.NodeName {
339
340 return true, "need to updated PodSchedulingContext for scheduled pod"
341 }
342 }
343 if claim.Status.Allocation != nil &&
344 !resourceclaim.IsReservedForPod(pod, claim) &&
345 resourceclaim.CanBeReserved(claim) {
346
347 return true, "need to reserve claim for pod"
348 }
349 }
350
351 return false, "nothing to do"
352 }
353
354 func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) {
355 if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
356 obj = d.Obj
357 }
358 claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
359 if !ok {
360 return
361 }
362
363 if !deleted {
364
365
366
367 key := claimKeyPrefix + claim.Namespace + "/" + claim.Name
368 logger.V(6).Info("enqueing new or updated claim", "claim", klog.KObj(claim), "key", key)
369 ec.queue.Add(key)
370 } else {
371 logger.V(6).Info("not enqueing deleted claim", "claim", klog.KObj(claim))
372 }
373
374
375
376 objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name))
377 if err != nil {
378 logger.Error(err, "listing pods from cache")
379 return
380 }
381 if len(objs) == 0 {
382 logger.V(6).Info("claim got deleted while not needed by any pod, nothing to do", "claim", klog.KObj(claim))
383 return
384 }
385 for _, obj := range objs {
386 ec.enqueuePod(logger, obj, false)
387 }
388 }
389
390 func (ec *Controller) Run(ctx context.Context, workers int) {
391 defer runtime.HandleCrash()
392 defer ec.queue.ShutDown()
393
394 logger := klog.FromContext(ctx)
395 logger.Info("Starting ephemeral volume controller")
396 defer logger.Info("Shutting down ephemeral volume controller")
397
398 eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
399 eventBroadcaster.StartLogging(klog.Infof)
400 eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ec.kubeClient.CoreV1().Events("")})
401 ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"})
402 defer eventBroadcaster.Shutdown()
403
404 if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) {
405 return
406 }
407
408 for i := 0; i < workers; i++ {
409 go wait.UntilWithContext(ctx, ec.runWorker, time.Second)
410 }
411
412 <-ctx.Done()
413 }
414
415 func (ec *Controller) runWorker(ctx context.Context) {
416 for ec.processNextWorkItem(ctx) {
417 }
418 }
419
420 func (ec *Controller) processNextWorkItem(ctx context.Context) bool {
421 key, shutdown := ec.queue.Get()
422 if shutdown {
423 return false
424 }
425 defer ec.queue.Done(key)
426
427 err := ec.syncHandler(ctx, key.(string))
428 if err == nil {
429 ec.queue.Forget(key)
430 return true
431 }
432
433 runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
434 ec.queue.AddRateLimited(key)
435
436 return true
437 }
438
439
440
441 func (ec *Controller) syncHandler(ctx context.Context, key string) error {
442 sep := strings.Index(key, ":")
443 if sep < 0 {
444 return fmt.Errorf("unexpected key: %s", key)
445 }
446 prefix, object := key[0:sep+1], key[sep+1:]
447 namespace, name, err := cache.SplitMetaNamespaceKey(object)
448 if err != nil {
449 return err
450 }
451
452 switch prefix {
453 case podKeyPrefix:
454 return ec.syncPod(ctx, namespace, name)
455 case claimKeyPrefix:
456 return ec.syncClaim(ctx, namespace, name)
457 default:
458 return fmt.Errorf("unexpected key prefix: %s", prefix)
459 }
460
461 }
462
463 func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error {
464 logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KRef(namespace, name))
465 ctx = klog.NewContext(ctx, logger)
466 pod, err := ec.podLister.Pods(namespace).Get(name)
467 if err != nil {
468 if apierrors.IsNotFound(err) {
469 logger.V(5).Info("nothing to do for pod, it is gone")
470 return nil
471 }
472 return err
473 }
474
475
476 if pod.DeletionTimestamp != nil {
477 logger.V(5).Info("nothing to do for pod, it is marked for deletion")
478 return nil
479 }
480
481 var newPodClaims map[string]string
482 for _, podClaim := range pod.Spec.ResourceClaims {
483 if err := ec.handleClaim(ctx, pod, podClaim, &newPodClaims); err != nil {
484 if ec.recorder != nil {
485 ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err))
486 }
487 return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err)
488 }
489 }
490
491 if newPodClaims != nil {
492
493
494 statuses := make([]*corev1apply.PodResourceClaimStatusApplyConfiguration, 0, len(newPodClaims))
495 for podClaimName, resourceClaimName := range newPodClaims {
496 statuses = append(statuses, corev1apply.PodResourceClaimStatus().WithName(podClaimName).WithResourceClaimName(resourceClaimName))
497 }
498 podApply := corev1apply.Pod(name, namespace).WithStatus(corev1apply.PodStatus().WithResourceClaimStatuses(statuses...))
499 if _, err := ec.kubeClient.CoreV1().Pods(namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
500 return fmt.Errorf("update pod %s/%s ResourceClaimStatuses: %v", namespace, name, err)
501 }
502 }
503
504 if pod.Spec.NodeName == "" {
505
506 logger.V(5).Info("nothing to do for pod, scheduler will deal with it")
507 return nil
508 }
509
510 for _, podClaim := range pod.Spec.ResourceClaims {
511 claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim)
512 if err != nil {
513 return err
514 }
515
516
517 if claimName == nil {
518 continue
519 }
520 claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
521 if apierrors.IsNotFound(err) {
522 return nil
523 }
524 if err != nil {
525 return fmt.Errorf("retrieve claim: %v", err)
526 }
527 if checkOwner {
528 if err := resourceclaim.IsForPod(pod, claim); err != nil {
529 return err
530 }
531 }
532 if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
533 claim.Status.Allocation == nil {
534 logger.V(5).Info("create PodSchedulingContext because claim needs to be allocated", "resourceClaim", klog.KObj(claim))
535 return ec.ensurePodSchedulingContext(ctx, pod)
536 }
537 if claim.Status.Allocation != nil &&
538 !resourceclaim.IsReservedForPod(pod, claim) &&
539 resourceclaim.CanBeReserved(claim) {
540 logger.V(5).Info("reserve claim for pod", "resourceClaim", klog.KObj(claim))
541 if err := ec.reserveForPod(ctx, pod, claim); err != nil {
542 return err
543 }
544 }
545 }
546
547 return nil
548 }
549
550
551 func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim, newPodClaims *map[string]string) error {
552 logger := klog.LoggerWithValues(klog.FromContext(ctx), "podClaim", podClaim.Name)
553 ctx = klog.NewContext(ctx, logger)
554 logger.V(5).Info("checking", "podClaim", podClaim.Name)
555
556
557
558
559
560
561 claimName, mustCheckOwner, err := resourceclaim.Name(pod, &podClaim)
562 switch {
563 case errors.Is(err, resourceclaim.ErrClaimNotFound):
564
565 case err != nil:
566 return fmt.Errorf("checking for claim before creating it: %v", err)
567 case claimName == nil:
568
569 return nil
570 case *claimName != "":
571 claimName := *claimName
572
573
574 claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(claimName)
575 if err != nil && !apierrors.IsNotFound(err) {
576 return err
577 }
578 if claim != nil {
579 var err error
580 if mustCheckOwner {
581 err = resourceclaim.IsForPod(pod, claim)
582 }
583 if err == nil {
584
585 logger.V(5).Info("claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName)
586 return nil
587 }
588 logger.Error(err, "claim that was created for the pod is no longer owned by the pod, creating a new one", "podClaim", podClaim.Name, "resourceClaim", claimName)
589 }
590 }
591
592 templateName := podClaim.Source.ResourceClaimTemplateName
593 if templateName == nil {
594
595 return nil
596 }
597
598
599
600
601 claim, err := ec.findPodResourceClaim(pod, podClaim)
602 if err != nil {
603 return fmt.Errorf("finding ResourceClaim for claim %s in pod %s/%s failed: %v", podClaim.Name, pod.Namespace, pod.Name, err)
604 }
605
606 if claim == nil {
607 template, err := ec.templateLister.ResourceClaimTemplates(pod.Namespace).Get(*templateName)
608 if err != nil {
609 return fmt.Errorf("resource claim template %q: %v", *templateName, err)
610 }
611
612
613
614 isTrue := true
615 annotations := template.Spec.ObjectMeta.Annotations
616 if annotations == nil {
617 annotations = make(map[string]string)
618 }
619 annotations[podResourceClaimAnnotation] = podClaim.Name
620 generateName := pod.Name + "-" + podClaim.Name + "-"
621 maxBaseLen := 57
622 if len(generateName) > maxBaseLen {
623
624
625
626
627
628 generateName = pod.Name[0:len(pod.Name)*maxBaseLen/len(generateName)] +
629 "-" +
630 podClaim.Name[0:len(podClaim.Name)*maxBaseLen/len(generateName)]
631 }
632 claim = &resourcev1alpha2.ResourceClaim{
633 ObjectMeta: metav1.ObjectMeta{
634 GenerateName: generateName,
635 OwnerReferences: []metav1.OwnerReference{
636 {
637 APIVersion: "v1",
638 Kind: "Pod",
639 Name: pod.Name,
640 UID: pod.UID,
641 Controller: &isTrue,
642 BlockOwnerDeletion: &isTrue,
643 },
644 },
645 Annotations: annotations,
646 Labels: template.Spec.ObjectMeta.Labels,
647 },
648 Spec: template.Spec.Spec,
649 }
650 metrics.ResourceClaimCreateAttempts.Inc()
651 claimName := claim.Name
652 claim, err = ec.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Create(ctx, claim, metav1.CreateOptions{})
653 if err != nil {
654 metrics.ResourceClaimCreateFailures.Inc()
655 return fmt.Errorf("create ResourceClaim %s: %v", claimName, err)
656 }
657 ec.claimCache.Mutation(claim)
658 }
659
660
661 if *newPodClaims == nil {
662 *newPodClaims = make(map[string]string)
663 }
664 (*newPodClaims)[podClaim.Name] = claim.Name
665
666 return nil
667 }
668
669
670
671
672 func (ec *Controller) findPodResourceClaim(pod *v1.Pod, podClaim v1.PodResourceClaim) (*resourcev1alpha2.ResourceClaim, error) {
673
674 claims, err := ec.claimCache.ByIndex(claimPodOwnerIndex, string(pod.UID))
675 if err != nil {
676 return nil, err
677 }
678 deterministicName := pod.Name + "-" + podClaim.Name
679 for _, claimObj := range claims {
680 claim, ok := claimObj.(*resourcev1alpha2.ResourceClaim)
681 if !ok {
682 return nil, fmt.Errorf("unexpected object of type %T returned by claim cache", claimObj)
683 }
684 podClaimName, ok := claim.Annotations[podResourceClaimAnnotation]
685 if ok && podClaimName != podClaim.Name {
686 continue
687 }
688
689
690
691
692
693
694 if !ok && claim.Name != deterministicName {
695 continue
696 }
697
698
699
700
701 return claim, nil
702 }
703 return nil, nil
704 }
705
706 func (ec *Controller) ensurePodSchedulingContext(ctx context.Context, pod *v1.Pod) error {
707 scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
708 if err != nil && !apierrors.IsNotFound(err) {
709 return fmt.Errorf("retrieve PodSchedulingContext: %v", err)
710 }
711 if scheduling == nil {
712 scheduling = &resourcev1alpha2.PodSchedulingContext{
713 ObjectMeta: metav1.ObjectMeta{
714 Name: pod.Name,
715 Namespace: pod.Namespace,
716 OwnerReferences: []metav1.OwnerReference{
717 {
718 APIVersion: "v1",
719 Kind: "Pod",
720 Name: pod.Name,
721 UID: pod.UID,
722 Controller: pointer.Bool(true),
723 },
724 },
725 },
726 Spec: resourcev1alpha2.PodSchedulingContextSpec{
727 SelectedNode: pod.Spec.NodeName,
728
729
730
731 },
732 }
733 if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Create(ctx, scheduling, metav1.CreateOptions{}); err != nil {
734 return fmt.Errorf("create PodSchedulingContext: %v", err)
735 }
736 return nil
737 }
738
739 if scheduling.Spec.SelectedNode != pod.Spec.NodeName {
740 scheduling := scheduling.DeepCopy()
741 scheduling.Spec.SelectedNode = pod.Spec.NodeName
742 if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Update(ctx, scheduling, metav1.UpdateOptions{}); err != nil {
743 return fmt.Errorf("update spec.selectedNode in PodSchedulingContext: %v", err)
744 }
745 }
746
747 return nil
748 }
749
750 func (ec *Controller) reserveForPod(ctx context.Context, pod *v1.Pod, claim *resourcev1alpha2.ResourceClaim) error {
751 claim = claim.DeepCopy()
752 claim.Status.ReservedFor = append(claim.Status.ReservedFor,
753 resourcev1alpha2.ResourceClaimConsumerReference{
754 Resource: "pods",
755 Name: pod.Name,
756 UID: pod.UID,
757 })
758 if _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
759 return fmt.Errorf("reserve claim for pod: %v", err)
760 }
761 return nil
762 }
763
764 func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error {
765 logger := klog.LoggerWithValues(klog.FromContext(ctx), "claim", klog.KRef(namespace, name))
766 ctx = klog.NewContext(ctx, logger)
767 claim, err := ec.claimLister.ResourceClaims(namespace).Get(name)
768 if err != nil {
769 if apierrors.IsNotFound(err) {
770 logger.V(5).Info("nothing to do for claim, it is gone")
771 return nil
772 }
773 return err
774 }
775
776
777 valid := make([]resourcev1alpha2.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor))
778 for _, reservedFor := range claim.Status.ReservedFor {
779 if reservedFor.APIGroup == "" &&
780 reservedFor.Resource == "pods" {
781
782
783
784
785
786 keepEntry := true
787
788
789
790
791
792
793
794 if ec.deletedObjects.Has(reservedFor.UID) {
795
796
797 keepEntry = false
798 } else {
799 pod, err := ec.podLister.Pods(claim.Namespace).Get(reservedFor.Name)
800 switch {
801 case err != nil && !apierrors.IsNotFound(err):
802 return err
803 case err != nil:
804
805
806
807
808
809 pod, err := ec.kubeClient.CoreV1().Pods(claim.Namespace).Get(ctx, reservedFor.Name, metav1.GetOptions{})
810 if err != nil && !apierrors.IsNotFound(err) {
811 return err
812 }
813 if pod == nil || pod.UID != reservedFor.UID {
814 logger.V(6).Info("remove reservation because pod is gone or got replaced", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
815 keepEntry = false
816 }
817 case pod.UID != reservedFor.UID:
818 logger.V(6).Info("remove reservation because pod got replaced with new instance", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
819 keepEntry = false
820 case isPodDone(pod):
821 logger.V(6).Info("remove reservation because pod will not run anymore", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
822 keepEntry = false
823 }
824 }
825
826 if keepEntry {
827 valid = append(valid, reservedFor)
828 }
829 continue
830 }
831
832
833 return fmt.Errorf("unsupported ReservedFor entry: %v", reservedFor)
834 }
835
836 builtinControllerFinalizer := slices.Index(claim.Finalizers, resourcev1alpha2.Finalizer)
837 logger.V(5).Info("claim reserved for counts", "currentCount", len(claim.Status.ReservedFor), "claim", klog.KRef(namespace, name), "updatedCount", len(valid), "builtinController", builtinControllerFinalizer >= 0)
838 if len(valid) < len(claim.Status.ReservedFor) {
839
840
841 claim := claim.DeepCopy()
842 claim.Status.ReservedFor = valid
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860 if len(valid) == 0 {
861 if builtinControllerFinalizer >= 0 {
862 if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer ||
863 claim.DeletionTimestamp != nil {
864
865
866 claim.Status.Allocation = nil
867 }
868 } else if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer {
869
870
871 claim.Status.DeallocationRequested = true
872 }
873
874
875 }
876
877 claim, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
878 if err != nil {
879 return err
880 }
881
882
883
884 builtinControllerFinalizer := slices.Index(claim.Finalizers, resourcev1alpha2.Finalizer)
885 if builtinControllerFinalizer >= 0 && claim.Status.Allocation == nil {
886 claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1)
887 if _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
888 return err
889 }
890 }
891 } else if builtinControllerFinalizer >= 0 && claim.DeletionTimestamp != nil && len(valid) == 0 {
892 claim := claim.DeepCopy()
893 if claim.Status.Allocation != nil {
894
895
896
897 claim.Status.Allocation = nil
898 var err error
899 claim, err = ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
900 if err != nil {
901 return err
902 }
903 }
904
905 claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1)
906 _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
907 if err != nil {
908 return err
909 }
910 }
911
912 if len(valid) == 0 {
913
914
915
916
917 podName, podUID := owningPod(claim)
918 if podName != "" {
919 pod, err := ec.podLister.Pods(claim.Namespace).Get(podName)
920 switch {
921 case err == nil:
922
923 if pod.UID != podUID || isPodDone(pod) {
924
925
926 logger.V(5).Info("deleting unused generated claim", "claim", klog.KObj(claim), "pod", klog.KObj(pod))
927 err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})
928 if err != nil {
929 return fmt.Errorf("delete claim: %v", err)
930 }
931 } else {
932 logger.V(6).Info("wrong pod content, not deleting claim", "claim", klog.KObj(claim), "podUID", podUID, "podContent", pod)
933 }
934 case apierrors.IsNotFound(err):
935
936
937 logger.V(5).Info("pod for claim not found", "claim", klog.KObj(claim), "pod", klog.KRef(claim.Namespace, podName))
938 default:
939 return fmt.Errorf("lookup pod: %v", err)
940 }
941 } else {
942 logger.V(5).Info("claim not generated for a pod", "claim", klog.KObj(claim))
943 }
944 }
945
946 return nil
947 }
948
949 func owningPod(claim *resourcev1alpha2.ResourceClaim) (string, types.UID) {
950 for _, owner := range claim.OwnerReferences {
951 if pointer.BoolDeref(owner.Controller, false) &&
952 owner.APIVersion == "v1" &&
953 owner.Kind == "Pod" {
954 return owner.Name, owner.UID
955 }
956 }
957 return "", ""
958 }
959
960
961
962 func podResourceClaimIndexFunc(obj interface{}) ([]string, error) {
963 pod, ok := obj.(*v1.Pod)
964 if !ok {
965 return []string{}, nil
966 }
967 keys := []string{}
968 for _, podClaim := range pod.Spec.ResourceClaims {
969 claimName, _, err := resourceclaim.Name(pod, &podClaim)
970 if err != nil || claimName == nil {
971
972
973
974 continue
975 }
976 keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, *claimName))
977 }
978 return keys, nil
979 }
980
981
982 func isPodDone(pod *v1.Pod) bool {
983 return podutil.IsPodPhaseTerminal(pod.Status.Phase) ||
984
985 pod.DeletionTimestamp != nil && pod.Spec.NodeName == ""
986 }
987
988
989
990 func claimPodOwnerIndexFunc(obj interface{}) ([]string, error) {
991 claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
992 if !ok {
993 return nil, nil
994 }
995 var keys []string
996 for _, owner := range claim.OwnerReferences {
997 if owner.Controller != nil &&
998 *owner.Controller &&
999 owner.APIVersion == "v1" &&
1000 owner.Kind == "Pod" {
1001 keys = append(keys, string(owner.UID))
1002 }
1003 }
1004 return keys, nil
1005 }
1006
View as plain text