/* Copyright 2020 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package resourceclaim import ( "context" "errors" "fmt" "slices" "strings" "time" v1 "k8s.io/api/core/v1" resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" corev1apply "k8s.io/client-go/applyconfigurations/core/v1" v1informers "k8s.io/client-go/informers/core/v1" resourcev1alpha2informers "k8s.io/client-go/informers/resource/v1alpha2" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1listers "k8s.io/client-go/listers/core/v1" resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/dynamic-resource-allocation/resourceclaim" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/resourceclaim/metrics" "k8s.io/utils/pointer" ) const ( // podResourceClaimIndex is the lookup name for the index function which indexes by pod ResourceClaim templates. podResourceClaimIndex = "pod-resource-claim-index" // podResourceClaimAnnotation is the special annotation that generated // ResourceClaims get. Its value is the pod.spec.resourceClaims[].name // for which it was generated. This is used only inside the controller // and not documented as part of the Kubernetes API. podResourceClaimAnnotation = "resource.kubernetes.io/pod-claim-name" // claimPodOwnerIndex is used to find ResourceClaims which have // a specific pod as owner. Values for this index are the pod UID. claimPodOwnerIndex = "claim-pod-owner-index" // Field manager used to update the pod status. fieldManager = "ResourceClaimController" maxUIDCacheEntries = 500 ) // Controller creates ResourceClaims for ResourceClaimTemplates in a pod spec. type Controller struct { // kubeClient is the kube API client used to communicate with the API // server. kubeClient clientset.Interface // claimLister is the shared ResourceClaim lister used to fetch and store ResourceClaim // objects from the API server. It is shared with other controllers and // therefore the ResourceClaim objects in its store should be treated as immutable. claimLister resourcev1alpha2listers.ResourceClaimLister claimsSynced cache.InformerSynced claimCache cache.MutationCache // podLister is the shared Pod lister used to fetch Pod // objects from the API server. It is shared with other controllers and // therefore the Pod objects in its store should be treated as immutable. podLister v1listers.PodLister podSynced cache.InformerSynced // podSchedulingList is the shared PodSchedulingContext lister used to // fetch scheduling objects from the API server. It is shared with other // controllers and therefore the objects in its store should be treated // as immutable. podSchedulingLister resourcev1alpha2listers.PodSchedulingContextLister podSchedulingSynced cache.InformerSynced // templateLister is the shared ResourceClaimTemplate lister used to // fetch template objects from the API server. It is shared with other // controllers and therefore the objects in its store should be treated // as immutable. templateLister resourcev1alpha2listers.ResourceClaimTemplateLister templatesSynced cache.InformerSynced // podIndexer has the common PodResourceClaim indexer indexer installed To // limit iteration over pods to those of interest. podIndexer cache.Indexer // recorder is used to record events in the API server recorder record.EventRecorder queue workqueue.RateLimitingInterface // The deletedObjects cache keeps track of Pods for which we know that // they have existed and have been removed. For those we can be sure // that a ReservedFor entry needs to be removed. deletedObjects *uidCache } const ( claimKeyPrefix = "claim:" podKeyPrefix = "pod:" ) // NewController creates a ResourceClaim controller. func NewController( logger klog.Logger, kubeClient clientset.Interface, podInformer v1informers.PodInformer, podSchedulingInformer resourcev1alpha2informers.PodSchedulingContextInformer, claimInformer resourcev1alpha2informers.ResourceClaimInformer, templateInformer resourcev1alpha2informers.ResourceClaimTemplateInformer) (*Controller, error) { ec := &Controller{ kubeClient: kubeClient, podLister: podInformer.Lister(), podIndexer: podInformer.Informer().GetIndexer(), podSynced: podInformer.Informer().HasSynced, podSchedulingLister: podSchedulingInformer.Lister(), podSchedulingSynced: podSchedulingInformer.Informer().HasSynced, claimLister: claimInformer.Lister(), claimsSynced: claimInformer.Informer().HasSynced, templateLister: templateInformer.Lister(), templatesSynced: templateInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"), deletedObjects: newUIDCache(maxUIDCacheEntries), } metrics.RegisterMetrics() if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ec.enqueuePod(logger, obj, false) }, UpdateFunc: func(old, updated interface{}) { ec.enqueuePod(logger, updated, false) }, DeleteFunc: func(obj interface{}) { ec.enqueuePod(logger, obj, true) }, }); err != nil { return nil, err } if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { logger.V(6).Info("new claim", "claimDump", obj) ec.enqueueResourceClaim(logger, obj, false) }, UpdateFunc: func(old, updated interface{}) { logger.V(6).Info("updated claim", "claimDump", updated) ec.enqueueResourceClaim(logger, updated, false) }, DeleteFunc: func(obj interface{}) { logger.V(6).Info("deleted claim", "claimDump", obj) ec.enqueueResourceClaim(logger, obj, true) }, }); err != nil { return nil, err } if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimIndex: podResourceClaimIndexFunc}); err != nil { return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err) } // The mutation cache acts as an additional layer for the informer // cache and after a create made by the controller returns that // object until the informer catches up. That is necessary // when a ResourceClaim got created, updating the pod status fails, // and then a retry occurs before the informer cache is updated. // In that scenario, the controller would create another claim // instead of continuing with the existing one. claimInformerCache := claimInformer.Informer().GetIndexer() if err := claimInformerCache.AddIndexers(cache.Indexers{claimPodOwnerIndex: claimPodOwnerIndexFunc}); err != nil { return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err) } ec.claimCache = cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache, // Very long time to live, unlikely to be needed because // the informer cache should get updated soon. time.Hour, // Allow storing objects not in the underlying cache - that's the point... // It's safe because in case of a race (claim is in mutation cache, claim // gets deleted, controller updates status based on mutation cache) the // "bad" pod status will get detected and fixed when the informer catches up. true, ) return ec, nil } func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bool) { if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { obj = d.Obj } pod, ok := obj.(*v1.Pod) if !ok { // Not a pod?! logger.Error(nil, "enqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj)) return } if len(pod.Spec.ResourceClaims) == 0 { // Nothing to do for it at all. return } if deleted { logger.V(6).Info("pod got deleted", "pod", klog.KObj(pod)) ec.deletedObjects.Add(pod.UID) } logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted) // Release reservations of a deleted or completed pod? if needsClaims, reason := podNeedsClaims(pod, deleted); !needsClaims { for _, podClaim := range pod.Spec.ResourceClaims { claimName, _, err := resourceclaim.Name(pod, &podClaim) switch { case err != nil: // Either the claim was not created (nothing to do here) or // the API changed. The later will also get reported elsewhere, // so here it's just a debug message. logger.V(6).Info("Nothing to do for claim during pod change", "err", err, "reason", reason) case claimName != nil: key := claimKeyPrefix + pod.Namespace + "/" + *claimName logger.V(6).Info("Process claim", "pod", klog.KObj(pod), "key", key, "reason", reason) ec.queue.Add(key) default: // Nothing to do, claim wasn't generated. logger.V(6).Info("Nothing to do for skipped claim during pod change", "reason", reason) } } } needsWork, reason := ec.podNeedsWork(pod) if needsWork { logger.V(6).Info("enqueing pod", "pod", klog.KObj(pod), "reason", reason) ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name) return } logger.V(6).Info("not enqueing pod", "pod", klog.KObj(pod), "reason", reason) } func podNeedsClaims(pod *v1.Pod, deleted bool) (bool, string) { if deleted { return false, "pod got removed" } if podutil.IsPodTerminal(pod) { return false, "pod has terminated" } if pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" { return false, "pod got deleted before scheduling" } // Still needs claims. return true, "pod might run" } // podNeedsWork checks whether a new or modified pod needs to be processed // further by a worker. It returns a boolean with the result and an explanation // for it. func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) { if pod.DeletionTimestamp != nil { // Nothing else to do for the pod. return false, "pod is deleted" } for _, podClaim := range pod.Spec.ResourceClaims { claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim) if err != nil { return true, err.Error() } // If the claimName is nil, then it has been determined before // that the claim is not needed. if claimName == nil { return false, "claim is not needed" } claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) if apierrors.IsNotFound(err) { if podClaim.Source.ResourceClaimTemplateName != nil { return true, "must create ResourceClaim from template" } // User needs to create claim. return false, "claim is missing and must be created by user" } if err != nil { // Shouldn't happen. return true, fmt.Sprintf("internal error while checking for claim: %v", err) } if checkOwner && resourceclaim.IsForPod(pod, claim) != nil { // Cannot proceed with the pod unless that other claim gets deleted. return false, "conflicting claim needs to be removed by user" } // This check skips over the reasons below that only apply // when a pod has been scheduled already. We need to keep checking // for more claims that might need to be created. if pod.Spec.NodeName == "" { continue } // Create PodSchedulingContext if the pod got scheduled without triggering // delayed allocation. // // These can happen when: // - a user created a pod with spec.nodeName set, perhaps for testing // - some scheduler was used which is unaware of DRA // - DRA was not enabled in kube-scheduler (version skew, configuration) if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer && claim.Status.Allocation == nil { scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name) if apierrors.IsNotFound(err) { return true, "need to create PodSchedulingContext for scheduled pod" } if err != nil { // Shouldn't happen. return true, fmt.Sprintf("internal error while checking for PodSchedulingContext: %v", err) } if scheduling.Spec.SelectedNode != pod.Spec.NodeName { // Need to update PodSchedulingContext. return true, "need to updated PodSchedulingContext for scheduled pod" } } if claim.Status.Allocation != nil && !resourceclaim.IsReservedForPod(pod, claim) && resourceclaim.CanBeReserved(claim) { // Need to reserve it. return true, "need to reserve claim for pod" } } return false, "nothing to do" } func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) { if d, ok := obj.(cache.DeletedFinalStateUnknown); ok { obj = d.Obj } claim, ok := obj.(*resourcev1alpha2.ResourceClaim) if !ok { return } if !deleted { // When starting up, we have to check all claims to find those with // stale pods in ReservedFor. During an update, a pod might get added // that already no longer exists. key := claimKeyPrefix + claim.Namespace + "/" + claim.Name logger.V(6).Info("enqueing new or updated claim", "claim", klog.KObj(claim), "key", key) ec.queue.Add(key) } else { logger.V(6).Info("not enqueing deleted claim", "claim", klog.KObj(claim)) } // Also check whether this causes work for any of the currently // known pods which use the ResourceClaim. objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)) if err != nil { logger.Error(err, "listing pods from cache") return } if len(objs) == 0 { logger.V(6).Info("claim got deleted while not needed by any pod, nothing to do", "claim", klog.KObj(claim)) return } for _, obj := range objs { ec.enqueuePod(logger, obj, false) } } func (ec *Controller) Run(ctx context.Context, workers int) { defer runtime.HandleCrash() defer ec.queue.ShutDown() logger := klog.FromContext(ctx) logger.Info("Starting ephemeral volume controller") defer logger.Info("Shutting down ephemeral volume controller") eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ec.kubeClient.CoreV1().Events("")}) ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"}) defer eventBroadcaster.Shutdown() if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) { return } for i := 0; i < workers; i++ { go wait.UntilWithContext(ctx, ec.runWorker, time.Second) } <-ctx.Done() } func (ec *Controller) runWorker(ctx context.Context) { for ec.processNextWorkItem(ctx) { } } func (ec *Controller) processNextWorkItem(ctx context.Context) bool { key, shutdown := ec.queue.Get() if shutdown { return false } defer ec.queue.Done(key) err := ec.syncHandler(ctx, key.(string)) if err == nil { ec.queue.Forget(key) return true } runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err)) ec.queue.AddRateLimited(key) return true } // syncHandler is invoked for each work item which might need to be processed. // If an error is returned from this function, the item will be requeued. func (ec *Controller) syncHandler(ctx context.Context, key string) error { sep := strings.Index(key, ":") if sep < 0 { return fmt.Errorf("unexpected key: %s", key) } prefix, object := key[0:sep+1], key[sep+1:] namespace, name, err := cache.SplitMetaNamespaceKey(object) if err != nil { return err } switch prefix { case podKeyPrefix: return ec.syncPod(ctx, namespace, name) case claimKeyPrefix: return ec.syncClaim(ctx, namespace, name) default: return fmt.Errorf("unexpected key prefix: %s", prefix) } } func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error { logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KRef(namespace, name)) ctx = klog.NewContext(ctx, logger) pod, err := ec.podLister.Pods(namespace).Get(name) if err != nil { if apierrors.IsNotFound(err) { logger.V(5).Info("nothing to do for pod, it is gone") return nil } return err } // Ignore pods which are already getting deleted. if pod.DeletionTimestamp != nil { logger.V(5).Info("nothing to do for pod, it is marked for deletion") return nil } var newPodClaims map[string]string for _, podClaim := range pod.Spec.ResourceClaims { if err := ec.handleClaim(ctx, pod, podClaim, &newPodClaims); err != nil { if ec.recorder != nil { ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err)) } return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err) } } if newPodClaims != nil { // Patch the pod status with the new information about // generated ResourceClaims. statuses := make([]*corev1apply.PodResourceClaimStatusApplyConfiguration, 0, len(newPodClaims)) for podClaimName, resourceClaimName := range newPodClaims { statuses = append(statuses, corev1apply.PodResourceClaimStatus().WithName(podClaimName).WithResourceClaimName(resourceClaimName)) } podApply := corev1apply.Pod(name, namespace).WithStatus(corev1apply.PodStatus().WithResourceClaimStatuses(statuses...)) if _, err := ec.kubeClient.CoreV1().Pods(namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { return fmt.Errorf("update pod %s/%s ResourceClaimStatuses: %v", namespace, name, err) } } if pod.Spec.NodeName == "" { // Scheduler will handle PodSchedulingContext and reservations. logger.V(5).Info("nothing to do for pod, scheduler will deal with it") return nil } for _, podClaim := range pod.Spec.ResourceClaims { claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim) if err != nil { return err } // If nil, then it has been determined that the claim is not needed // and can be skipped. if claimName == nil { continue } claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName) if apierrors.IsNotFound(err) { return nil } if err != nil { return fmt.Errorf("retrieve claim: %v", err) } if checkOwner { if err := resourceclaim.IsForPod(pod, claim); err != nil { return err } } if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer && claim.Status.Allocation == nil { logger.V(5).Info("create PodSchedulingContext because claim needs to be allocated", "resourceClaim", klog.KObj(claim)) return ec.ensurePodSchedulingContext(ctx, pod) } if claim.Status.Allocation != nil && !resourceclaim.IsReservedForPod(pod, claim) && resourceclaim.CanBeReserved(claim) { logger.V(5).Info("reserve claim for pod", "resourceClaim", klog.KObj(claim)) if err := ec.reserveForPod(ctx, pod, claim); err != nil { return err } } } return nil } // handleResourceClaim is invoked for each volume of a pod. func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim, newPodClaims *map[string]string) error { logger := klog.LoggerWithValues(klog.FromContext(ctx), "podClaim", podClaim.Name) ctx = klog.NewContext(ctx, logger) logger.V(5).Info("checking", "podClaim", podClaim.Name) // resourceclaim.Name checks for the situation that the client doesn't // know some future addition to the API. Therefore it gets called here // even if there is no template to work on, because if some new field // gets added, the expectation might be that the controller does // something for it. claimName, mustCheckOwner, err := resourceclaim.Name(pod, &podClaim) switch { case errors.Is(err, resourceclaim.ErrClaimNotFound): // Continue below. case err != nil: return fmt.Errorf("checking for claim before creating it: %v", err) case claimName == nil: // Nothing to do, no claim needed. return nil case *claimName != "": claimName := *claimName // The ResourceClaim should exist because it is recorded in the pod.status.resourceClaimStatuses, // but perhaps it was deleted accidentally. In that case we re-create it. claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(claimName) if err != nil && !apierrors.IsNotFound(err) { return err } if claim != nil { var err error if mustCheckOwner { err = resourceclaim.IsForPod(pod, claim) } if err == nil { // Already created, nothing more to do. logger.V(5).Info("claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName) return nil } logger.Error(err, "claim that was created for the pod is no longer owned by the pod, creating a new one", "podClaim", podClaim.Name, "resourceClaim", claimName) } } templateName := podClaim.Source.ResourceClaimTemplateName if templateName == nil { // Nothing to do. return nil } // Before we create a new ResourceClaim, check if there is an orphaned one. // This covers the case that the controller has created it, but then fails // before it can update the pod status. claim, err := ec.findPodResourceClaim(pod, podClaim) if err != nil { return fmt.Errorf("finding ResourceClaim for claim %s in pod %s/%s failed: %v", podClaim.Name, pod.Namespace, pod.Name, err) } if claim == nil { template, err := ec.templateLister.ResourceClaimTemplates(pod.Namespace).Get(*templateName) if err != nil { return fmt.Errorf("resource claim template %q: %v", *templateName, err) } // Create the ResourceClaim with pod as owner, with a generated name that uses // - as base. isTrue := true annotations := template.Spec.ObjectMeta.Annotations if annotations == nil { annotations = make(map[string]string) } annotations[podResourceClaimAnnotation] = podClaim.Name generateName := pod.Name + "-" + podClaim.Name + "-" maxBaseLen := 57 // Leave space for hyphen and 5 random characters in a name with 63 characters. if len(generateName) > maxBaseLen { // We could leave truncation to the apiserver, but as // it removes at the end, we would loose everything // from the pod claim name when the pod name is long. // We can do better and truncate both strings, // proportional to their length. generateName = pod.Name[0:len(pod.Name)*maxBaseLen/len(generateName)] + "-" + podClaim.Name[0:len(podClaim.Name)*maxBaseLen/len(generateName)] } claim = &resourcev1alpha2.ResourceClaim{ ObjectMeta: metav1.ObjectMeta{ GenerateName: generateName, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", Kind: "Pod", Name: pod.Name, UID: pod.UID, Controller: &isTrue, BlockOwnerDeletion: &isTrue, }, }, Annotations: annotations, Labels: template.Spec.ObjectMeta.Labels, }, Spec: template.Spec.Spec, } metrics.ResourceClaimCreateAttempts.Inc() claimName := claim.Name claim, err = ec.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Create(ctx, claim, metav1.CreateOptions{}) if err != nil { metrics.ResourceClaimCreateFailures.Inc() return fmt.Errorf("create ResourceClaim %s: %v", claimName, err) } ec.claimCache.Mutation(claim) } // Remember the new ResourceClaim for a batch PodStatus update in our caller. if *newPodClaims == nil { *newPodClaims = make(map[string]string) } (*newPodClaims)[podClaim.Name] = claim.Name return nil } // findPodResourceClaim looks for an existing ResourceClaim with the right // annotation (ties it to the pod claim) and the right ownership (ties it to // the pod). func (ec *Controller) findPodResourceClaim(pod *v1.Pod, podClaim v1.PodResourceClaim) (*resourcev1alpha2.ResourceClaim, error) { // Only claims owned by the pod will get returned here. claims, err := ec.claimCache.ByIndex(claimPodOwnerIndex, string(pod.UID)) if err != nil { return nil, err } deterministicName := pod.Name + "-" + podClaim.Name // Kubernetes <= 1.27 behavior. for _, claimObj := range claims { claim, ok := claimObj.(*resourcev1alpha2.ResourceClaim) if !ok { return nil, fmt.Errorf("unexpected object of type %T returned by claim cache", claimObj) } podClaimName, ok := claim.Annotations[podResourceClaimAnnotation] if ok && podClaimName != podClaim.Name { continue } // No annotation? It might a ResourceClaim created for // the pod with a previous Kubernetes release where the // ResourceClaim name was deterministic, in which case // we have to use it and update the new pod status // field accordingly. if !ok && claim.Name != deterministicName { continue } // Pick the first one that matches. There shouldn't be more than one. If there is, // then all others will be ignored until the pod gets deleted. Then they also get // cleaned up. return claim, nil } return nil, nil } func (ec *Controller) ensurePodSchedulingContext(ctx context.Context, pod *v1.Pod) error { scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name) if err != nil && !apierrors.IsNotFound(err) { return fmt.Errorf("retrieve PodSchedulingContext: %v", err) } if scheduling == nil { scheduling = &resourcev1alpha2.PodSchedulingContext{ ObjectMeta: metav1.ObjectMeta{ Name: pod.Name, Namespace: pod.Namespace, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "v1", Kind: "Pod", Name: pod.Name, UID: pod.UID, Controller: pointer.Bool(true), }, }, }, Spec: resourcev1alpha2.PodSchedulingContextSpec{ SelectedNode: pod.Spec.NodeName, // There is no need for negotiation about // potential and suitable nodes anymore, so // PotentialNodes can be left empty. }, } if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Create(ctx, scheduling, metav1.CreateOptions{}); err != nil { return fmt.Errorf("create PodSchedulingContext: %v", err) } return nil } if scheduling.Spec.SelectedNode != pod.Spec.NodeName { scheduling := scheduling.DeepCopy() scheduling.Spec.SelectedNode = pod.Spec.NodeName if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Update(ctx, scheduling, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("update spec.selectedNode in PodSchedulingContext: %v", err) } } return nil } func (ec *Controller) reserveForPod(ctx context.Context, pod *v1.Pod, claim *resourcev1alpha2.ResourceClaim) error { claim = claim.DeepCopy() claim.Status.ReservedFor = append(claim.Status.ReservedFor, resourcev1alpha2.ResourceClaimConsumerReference{ Resource: "pods", Name: pod.Name, UID: pod.UID, }) if _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil { return fmt.Errorf("reserve claim for pod: %v", err) } return nil } func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error { logger := klog.LoggerWithValues(klog.FromContext(ctx), "claim", klog.KRef(namespace, name)) ctx = klog.NewContext(ctx, logger) claim, err := ec.claimLister.ResourceClaims(namespace).Get(name) if err != nil { if apierrors.IsNotFound(err) { logger.V(5).Info("nothing to do for claim, it is gone") return nil } return err } // Check if the ReservedFor entries are all still valid. valid := make([]resourcev1alpha2.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor)) for _, reservedFor := range claim.Status.ReservedFor { if reservedFor.APIGroup == "" && reservedFor.Resource == "pods" { // A pod falls into one of three categories: // - we have it in our cache -> don't remove it until we are told that it got removed // - we don't have it in our cache anymore, but we have seen it before -> it was deleted, remove it // - not in our cache, not seen -> double-check with API server before removal keepEntry := true // Tracking deleted pods in the LRU cache is an // optimization. Without this cache, the code would // have to do the API call below for every deleted pod // to ensure that the pod really doesn't exist. With // the cache, most of the time the pod will be recorded // as deleted and the API call can be avoided. if ec.deletedObjects.Has(reservedFor.UID) { // We know that the pod was deleted. This is // easy to check and thus is done first. keepEntry = false } else { pod, err := ec.podLister.Pods(claim.Namespace).Get(reservedFor.Name) switch { case err != nil && !apierrors.IsNotFound(err): return err case err != nil: // We might not have it in our informer cache // yet. Removing the pod while the scheduler is // scheduling it would be bad. We have to be // absolutely sure and thus have to check with // the API server. pod, err := ec.kubeClient.CoreV1().Pods(claim.Namespace).Get(ctx, reservedFor.Name, metav1.GetOptions{}) if err != nil && !apierrors.IsNotFound(err) { return err } if pod == nil || pod.UID != reservedFor.UID { logger.V(6).Info("remove reservation because pod is gone or got replaced", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) keepEntry = false } case pod.UID != reservedFor.UID: logger.V(6).Info("remove reservation because pod got replaced with new instance", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) keepEntry = false case isPodDone(pod): logger.V(6).Info("remove reservation because pod will not run anymore", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name)) keepEntry = false } } if keepEntry { valid = append(valid, reservedFor) } continue } // TODO: support generic object lookup return fmt.Errorf("unsupported ReservedFor entry: %v", reservedFor) } builtinControllerFinalizer := slices.Index(claim.Finalizers, resourcev1alpha2.Finalizer) logger.V(5).Info("claim reserved for counts", "currentCount", len(claim.Status.ReservedFor), "claim", klog.KRef(namespace, name), "updatedCount", len(valid), "builtinController", builtinControllerFinalizer >= 0) if len(valid) < len(claim.Status.ReservedFor) { // This is not using a patch because we want the update to fail if anything // changed in the meantime. claim := claim.DeepCopy() claim.Status.ReservedFor = valid // When a ResourceClaim uses delayed allocation, then it makes sense to // deallocate the claim as soon as the last consumer stops using // it. This ensures that the claim can be allocated again as needed by // some future consumer instead of trying to schedule that consumer // onto the node that was chosen for the previous consumer. It also // releases the underlying resources for use by other claims. // // This has to be triggered by the transition from "was being used" to // "is not used anymore" because a DRA driver is not required to set // `status.reservedFor` together with `status.allocation`, i.e. a claim // that is "currently unused" should not get deallocated. // // This does not matter for claims that were created for a pod. For // those, the resource claim controller will trigger deletion when the // pod is done. However, it doesn't hurt to also trigger deallocation // for such claims and not checking for them keeps this code simpler. if len(valid) == 0 { if builtinControllerFinalizer >= 0 { if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer || claim.DeletionTimestamp != nil { // Allocated by scheduler with structured parameters. We can "deallocate" // by clearing the allocation. claim.Status.Allocation = nil } } else if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer { // DRA driver controller in the control plane // needs to do the deallocation. claim.Status.DeallocationRequested = true } // In all other cases, we keep the claim allocated, in particular for immediate allocation // with a control plane controller. } claim, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) if err != nil { return err } // Now also remove the finalizer if it is not needed anymore. // Note that the index may have changed as a result of the UpdateStatus call. builtinControllerFinalizer := slices.Index(claim.Finalizers, resourcev1alpha2.Finalizer) if builtinControllerFinalizer >= 0 && claim.Status.Allocation == nil { claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1) if _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil { return err } } } else if builtinControllerFinalizer >= 0 && claim.DeletionTimestamp != nil && len(valid) == 0 { claim := claim.DeepCopy() if claim.Status.Allocation != nil { // This can happen when a claim with immediate allocation // stopped being used, remained allocated, and then got // deleted. As above we then need to clear the allocation. claim.Status.Allocation = nil var err error claim, err = ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}) if err != nil { return err } } // Whether it was allocated or not, remove the finalizer to unblock removal. claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1) _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}) if err != nil { return err } } if len(valid) == 0 { // Claim is not reserved. If it was generated for a pod and // that pod is not going to run, the claim can be // deleted. Normally the garbage collector does that, but the // pod itself might not get deleted for a while. podName, podUID := owningPod(claim) if podName != "" { pod, err := ec.podLister.Pods(claim.Namespace).Get(podName) switch { case err == nil: // Pod already replaced or not going to run? if pod.UID != podUID || isPodDone(pod) { // We are certain that the owning pod is not going to need // the claim and therefore remove the claim. logger.V(5).Info("deleting unused generated claim", "claim", klog.KObj(claim), "pod", klog.KObj(pod)) err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("delete claim: %v", err) } } else { logger.V(6).Info("wrong pod content, not deleting claim", "claim", klog.KObj(claim), "podUID", podUID, "podContent", pod) } case apierrors.IsNotFound(err): // We might not know the pod *yet*. Instead of doing an expensive API call, // let the garbage collector handle the case that the pod is truly gone. logger.V(5).Info("pod for claim not found", "claim", klog.KObj(claim), "pod", klog.KRef(claim.Namespace, podName)) default: return fmt.Errorf("lookup pod: %v", err) } } else { logger.V(5).Info("claim not generated for a pod", "claim", klog.KObj(claim)) } } return nil } func owningPod(claim *resourcev1alpha2.ResourceClaim) (string, types.UID) { for _, owner := range claim.OwnerReferences { if pointer.BoolDeref(owner.Controller, false) && owner.APIVersion == "v1" && owner.Kind == "Pod" { return owner.Name, owner.UID } } return "", "" } // podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (= // namespace/name) for ResourceClaim or ResourceClaimTemplates in a given pod. func podResourceClaimIndexFunc(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { return []string{}, nil } keys := []string{} for _, podClaim := range pod.Spec.ResourceClaims { claimName, _, err := resourceclaim.Name(pod, &podClaim) if err != nil || claimName == nil { // Index functions are not supposed to fail, the caller will panic. // For both error reasons (claim not created yet, unknown API) // we simply don't index. continue } keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, *claimName)) } return keys, nil } // isPodDone returns true if it is certain that none of the containers are running and never will run. func isPodDone(pod *v1.Pod) bool { return podutil.IsPodPhaseTerminal(pod.Status.Phase) || // Deleted and not scheduled: pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" } // claimPodOwnerIndexFunc is an index function that returns the pod UIDs of // all pods which own the resource claim. Should only be one, though. func claimPodOwnerIndexFunc(obj interface{}) ([]string, error) { claim, ok := obj.(*resourcev1alpha2.ResourceClaim) if !ok { return nil, nil } var keys []string for _, owner := range claim.OwnerReferences { if owner.Controller != nil && *owner.Controller && owner.APIVersion == "v1" && owner.Kind == "Pod" { keys = append(keys, string(owner.UID)) } } return keys, nil }