...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/dynamicresources

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package dynamicresources
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"errors"
    23  	"fmt"
    24  	"slices"
    25  	"sort"
    26  	"sync"
    27  
    28  	"github.com/google/go-cmp/cmp"
    29  
    30  	v1 "k8s.io/api/core/v1"
    31  	resourcev1alpha2 "k8s.io/api/resource/v1alpha2"
    32  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    33  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/labels"
    36  	"k8s.io/apimachinery/pkg/runtime"
    37  	"k8s.io/apimachinery/pkg/runtime/schema"
    38  	"k8s.io/apimachinery/pkg/types"
    39  	"k8s.io/apimachinery/pkg/util/sets"
    40  	resourcev1alpha2apply "k8s.io/client-go/applyconfigurations/resource/v1alpha2"
    41  	"k8s.io/client-go/kubernetes"
    42  	resourcev1alpha2listers "k8s.io/client-go/listers/resource/v1alpha2"
    43  	"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
    44  	"k8s.io/dynamic-resource-allocation/resourceclaim"
    45  	"k8s.io/klog/v2"
    46  	"k8s.io/kubernetes/pkg/scheduler/framework"
    47  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
    48  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    49  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
    50  	schedutil "k8s.io/kubernetes/pkg/scheduler/util"
    51  	"k8s.io/utils/ptr"
    52  )
    53  
    54  const (
    55  	// Name is the name of the plugin used in Registry and configurations.
    56  	Name = names.DynamicResources
    57  
    58  	stateKey framework.StateKey = Name
    59  )
    60  
    61  // The state is initialized in PreFilter phase. Because we save the pointer in
    62  // framework.CycleState, in the later phases we don't need to call Write method
    63  // to update the value
    64  type stateData struct {
    65  	// preScored is true if PreScore was invoked.
    66  	preScored bool
    67  
    68  	// A copy of all claims for the Pod (i.e. 1:1 match with
    69  	// pod.Spec.ResourceClaims), initially with the status from the start
    70  	// of the scheduling cycle. Each claim instance is read-only because it
    71  	// might come from the informer cache. The instances get replaced when
    72  	// the plugin itself successfully does an Update.
    73  	//
    74  	// Empty if the Pod has no claims.
    75  	claims []*resourcev1alpha2.ResourceClaim
    76  
    77  	// podSchedulingState keeps track of the PodSchedulingContext
    78  	// (if one exists) and the changes made to it.
    79  	podSchedulingState podSchedulingState
    80  
    81  	// resourceModel contains the information about available and allocated resources when using
    82  	// structured parameters and the pod needs this information.
    83  	resources resources
    84  
    85  	// mutex must be locked while accessing any of the fields below.
    86  	mutex sync.Mutex
    87  
    88  	// The indices of all claims that:
    89  	// - are allocated
    90  	// - use delayed allocation or the builtin controller
    91  	// - were not available on at least one node
    92  	//
    93  	// Set in parallel during Filter, so write access there must be
    94  	// protected by the mutex. Used by PostFilter.
    95  	unavailableClaims sets.Set[int]
    96  
    97  	informationsForClaim []informationForClaim
    98  }
    99  
   100  func (d *stateData) Clone() framework.StateData {
   101  	return d
   102  }
   103  
   104  type informationForClaim struct {
   105  	// The availableOnNode node filter of the claim converted from the
   106  	// v1 API to nodeaffinity.NodeSelector by PreFilter for repeated
   107  	// evaluation in Filter. Nil for claim which don't have it.
   108  	availableOnNode *nodeaffinity.NodeSelector
   109  
   110  	// The status of the claim got from the
   111  	// schedulingCtx by PreFilter for repeated
   112  	// evaluation in Filter. Nil for claim which don't have it.
   113  	status *resourcev1alpha2.ResourceClaimSchedulingStatus
   114  
   115  	// structuredParameters is true if the claim is handled via the builtin
   116  	// controller.
   117  	structuredParameters bool
   118  	controller           *claimController
   119  
   120  	// Set by Reserved, published by PreBind.
   121  	allocation           *resourcev1alpha2.AllocationResult
   122  	allocationDriverName string
   123  }
   124  
   125  type podSchedulingState struct {
   126  	// A pointer to the PodSchedulingContext object for the pod, if one exists
   127  	// in the API server.
   128  	//
   129  	// Conceptually, this object belongs into the scheduler framework
   130  	// where it might get shared by different plugins. But in practice,
   131  	// it is currently only used by dynamic provisioning and thus
   132  	// managed entirely here.
   133  	schedulingCtx *resourcev1alpha2.PodSchedulingContext
   134  
   135  	// selectedNode is set if (and only if) a node has been selected.
   136  	selectedNode *string
   137  
   138  	// potentialNodes is set if (and only if) the potential nodes field
   139  	// needs to be updated or set.
   140  	potentialNodes *[]string
   141  }
   142  
   143  func (p *podSchedulingState) isDirty() bool {
   144  	return p.selectedNode != nil ||
   145  		p.potentialNodes != nil
   146  }
   147  
   148  // init checks whether there is already a PodSchedulingContext object.
   149  // Must not be called concurrently,
   150  func (p *podSchedulingState) init(ctx context.Context, pod *v1.Pod, podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister) error {
   151  	schedulingCtx, err := podSchedulingContextLister.PodSchedulingContexts(pod.Namespace).Get(pod.Name)
   152  	switch {
   153  	case apierrors.IsNotFound(err):
   154  		return nil
   155  	case err != nil:
   156  		return err
   157  	default:
   158  		// We have an object, but it might be obsolete.
   159  		if !metav1.IsControlledBy(schedulingCtx, pod) {
   160  			return fmt.Errorf("PodSchedulingContext object with UID %s is not owned by Pod %s/%s", schedulingCtx.UID, pod.Namespace, pod.Name)
   161  		}
   162  	}
   163  	p.schedulingCtx = schedulingCtx
   164  	return nil
   165  }
   166  
   167  // publish creates or updates the PodSchedulingContext object, if necessary.
   168  // Must not be called concurrently.
   169  func (p *podSchedulingState) publish(ctx context.Context, pod *v1.Pod, clientset kubernetes.Interface) error {
   170  	if !p.isDirty() {
   171  		return nil
   172  	}
   173  
   174  	var err error
   175  	logger := klog.FromContext(ctx)
   176  	if p.schedulingCtx != nil {
   177  		// Update it.
   178  		schedulingCtx := p.schedulingCtx.DeepCopy()
   179  		if p.selectedNode != nil {
   180  			schedulingCtx.Spec.SelectedNode = *p.selectedNode
   181  		}
   182  		if p.potentialNodes != nil {
   183  			schedulingCtx.Spec.PotentialNodes = *p.potentialNodes
   184  		}
   185  		if loggerV := logger.V(6); loggerV.Enabled() {
   186  			// At a high enough log level, dump the entire object.
   187  			loggerV.Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx))
   188  		} else {
   189  			logger.V(5).Info("Updating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx))
   190  		}
   191  		_, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Update(ctx, schedulingCtx, metav1.UpdateOptions{})
   192  		if apierrors.IsConflict(err) {
   193  			// We don't use SSA by default for performance reasons
   194  			// (https://github.com/kubernetes/kubernetes/issues/113700#issuecomment-1698563918)
   195  			// because most of the time an Update doesn't encounter
   196  			// a conflict and is faster.
   197  			//
   198  			// We could return an error here and rely on
   199  			// backoff+retry, but scheduling attempts are expensive
   200  			// and the backoff delay would cause a (small)
   201  			// slowdown. Therefore we fall back to SSA here if needed.
   202  			//
   203  			// Using SSA instead of Get+Update has the advantage that
   204  			// there is no delay for the Get. SSA is safe because only
   205  			// the scheduler updates these fields.
   206  			spec := resourcev1alpha2apply.PodSchedulingContextSpec()
   207  			spec.SelectedNode = p.selectedNode
   208  			if p.potentialNodes != nil {
   209  				spec.PotentialNodes = *p.potentialNodes
   210  			} else {
   211  				// Unchanged. Has to be set because the object that we send
   212  				// must represent the "fully specified intent". Not sending
   213  				// the list would clear it.
   214  				spec.PotentialNodes = p.schedulingCtx.Spec.PotentialNodes
   215  			}
   216  			schedulingCtxApply := resourcev1alpha2apply.PodSchedulingContext(pod.Name, pod.Namespace).WithSpec(spec)
   217  
   218  			if loggerV := logger.V(6); loggerV.Enabled() {
   219  				// At a high enough log level, dump the entire object.
   220  				loggerV.Info("Patching PodSchedulingContext", "podSchedulingCtx", klog.KObj(pod), "podSchedulingCtxApply", klog.Format(schedulingCtxApply))
   221  			} else {
   222  				logger.V(5).Info("Patching PodSchedulingContext", "podSchedulingCtx", klog.KObj(pod))
   223  			}
   224  			_, err = clientset.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Apply(ctx, schedulingCtxApply, metav1.ApplyOptions{FieldManager: "kube-scheduler", Force: true})
   225  		}
   226  
   227  	} else {
   228  		// Create it.
   229  		schedulingCtx := &resourcev1alpha2.PodSchedulingContext{
   230  			ObjectMeta: metav1.ObjectMeta{
   231  				Name:            pod.Name,
   232  				Namespace:       pod.Namespace,
   233  				OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(pod, schema.GroupVersionKind{Version: "v1", Kind: "Pod"})},
   234  			},
   235  		}
   236  		if p.selectedNode != nil {
   237  			schedulingCtx.Spec.SelectedNode = *p.selectedNode
   238  		}
   239  		if p.potentialNodes != nil {
   240  			schedulingCtx.Spec.PotentialNodes = *p.potentialNodes
   241  		}
   242  		if loggerV := logger.V(6); loggerV.Enabled() {
   243  			// At a high enough log level, dump the entire object.
   244  			loggerV.Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx), "podSchedulingCtxObject", klog.Format(schedulingCtx))
   245  		} else {
   246  			logger.V(5).Info("Creating PodSchedulingContext", "podSchedulingCtx", klog.KObj(schedulingCtx))
   247  		}
   248  		_, err = clientset.ResourceV1alpha2().PodSchedulingContexts(schedulingCtx.Namespace).Create(ctx, schedulingCtx, metav1.CreateOptions{})
   249  	}
   250  	if err != nil {
   251  		return err
   252  	}
   253  	p.potentialNodes = nil
   254  	p.selectedNode = nil
   255  	return nil
   256  }
   257  
   258  func statusForClaim(schedulingCtx *resourcev1alpha2.PodSchedulingContext, podClaimName string) *resourcev1alpha2.ResourceClaimSchedulingStatus {
   259  	if schedulingCtx == nil {
   260  		return nil
   261  	}
   262  	for _, status := range schedulingCtx.Status.ResourceClaims {
   263  		if status.Name == podClaimName {
   264  			return &status
   265  		}
   266  	}
   267  	return nil
   268  }
   269  
   270  // dynamicResources is a plugin that ensures that ResourceClaims are allocated.
   271  type dynamicResources struct {
   272  	enabled                    bool
   273  	fh                         framework.Handle
   274  	clientset                  kubernetes.Interface
   275  	claimLister                resourcev1alpha2listers.ResourceClaimLister
   276  	classLister                resourcev1alpha2listers.ResourceClassLister
   277  	podSchedulingContextLister resourcev1alpha2listers.PodSchedulingContextLister
   278  	claimParametersLister      resourcev1alpha2listers.ResourceClaimParametersLister
   279  	classParametersLister      resourcev1alpha2listers.ResourceClassParametersLister
   280  	resourceSliceLister        resourcev1alpha2listers.ResourceSliceLister
   281  	claimNameLookup            *resourceclaim.Lookup
   282  
   283  	// claimAssumeCache enables temporarily storing a newer claim object
   284  	// while the scheduler has allocated it and the corresponding object
   285  	// update from the apiserver has not been processed by the claim
   286  	// informer callbacks. Claims get added here in PreBind and removed by
   287  	// the informer callback (based on the "newer than" comparison in the
   288  	// assume cache).
   289  	//
   290  	// It uses cache.MetaNamespaceKeyFunc to generate object names, which
   291  	// therefore are "<namespace>/<name>".
   292  	//
   293  	// This is necessary to ensure that reconstructing the resource usage
   294  	// at the start of a pod scheduling cycle doesn't reuse the resources
   295  	// assigned to such a claim. Alternatively, claim allocation state
   296  	// could also get tracked across pod scheduling cycles, but that
   297  	// - adds complexity (need to carefully sync state with informer events
   298  	//   for claims and ResourceSlices)
   299  	// - would make integration with cluster autoscaler harder because it would need
   300  	//   to trigger informer callbacks.
   301  	//
   302  	// When implementing cluster autoscaler support, this assume cache or
   303  	// something like it (see https://github.com/kubernetes/kubernetes/pull/112202)
   304  	// might have to be managed by the cluster autoscaler.
   305  	claimAssumeCache volumebinding.AssumeCache
   306  
   307  	// inFlightAllocations is map from claim UUIDs to claim objects for those claims
   308  	// for which allocation was triggered during a scheduling cycle and the
   309  	// corresponding claim status update call in PreBind has not been done
   310  	// yet. If another pod needs the claim, the pod is treated as "not
   311  	// schedulable yet". The cluster event for the claim status update will
   312  	// make it schedulable.
   313  	//
   314  	// This mechanism avoids the following problem:
   315  	// - Pod A triggers allocation for claim X.
   316  	// - Pod B shares access to that claim and gets scheduled because
   317  	//   the claim is assumed to be allocated.
   318  	// - PreBind for pod B is called first, tries to update reservedFor and
   319  	//   fails because the claim is not really allocated yet.
   320  	//
   321  	// We could avoid the ordering problem by allowing either pod A or pod B
   322  	// to set the allocation. But that is more complicated and leads to another
   323  	// problem:
   324  	// - Pod A and B get scheduled as above.
   325  	// - PreBind for pod A gets called first, then fails with a temporary API error.
   326  	//   It removes the updated claim from the assume cache because of that.
   327  	// - PreBind for pod B gets called next and succeeds with adding the
   328  	//   allocation and its own reservedFor entry.
   329  	// - The assume cache is now not reflecting that the claim is allocated,
   330  	//   which could lead to reusing the same resource for some other claim.
   331  	//
   332  	// A sync.Map is used because in practice sharing of a claim between
   333  	// pods is expected to be rare compared to per-pod claim, so we end up
   334  	// hitting the "multiple goroutines read, write, and overwrite entries
   335  	// for disjoint sets of keys" case that sync.Map is optimized for.
   336  	inFlightAllocations sync.Map
   337  }
   338  
   339  // New initializes a new plugin and returns it.
   340  func New(ctx context.Context, plArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
   341  	if !fts.EnableDynamicResourceAllocation {
   342  		// Disabled, won't do anything.
   343  		return &dynamicResources{}, nil
   344  	}
   345  
   346  	logger := klog.FromContext(ctx)
   347  	pl := &dynamicResources{
   348  		enabled:                    true,
   349  		fh:                         fh,
   350  		clientset:                  fh.ClientSet(),
   351  		claimLister:                fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Lister(),
   352  		classLister:                fh.SharedInformerFactory().Resource().V1alpha2().ResourceClasses().Lister(),
   353  		podSchedulingContextLister: fh.SharedInformerFactory().Resource().V1alpha2().PodSchedulingContexts().Lister(),
   354  		claimParametersLister:      fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaimParameters().Lister(),
   355  		classParametersLister:      fh.SharedInformerFactory().Resource().V1alpha2().ResourceClassParameters().Lister(),
   356  		resourceSliceLister:        fh.SharedInformerFactory().Resource().V1alpha2().ResourceSlices().Lister(),
   357  		claimNameLookup:            resourceclaim.NewNameLookup(fh.ClientSet()),
   358  		claimAssumeCache:           volumebinding.NewAssumeCache(logger, fh.SharedInformerFactory().Resource().V1alpha2().ResourceClaims().Informer(), "claim", "", nil),
   359  	}
   360  
   361  	return pl, nil
   362  }
   363  
   364  var _ framework.PreEnqueuePlugin = &dynamicResources{}
   365  var _ framework.PreFilterPlugin = &dynamicResources{}
   366  var _ framework.FilterPlugin = &dynamicResources{}
   367  var _ framework.PostFilterPlugin = &dynamicResources{}
   368  var _ framework.PreScorePlugin = &dynamicResources{}
   369  var _ framework.ReservePlugin = &dynamicResources{}
   370  var _ framework.EnqueueExtensions = &dynamicResources{}
   371  var _ framework.PreBindPlugin = &dynamicResources{}
   372  var _ framework.PostBindPlugin = &dynamicResources{}
   373  
   374  // Name returns name of the plugin. It is used in logs, etc.
   375  func (pl *dynamicResources) Name() string {
   376  	return Name
   377  }
   378  
   379  // EventsToRegister returns the possible events that may make a Pod
   380  // failed by this plugin schedulable.
   381  func (pl *dynamicResources) EventsToRegister() []framework.ClusterEventWithHint {
   382  	if !pl.enabled {
   383  		return nil
   384  	}
   385  
   386  	events := []framework.ClusterEventWithHint{
   387  		// Changes for claim or class parameters creation may make pods
   388  		// schedulable which depend on claims using those parameters.
   389  		{Event: framework.ClusterEvent{Resource: framework.ResourceClaimParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimParametersChange},
   390  		{Event: framework.ClusterEvent{Resource: framework.ResourceClassParameters, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClassParametersChange},
   391  
   392  		// Allocation is tracked in ResourceClaims, so any changes may make the pods schedulable.
   393  		{Event: framework.ClusterEvent{Resource: framework.ResourceClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterClaimChange},
   394  		// When a driver has provided additional information, a pod waiting for that information
   395  		// may be schedulable.
   396  		{Event: framework.ClusterEvent{Resource: framework.PodSchedulingContext, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPodSchedulingContextChange},
   397  		// A resource might depend on node labels for topology filtering.
   398  		// A new or updated node may make pods schedulable.
   399  		//
   400  		// A note about UpdateNodeTaint event:
   401  		// NodeAdd QueueingHint isn't always called because of the internal feature called preCheck.
   402  		// As a common problematic scenario,
   403  		// when a node is added but not ready, NodeAdd event is filtered out by preCheck and doesn't arrive.
   404  		// In such cases, this plugin may miss some events that actually make pods schedulable.
   405  		// As a workaround, we add UpdateNodeTaint event to catch the case.
   406  		// We can remove UpdateNodeTaint when we remove the preCheck feature.
   407  		// See: https://github.com/kubernetes/kubernetes/issues/110175
   408  		{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add | framework.UpdateNodeLabel | framework.UpdateNodeTaint}},
   409  		// A pod might be waiting for a class to get created or modified.
   410  		{Event: framework.ClusterEvent{Resource: framework.ResourceClass, ActionType: framework.Add | framework.Update}},
   411  	}
   412  	return events
   413  }
   414  
   415  // PreEnqueue checks if there are known reasons why a pod currently cannot be
   416  // scheduled. When this fails, one of the registered events can trigger another
   417  // attempt.
   418  func (pl *dynamicResources) PreEnqueue(ctx context.Context, pod *v1.Pod) (status *framework.Status) {
   419  	if err := pl.foreachPodResourceClaim(pod, nil); err != nil {
   420  		return statusUnschedulable(klog.FromContext(ctx), err.Error())
   421  	}
   422  	return nil
   423  }
   424  
   425  // isSchedulableAfterClaimParametersChange is invoked for add and update claim parameters events reported by
   426  // an informer. It checks whether that change made a previously unschedulable
   427  // pod schedulable. It errs on the side of letting a pod scheduling attempt
   428  // happen. The delete claim event will not invoke it, so newObj will never be nil.
   429  func (pl *dynamicResources) isSchedulableAfterClaimParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
   430  	originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClaimParameters](oldObj, newObj)
   431  	if err != nil {
   432  		// Shouldn't happen.
   433  		return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimParametersChange: %w", err)
   434  	}
   435  
   436  	usesParameters := false
   437  	if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
   438  		ref := claim.Spec.ParametersRef
   439  		if ref == nil {
   440  			return
   441  		}
   442  
   443  		// Using in-tree parameters directly?
   444  		if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
   445  			ref.Kind == "ResourceClaimParameters" {
   446  			if modifiedParameters.Name == ref.Name {
   447  				usesParameters = true
   448  			}
   449  			return
   450  		}
   451  
   452  		// Need to look for translated parameters.
   453  		generatedFrom := modifiedParameters.GeneratedFrom
   454  		if generatedFrom == nil {
   455  			return
   456  		}
   457  		if generatedFrom.APIGroup == ref.APIGroup &&
   458  			generatedFrom.Kind == ref.Kind &&
   459  			generatedFrom.Name == ref.Name {
   460  			usesParameters = true
   461  		}
   462  	}); err != nil {
   463  		// This is not an unexpected error: we know that
   464  		// foreachPodResourceClaim only returns errors for "not
   465  		// schedulable".
   466  		logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedParameters), "reason", err.Error())
   467  		return framework.QueueSkip, nil
   468  	}
   469  
   470  	if !usesParameters {
   471  		// This were not the parameters the pod was waiting for.
   472  		logger.V(6).Info("unrelated claim parameters got modified", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
   473  		return framework.QueueSkip, nil
   474  	}
   475  
   476  	if originalParameters == nil {
   477  		logger.V(4).Info("claim parameters for pod got created", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
   478  		return framework.Queue, nil
   479  	}
   480  
   481  	// Modifications may or may not be relevant. If the entire
   482  	// requests are as before, then something else must have changed
   483  	// and we don't care.
   484  	if apiequality.Semantic.DeepEqual(&originalParameters.DriverRequests, &modifiedParameters.DriverRequests) {
   485  		logger.V(6).Info("claim parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
   486  		return framework.QueueSkip, nil
   487  	}
   488  
   489  	logger.V(4).Info("requests in claim parameters for pod got updated", "pod", klog.KObj(pod), "claimParameters", klog.KObj(modifiedParameters))
   490  	return framework.Queue, nil
   491  }
   492  
   493  // isSchedulableAfterClassParametersChange is invoked for add and update class parameters events reported by
   494  // an informer. It checks whether that change made a previously unschedulable
   495  // pod schedulable. It errs on the side of letting a pod scheduling attempt
   496  // happen. The delete class event will not invoke it, so newObj will never be nil.
   497  func (pl *dynamicResources) isSchedulableAfterClassParametersChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
   498  	originalParameters, modifiedParameters, err := schedutil.As[*resourcev1alpha2.ResourceClassParameters](oldObj, newObj)
   499  	if err != nil {
   500  		// Shouldn't happen.
   501  		return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClassParametersChange: %w", err)
   502  	}
   503  
   504  	usesParameters := false
   505  	if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
   506  		class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
   507  		if err != nil {
   508  			if !apierrors.IsNotFound(err) {
   509  				logger.Error(err, "look up resource class")
   510  			}
   511  			return
   512  		}
   513  		ref := class.ParametersRef
   514  		if ref == nil {
   515  			return
   516  		}
   517  
   518  		// Using in-tree parameters directly?
   519  		if ref.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
   520  			ref.Kind == "ResourceClassParameters" {
   521  			if modifiedParameters.Name == ref.Name {
   522  				usesParameters = true
   523  			}
   524  			return
   525  		}
   526  
   527  		// Need to look for translated parameters.
   528  		generatedFrom := modifiedParameters.GeneratedFrom
   529  		if generatedFrom == nil {
   530  			return
   531  		}
   532  		if generatedFrom.APIGroup == ref.APIGroup &&
   533  			generatedFrom.Kind == ref.Kind &&
   534  			generatedFrom.Name == ref.Name {
   535  			usesParameters = true
   536  		}
   537  	}); err != nil {
   538  		// This is not an unexpected error: we know that
   539  		// foreachPodResourceClaim only returns errors for "not
   540  		// schedulable".
   541  		logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters), "reason", err.Error())
   542  		return framework.QueueSkip, nil
   543  	}
   544  
   545  	if !usesParameters {
   546  		// This were not the parameters the pod was waiting for.
   547  		logger.V(6).Info("unrelated class parameters got modified", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
   548  		return framework.QueueSkip, nil
   549  	}
   550  
   551  	if originalParameters == nil {
   552  		logger.V(4).Info("class parameters for pod got created", "pod", klog.KObj(pod), "class", klog.KObj(modifiedParameters))
   553  		return framework.Queue, nil
   554  	}
   555  
   556  	// Modifications may or may not be relevant. If the entire
   557  	// requests are as before, then something else must have changed
   558  	// and we don't care.
   559  	if apiequality.Semantic.DeepEqual(&originalParameters.Filters, &modifiedParameters.Filters) {
   560  		logger.V(6).Info("class parameters for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
   561  		return framework.QueueSkip, nil
   562  	}
   563  
   564  	logger.V(4).Info("filters in class parameters for pod got updated", "pod", klog.KObj(pod), "classParameters", klog.KObj(modifiedParameters))
   565  	return framework.Queue, nil
   566  }
   567  
   568  // isSchedulableAfterClaimChange is invoked for add and update claim events reported by
   569  // an informer. It checks whether that change made a previously unschedulable
   570  // pod schedulable. It errs on the side of letting a pod scheduling attempt
   571  // happen. The delete claim event will not invoke it, so newObj will never be nil.
   572  func (pl *dynamicResources) isSchedulableAfterClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
   573  	originalClaim, modifiedClaim, err := schedutil.As[*resourcev1alpha2.ResourceClaim](oldObj, newObj)
   574  	if err != nil {
   575  		// Shouldn't happen.
   576  		return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterClaimChange: %w", err)
   577  	}
   578  
   579  	usesClaim := false
   580  	if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
   581  		if claim.UID == modifiedClaim.UID {
   582  			usesClaim = true
   583  		}
   584  	}); err != nil {
   585  		// This is not an unexpected error: we know that
   586  		// foreachPodResourceClaim only returns errors for "not
   587  		// schedulable".
   588  		logger.V(4).Info("pod is not schedulable", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "reason", err.Error())
   589  		return framework.QueueSkip, nil
   590  	}
   591  
   592  	if originalClaim != nil &&
   593  		resourceclaim.IsAllocatedWithStructuredParameters(originalClaim) &&
   594  		modifiedClaim.Status.Allocation == nil {
   595  		// A claim with structured parameters was deallocated. This might have made
   596  		// resources available for other pods.
   597  		//
   598  		// TODO (https://github.com/kubernetes/kubernetes/issues/123697):
   599  		// check that the pending claims depend on structured parameters (depends on refactoring foreachPodResourceClaim, see other TODO).
   600  		//
   601  		// There is a small race here:
   602  		// - The dynamicresources plugin allocates claim A and updates the assume cache.
   603  		// - A second pod gets marked as unschedulable based on that assume cache.
   604  		// - Before the informer cache here catches up, the pod runs, terminates and
   605  		//   the claim gets deallocated without ever sending the claim status with
   606  		//   allocation to the scheduler.
   607  		// - The comparison below is for a *very* old claim with no allocation and the
   608  		//   new claim where the allocation is already removed again, so no
   609  		//   RemovedClaimAllocation event gets emitted.
   610  		//
   611  		// This is extremely unlikely and thus a fix is not needed for alpha in Kubernetes 1.30.
   612  		// TODO (https://github.com/kubernetes/kubernetes/issues/123698): The solution is to somehow integrate the assume cache
   613  		// into the event mechanism. This can be tackled together with adding autoscaler
   614  		// support, which also needs to do something with the assume cache.
   615  		logger.V(6).Info("claim with structured parameters got deallocated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
   616  		return framework.Queue, nil
   617  	}
   618  
   619  	if !usesClaim {
   620  		// This was not the claim the pod was waiting for.
   621  		logger.V(6).Info("unrelated claim got modified", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
   622  		return framework.QueueSkip, nil
   623  	}
   624  
   625  	if originalClaim == nil {
   626  		logger.V(4).Info("claim for pod got created", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
   627  		return framework.Queue, nil
   628  	}
   629  
   630  	// Modifications may or may not be relevant. If the entire
   631  	// status is as before, then something else must have changed
   632  	// and we don't care. What happens in practice is that the
   633  	// resource driver adds the finalizer.
   634  	if apiequality.Semantic.DeepEqual(&originalClaim.Status, &modifiedClaim.Status) {
   635  		if loggerV := logger.V(7); loggerV.Enabled() {
   636  			// Log more information.
   637  			loggerV.Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim), "diff", cmp.Diff(originalClaim, modifiedClaim))
   638  		} else {
   639  			logger.V(6).Info("claim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
   640  		}
   641  		return framework.QueueSkip, nil
   642  	}
   643  
   644  	logger.V(4).Info("status of claim for pod got updated", "pod", klog.KObj(pod), "claim", klog.KObj(modifiedClaim))
   645  	return framework.Queue, nil
   646  }
   647  
   648  // isSchedulableAfterPodSchedulingContextChange is invoked for all
   649  // PodSchedulingContext events reported by an informer. It checks whether that
   650  // change made a previously unschedulable pod schedulable (updated) or a new
   651  // attempt is needed to re-create the object (deleted). It errs on the side of
   652  // letting a pod scheduling attempt happen.
   653  func (pl *dynamicResources) isSchedulableAfterPodSchedulingContextChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
   654  	// Deleted? That can happen because we ourselves delete the PodSchedulingContext while
   655  	// working on the pod. This can be ignored.
   656  	if oldObj != nil && newObj == nil {
   657  		logger.V(4).Info("PodSchedulingContext got deleted")
   658  		return framework.QueueSkip, nil
   659  	}
   660  
   661  	oldPodScheduling, newPodScheduling, err := schedutil.As[*resourcev1alpha2.PodSchedulingContext](oldObj, newObj)
   662  	if err != nil {
   663  		// Shouldn't happen.
   664  		return framework.Queue, fmt.Errorf("unexpected object in isSchedulableAfterPodSchedulingContextChange: %w", err)
   665  	}
   666  	podScheduling := newPodScheduling // Never nil because deletes are handled above.
   667  
   668  	if podScheduling.Name != pod.Name || podScheduling.Namespace != pod.Namespace {
   669  		logger.V(7).Info("PodSchedulingContext for unrelated pod got modified", "pod", klog.KObj(pod), "podScheduling", klog.KObj(podScheduling))
   670  		return framework.QueueSkip, nil
   671  	}
   672  
   673  	// If the drivers have provided information about all
   674  	// unallocated claims with delayed allocation, then the next
   675  	// scheduling attempt is able to pick a node, so we let it run
   676  	// immediately if this occurred for the first time, otherwise
   677  	// we allow backoff.
   678  	pendingDelayedClaims := 0
   679  	if err := pl.foreachPodResourceClaim(pod, func(podResourceName string, claim *resourcev1alpha2.ResourceClaim) {
   680  		if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer &&
   681  			claim.Status.Allocation == nil &&
   682  			!podSchedulingHasClaimInfo(podScheduling, podResourceName) {
   683  			pendingDelayedClaims++
   684  		}
   685  	}); err != nil {
   686  		// This is not an unexpected error: we know that
   687  		// foreachPodResourceClaim only returns errors for "not
   688  		// schedulable".
   689  		logger.V(4).Info("pod is not schedulable, keep waiting", "pod", klog.KObj(pod), "reason", err.Error())
   690  		return framework.QueueSkip, nil
   691  	}
   692  
   693  	// Some driver responses missing?
   694  	if pendingDelayedClaims > 0 {
   695  		// We could start a pod scheduling attempt to refresh the
   696  		// potential nodes list.  But pod scheduling attempts are
   697  		// expensive and doing them too often causes the pod to enter
   698  		// backoff. Let's wait instead for all drivers to reply.
   699  		if loggerV := logger.V(6); loggerV.Enabled() {
   700  			loggerV.Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling))
   701  		} else {
   702  			logger.V(5).Info("PodSchedulingContext with missing resource claim information, keep waiting", "pod", klog.KObj(pod))
   703  		}
   704  		return framework.QueueSkip, nil
   705  	}
   706  
   707  	if oldPodScheduling == nil /* create */ ||
   708  		len(oldPodScheduling.Status.ResourceClaims) < len(podScheduling.Status.ResourceClaims) /* new information and not incomplete (checked above) */ {
   709  		// This definitely is new information for the scheduler. Try again immediately.
   710  		logger.V(4).Info("PodSchedulingContext for pod has all required information, schedule immediately", "pod", klog.KObj(pod))
   711  		return framework.Queue, nil
   712  	}
   713  
   714  	// The other situation where the scheduler needs to do
   715  	// something immediately is when the selected node doesn't
   716  	// work: waiting in the backoff queue only helps eventually
   717  	// resources on the selected node become available again. It's
   718  	// much more likely, in particular when trying to fill up the
   719  	// cluster, that the choice simply didn't work out. The risk
   720  	// here is that in a situation where the cluster really is
   721  	// full, backoff won't be used because the scheduler keeps
   722  	// trying different nodes. This should not happen when it has
   723  	// full knowledge about resource availability (=
   724  	// PodSchedulingContext.*.UnsuitableNodes is complete) but may happen
   725  	// when it doesn't (= PodSchedulingContext.*.UnsuitableNodes had to be
   726  	// truncated).
   727  	//
   728  	// Truncation only happens for very large clusters and then may slow
   729  	// down scheduling, but should not break it completely. This is
   730  	// acceptable while DRA is alpha and will be investigated further
   731  	// before moving DRA to beta.
   732  	if podScheduling.Spec.SelectedNode != "" {
   733  		for _, claimStatus := range podScheduling.Status.ResourceClaims {
   734  			if sliceContains(claimStatus.UnsuitableNodes, podScheduling.Spec.SelectedNode) {
   735  				logger.V(5).Info("PodSchedulingContext has unsuitable selected node, schedule immediately", "pod", klog.KObj(pod), "selectedNode", podScheduling.Spec.SelectedNode, "podResourceName", claimStatus.Name)
   736  				return framework.Queue, nil
   737  			}
   738  		}
   739  	}
   740  
   741  	// Update with only the spec modified?
   742  	if oldPodScheduling != nil &&
   743  		!apiequality.Semantic.DeepEqual(&oldPodScheduling.Spec, &podScheduling.Spec) &&
   744  		apiequality.Semantic.DeepEqual(&oldPodScheduling.Status, &podScheduling.Status) {
   745  		logger.V(5).Info("PodSchedulingContext has only the scheduler spec changes, ignore the update", "pod", klog.KObj(pod))
   746  		return framework.QueueSkip, nil
   747  	}
   748  
   749  	// Once we get here, all changes which are known to require special responses
   750  	// have been checked for. Whatever the change was, we don't know exactly how
   751  	// to handle it and thus return Queue. This will cause the
   752  	// scheduler to treat the event as if no event hint callback had been provided.
   753  	// Developers who want to investigate this can enable a diff at log level 6.
   754  	if loggerV := logger.V(6); loggerV.Enabled() {
   755  		loggerV.Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod), "podSchedulingDiff", cmp.Diff(oldPodScheduling, podScheduling))
   756  	} else {
   757  		logger.V(5).Info("PodSchedulingContext for pod with unknown changes, maybe schedule", "pod", klog.KObj(pod))
   758  	}
   759  	return framework.Queue, nil
   760  
   761  }
   762  
   763  func podSchedulingHasClaimInfo(podScheduling *resourcev1alpha2.PodSchedulingContext, podResourceName string) bool {
   764  	for _, claimStatus := range podScheduling.Status.ResourceClaims {
   765  		if claimStatus.Name == podResourceName {
   766  			return true
   767  		}
   768  	}
   769  	return false
   770  }
   771  
   772  func sliceContains(hay []string, needle string) bool {
   773  	for _, item := range hay {
   774  		if item == needle {
   775  			return true
   776  		}
   777  	}
   778  	return false
   779  }
   780  
   781  // podResourceClaims returns the ResourceClaims for all pod.Spec.PodResourceClaims.
   782  func (pl *dynamicResources) podResourceClaims(pod *v1.Pod) ([]*resourcev1alpha2.ResourceClaim, error) {
   783  	claims := make([]*resourcev1alpha2.ResourceClaim, 0, len(pod.Spec.ResourceClaims))
   784  	if err := pl.foreachPodResourceClaim(pod, func(_ string, claim *resourcev1alpha2.ResourceClaim) {
   785  		// We store the pointer as returned by the lister. The
   786  		// assumption is that if a claim gets modified while our code
   787  		// runs, the cache will store a new pointer, not mutate the
   788  		// existing object that we point to here.
   789  		claims = append(claims, claim)
   790  	}); err != nil {
   791  		return nil, err
   792  	}
   793  	return claims, nil
   794  }
   795  
   796  // foreachPodResourceClaim checks that each ResourceClaim for the pod exists.
   797  // It calls an optional handler for those claims that it finds.
   798  func (pl *dynamicResources) foreachPodResourceClaim(pod *v1.Pod, cb func(podResourceName string, claim *resourcev1alpha2.ResourceClaim)) error {
   799  	for _, resource := range pod.Spec.ResourceClaims {
   800  		claimName, mustCheckOwner, err := pl.claimNameLookup.Name(pod, &resource)
   801  		if err != nil {
   802  			return err
   803  		}
   804  		// The claim name might be nil if no underlying resource claim
   805  		// was generated for the referenced claim. There are valid use
   806  		// cases when this might happen, so we simply skip it.
   807  		if claimName == nil {
   808  			continue
   809  		}
   810  		claim, err := pl.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
   811  		if err != nil {
   812  			return err
   813  		}
   814  
   815  		if claim.DeletionTimestamp != nil {
   816  			return fmt.Errorf("resourceclaim %q is being deleted", claim.Name)
   817  		}
   818  
   819  		if mustCheckOwner {
   820  			if err := resourceclaim.IsForPod(pod, claim); err != nil {
   821  				return err
   822  			}
   823  		}
   824  		if cb != nil {
   825  			cb(resource.Name, claim)
   826  		}
   827  	}
   828  	return nil
   829  }
   830  
   831  // PreFilter invoked at the prefilter extension point to check if pod has all
   832  // immediate claims bound. UnschedulableAndUnresolvable is returned if
   833  // the pod cannot be scheduled at the moment on any node.
   834  func (pl *dynamicResources) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) {
   835  	if !pl.enabled {
   836  		return nil, framework.NewStatus(framework.Skip)
   837  	}
   838  	logger := klog.FromContext(ctx)
   839  
   840  	// If the pod does not reference any claim, we don't need to do
   841  	// anything for it. We just initialize an empty state to record that
   842  	// observation for the other functions. This gets updated below
   843  	// if we get that far.
   844  	s := &stateData{}
   845  	state.Write(stateKey, s)
   846  
   847  	claims, err := pl.podResourceClaims(pod)
   848  	if err != nil {
   849  		return nil, statusUnschedulable(logger, err.Error())
   850  	}
   851  	logger.V(5).Info("pod resource claims", "pod", klog.KObj(pod), "resourceclaims", klog.KObjSlice(claims))
   852  
   853  	// If the pod does not reference any claim,
   854  	// DynamicResources Filter has nothing to do with the Pod.
   855  	if len(claims) == 0 {
   856  		return nil, framework.NewStatus(framework.Skip)
   857  	}
   858  
   859  	// Fetch PodSchedulingContext, it's going to be needed when checking claims.
   860  	if err := s.podSchedulingState.init(ctx, pod, pl.podSchedulingContextLister); err != nil {
   861  		return nil, statusError(logger, err)
   862  	}
   863  
   864  	s.informationsForClaim = make([]informationForClaim, len(claims))
   865  	needResourceInformation := false
   866  	for index, claim := range claims {
   867  		if claim.Status.DeallocationRequested {
   868  			// This will get resolved by the resource driver.
   869  			return nil, statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
   870  		}
   871  		if claim.Status.Allocation != nil &&
   872  			!resourceclaim.CanBeReserved(claim) &&
   873  			!resourceclaim.IsReservedForPod(pod, claim) {
   874  			// Resource is in use. The pod has to wait.
   875  			return nil, statusUnschedulable(logger, "resourceclaim in use", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
   876  		}
   877  
   878  		if claim.Status.Allocation != nil {
   879  			if claim.Status.Allocation.AvailableOnNodes != nil {
   880  				nodeSelector, err := nodeaffinity.NewNodeSelector(claim.Status.Allocation.AvailableOnNodes)
   881  				if err != nil {
   882  					return nil, statusError(logger, err)
   883  				}
   884  				s.informationsForClaim[index].availableOnNode = nodeSelector
   885  			}
   886  
   887  			// The claim was allocated by the scheduler if it has the finalizer that is
   888  			// reserved for Kubernetes.
   889  			s.informationsForClaim[index].structuredParameters = slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer)
   890  		} else {
   891  			// The ResourceClass might have a node filter. This is
   892  			// useful for trimming the initial set of potential
   893  			// nodes before we ask the driver(s) for information
   894  			// about the specific pod.
   895  			class, err := pl.classLister.Get(claim.Spec.ResourceClassName)
   896  			if err != nil {
   897  				// If the class cannot be retrieved, allocation cannot proceed.
   898  				if apierrors.IsNotFound(err) {
   899  					// Here we mark the pod as "unschedulable", so it'll sleep in
   900  					// the unscheduleable queue until a ResourceClass event occurs.
   901  					return nil, statusUnschedulable(logger, fmt.Sprintf("resource class %s does not exist", claim.Spec.ResourceClassName))
   902  				}
   903  				// Other error, retry with backoff.
   904  				return nil, statusError(logger, fmt.Errorf("look up resource class: %v", err))
   905  			}
   906  			if class.SuitableNodes != nil {
   907  				selector, err := nodeaffinity.NewNodeSelector(class.SuitableNodes)
   908  				if err != nil {
   909  					return nil, statusError(logger, err)
   910  				}
   911  				s.informationsForClaim[index].availableOnNode = selector
   912  			}
   913  			s.informationsForClaim[index].status = statusForClaim(s.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name)
   914  
   915  			if class.StructuredParameters != nil && *class.StructuredParameters {
   916  				s.informationsForClaim[index].structuredParameters = true
   917  
   918  				// Allocation in flight? Better wait for that
   919  				// to finish, see inFlightAllocations
   920  				// documentation for details.
   921  				if _, found := pl.inFlightAllocations.Load(claim.UID); found {
   922  					return nil, statusUnschedulable(logger, fmt.Sprintf("resource claim %s is in the process of being allocated", klog.KObj(claim)))
   923  				}
   924  
   925  				// We need the claim and class parameters. If
   926  				// they don't exist yet, the pod has to wait.
   927  				//
   928  				// TODO (https://github.com/kubernetes/kubernetes/issues/123697):
   929  				// check this already in foreachPodResourceClaim, together with setting up informationsForClaim.
   930  				// Then PreEnqueue will also check for existence of parameters.
   931  				classParameters, claimParameters, status := pl.lookupParameters(logger, class, claim)
   932  				if status != nil {
   933  					return nil, status
   934  				}
   935  				controller, err := newClaimController(logger, class, classParameters, claimParameters)
   936  				if err != nil {
   937  					return nil, statusError(logger, err)
   938  				}
   939  				s.informationsForClaim[index].controller = controller
   940  				needResourceInformation = true
   941  			} else if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeImmediate {
   942  				// This will get resolved by the resource driver.
   943  				return nil, statusUnschedulable(logger, "unallocated immediate resourceclaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
   944  			}
   945  		}
   946  	}
   947  
   948  	if needResourceInformation {
   949  		// Doing this over and over again for each pod could be avoided
   950  		// by parsing once when creating the plugin and then updating
   951  		// that state in informer callbacks. But that would cause
   952  		// problems for using the plugin in the Cluster Autoscaler. If
   953  		// this step here turns out to be expensive, we may have to
   954  		// maintain and update state more persistently.
   955  		//
   956  		// Claims are treated as "allocated" if they are in the assume cache
   957  		// or currently their allocation is in-flight.
   958  		resources, err := newResourceModel(logger, pl.resourceSliceLister, pl.claimAssumeCache, &pl.inFlightAllocations)
   959  		logger.V(5).Info("Resource usage", "resources", klog.Format(resources))
   960  		if err != nil {
   961  			return nil, statusError(logger, err)
   962  		}
   963  		s.resources = resources
   964  	}
   965  
   966  	s.claims = claims
   967  	return nil, nil
   968  }
   969  
   970  func (pl *dynamicResources) lookupParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (classParameters *resourcev1alpha2.ResourceClassParameters, claimParameters *resourcev1alpha2.ResourceClaimParameters, status *framework.Status) {
   971  	classParameters, status = pl.lookupClassParameters(logger, class)
   972  	if status != nil {
   973  		return
   974  	}
   975  	claimParameters, status = pl.lookupClaimParameters(logger, class, claim)
   976  	return
   977  }
   978  
   979  func (pl *dynamicResources) lookupClassParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass) (*resourcev1alpha2.ResourceClassParameters, *framework.Status) {
   980  	defaultClassParameters := resourcev1alpha2.ResourceClassParameters{}
   981  
   982  	if class.ParametersRef == nil {
   983  		return &defaultClassParameters, nil
   984  	}
   985  
   986  	if class.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
   987  		class.ParametersRef.Kind == "ResourceClassParameters" {
   988  		// Use the parameters which were referenced directly.
   989  		parameters, err := pl.classParametersLister.ResourceClassParameters(class.ParametersRef.Namespace).Get(class.ParametersRef.Name)
   990  		if err != nil {
   991  			if apierrors.IsNotFound(err) {
   992  				return nil, statusUnschedulable(logger, fmt.Sprintf("class parameters %s not found", klog.KRef(class.ParametersRef.Namespace, class.ParametersRef.Name)))
   993  			}
   994  			return nil, statusError(logger, fmt.Errorf("get class parameters %s: %v", klog.KRef(class.Namespace, class.ParametersRef.Name), err))
   995  		}
   996  		return parameters, nil
   997  	}
   998  
   999  	// TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer
  1000  	allParameters, err := pl.classParametersLister.ResourceClassParameters(class.Namespace).List(labels.Everything())
  1001  	if err != nil {
  1002  		return nil, statusError(logger, fmt.Errorf("listing class parameters failed: %v", err))
  1003  	}
  1004  	for _, parameters := range allParameters {
  1005  		if parameters.GeneratedFrom == nil {
  1006  			continue
  1007  		}
  1008  		if parameters.GeneratedFrom.APIGroup == class.ParametersRef.APIGroup &&
  1009  			parameters.GeneratedFrom.Kind == class.ParametersRef.Kind &&
  1010  			parameters.GeneratedFrom.Name == class.ParametersRef.Name &&
  1011  			parameters.GeneratedFrom.Namespace == class.ParametersRef.Namespace {
  1012  			return parameters, nil
  1013  		}
  1014  	}
  1015  	return nil, statusUnschedulable(logger, fmt.Sprintf("generated class parameters for %s.%s %s not found", class.ParametersRef.Kind, class.ParametersRef.APIGroup, klog.KRef(class.Namespace, class.ParametersRef.Name)))
  1016  }
  1017  
  1018  func (pl *dynamicResources) lookupClaimParameters(logger klog.Logger, class *resourcev1alpha2.ResourceClass, claim *resourcev1alpha2.ResourceClaim) (*resourcev1alpha2.ResourceClaimParameters, *framework.Status) {
  1019  	defaultClaimParameters := resourcev1alpha2.ResourceClaimParameters{
  1020  		Shareable: true,
  1021  		DriverRequests: []resourcev1alpha2.DriverRequests{
  1022  			{
  1023  				DriverName: class.DriverName,
  1024  				Requests: []resourcev1alpha2.ResourceRequest{
  1025  					{
  1026  						ResourceRequestModel: resourcev1alpha2.ResourceRequestModel{
  1027  							// TODO: This only works because NamedResources is
  1028  							// the only model currently implemented. We need to
  1029  							// match the default to how the resources of this
  1030  							// class are being advertized in a ResourceSlice.
  1031  							NamedResources: &resourcev1alpha2.NamedResourcesRequest{
  1032  								Selector: "true",
  1033  							},
  1034  						},
  1035  					},
  1036  				},
  1037  			},
  1038  		},
  1039  	}
  1040  
  1041  	if claim.Spec.ParametersRef == nil {
  1042  		return &defaultClaimParameters, nil
  1043  	}
  1044  	if claim.Spec.ParametersRef.APIGroup == resourcev1alpha2.SchemeGroupVersion.Group &&
  1045  		claim.Spec.ParametersRef.Kind == "ResourceClaimParameters" {
  1046  		// Use the parameters which were referenced directly.
  1047  		parameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).Get(claim.Spec.ParametersRef.Name)
  1048  		if err != nil {
  1049  			if apierrors.IsNotFound(err) {
  1050  				return nil, statusUnschedulable(logger, fmt.Sprintf("claim parameters %s not found", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
  1051  			}
  1052  			return nil, statusError(logger, fmt.Errorf("get claim parameters %s: %v", klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name), err))
  1053  		}
  1054  		return parameters, nil
  1055  	}
  1056  
  1057  	// TODO (https://github.com/kubernetes/kubernetes/issues/123731): use an indexer
  1058  	allParameters, err := pl.claimParametersLister.ResourceClaimParameters(claim.Namespace).List(labels.Everything())
  1059  	if err != nil {
  1060  		return nil, statusError(logger, fmt.Errorf("listing claim parameters failed: %v", err))
  1061  	}
  1062  	for _, parameters := range allParameters {
  1063  		if parameters.GeneratedFrom == nil {
  1064  			continue
  1065  		}
  1066  		if parameters.GeneratedFrom.APIGroup == claim.Spec.ParametersRef.APIGroup &&
  1067  			parameters.GeneratedFrom.Kind == claim.Spec.ParametersRef.Kind &&
  1068  			parameters.GeneratedFrom.Name == claim.Spec.ParametersRef.Name {
  1069  			return parameters, nil
  1070  		}
  1071  	}
  1072  	return nil, statusUnschedulable(logger, fmt.Sprintf("generated claim parameters for %s.%s %s not found", claim.Spec.ParametersRef.Kind, claim.Spec.ParametersRef.APIGroup, klog.KRef(claim.Namespace, claim.Spec.ParametersRef.Name)))
  1073  }
  1074  
  1075  // PreFilterExtensions returns prefilter extensions, pod add and remove.
  1076  func (pl *dynamicResources) PreFilterExtensions() framework.PreFilterExtensions {
  1077  	return nil
  1078  }
  1079  
  1080  func getStateData(cs *framework.CycleState) (*stateData, error) {
  1081  	state, err := cs.Read(stateKey)
  1082  	if err != nil {
  1083  		return nil, err
  1084  	}
  1085  	s, ok := state.(*stateData)
  1086  	if !ok {
  1087  		return nil, errors.New("unable to convert state into stateData")
  1088  	}
  1089  	return s, nil
  1090  }
  1091  
  1092  // Filter invoked at the filter extension point.
  1093  // It evaluates if a pod can fit due to the resources it requests,
  1094  // for both allocated and unallocated claims.
  1095  //
  1096  // For claims that are bound, then it checks that the node affinity is
  1097  // satisfied by the given node.
  1098  //
  1099  // For claims that are unbound, it checks whether the claim might get allocated
  1100  // for the node.
  1101  func (pl *dynamicResources) Filter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
  1102  	if !pl.enabled {
  1103  		return nil
  1104  	}
  1105  	state, err := getStateData(cs)
  1106  	if err != nil {
  1107  		return statusError(klog.FromContext(ctx), err)
  1108  	}
  1109  	if len(state.claims) == 0 {
  1110  		return nil
  1111  	}
  1112  
  1113  	logger := klog.FromContext(ctx)
  1114  	node := nodeInfo.Node()
  1115  
  1116  	var unavailableClaims []int
  1117  	for index, claim := range state.claims {
  1118  		logger.V(10).Info("filtering based on resource claims of the pod", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
  1119  		switch {
  1120  		case claim.Status.Allocation != nil:
  1121  			if nodeSelector := state.informationsForClaim[index].availableOnNode; nodeSelector != nil {
  1122  				if !nodeSelector.Match(node) {
  1123  					logger.V(5).Info("AvailableOnNodes does not match", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
  1124  					unavailableClaims = append(unavailableClaims, index)
  1125  				}
  1126  			}
  1127  		case claim.Status.DeallocationRequested:
  1128  			// We shouldn't get here. PreFilter already checked this.
  1129  			return statusUnschedulable(logger, "resourceclaim must be reallocated", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
  1130  		case claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer ||
  1131  			state.informationsForClaim[index].structuredParameters:
  1132  			if selector := state.informationsForClaim[index].availableOnNode; selector != nil {
  1133  				if matches := selector.Match(node); !matches {
  1134  					return statusUnschedulable(logger, "excluded by resource class node filter", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclassName", claim.Spec.ResourceClassName)
  1135  				}
  1136  			}
  1137  			// Can the builtin controller tell us whether the node is suitable?
  1138  			if state.informationsForClaim[index].structuredParameters {
  1139  				suitable, err := state.informationsForClaim[index].controller.nodeIsSuitable(ctx, node.Name, state.resources)
  1140  				if err != nil {
  1141  					// An error indicates that something wasn't configured correctly, for example
  1142  					// writing a CEL expression which doesn't handle a map lookup error. Normally
  1143  					// this should never fail. We could return an error here, but then the pod
  1144  					// would get retried. Instead we ignore the node.
  1145  					return statusUnschedulable(logger, fmt.Sprintf("checking structured parameters failed: %v", err), "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
  1146  				}
  1147  				if !suitable {
  1148  					return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim))
  1149  				}
  1150  			} else {
  1151  				if status := state.informationsForClaim[index].status; status != nil {
  1152  					for _, unsuitableNode := range status.UnsuitableNodes {
  1153  						if node.Name == unsuitableNode {
  1154  							return statusUnschedulable(logger, "resourceclaim cannot be allocated for the node (unsuitable)", "pod", klog.KObj(pod), "node", klog.KObj(node), "resourceclaim", klog.KObj(claim), "unsuitablenodes", status.UnsuitableNodes)
  1155  						}
  1156  					}
  1157  				}
  1158  			}
  1159  		default:
  1160  			// This claim should have been handled above.
  1161  			// Immediate allocation with control plane controller
  1162  			// was already checked for in PreFilter.
  1163  			return statusError(logger, fmt.Errorf("internal error, unexpected allocation mode %v", claim.Spec.AllocationMode))
  1164  		}
  1165  	}
  1166  
  1167  	if len(unavailableClaims) > 0 {
  1168  		state.mutex.Lock()
  1169  		defer state.mutex.Unlock()
  1170  		if state.unavailableClaims == nil {
  1171  			state.unavailableClaims = sets.New[int]()
  1172  		}
  1173  
  1174  		for _, index := range unavailableClaims {
  1175  			claim := state.claims[index]
  1176  			// Deallocation makes more sense for claims with
  1177  			// delayed allocation. Claims with immediate allocation
  1178  			// would just get allocated again for a random node,
  1179  			// which is unlikely to help the pod.
  1180  			//
  1181  			// Claims with builtin controller are handled like
  1182  			// claims with delayed allocation.
  1183  			if claim.Spec.AllocationMode == resourcev1alpha2.AllocationModeWaitForFirstConsumer ||
  1184  				state.informationsForClaim[index].controller != nil {
  1185  				state.unavailableClaims.Insert(index)
  1186  			}
  1187  		}
  1188  		return statusUnschedulable(logger, "resourceclaim not available on the node", "pod", klog.KObj(pod))
  1189  	}
  1190  
  1191  	return nil
  1192  }
  1193  
  1194  // PostFilter checks whether there are allocated claims that could get
  1195  // deallocated to help get the Pod schedulable. If yes, it picks one and
  1196  // requests its deallocation.  This only gets called when filtering found no
  1197  // suitable node.
  1198  func (pl *dynamicResources) PostFilter(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
  1199  	if !pl.enabled {
  1200  		return nil, framework.NewStatus(framework.Unschedulable, "plugin disabled")
  1201  	}
  1202  	logger := klog.FromContext(ctx)
  1203  	state, err := getStateData(cs)
  1204  	if err != nil {
  1205  		return nil, statusError(logger, err)
  1206  	}
  1207  	if len(state.claims) == 0 {
  1208  		return nil, framework.NewStatus(framework.Unschedulable, "no new claims to deallocate")
  1209  	}
  1210  
  1211  	// Iterating over a map is random. This is intentional here, we want to
  1212  	// pick one claim randomly because there is no better heuristic.
  1213  	for index := range state.unavailableClaims {
  1214  		claim := state.claims[index]
  1215  		if len(claim.Status.ReservedFor) == 0 ||
  1216  			len(claim.Status.ReservedFor) == 1 && claim.Status.ReservedFor[0].UID == pod.UID {
  1217  			// Is the claim is handled by the builtin controller?
  1218  			// Then we can simply clear the allocation. Once the
  1219  			// claim informer catches up, the controllers will
  1220  			// be notified about this change.
  1221  			clearAllocation := state.informationsForClaim[index].controller != nil
  1222  
  1223  			// Before we tell a driver to deallocate a claim, we
  1224  			// have to stop telling it to allocate. Otherwise,
  1225  			// depending on timing, it will deallocate the claim,
  1226  			// see a PodSchedulingContext with selected node, and
  1227  			// allocate again for that same node.
  1228  			if !clearAllocation &&
  1229  				state.podSchedulingState.schedulingCtx != nil &&
  1230  				state.podSchedulingState.schedulingCtx.Spec.SelectedNode != "" {
  1231  				state.podSchedulingState.selectedNode = ptr.To("")
  1232  				if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
  1233  					return nil, statusError(logger, err)
  1234  				}
  1235  			}
  1236  
  1237  			claim := claim.DeepCopy()
  1238  			claim.Status.ReservedFor = nil
  1239  			if clearAllocation {
  1240  				claim.Status.Allocation = nil
  1241  			} else {
  1242  				claim.Status.DeallocationRequested = true
  1243  			}
  1244  			logger.V(5).Info("Requesting deallocation of ResourceClaim", "pod", klog.KObj(pod), "resourceclaim", klog.KObj(claim))
  1245  			if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).UpdateStatus(ctx, claim, metav1.UpdateOptions{}); err != nil {
  1246  				return nil, statusError(logger, err)
  1247  			}
  1248  			return nil, framework.NewStatus(framework.Unschedulable, "deallocation of ResourceClaim completed")
  1249  		}
  1250  	}
  1251  	return nil, framework.NewStatus(framework.Unschedulable, "still not schedulable")
  1252  }
  1253  
  1254  // PreScore is passed a list of all nodes that would fit the pod. Not all
  1255  // claims are necessarily allocated yet, so here we can set the SuitableNodes
  1256  // field for those which are pending.
  1257  func (pl *dynamicResources) PreScore(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodes []*framework.NodeInfo) *framework.Status {
  1258  	if !pl.enabled {
  1259  		return nil
  1260  	}
  1261  	state, err := getStateData(cs)
  1262  	if err != nil {
  1263  		return statusError(klog.FromContext(ctx), err)
  1264  	}
  1265  	defer func() {
  1266  		state.preScored = true
  1267  	}()
  1268  	if len(state.claims) == 0 {
  1269  		return nil
  1270  	}
  1271  
  1272  	logger := klog.FromContext(ctx)
  1273  	pending := false
  1274  	for index, claim := range state.claims {
  1275  		if claim.Status.Allocation == nil &&
  1276  			state.informationsForClaim[index].controller == nil {
  1277  			pending = true
  1278  			break
  1279  		}
  1280  	}
  1281  	if !pending {
  1282  		logger.V(5).Info("no pending claims with control plane controller", "pod", klog.KObj(pod))
  1283  		return nil
  1284  	}
  1285  
  1286  	if haveAllPotentialNodes(state.podSchedulingState.schedulingCtx, nodes) {
  1287  		logger.V(5).Info("all potential nodes already set", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
  1288  		return nil
  1289  	}
  1290  
  1291  	// Remember the potential nodes. The object will get created or
  1292  	// updated in Reserve. This is both an optimization and
  1293  	// covers the case that PreScore doesn't get called when there
  1294  	// is only a single node.
  1295  	logger.V(5).Info("remembering potential nodes", "pod", klog.KObj(pod), "potentialnodes", klog.KObjSlice(nodes))
  1296  	numNodes := len(nodes)
  1297  	if numNodes > resourcev1alpha2.PodSchedulingNodeListMaxSize {
  1298  		numNodes = resourcev1alpha2.PodSchedulingNodeListMaxSize
  1299  	}
  1300  	potentialNodes := make([]string, 0, numNodes)
  1301  	if numNodes == len(nodes) {
  1302  		// Copy all node names.
  1303  		for _, node := range nodes {
  1304  			potentialNodes = append(potentialNodes, node.Node().Name)
  1305  		}
  1306  	} else {
  1307  		// Select a random subset of the nodes to comply with
  1308  		// the PotentialNodes length limit. Randomization is
  1309  		// done for us by Go which iterates over map entries
  1310  		// randomly.
  1311  		nodeNames := map[string]struct{}{}
  1312  		for _, node := range nodes {
  1313  			nodeNames[node.Node().Name] = struct{}{}
  1314  		}
  1315  		for nodeName := range nodeNames {
  1316  			if len(potentialNodes) >= resourcev1alpha2.PodSchedulingNodeListMaxSize {
  1317  				break
  1318  			}
  1319  			potentialNodes = append(potentialNodes, nodeName)
  1320  		}
  1321  	}
  1322  	sort.Strings(potentialNodes)
  1323  	state.podSchedulingState.potentialNodes = &potentialNodes
  1324  	return nil
  1325  }
  1326  
  1327  func haveAllPotentialNodes(schedulingCtx *resourcev1alpha2.PodSchedulingContext, nodes []*framework.NodeInfo) bool {
  1328  	if schedulingCtx == nil {
  1329  		return false
  1330  	}
  1331  	for _, node := range nodes {
  1332  		if !haveNode(schedulingCtx.Spec.PotentialNodes, node.Node().Name) {
  1333  			return false
  1334  		}
  1335  	}
  1336  	return true
  1337  }
  1338  
  1339  func haveNode(nodeNames []string, nodeName string) bool {
  1340  	for _, n := range nodeNames {
  1341  		if n == nodeName {
  1342  			return true
  1343  		}
  1344  	}
  1345  	return false
  1346  }
  1347  
  1348  // Reserve reserves claims for the pod.
  1349  func (pl *dynamicResources) Reserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) (status *framework.Status) {
  1350  	if !pl.enabled {
  1351  		return nil
  1352  	}
  1353  	state, err := getStateData(cs)
  1354  	if err != nil {
  1355  		return statusError(klog.FromContext(ctx), err)
  1356  	}
  1357  	if len(state.claims) == 0 {
  1358  		return nil
  1359  	}
  1360  
  1361  	numDelayedAllocationPending := 0
  1362  	numClaimsWithStatusInfo := 0
  1363  	claimsWithBuiltinController := make([]int, 0, len(state.claims))
  1364  	logger := klog.FromContext(ctx)
  1365  	for index, claim := range state.claims {
  1366  		if claim.Status.Allocation != nil {
  1367  			// Allocated, but perhaps not reserved yet. We checked in PreFilter that
  1368  			// the pod could reserve the claim. Instead of reserving here by
  1369  			// updating the ResourceClaim status, we assume that reserving
  1370  			// will work and only do it for real during binding. If it fails at
  1371  			// that time, some other pod was faster and we have to try again.
  1372  			continue
  1373  		}
  1374  
  1375  		// Do we have the builtin controller?
  1376  		if state.informationsForClaim[index].controller != nil {
  1377  			claimsWithBuiltinController = append(claimsWithBuiltinController, index)
  1378  			continue
  1379  		}
  1380  
  1381  		// Must be delayed allocation with control plane controller.
  1382  		numDelayedAllocationPending++
  1383  
  1384  		// Did the driver provide information that steered node
  1385  		// selection towards a node that it can support?
  1386  		if statusForClaim(state.podSchedulingState.schedulingCtx, pod.Spec.ResourceClaims[index].Name) != nil {
  1387  			numClaimsWithStatusInfo++
  1388  		}
  1389  	}
  1390  
  1391  	if numDelayedAllocationPending == 0 && len(claimsWithBuiltinController) == 0 {
  1392  		// Nothing left to do.
  1393  		return nil
  1394  	}
  1395  
  1396  	if !state.preScored && numDelayedAllocationPending > 0 {
  1397  		// There was only one candidate that passed the Filters and
  1398  		// therefore PreScore was not called.
  1399  		//
  1400  		// We need to ask whether that node is suitable, otherwise the
  1401  		// scheduler will pick it forever even when it cannot satisfy
  1402  		// the claim.
  1403  		if state.podSchedulingState.schedulingCtx == nil ||
  1404  			!containsNode(state.podSchedulingState.schedulingCtx.Spec.PotentialNodes, nodeName) {
  1405  			potentialNodes := []string{nodeName}
  1406  			state.podSchedulingState.potentialNodes = &potentialNodes
  1407  			logger.V(5).Info("asking for information about single potential node", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
  1408  		}
  1409  	}
  1410  
  1411  	// Prepare allocation of claims handled by the schedulder.
  1412  	for _, index := range claimsWithBuiltinController {
  1413  		claim := state.claims[index]
  1414  		driverName, allocation, err := state.informationsForClaim[index].controller.allocate(ctx, nodeName, state.resources)
  1415  		if err != nil {
  1416  			// We checked before that the node is suitable. This shouldn't have failed,
  1417  			// so treat this as an error.
  1418  			return statusError(logger, fmt.Errorf("claim allocation failed unexpectedly: %v", err))
  1419  		}
  1420  		state.informationsForClaim[index].allocation = allocation
  1421  		state.informationsForClaim[index].allocationDriverName = driverName
  1422  		claim = claim.DeepCopy()
  1423  		claim.Status.DriverName = driverName
  1424  		claim.Status.Allocation = allocation
  1425  		pl.inFlightAllocations.Store(claim.UID, claim)
  1426  		logger.V(5).Info("Reserved resource in allocation result", "claim", klog.KObj(claim), "driver", driverName, "allocation", klog.Format(allocation))
  1427  	}
  1428  
  1429  	// When there is only one pending resource, we can go ahead with
  1430  	// requesting allocation even when we don't have the information from
  1431  	// the driver yet. Otherwise we wait for information before blindly
  1432  	// making a decision that might have to be reversed later.
  1433  	//
  1434  	// If all pending claims are handled with the builtin controller,
  1435  	// there is no need for a PodSchedulingContext change.
  1436  	if numDelayedAllocationPending == 1 && len(claimsWithBuiltinController) == 0 ||
  1437  		numClaimsWithStatusInfo+len(claimsWithBuiltinController) == numDelayedAllocationPending && len(claimsWithBuiltinController) < numDelayedAllocationPending {
  1438  		// TODO: can we increase the chance that the scheduler picks
  1439  		// the same node as before when allocation is on-going,
  1440  		// assuming that that node still fits the pod?  Picking a
  1441  		// different node may lead to some claims being allocated for
  1442  		// one node and others for another, which then would have to be
  1443  		// resolved with deallocation.
  1444  		if state.podSchedulingState.schedulingCtx == nil ||
  1445  			state.podSchedulingState.schedulingCtx.Spec.SelectedNode != nodeName {
  1446  			state.podSchedulingState.selectedNode = &nodeName
  1447  			logger.V(5).Info("start allocation", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
  1448  			// The actual publish happens in PreBind or Unreserve.
  1449  			return nil
  1450  		}
  1451  	}
  1452  
  1453  	// May have been modified earlier in PreScore or above.
  1454  	if state.podSchedulingState.isDirty() {
  1455  		// The actual publish happens in PreBind or Unreserve.
  1456  		return nil
  1457  	}
  1458  
  1459  	// If all pending claims are handled with the builtin controller, then
  1460  	// we can allow the pod to proceed. Allocating and reserving the claims
  1461  	// will be done in PreBind.
  1462  	if numDelayedAllocationPending == 0 {
  1463  		return nil
  1464  	}
  1465  
  1466  	// More than one pending claim and not enough information about all of them.
  1467  	//
  1468  	// TODO: can or should we ensure that schedulingCtx gets aborted while
  1469  	// waiting for resources *before* triggering delayed volume
  1470  	// provisioning?  On the one hand, volume provisioning is currently
  1471  	// irreversible, so it better should come last. On the other hand,
  1472  	// triggering both in parallel might be faster.
  1473  	return statusPending(logger, "waiting for resource driver to provide information", "pod", klog.KObj(pod))
  1474  }
  1475  
  1476  func containsNode(hay []string, needle string) bool {
  1477  	for _, node := range hay {
  1478  		if node == needle {
  1479  			return true
  1480  		}
  1481  	}
  1482  	return false
  1483  }
  1484  
  1485  // Unreserve clears the ReservedFor field for all claims.
  1486  // It's idempotent, and does nothing if no state found for the given pod.
  1487  func (pl *dynamicResources) Unreserve(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
  1488  	if !pl.enabled {
  1489  		return
  1490  	}
  1491  	state, err := getStateData(cs)
  1492  	if err != nil {
  1493  		return
  1494  	}
  1495  	if len(state.claims) == 0 {
  1496  		return
  1497  	}
  1498  
  1499  	logger := klog.FromContext(ctx)
  1500  
  1501  	// Was publishing delayed? If yes, do it now.
  1502  	//
  1503  	// The most common scenario is that a different set of potential nodes
  1504  	// was identified. This revised set needs to be published to enable DRA
  1505  	// drivers to provide better guidance for future scheduling attempts.
  1506  	if state.podSchedulingState.isDirty() {
  1507  		if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
  1508  			logger.Error(err, "publish PodSchedulingContext")
  1509  		}
  1510  	}
  1511  
  1512  	for index, claim := range state.claims {
  1513  		// If allocation was in-flight, then it's not anymore and we need to revert the
  1514  		// claim object in the assume cache to what it was before.
  1515  		if state.informationsForClaim[index].controller != nil {
  1516  			if _, found := pl.inFlightAllocations.LoadAndDelete(state.claims[index].UID); found {
  1517  				pl.claimAssumeCache.Restore(claim.Namespace + "/" + claim.Name)
  1518  			}
  1519  		}
  1520  
  1521  		if claim.Status.Allocation != nil &&
  1522  			resourceclaim.IsReservedForPod(pod, claim) {
  1523  			// Remove pod from ReservedFor. A strategic-merge-patch is used
  1524  			// because that allows removing an individual entry without having
  1525  			// the latest slice.
  1526  			patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": { "reservedFor": [ {"$patch": "delete", "uid": %q} ] }}`,
  1527  				claim.UID,
  1528  				pod.UID,
  1529  			)
  1530  			logger.V(5).Info("unreserve", "resourceclaim", klog.KObj(claim), "pod", klog.KObj(pod))
  1531  			claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
  1532  			if err != nil {
  1533  				// We will get here again when pod scheduling is retried.
  1534  				logger.Error(err, "unreserve", "resourceclaim", klog.KObj(claim))
  1535  			}
  1536  		}
  1537  	}
  1538  }
  1539  
  1540  // PreBind gets called in a separate goroutine after it has been determined
  1541  // that the pod should get bound to this node. Because Reserve did not actually
  1542  // reserve claims, we need to do it now. For claims with the builtin controller,
  1543  // we also handle the allocation.
  1544  //
  1545  // If anything fails, we return an error and
  1546  // the pod will have to go into the backoff queue. The scheduler will call
  1547  // Unreserve as part of the error handling.
  1548  func (pl *dynamicResources) PreBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) *framework.Status {
  1549  	if !pl.enabled {
  1550  		return nil
  1551  	}
  1552  	state, err := getStateData(cs)
  1553  	if err != nil {
  1554  		return statusError(klog.FromContext(ctx), err)
  1555  	}
  1556  	if len(state.claims) == 0 {
  1557  		return nil
  1558  	}
  1559  
  1560  	logger := klog.FromContext(ctx)
  1561  
  1562  	// Was publishing delayed? If yes, do it now and then cause binding to stop.
  1563  	// This will not happen if all claims get handled by builtin controllers.
  1564  	if state.podSchedulingState.isDirty() {
  1565  		if err := state.podSchedulingState.publish(ctx, pod, pl.clientset); err != nil {
  1566  			return statusError(logger, err)
  1567  		}
  1568  		return statusPending(logger, "waiting for resource driver", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName})
  1569  	}
  1570  
  1571  	for index, claim := range state.claims {
  1572  		if !resourceclaim.IsReservedForPod(pod, claim) {
  1573  			claim, err := pl.bindClaim(ctx, state, index, pod, nodeName)
  1574  			if err != nil {
  1575  				return statusError(logger, err)
  1576  			}
  1577  			state.claims[index] = claim
  1578  		}
  1579  	}
  1580  	// If we get here, we know that reserving the claim for
  1581  	// the pod worked and we can proceed with binding it.
  1582  	return nil
  1583  }
  1584  
  1585  // bindClaim gets called by PreBind for claim which is not reserved for the pod yet.
  1586  // It might not even be allocated. bindClaim then ensures that the allocation
  1587  // and reservation are recorded. This finishes the work started in Reserve.
  1588  func (pl *dynamicResources) bindClaim(ctx context.Context, state *stateData, index int, pod *v1.Pod, nodeName string) (patchedClaim *resourcev1alpha2.ResourceClaim, finalErr error) {
  1589  	logger := klog.FromContext(ctx)
  1590  	claim := state.claims[index]
  1591  	allocationPatch := ""
  1592  
  1593  	allocation := state.informationsForClaim[index].allocation
  1594  	logger.V(5).Info("preparing claim status patch", "claim", klog.KObj(state.claims[index]), "allocation", klog.Format(allocation))
  1595  
  1596  	// Do we need to store an allocation result from Reserve?
  1597  	if allocation != nil {
  1598  		buffer, err := json.Marshal(allocation)
  1599  		if err != nil {
  1600  			return nil, fmt.Errorf("marshaling AllocationResult failed: %v", err)
  1601  		}
  1602  		allocationPatch = fmt.Sprintf(`"driverName": %q, "allocation": %s, `, state.informationsForClaim[index].allocationDriverName, string(buffer))
  1603  
  1604  		// The finalizer needs to be added in a normal update. Using a simple update is fine
  1605  		// because we don't expect concurrent modifications while the claim is not allocated
  1606  		// yet. If there are any, we want to fail.
  1607  		//
  1608  		// If we were interrupted in the past, it might already be set and we simply continue.
  1609  		if !slices.Contains(claim.Finalizers, resourcev1alpha2.Finalizer) {
  1610  			claim := state.claims[index].DeepCopy()
  1611  			claim.Finalizers = append(claim.Finalizers, resourcev1alpha2.Finalizer)
  1612  			if _, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Update(ctx, claim, metav1.UpdateOptions{}); err != nil {
  1613  				return nil, fmt.Errorf("add finalizer: %v", err)
  1614  			}
  1615  		}
  1616  	}
  1617  
  1618  	// The claim might be stale, for example because the claim can get shared and some
  1619  	// other goroutine has updated it in the meantime. We therefore cannot use
  1620  	// SSA here to add the pod because then we would have to send the entire slice
  1621  	// or use different field manager strings for each entry.
  1622  	//
  1623  	// With a strategic-merge-patch, we can simply send one new entry. The apiserver
  1624  	// validation will catch if two goroutines try to do that at the same time and
  1625  	// the claim cannot be shared.
  1626  	//
  1627  	// Note that this also works when the allocation result gets added twice because
  1628  	// two pods both started using a shared claim: the first pod to get here adds the
  1629  	// allocation result. The second pod then only adds itself to reservedFor.
  1630  	patch := fmt.Sprintf(`{"metadata": {"uid": %q}, "status": {%s "reservedFor": [ {"resource": "pods", "name": %q, "uid": %q} ] }}`,
  1631  		claim.UID,
  1632  		allocationPatch,
  1633  		pod.Name,
  1634  		pod.UID,
  1635  	)
  1636  	if loggerV := logger.V(6); loggerV.Enabled() {
  1637  		logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim), "patch", patch)
  1638  	} else {
  1639  		logger.V(5).Info("reserve", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.KObj(claim))
  1640  	}
  1641  	claim, err := pl.clientset.ResourceV1alpha2().ResourceClaims(claim.Namespace).Patch(ctx, claim.Name, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}, "status")
  1642  	logger.V(5).Info("reserved", "pod", klog.KObj(pod), "node", klog.ObjectRef{Name: nodeName}, "resourceclaim", klog.Format(claim), "err", err)
  1643  	if allocationPatch != "" {
  1644  		// The scheduler was handling allocation. Now that has
  1645  		// completed, either successfully or with a failure.
  1646  		if err == nil {
  1647  			// This can fail, but only for reasons that are okay (concurrent delete or update).
  1648  			// Shouldn't happen in this case.
  1649  			if err := pl.claimAssumeCache.Assume(claim); err != nil {
  1650  				logger.V(5).Info("Claim not stored in assume cache", "err", err)
  1651  			}
  1652  		}
  1653  		pl.inFlightAllocations.Delete(claim.UID)
  1654  	}
  1655  	return claim, err
  1656  }
  1657  
  1658  // PostBind is called after a pod is successfully bound to a node. Now we are
  1659  // sure that a PodSchedulingContext object, if it exists, is definitely not going to
  1660  // be needed anymore and can delete it. This is a one-shot thing, there won't
  1661  // be any retries.  This is okay because it should usually work and in those
  1662  // cases where it doesn't, the garbage collector will eventually clean up.
  1663  func (pl *dynamicResources) PostBind(ctx context.Context, cs *framework.CycleState, pod *v1.Pod, nodeName string) {
  1664  	if !pl.enabled {
  1665  		return
  1666  	}
  1667  	state, err := getStateData(cs)
  1668  	if err != nil {
  1669  		return
  1670  	}
  1671  	if len(state.claims) == 0 {
  1672  		return
  1673  	}
  1674  
  1675  	// We cannot know for sure whether the PodSchedulingContext object exists. We
  1676  	// might have created it in the previous pod schedulingCtx cycle and not
  1677  	// have it in our informer cache yet. Let's try to delete, just to be
  1678  	// on the safe side.
  1679  	logger := klog.FromContext(ctx)
  1680  	err = pl.clientset.ResourceV1alpha2().PodSchedulingContexts(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
  1681  	switch {
  1682  	case apierrors.IsNotFound(err):
  1683  		logger.V(5).Info("no PodSchedulingContext object to delete")
  1684  	case err != nil:
  1685  		logger.Error(err, "delete PodSchedulingContext")
  1686  	default:
  1687  		logger.V(5).Info("PodSchedulingContext object deleted")
  1688  	}
  1689  }
  1690  
  1691  // statusUnschedulable ensures that there is a log message associated with the
  1692  // line where the status originated.
  1693  func statusUnschedulable(logger klog.Logger, reason string, kv ...interface{}) *framework.Status {
  1694  	if loggerV := logger.V(5); loggerV.Enabled() {
  1695  		helper, loggerV := loggerV.WithCallStackHelper()
  1696  		helper()
  1697  		kv = append(kv, "reason", reason)
  1698  		// nolint: logcheck // warns because it cannot check key/values
  1699  		loggerV.Info("pod unschedulable", kv...)
  1700  	}
  1701  	return framework.NewStatus(framework.UnschedulableAndUnresolvable, reason)
  1702  }
  1703  
  1704  // statusPending ensures that there is a log message associated with the
  1705  // line where the status originated.
  1706  func statusPending(logger klog.Logger, reason string, kv ...interface{}) *framework.Status {
  1707  	if loggerV := logger.V(5); loggerV.Enabled() {
  1708  		helper, loggerV := loggerV.WithCallStackHelper()
  1709  		helper()
  1710  		kv = append(kv, "reason", reason)
  1711  		// nolint: logcheck // warns because it cannot check key/values
  1712  		loggerV.Info("pod waiting for external component", kv...)
  1713  	}
  1714  
  1715  	// When we return Pending, we want to block the Pod at the same time.
  1716  	return framework.NewStatus(framework.Pending, reason)
  1717  }
  1718  
  1719  // statusError ensures that there is a log message associated with the
  1720  // line where the error originated.
  1721  func statusError(logger klog.Logger, err error, kv ...interface{}) *framework.Status {
  1722  	if loggerV := logger.V(5); loggerV.Enabled() {
  1723  		helper, loggerV := loggerV.WithCallStackHelper()
  1724  		helper()
  1725  		// nolint: logcheck // warns because it cannot check key/values
  1726  		loggerV.Error(err, "dynamic resource plugin failed", kv...)
  1727  	}
  1728  	return framework.AsStatus(err)
  1729  }
  1730  

View as plain text