...

Source file src/k8s.io/kubernetes/pkg/controller/resourceclaim/controller.go

Documentation: k8s.io/kubernetes/pkg/controller/resourceclaim

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package resourceclaim
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"slices"
    24  	"strings"
    25  	"time"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/types"
    32  	"k8s.io/apimachinery/pkg/util/runtime"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
    35  	v1informers "k8s.io/client-go/informers/core/v1"
    36  	resourcev1alpha2informers "k8s.io/client-go/informers/resource/v1alpha2"
    37  	clientset "k8s.io/client-go/kubernetes"
    38  	"k8s.io/client-go/kubernetes/scheme"
    39  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    40  	v1listers "k8s.io/client-go/listers/core/v1"
    41  	resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
    42  	"k8s.io/client-go/tools/cache"
    43  	"k8s.io/client-go/tools/record"
    44  	"k8s.io/client-go/util/workqueue"
    45  	"k8s.io/dynamic-resource-allocation/resourceclaim"
    46  	"k8s.io/klog/v2"
    47  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    48  	"k8s.io/kubernetes/pkg/controller/resourceclaim/metrics"
    49  	"k8s.io/utils/pointer"
    50  )
    51  
    52  const (
    53  	// podResourceClaimIndex is the lookup name for the index function which indexes by pod ResourceClaim templates.
    54  	podResourceClaimIndex = "pod-resource-claim-index"
    55  
    56  	// podResourceClaimAnnotation is the special annotation that generated
    57  	// ResourceClaims get. Its value is the pod.spec.resourceClaims[].name
    58  	// for which it was generated. This is used only inside the controller
    59  	// and not documented as part of the Kubernetes API.
    60  	podResourceClaimAnnotation = "resource.kubernetes.io/pod-claim-name"
    61  
    62  	// claimPodOwnerIndex is used to find ResourceClaims which have
    63  	// a specific pod as owner. Values for this index are the pod UID.
    64  	claimPodOwnerIndex = "claim-pod-owner-index"
    65  
    66  	// Field manager used to update the pod status.
    67  	fieldManager = "ResourceClaimController"
    68  
    69  	maxUIDCacheEntries = 500
    70  )
    71  
    72  // Controller creates ResourceClaims for ResourceClaimTemplates in a pod spec.
    73  type Controller struct {
    74  	// kubeClient is the kube API client used to communicate with the API
    75  	// server.
    76  	kubeClient clientset.Interface
    77  
    78  	// claimLister is the shared ResourceClaim lister used to fetch and store ResourceClaim
    79  	// objects from the API server. It is shared with other controllers and
    80  	// therefore the ResourceClaim objects in its store should be treated as immutable.
    81  	claimLister  resourcev1alpha2listers.ResourceClaimLister
    82  	claimsSynced cache.InformerSynced
    83  	claimCache   cache.MutationCache
    84  
    85  	// podLister is the shared Pod lister used to fetch Pod
    86  	// objects from the API server. It is shared with other controllers and
    87  	// therefore the Pod objects in its store should be treated as immutable.
    88  	podLister v1listers.PodLister
    89  	podSynced cache.InformerSynced
    90  
    91  	// podSchedulingList is the shared PodSchedulingContext lister used to
    92  	// fetch scheduling objects from the API server. It is shared with other
    93  	// controllers and therefore the objects in its store should be treated
    94  	// as immutable.
    95  	podSchedulingLister resourcev1alpha2listers.PodSchedulingContextLister
    96  	podSchedulingSynced cache.InformerSynced
    97  
    98  	// templateLister is the shared ResourceClaimTemplate lister used to
    99  	// fetch template objects from the API server. It is shared with other
   100  	// controllers and therefore the objects in its store should be treated
   101  	// as immutable.
   102  	templateLister  resourcev1alpha2listers.ResourceClaimTemplateLister
   103  	templatesSynced cache.InformerSynced
   104  
   105  	// podIndexer has the common PodResourceClaim indexer indexer installed To
   106  	// limit iteration over pods to those of interest.
   107  	podIndexer cache.Indexer
   108  
   109  	// recorder is used to record events in the API server
   110  	recorder record.EventRecorder
   111  
   112  	queue workqueue.RateLimitingInterface
   113  
   114  	// The deletedObjects cache keeps track of Pods for which we know that
   115  	// they have existed and have been removed. For those we can be sure
   116  	// that a ReservedFor entry needs to be removed.
   117  	deletedObjects *uidCache
   118  }
   119  
   120  const (
   121  	claimKeyPrefix = "claim:"
   122  	podKeyPrefix   = "pod:"
   123  )
   124  
   125  // NewController creates a ResourceClaim controller.
   126  func NewController(
   127  	logger klog.Logger,
   128  	kubeClient clientset.Interface,
   129  	podInformer v1informers.PodInformer,
   130  	podSchedulingInformer resourcev1alpha2informers.PodSchedulingContextInformer,
   131  	claimInformer resourcev1alpha2informers.ResourceClaimInformer,
   132  	templateInformer resourcev1alpha2informers.ResourceClaimTemplateInformer) (*Controller, error) {
   133  
   134  	ec := &Controller{
   135  		kubeClient:          kubeClient,
   136  		podLister:           podInformer.Lister(),
   137  		podIndexer:          podInformer.Informer().GetIndexer(),
   138  		podSynced:           podInformer.Informer().HasSynced,
   139  		podSchedulingLister: podSchedulingInformer.Lister(),
   140  		podSchedulingSynced: podSchedulingInformer.Informer().HasSynced,
   141  		claimLister:         claimInformer.Lister(),
   142  		claimsSynced:        claimInformer.Informer().HasSynced,
   143  		templateLister:      templateInformer.Lister(),
   144  		templatesSynced:     templateInformer.Informer().HasSynced,
   145  		queue:               workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_claim"),
   146  		deletedObjects:      newUIDCache(maxUIDCacheEntries),
   147  	}
   148  
   149  	metrics.RegisterMetrics()
   150  
   151  	if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   152  		AddFunc: func(obj interface{}) {
   153  			ec.enqueuePod(logger, obj, false)
   154  		},
   155  		UpdateFunc: func(old, updated interface{}) {
   156  			ec.enqueuePod(logger, updated, false)
   157  		},
   158  		DeleteFunc: func(obj interface{}) {
   159  			ec.enqueuePod(logger, obj, true)
   160  		},
   161  	}); err != nil {
   162  		return nil, err
   163  	}
   164  	if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   165  		AddFunc: func(obj interface{}) {
   166  			logger.V(6).Info("new claim", "claimDump", obj)
   167  			ec.enqueueResourceClaim(logger, obj, false)
   168  		},
   169  		UpdateFunc: func(old, updated interface{}) {
   170  			logger.V(6).Info("updated claim", "claimDump", updated)
   171  			ec.enqueueResourceClaim(logger, updated, false)
   172  		},
   173  		DeleteFunc: func(obj interface{}) {
   174  			logger.V(6).Info("deleted claim", "claimDump", obj)
   175  			ec.enqueueResourceClaim(logger, obj, true)
   176  		},
   177  	}); err != nil {
   178  		return nil, err
   179  	}
   180  	if err := ec.podIndexer.AddIndexers(cache.Indexers{podResourceClaimIndex: podResourceClaimIndexFunc}); err != nil {
   181  		return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
   182  	}
   183  
   184  	// The mutation cache acts as an additional layer for the informer
   185  	// cache and after a create made by the controller returns that
   186  	// object until the informer catches up. That is necessary
   187  	// when a ResourceClaim got created, updating the pod status fails,
   188  	// and then a retry occurs before the informer cache is updated.
   189  	// In that scenario, the controller would create another claim
   190  	// instead of continuing with the existing one.
   191  	claimInformerCache := claimInformer.Informer().GetIndexer()
   192  	if err := claimInformerCache.AddIndexers(cache.Indexers{claimPodOwnerIndex: claimPodOwnerIndexFunc}); err != nil {
   193  		return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err)
   194  	}
   195  	ec.claimCache = cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache,
   196  		// Very long time to live, unlikely to be needed because
   197  		// the informer cache should get updated soon.
   198  		time.Hour,
   199  		// Allow storing objects not in the underlying cache - that's the point...
   200  		// It's safe because in case of a race (claim is in mutation cache, claim
   201  		// gets deleted, controller updates status based on mutation cache) the
   202  		// "bad" pod status will get detected and fixed when the informer catches up.
   203  		true,
   204  	)
   205  
   206  	return ec, nil
   207  }
   208  
   209  func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bool) {
   210  	if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
   211  		obj = d.Obj
   212  	}
   213  	pod, ok := obj.(*v1.Pod)
   214  	if !ok {
   215  		// Not a pod?!
   216  		logger.Error(nil, "enqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj))
   217  		return
   218  	}
   219  
   220  	if len(pod.Spec.ResourceClaims) == 0 {
   221  		// Nothing to do for it at all.
   222  		return
   223  	}
   224  
   225  	if deleted {
   226  		logger.V(6).Info("pod got deleted", "pod", klog.KObj(pod))
   227  		ec.deletedObjects.Add(pod.UID)
   228  	}
   229  
   230  	logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted)
   231  
   232  	// Release reservations of a deleted or completed pod?
   233  	if needsClaims, reason := podNeedsClaims(pod, deleted); !needsClaims {
   234  		for _, podClaim := range pod.Spec.ResourceClaims {
   235  			claimName, _, err := resourceclaim.Name(pod, &podClaim)
   236  			switch {
   237  			case err != nil:
   238  				// Either the claim was not created (nothing to do here) or
   239  				// the API changed. The later will also get reported elsewhere,
   240  				// so here it's just a debug message.
   241  				logger.V(6).Info("Nothing to do for claim during pod change", "err", err, "reason", reason)
   242  			case claimName != nil:
   243  				key := claimKeyPrefix + pod.Namespace + "/" + *claimName
   244  				logger.V(6).Info("Process claim", "pod", klog.KObj(pod), "key", key, "reason", reason)
   245  				ec.queue.Add(key)
   246  			default:
   247  				// Nothing to do, claim wasn't generated.
   248  				logger.V(6).Info("Nothing to do for skipped claim during pod change", "reason", reason)
   249  			}
   250  		}
   251  	}
   252  
   253  	needsWork, reason := ec.podNeedsWork(pod)
   254  	if needsWork {
   255  		logger.V(6).Info("enqueing pod", "pod", klog.KObj(pod), "reason", reason)
   256  		ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name)
   257  		return
   258  	}
   259  	logger.V(6).Info("not enqueing pod", "pod", klog.KObj(pod), "reason", reason)
   260  }
   261  
   262  func podNeedsClaims(pod *v1.Pod, deleted bool) (bool, string) {
   263  	if deleted {
   264  		return false, "pod got removed"
   265  	}
   266  	if podutil.IsPodTerminal(pod) {
   267  		return false, "pod has terminated"
   268  	}
   269  	if pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" {
   270  		return false, "pod got deleted before scheduling"
   271  	}
   272  	// Still needs claims.
   273  	return true, "pod might run"
   274  }
   275  
   276  // podNeedsWork checks whether a new or modified pod needs to be processed
   277  // further by a worker. It returns a boolean with the result and an explanation
   278  // for it.
   279  func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) {
   280  	if pod.DeletionTimestamp != nil {
   281  		// Nothing else to do for the pod.
   282  		return false, "pod is deleted"
   283  	}
   284  
   285  	for _, podClaim := range pod.Spec.ResourceClaims {
   286  		claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim)
   287  		if err != nil {
   288  			return true, err.Error()
   289  		}
   290  		// If the claimName is nil, then it has been determined before
   291  		// that the claim is not needed.
   292  		if claimName == nil {
   293  			return false, "claim is not needed"
   294  		}
   295  		claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
   296  		if apierrors.IsNotFound(err) {
   297  			if podClaim.Source.ResourceClaimTemplateName != nil {
   298  				return true, "must create ResourceClaim from template"
   299  			}
   300  			// User needs to create claim.
   301  			return false, "claim is missing and must be created by user"
   302  		}
   303  		if err != nil {
   304  			// Shouldn't happen.
   305  			return true, fmt.Sprintf("internal error while checking for claim: %v", err)
   306  		}
   307  
   308  		if checkOwner &&
   309  			resourceclaim.IsForPod(pod, claim) != nil {
   310  			// Cannot proceed with the pod unless that other claim gets deleted.
   311  			return false, "conflicting claim needs to be removed by user"
   312  		}
   313  
   314  		// This check skips over the reasons below that only apply
   315  		// when a pod has been scheduled already. We need to keep checking
   316  		// for more claims that might need to be created.
   317  		if pod.Spec.NodeName == "" {
   318  			continue
   319  		}
   320  
   321  		// Create PodSchedulingContext if the pod got scheduled without triggering
   322  		// delayed allocation.
   323  		//
   324  		// These can happen when:
   325  		// - a user created a pod with spec.nodeName set, perhaps for testing
   326  		// - some scheduler was used which is unaware of DRA
   327  		// - DRA was not enabled in kube-scheduler (version skew, configuration)
   328  		if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
   329  			claim.Status.Allocation == nil {
   330  			scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
   331  			if apierrors.IsNotFound(err) {
   332  				return true, "need to create PodSchedulingContext for scheduled pod"
   333  			}
   334  			if err != nil {
   335  				// Shouldn't happen.
   336  				return true, fmt.Sprintf("internal error while checking for PodSchedulingContext: %v", err)
   337  			}
   338  			if scheduling.Spec.SelectedNode != pod.Spec.NodeName {
   339  				// Need to update PodSchedulingContext.
   340  				return true, "need to updated PodSchedulingContext for scheduled pod"
   341  			}
   342  		}
   343  		if claim.Status.Allocation != nil &&
   344  			!resourceclaim.IsReservedForPod(pod, claim) &&
   345  			resourceclaim.CanBeReserved(claim) {
   346  			// Need to reserve it.
   347  			return true, "need to reserve claim for pod"
   348  		}
   349  	}
   350  
   351  	return false, "nothing to do"
   352  }
   353  
   354  func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) {
   355  	if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
   356  		obj = d.Obj
   357  	}
   358  	claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
   359  	if !ok {
   360  		return
   361  	}
   362  
   363  	if !deleted {
   364  		// When starting up, we have to check all claims to find those with
   365  		// stale pods in ReservedFor. During an update, a pod might get added
   366  		// that already no longer exists.
   367  		key := claimKeyPrefix + claim.Namespace + "/" + claim.Name
   368  		logger.V(6).Info("enqueing new or updated claim", "claim", klog.KObj(claim), "key", key)
   369  		ec.queue.Add(key)
   370  	} else {
   371  		logger.V(6).Info("not enqueing deleted claim", "claim", klog.KObj(claim))
   372  	}
   373  
   374  	// Also check whether this causes work for any of the currently
   375  	// known pods which use the ResourceClaim.
   376  	objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name))
   377  	if err != nil {
   378  		logger.Error(err, "listing pods from cache")
   379  		return
   380  	}
   381  	if len(objs) == 0 {
   382  		logger.V(6).Info("claim got deleted while not needed by any pod, nothing to do", "claim", klog.KObj(claim))
   383  		return
   384  	}
   385  	for _, obj := range objs {
   386  		ec.enqueuePod(logger, obj, false)
   387  	}
   388  }
   389  
   390  func (ec *Controller) Run(ctx context.Context, workers int) {
   391  	defer runtime.HandleCrash()
   392  	defer ec.queue.ShutDown()
   393  
   394  	logger := klog.FromContext(ctx)
   395  	logger.Info("Starting ephemeral volume controller")
   396  	defer logger.Info("Shutting down ephemeral volume controller")
   397  
   398  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
   399  	eventBroadcaster.StartLogging(klog.Infof)
   400  	eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ec.kubeClient.CoreV1().Events("")})
   401  	ec.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "resource_claim"})
   402  	defer eventBroadcaster.Shutdown()
   403  
   404  	if !cache.WaitForNamedCacheSync("ephemeral", ctx.Done(), ec.podSynced, ec.claimsSynced) {
   405  		return
   406  	}
   407  
   408  	for i := 0; i < workers; i++ {
   409  		go wait.UntilWithContext(ctx, ec.runWorker, time.Second)
   410  	}
   411  
   412  	<-ctx.Done()
   413  }
   414  
   415  func (ec *Controller) runWorker(ctx context.Context) {
   416  	for ec.processNextWorkItem(ctx) {
   417  	}
   418  }
   419  
   420  func (ec *Controller) processNextWorkItem(ctx context.Context) bool {
   421  	key, shutdown := ec.queue.Get()
   422  	if shutdown {
   423  		return false
   424  	}
   425  	defer ec.queue.Done(key)
   426  
   427  	err := ec.syncHandler(ctx, key.(string))
   428  	if err == nil {
   429  		ec.queue.Forget(key)
   430  		return true
   431  	}
   432  
   433  	runtime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
   434  	ec.queue.AddRateLimited(key)
   435  
   436  	return true
   437  }
   438  
   439  // syncHandler is invoked for each work item which might need to be processed.
   440  // If an error is returned from this function, the item will be requeued.
   441  func (ec *Controller) syncHandler(ctx context.Context, key string) error {
   442  	sep := strings.Index(key, ":")
   443  	if sep < 0 {
   444  		return fmt.Errorf("unexpected key: %s", key)
   445  	}
   446  	prefix, object := key[0:sep+1], key[sep+1:]
   447  	namespace, name, err := cache.SplitMetaNamespaceKey(object)
   448  	if err != nil {
   449  		return err
   450  	}
   451  
   452  	switch prefix {
   453  	case podKeyPrefix:
   454  		return ec.syncPod(ctx, namespace, name)
   455  	case claimKeyPrefix:
   456  		return ec.syncClaim(ctx, namespace, name)
   457  	default:
   458  		return fmt.Errorf("unexpected key prefix: %s", prefix)
   459  	}
   460  
   461  }
   462  
   463  func (ec *Controller) syncPod(ctx context.Context, namespace, name string) error {
   464  	logger := klog.LoggerWithValues(klog.FromContext(ctx), "pod", klog.KRef(namespace, name))
   465  	ctx = klog.NewContext(ctx, logger)
   466  	pod, err := ec.podLister.Pods(namespace).Get(name)
   467  	if err != nil {
   468  		if apierrors.IsNotFound(err) {
   469  			logger.V(5).Info("nothing to do for pod, it is gone")
   470  			return nil
   471  		}
   472  		return err
   473  	}
   474  
   475  	// Ignore pods which are already getting deleted.
   476  	if pod.DeletionTimestamp != nil {
   477  		logger.V(5).Info("nothing to do for pod, it is marked for deletion")
   478  		return nil
   479  	}
   480  
   481  	var newPodClaims map[string]string
   482  	for _, podClaim := range pod.Spec.ResourceClaims {
   483  		if err := ec.handleClaim(ctx, pod, podClaim, &newPodClaims); err != nil {
   484  			if ec.recorder != nil {
   485  				ec.recorder.Event(pod, v1.EventTypeWarning, "FailedResourceClaimCreation", fmt.Sprintf("PodResourceClaim %s: %v", podClaim.Name, err))
   486  			}
   487  			return fmt.Errorf("pod %s/%s, PodResourceClaim %s: %v", namespace, name, podClaim.Name, err)
   488  		}
   489  	}
   490  
   491  	if newPodClaims != nil {
   492  		// Patch the pod status with the new information about
   493  		// generated ResourceClaims.
   494  		statuses := make([]*corev1apply.PodResourceClaimStatusApplyConfiguration, 0, len(newPodClaims))
   495  		for podClaimName, resourceClaimName := range newPodClaims {
   496  			statuses = append(statuses, corev1apply.PodResourceClaimStatus().WithName(podClaimName).WithResourceClaimName(resourceClaimName))
   497  		}
   498  		podApply := corev1apply.Pod(name, namespace).WithStatus(corev1apply.PodStatus().WithResourceClaimStatuses(statuses...))
   499  		if _, err := ec.kubeClient.CoreV1().Pods(namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil {
   500  			return fmt.Errorf("update pod %s/%s ResourceClaimStatuses: %v", namespace, name, err)
   501  		}
   502  	}
   503  
   504  	if pod.Spec.NodeName == "" {
   505  		// Scheduler will handle PodSchedulingContext and reservations.
   506  		logger.V(5).Info("nothing to do for pod, scheduler will deal with it")
   507  		return nil
   508  	}
   509  
   510  	for _, podClaim := range pod.Spec.ResourceClaims {
   511  		claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim)
   512  		if err != nil {
   513  			return err
   514  		}
   515  		// If nil, then it has been determined that the claim is not needed
   516  		// and can be skipped.
   517  		if claimName == nil {
   518  			continue
   519  		}
   520  		claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
   521  		if apierrors.IsNotFound(err) {
   522  			return nil
   523  		}
   524  		if err != nil {
   525  			return fmt.Errorf("retrieve claim: %v", err)
   526  		}
   527  		if checkOwner {
   528  			if err := resourceclaim.IsForPod(pod, claim); err != nil {
   529  				return err
   530  			}
   531  		}
   532  		if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
   533  			claim.Status.Allocation == nil {
   534  			logger.V(5).Info("create PodSchedulingContext because claim needs to be allocated", "resourceClaim", klog.KObj(claim))
   535  			return ec.ensurePodSchedulingContext(ctx, pod)
   536  		}
   537  		if claim.Status.Allocation != nil &&
   538  			!resourceclaim.IsReservedForPod(pod, claim) &&
   539  			resourceclaim.CanBeReserved(claim) {
   540  			logger.V(5).Info("reserve claim for pod", "resourceClaim", klog.KObj(claim))
   541  			if err := ec.reserveForPod(ctx, pod, claim); err != nil {
   542  				return err
   543  			}
   544  		}
   545  	}
   546  
   547  	return nil
   548  }
   549  
   550  // handleResourceClaim is invoked for each volume of a pod.
   551  func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1.PodResourceClaim, newPodClaims *map[string]string) error {
   552  	logger := klog.LoggerWithValues(klog.FromContext(ctx), "podClaim", podClaim.Name)
   553  	ctx = klog.NewContext(ctx, logger)
   554  	logger.V(5).Info("checking", "podClaim", podClaim.Name)
   555  
   556  	// resourceclaim.Name checks for the situation that the client doesn't
   557  	// know some future addition to the API. Therefore it gets called here
   558  	// even if there is no template to work on, because if some new field
   559  	// gets added, the expectation might be that the controller does
   560  	// something for it.
   561  	claimName, mustCheckOwner, err := resourceclaim.Name(pod, &podClaim)
   562  	switch {
   563  	case errors.Is(err, resourceclaim.ErrClaimNotFound):
   564  		// Continue below.
   565  	case err != nil:
   566  		return fmt.Errorf("checking for claim before creating it: %v", err)
   567  	case claimName == nil:
   568  		// Nothing to do, no claim needed.
   569  		return nil
   570  	case *claimName != "":
   571  		claimName := *claimName
   572  		// The ResourceClaim should exist because it is recorded in the pod.status.resourceClaimStatuses,
   573  		// but perhaps it was deleted accidentally. In that case we re-create it.
   574  		claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(claimName)
   575  		if err != nil && !apierrors.IsNotFound(err) {
   576  			return err
   577  		}
   578  		if claim != nil {
   579  			var err error
   580  			if mustCheckOwner {
   581  				err = resourceclaim.IsForPod(pod, claim)
   582  			}
   583  			if err == nil {
   584  				// Already created, nothing more to do.
   585  				logger.V(5).Info("claim already created", "podClaim", podClaim.Name, "resourceClaim", claimName)
   586  				return nil
   587  			}
   588  			logger.Error(err, "claim that was created for the pod is no longer owned by the pod, creating a new one", "podClaim", podClaim.Name, "resourceClaim", claimName)
   589  		}
   590  	}
   591  
   592  	templateName := podClaim.Source.ResourceClaimTemplateName
   593  	if templateName == nil {
   594  		// Nothing to do.
   595  		return nil
   596  	}
   597  
   598  	// Before we create a new ResourceClaim, check if there is an orphaned one.
   599  	// This covers the case that the controller has created it, but then fails
   600  	// before it can update the pod status.
   601  	claim, err := ec.findPodResourceClaim(pod, podClaim)
   602  	if err != nil {
   603  		return fmt.Errorf("finding ResourceClaim for claim %s in pod %s/%s failed: %v", podClaim.Name, pod.Namespace, pod.Name, err)
   604  	}
   605  
   606  	if claim == nil {
   607  		template, err := ec.templateLister.ResourceClaimTemplates(pod.Namespace).Get(*templateName)
   608  		if err != nil {
   609  			return fmt.Errorf("resource claim template %q: %v", *templateName, err)
   610  		}
   611  
   612  		// Create the ResourceClaim with pod as owner, with a generated name that uses
   613  		// <pod>-<claim name> as base.
   614  		isTrue := true
   615  		annotations := template.Spec.ObjectMeta.Annotations
   616  		if annotations == nil {
   617  			annotations = make(map[string]string)
   618  		}
   619  		annotations[podResourceClaimAnnotation] = podClaim.Name
   620  		generateName := pod.Name + "-" + podClaim.Name + "-"
   621  		maxBaseLen := 57 // Leave space for hyphen and 5 random characters in a name with 63 characters.
   622  		if len(generateName) > maxBaseLen {
   623  			// We could leave truncation to the apiserver, but as
   624  			// it removes at the end, we would loose everything
   625  			// from the pod claim name when the pod name is long.
   626  			// We can do better and truncate both strings,
   627  			// proportional to their length.
   628  			generateName = pod.Name[0:len(pod.Name)*maxBaseLen/len(generateName)] +
   629  				"-" +
   630  				podClaim.Name[0:len(podClaim.Name)*maxBaseLen/len(generateName)]
   631  		}
   632  		claim = &resourcev1alpha2.ResourceClaim{
   633  			ObjectMeta: metav1.ObjectMeta{
   634  				GenerateName: generateName,
   635  				OwnerReferences: []metav1.OwnerReference{
   636  					{
   637  						APIVersion:         "v1",
   638  						Kind:               "Pod",
   639  						Name:               pod.Name,
   640  						UID:                pod.UID,
   641  						Controller:         &isTrue,
   642  						BlockOwnerDeletion: &isTrue,
   643  					},
   644  				},
   645  				Annotations: annotations,
   646  				Labels:      template.Spec.ObjectMeta.Labels,
   647  			},
   648  			Spec: template.Spec.Spec,
   649  		}
   650  		metrics.ResourceClaimCreateAttempts.Inc()
   651  		claimName := claim.Name
   652  		claim, err = ec.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Create(ctx, claim, metav1.CreateOptions{})
   653  		if err != nil {
   654  			metrics.ResourceClaimCreateFailures.Inc()
   655  			return fmt.Errorf("create ResourceClaim %s: %v", claimName, err)
   656  		}
   657  		ec.claimCache.Mutation(claim)
   658  	}
   659  
   660  	// Remember the new ResourceClaim for a batch PodStatus update in our caller.
   661  	if *newPodClaims == nil {
   662  		*newPodClaims = make(map[string]string)
   663  	}
   664  	(*newPodClaims)[podClaim.Name] = claim.Name
   665  
   666  	return nil
   667  }
   668  
   669  // findPodResourceClaim looks for an existing ResourceClaim with the right
   670  // annotation (ties it to the pod claim) and the right ownership (ties it to
   671  // the pod).
   672  func (ec *Controller) findPodResourceClaim(pod *v1.Pod, podClaim v1.PodResourceClaim) (*resourcev1alpha2.ResourceClaim, error) {
   673  	// Only claims owned by the pod will get returned here.
   674  	claims, err := ec.claimCache.ByIndex(claimPodOwnerIndex, string(pod.UID))
   675  	if err != nil {
   676  		return nil, err
   677  	}
   678  	deterministicName := pod.Name + "-" + podClaim.Name // Kubernetes <= 1.27 behavior.
   679  	for _, claimObj := range claims {
   680  		claim, ok := claimObj.(*resourcev1alpha2.ResourceClaim)
   681  		if !ok {
   682  			return nil, fmt.Errorf("unexpected object of type %T returned by claim cache", claimObj)
   683  		}
   684  		podClaimName, ok := claim.Annotations[podResourceClaimAnnotation]
   685  		if ok && podClaimName != podClaim.Name {
   686  			continue
   687  		}
   688  
   689  		// No annotation? It might a ResourceClaim created for
   690  		// the pod with a previous Kubernetes release where the
   691  		// ResourceClaim name was deterministic, in which case
   692  		// we have to use it and update the new pod status
   693  		// field accordingly.
   694  		if !ok && claim.Name != deterministicName {
   695  			continue
   696  		}
   697  
   698  		// Pick the first one that matches. There shouldn't be more than one. If there is,
   699  		// then all others will be ignored until the pod gets deleted. Then they also get
   700  		// cleaned up.
   701  		return claim, nil
   702  	}
   703  	return nil, nil
   704  }
   705  
   706  func (ec *Controller) ensurePodSchedulingContext(ctx context.Context, pod *v1.Pod) error {
   707  	scheduling, err := ec.podSchedulingLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
   708  	if err != nil && !apierrors.IsNotFound(err) {
   709  		return fmt.Errorf("retrieve PodSchedulingContext: %v", err)
   710  	}
   711  	if scheduling == nil {
   712  		scheduling = &resourcev1alpha2.PodSchedulingContext{
   713  			ObjectMeta: metav1.ObjectMeta{
   714  				Name:      pod.Name,
   715  				Namespace: pod.Namespace,
   716  				OwnerReferences: []metav1.OwnerReference{
   717  					{
   718  						APIVersion: "v1",
   719  						Kind:       "Pod",
   720  						Name:       pod.Name,
   721  						UID:        pod.UID,
   722  						Controller: pointer.Bool(true),
   723  					},
   724  				},
   725  			},
   726  			Spec: resourcev1alpha2.PodSchedulingContextSpec{
   727  				SelectedNode: pod.Spec.NodeName,
   728  				// There is no need for negotiation about
   729  				// potential and suitable nodes anymore, so
   730  				// PotentialNodes can be left empty.
   731  			},
   732  		}
   733  		if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Create(ctx, scheduling, metav1.CreateOptions{}); err != nil {
   734  			return fmt.Errorf("create PodSchedulingContext: %v", err)
   735  		}
   736  		return nil
   737  	}
   738  
   739  	if scheduling.Spec.SelectedNode != pod.Spec.NodeName {
   740  		scheduling := scheduling.DeepCopy()
   741  		scheduling.Spec.SelectedNode = pod.Spec.NodeName
   742  		if _, err := ec.kubeClient.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Update(ctx, scheduling, metav1.UpdateOptions{}); err != nil {
   743  			return fmt.Errorf("update spec.selectedNode in PodSchedulingContext: %v", err)
   744  		}
   745  	}
   746  
   747  	return nil
   748  }
   749  
   750  func (ec *Controller) reserveForPod(ctx context.Context, pod *v1.Pod, claim *resourcev1alpha2.ResourceClaim) error {
   751  	claim = claim.DeepCopy()
   752  	claim.Status.ReservedFor = append(claim.Status.ReservedFor,
   753  		resourcev1alpha2.ResourceClaimConsumerReference{
   754  			Resource: "pods",
   755  			Name:     pod.Name,
   756  			UID:      pod.UID,
   757  		})
   758  	if _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
   759  		return fmt.Errorf("reserve claim for pod: %v", err)
   760  	}
   761  	return nil
   762  }
   763  
   764  func (ec *Controller) syncClaim(ctx context.Context, namespace, name string) error {
   765  	logger := klog.LoggerWithValues(klog.FromContext(ctx), "claim", klog.KRef(namespace, name))
   766  	ctx = klog.NewContext(ctx, logger)
   767  	claim, err := ec.claimLister.ResourceClaims(namespace).Get(name)
   768  	if err != nil {
   769  		if apierrors.IsNotFound(err) {
   770  			logger.V(5).Info("nothing to do for claim, it is gone")
   771  			return nil
   772  		}
   773  		return err
   774  	}
   775  
   776  	// Check if the ReservedFor entries are all still valid.
   777  	valid := make([]resourcev1alpha2.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor))
   778  	for _, reservedFor := range claim.Status.ReservedFor {
   779  		if reservedFor.APIGroup == "" &&
   780  			reservedFor.Resource == "pods" {
   781  			// A pod falls into one of three categories:
   782  			// - we have it in our cache -> don't remove it until we are told that it got removed
   783  			// - we don't have it in our cache anymore, but we have seen it before -> it was deleted, remove it
   784  			// - not in our cache, not seen -> double-check with API server before removal
   785  
   786  			keepEntry := true
   787  
   788  			// Tracking deleted pods in the LRU cache is an
   789  			// optimization. Without this cache, the code would
   790  			// have to do the API call below for every deleted pod
   791  			// to ensure that the pod really doesn't exist. With
   792  			// the cache, most of the time the pod will be recorded
   793  			// as deleted and the API call can be avoided.
   794  			if ec.deletedObjects.Has(reservedFor.UID) {
   795  				// We know that the pod was deleted. This is
   796  				// easy to check and thus is done first.
   797  				keepEntry = false
   798  			} else {
   799  				pod, err := ec.podLister.Pods(claim.Namespace).Get(reservedFor.Name)
   800  				switch {
   801  				case err != nil && !apierrors.IsNotFound(err):
   802  					return err
   803  				case err != nil:
   804  					// We might not have it in our informer cache
   805  					// yet. Removing the pod while the scheduler is
   806  					// scheduling it would be bad. We have to be
   807  					// absolutely sure and thus have to check with
   808  					// the API server.
   809  					pod, err := ec.kubeClient.CoreV1().Pods(claim.Namespace).Get(ctx, reservedFor.Name, metav1.GetOptions{})
   810  					if err != nil && !apierrors.IsNotFound(err) {
   811  						return err
   812  					}
   813  					if pod == nil || pod.UID != reservedFor.UID {
   814  						logger.V(6).Info("remove reservation because pod is gone or got replaced", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
   815  						keepEntry = false
   816  					}
   817  				case pod.UID != reservedFor.UID:
   818  					logger.V(6).Info("remove reservation because pod got replaced with new instance", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
   819  					keepEntry = false
   820  				case isPodDone(pod):
   821  					logger.V(6).Info("remove reservation because pod will not run anymore", "pod", klog.KObj(pod), "claim", klog.KRef(namespace, name))
   822  					keepEntry = false
   823  				}
   824  			}
   825  
   826  			if keepEntry {
   827  				valid = append(valid, reservedFor)
   828  			}
   829  			continue
   830  		}
   831  
   832  		// TODO: support generic object lookup
   833  		return fmt.Errorf("unsupported ReservedFor entry: %v", reservedFor)
   834  	}
   835  
   836  	builtinControllerFinalizer := slices.Index(claim.Finalizers, resourcev1alpha2.Finalizer)
   837  	logger.V(5).Info("claim reserved for counts", "currentCount", len(claim.Status.ReservedFor), "claim", klog.KRef(namespace, name), "updatedCount", len(valid), "builtinController", builtinControllerFinalizer >= 0)
   838  	if len(valid) < len(claim.Status.ReservedFor) {
   839  		// This is not using a patch because we want the update to fail if anything
   840  		// changed in the meantime.
   841  		claim := claim.DeepCopy()
   842  		claim.Status.ReservedFor = valid
   843  
   844  		// When a ResourceClaim uses delayed allocation, then it makes sense to
   845  		// deallocate the claim as soon as the last consumer stops using
   846  		// it. This ensures that the claim can be allocated again as needed by
   847  		// some future consumer instead of trying to schedule that consumer
   848  		// onto the node that was chosen for the previous consumer. It also
   849  		// releases the underlying resources for use by other claims.
   850  		//
   851  		// This has to be triggered by the transition from "was being used" to
   852  		// "is not used anymore" because a DRA driver is not required to set
   853  		// `status.reservedFor` together with `status.allocation`, i.e. a claim
   854  		// that is "currently unused" should not get deallocated.
   855  		//
   856  		// This does not matter for claims that were created for a pod. For
   857  		// those, the resource claim controller will trigger deletion when the
   858  		// pod is done. However, it doesn't hurt to also trigger deallocation
   859  		// for such claims and not checking for them keeps this code simpler.
   860  		if len(valid) == 0 {
   861  			if builtinControllerFinalizer >= 0 {
   862  				if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer ||
   863  					claim.DeletionTimestamp != nil {
   864  					// Allocated by scheduler with structured parameters. We can "deallocate"
   865  					// by clearing the allocation.
   866  					claim.Status.Allocation = nil
   867  				}
   868  			} else if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer {
   869  				// DRA driver controller in the control plane
   870  				// needs to do the deallocation.
   871  				claim.Status.DeallocationRequested = true
   872  			}
   873  			// In all other cases, we keep the claim allocated, in particular for immediate allocation
   874  			// with a control plane controller.
   875  		}
   876  
   877  		claim, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
   878  		if err != nil {
   879  			return err
   880  		}
   881  
   882  		// Now also remove the finalizer if it is not needed anymore.
   883  		// Note that the index may have changed as a result of the UpdateStatus call.
   884  		builtinControllerFinalizer := slices.Index(claim.Finalizers, resourcev1alpha2.Finalizer)
   885  		if builtinControllerFinalizer >= 0 && claim.Status.Allocation == nil {
   886  			claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1)
   887  			if _, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
   888  				return err
   889  			}
   890  		}
   891  	} else if builtinControllerFinalizer >= 0 && claim.DeletionTimestamp != nil && len(valid) == 0 {
   892  		claim := claim.DeepCopy()
   893  		if claim.Status.Allocation != nil {
   894  			// This can happen when a claim with immediate allocation
   895  			// stopped being used, remained allocated, and then got
   896  			// deleted. As above we then need to clear the allocation.
   897  			claim.Status.Allocation = nil
   898  			var err error
   899  			claim, err = ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{})
   900  			if err != nil {
   901  				return err
   902  			}
   903  		}
   904  		// Whether it was allocated or not, remove the finalizer to unblock removal.
   905  		claim.Finalizers = slices.Delete(claim.Finalizers, builtinControllerFinalizer, builtinControllerFinalizer+1)
   906  		_, err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{})
   907  		if err != nil {
   908  			return err
   909  		}
   910  	}
   911  
   912  	if len(valid) == 0 {
   913  		// Claim is not reserved. If it was generated for a pod and
   914  		// that pod is not going to run, the claim can be
   915  		// deleted. Normally the garbage collector does that, but the
   916  		// pod itself might not get deleted for a while.
   917  		podName, podUID := owningPod(claim)
   918  		if podName != "" {
   919  			pod, err := ec.podLister.Pods(claim.Namespace).Get(podName)
   920  			switch {
   921  			case err == nil:
   922  				// Pod already replaced or not going to run?
   923  				if pod.UID != podUID || isPodDone(pod) {
   924  					// We are certain that the owning pod is not going to need
   925  					// the claim and therefore remove the claim.
   926  					logger.V(5).Info("deleting unused generated claim", "claim", klog.KObj(claim), "pod", klog.KObj(pod))
   927  					err := ec.kubeClient.ResourceV1alpha2().ResourceClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})
   928  					if err != nil {
   929  						return fmt.Errorf("delete claim: %v", err)
   930  					}
   931  				} else {
   932  					logger.V(6).Info("wrong pod content, not deleting claim", "claim", klog.KObj(claim), "podUID", podUID, "podContent", pod)
   933  				}
   934  			case apierrors.IsNotFound(err):
   935  				// We might not know the pod *yet*. Instead of doing an expensive API call,
   936  				// let the garbage collector handle the case that the pod is truly gone.
   937  				logger.V(5).Info("pod for claim not found", "claim", klog.KObj(claim), "pod", klog.KRef(claim.Namespace, podName))
   938  			default:
   939  				return fmt.Errorf("lookup pod: %v", err)
   940  			}
   941  		} else {
   942  			logger.V(5).Info("claim not generated for a pod", "claim", klog.KObj(claim))
   943  		}
   944  	}
   945  
   946  	return nil
   947  }
   948  
   949  func owningPod(claim *resourcev1alpha2.ResourceClaim) (string, types.UID) {
   950  	for _, owner := range claim.OwnerReferences {
   951  		if pointer.BoolDeref(owner.Controller, false) &&
   952  			owner.APIVersion == "v1" &&
   953  			owner.Kind == "Pod" {
   954  			return owner.Name, owner.UID
   955  		}
   956  	}
   957  	return "", ""
   958  }
   959  
   960  // podResourceClaimIndexFunc is an index function that returns ResourceClaim keys (=
   961  // namespace/name) for ResourceClaim or ResourceClaimTemplates in a given pod.
   962  func podResourceClaimIndexFunc(obj interface{}) ([]string, error) {
   963  	pod, ok := obj.(*v1.Pod)
   964  	if !ok {
   965  		return []string{}, nil
   966  	}
   967  	keys := []string{}
   968  	for _, podClaim := range pod.Spec.ResourceClaims {
   969  		claimName, _, err := resourceclaim.Name(pod, &podClaim)
   970  		if err != nil || claimName == nil {
   971  			// Index functions are not supposed to fail, the caller will panic.
   972  			// For both error reasons (claim not created yet, unknown API)
   973  			// we simply don't index.
   974  			continue
   975  		}
   976  		keys = append(keys, fmt.Sprintf("%s/%s", pod.Namespace, *claimName))
   977  	}
   978  	return keys, nil
   979  }
   980  
   981  // isPodDone returns true if it is certain that none of the containers are running and never will run.
   982  func isPodDone(pod *v1.Pod) bool {
   983  	return podutil.IsPodPhaseTerminal(pod.Status.Phase) ||
   984  		// Deleted and not scheduled:
   985  		pod.DeletionTimestamp != nil && pod.Spec.NodeName == ""
   986  }
   987  
   988  // claimPodOwnerIndexFunc is an index function that returns the pod UIDs of
   989  // all pods which own the resource claim. Should only be one, though.
   990  func claimPodOwnerIndexFunc(obj interface{}) ([]string, error) {
   991  	claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
   992  	if !ok {
   993  		return nil, nil
   994  	}
   995  	var keys []string
   996  	for _, owner := range claim.OwnerReferences {
   997  		if owner.Controller != nil &&
   998  			*owner.Controller &&
   999  			owner.APIVersion == "v1" &&
  1000  			owner.Kind == "Pod" {
  1001  			keys = append(keys, string(owner.UID))
  1002  		}
  1003  	}
  1004  	return keys, nil
  1005  }
  1006  

View as plain text