...

Source file src/k8s.io/kubernetes/pkg/scheduler/schedule_one.go

Documentation: k8s.io/kubernetes/pkg/scheduler

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduler
    18  
    19  import (
    20  	"container/heap"
    21  	"context"
    22  	"errors"
    23  	"fmt"
    24  	"math/rand"
    25  	"strconv"
    26  	"sync"
    27  	"sync/atomic"
    28  	"time"
    29  
    30  	v1 "k8s.io/api/core/v1"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    33  	"k8s.io/apimachinery/pkg/util/sets"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/klog/v2"
    36  	extenderv1 "k8s.io/kube-scheduler/extender/v1"
    37  	podutil "k8s.io/kubernetes/pkg/api/v1/pod"
    38  	"k8s.io/kubernetes/pkg/apis/core/validation"
    39  	"k8s.io/kubernetes/pkg/scheduler/framework"
    40  	"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
    41  	internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
    42  	"k8s.io/kubernetes/pkg/scheduler/metrics"
    43  	"k8s.io/kubernetes/pkg/scheduler/util"
    44  	utiltrace "k8s.io/utils/trace"
    45  )
    46  
    47  const (
    48  	// Percentage of plugin metrics to be sampled.
    49  	pluginMetricsSamplePercent = 10
    50  	// minFeasibleNodesToFind is the minimum number of nodes that would be scored
    51  	// in each scheduling cycle. This is a semi-arbitrary value to ensure that a
    52  	// certain minimum of nodes are checked for feasibility. This in turn helps
    53  	// ensure a minimum level of spreading.
    54  	minFeasibleNodesToFind = 100
    55  	// minFeasibleNodesPercentageToFind is the minimum percentage of nodes that
    56  	// would be scored in each scheduling cycle. This is a semi-arbitrary value
    57  	// to ensure that a certain minimum of nodes are checked for feasibility.
    58  	// This in turn helps ensure a minimum level of spreading.
    59  	minFeasibleNodesPercentageToFind = 5
    60  	// numberOfHighestScoredNodesToReport is the number of node scores
    61  	// to be included in ScheduleResult.
    62  	numberOfHighestScoredNodesToReport = 3
    63  )
    64  
    65  // ScheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
    66  func (sched *Scheduler) ScheduleOne(ctx context.Context) {
    67  	logger := klog.FromContext(ctx)
    68  	podInfo, err := sched.NextPod(logger)
    69  	if err != nil {
    70  		logger.Error(err, "Error while retrieving next pod from scheduling queue")
    71  		return
    72  	}
    73  	// pod could be nil when schedulerQueue is closed
    74  	if podInfo == nil || podInfo.Pod == nil {
    75  		return
    76  	}
    77  
    78  	pod := podInfo.Pod
    79  	// TODO(knelasevero): Remove duplicated keys from log entry calls
    80  	// When contextualized logging hits GA
    81  	// https://github.com/kubernetes/kubernetes/issues/111672
    82  	logger = klog.LoggerWithValues(logger, "pod", klog.KObj(pod))
    83  	ctx = klog.NewContext(ctx, logger)
    84  	logger.V(4).Info("About to try and schedule pod", "pod", klog.KObj(pod))
    85  
    86  	fwk, err := sched.frameworkForPod(pod)
    87  	if err != nil {
    88  		// This shouldn't happen, because we only accept for scheduling the pods
    89  		// which specify a scheduler name that matches one of the profiles.
    90  		logger.Error(err, "Error occurred")
    91  		return
    92  	}
    93  	if sched.skipPodSchedule(ctx, fwk, pod) {
    94  		return
    95  	}
    96  
    97  	logger.V(3).Info("Attempting to schedule pod", "pod", klog.KObj(pod))
    98  
    99  	// Synchronously attempt to find a fit for the pod.
   100  	start := time.Now()
   101  	state := framework.NewCycleState()
   102  	state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent)
   103  
   104  	// Initialize an empty podsToActivate struct, which will be filled up by plugins or stay empty.
   105  	podsToActivate := framework.NewPodsToActivate()
   106  	state.Write(framework.PodsToActivateKey, podsToActivate)
   107  
   108  	schedulingCycleCtx, cancel := context.WithCancel(ctx)
   109  	defer cancel()
   110  
   111  	scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
   112  	if !status.IsSuccess() {
   113  		sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
   114  		return
   115  	}
   116  
   117  	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
   118  	go func() {
   119  		bindingCycleCtx, cancel := context.WithCancel(ctx)
   120  		defer cancel()
   121  
   122  		metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
   123  		defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()
   124  
   125  		status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
   126  		if !status.IsSuccess() {
   127  			sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
   128  			return
   129  		}
   130  		// Usually, DonePod is called inside the scheduling queue,
   131  		// but in this case, we need to call it here because this Pod won't go back to the scheduling queue.
   132  		sched.SchedulingQueue.Done(assumedPodInfo.Pod.UID)
   133  	}()
   134  }
   135  
   136  var clearNominatedNode = &framework.NominatingInfo{NominatingMode: framework.ModeOverride, NominatedNodeName: ""}
   137  
   138  // schedulingCycle tries to schedule a single Pod.
   139  func (sched *Scheduler) schedulingCycle(
   140  	ctx context.Context,
   141  	state *framework.CycleState,
   142  	fwk framework.Framework,
   143  	podInfo *framework.QueuedPodInfo,
   144  	start time.Time,
   145  	podsToActivate *framework.PodsToActivate,
   146  ) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
   147  	logger := klog.FromContext(ctx)
   148  	pod := podInfo.Pod
   149  	scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
   150  	if err != nil {
   151  		defer func() {
   152  			metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
   153  		}()
   154  		if err == ErrNoNodesAvailable {
   155  			status := framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err)
   156  			return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status
   157  		}
   158  
   159  		fitError, ok := err.(*framework.FitError)
   160  		if !ok {
   161  			logger.Error(err, "Error selecting node for pod", "pod", klog.KObj(pod))
   162  			return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, framework.AsStatus(err)
   163  		}
   164  
   165  		// SchedulePod() may have failed because the pod would not fit on any host, so we try to
   166  		// preempt, with the expectation that the next time the pod is tried for scheduling it
   167  		// will fit due to the preemption. It is also possible that a different pod will schedule
   168  		// into the resources that were preempted, but this is harmless.
   169  
   170  		if !fwk.HasPostFilterPlugins() {
   171  			logger.V(3).Info("No PostFilter plugins are registered, so no preemption will be performed")
   172  			return ScheduleResult{}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
   173  		}
   174  
   175  		// Run PostFilter plugins to attempt to make the pod schedulable in a future scheduling cycle.
   176  		result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
   177  		msg := status.Message()
   178  		fitError.Diagnosis.PostFilterMsg = msg
   179  		if status.Code() == framework.Error {
   180  			logger.Error(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
   181  		} else {
   182  			logger.V(5).Info("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
   183  		}
   184  
   185  		var nominatingInfo *framework.NominatingInfo
   186  		if result != nil {
   187  			nominatingInfo = result.NominatingInfo
   188  		}
   189  		return ScheduleResult{nominatingInfo: nominatingInfo}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
   190  	}
   191  
   192  	metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
   193  	// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
   194  	// This allows us to keep scheduling without waiting on binding to occur.
   195  	assumedPodInfo := podInfo.DeepCopy()
   196  	assumedPod := assumedPodInfo.Pod
   197  	// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
   198  	err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost)
   199  	if err != nil {
   200  		// This is most probably result of a BUG in retrying logic.
   201  		// We report an error here so that pod scheduling can be retried.
   202  		// This relies on the fact that Error will check if the pod has been bound
   203  		// to a node and if so will not add it back to the unscheduled pods queue
   204  		// (otherwise this would cause an infinite loop).
   205  		return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.AsStatus(err)
   206  	}
   207  
   208  	// Run the Reserve method of reserve plugins.
   209  	if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
   210  		// trigger un-reserve to clean up state associated with the reserved Pod
   211  		fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
   212  		if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
   213  			logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
   214  		}
   215  
   216  		if sts.IsRejected() {
   217  			fitErr := &framework.FitError{
   218  				NumAllNodes: 1,
   219  				Pod:         pod,
   220  				Diagnosis: framework.Diagnosis{
   221  					NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: sts},
   222  				},
   223  			}
   224  			fitErr.Diagnosis.AddPluginStatus(sts)
   225  			return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(sts.Code()).WithError(fitErr)
   226  		}
   227  		return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts
   228  	}
   229  
   230  	// Run "permit" plugins.
   231  	runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
   232  	if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
   233  		// trigger un-reserve to clean up state associated with the reserved Pod
   234  		fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
   235  		if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
   236  			logger.Error(forgetErr, "Scheduler cache ForgetPod failed")
   237  		}
   238  
   239  		if runPermitStatus.IsRejected() {
   240  			fitErr := &framework.FitError{
   241  				NumAllNodes: 1,
   242  				Pod:         pod,
   243  				Diagnosis: framework.Diagnosis{
   244  					NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: runPermitStatus},
   245  				},
   246  			}
   247  			fitErr.Diagnosis.AddPluginStatus(runPermitStatus)
   248  			return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.NewStatus(runPermitStatus.Code()).WithError(fitErr)
   249  		}
   250  
   251  		return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, runPermitStatus
   252  	}
   253  
   254  	// At the end of a successful scheduling cycle, pop and move up Pods if needed.
   255  	if len(podsToActivate.Map) != 0 {
   256  		sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
   257  		// Clear the entries after activation.
   258  		podsToActivate.Map = make(map[string]*v1.Pod)
   259  	}
   260  
   261  	return scheduleResult, assumedPodInfo, nil
   262  }
   263  
   264  // bindingCycle tries to bind an assumed Pod.
   265  func (sched *Scheduler) bindingCycle(
   266  	ctx context.Context,
   267  	state *framework.CycleState,
   268  	fwk framework.Framework,
   269  	scheduleResult ScheduleResult,
   270  	assumedPodInfo *framework.QueuedPodInfo,
   271  	start time.Time,
   272  	podsToActivate *framework.PodsToActivate) *framework.Status {
   273  	logger := klog.FromContext(ctx)
   274  
   275  	assumedPod := assumedPodInfo.Pod
   276  
   277  	// Run "permit" plugins.
   278  	if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
   279  		if status.IsRejected() {
   280  			fitErr := &framework.FitError{
   281  				NumAllNodes: 1,
   282  				Pod:         assumedPodInfo.Pod,
   283  				Diagnosis: framework.Diagnosis{
   284  					NodeToStatusMap:      framework.NodeToStatusMap{scheduleResult.SuggestedHost: status},
   285  					UnschedulablePlugins: sets.New(status.Plugin()),
   286  				},
   287  			}
   288  			return framework.NewStatus(status.Code()).WithError(fitErr)
   289  		}
   290  		return status
   291  	}
   292  
   293  	// Run "prebind" plugins.
   294  	if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
   295  		return status
   296  	}
   297  
   298  	// Run "bind" plugins.
   299  	if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
   300  		return status
   301  	}
   302  
   303  	// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
   304  	logger.V(2).Info("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
   305  	metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
   306  	metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts))
   307  	if assumedPodInfo.InitialAttemptTimestamp != nil {
   308  		metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
   309  		metrics.PodSchedulingSLIDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
   310  	}
   311  	// Run "postbind" plugins.
   312  	fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
   313  
   314  	// At the end of a successful binding cycle, move up Pods if needed.
   315  	if len(podsToActivate.Map) != 0 {
   316  		sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
   317  		// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
   318  		// as `podsToActivate.Map` is no longer consumed.
   319  	}
   320  
   321  	return nil
   322  }
   323  
   324  func (sched *Scheduler) handleBindingCycleError(
   325  	ctx context.Context,
   326  	state *framework.CycleState,
   327  	fwk framework.Framework,
   328  	podInfo *framework.QueuedPodInfo,
   329  	start time.Time,
   330  	scheduleResult ScheduleResult,
   331  	status *framework.Status) {
   332  	logger := klog.FromContext(ctx)
   333  
   334  	assumedPod := podInfo.Pod
   335  	// trigger un-reserve plugins to clean up state associated with the reserved Pod
   336  	fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
   337  	if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
   338  		logger.Error(forgetErr, "scheduler cache ForgetPod failed")
   339  	} else {
   340  		// "Forget"ing an assumed Pod in binding cycle should be treated as a PodDelete event,
   341  		// as the assumed Pod had occupied a certain amount of resources in scheduler cache.
   342  		//
   343  		// Avoid moving the assumed Pod itself as it's always Unschedulable.
   344  		// It's intentional to "defer" this operation; otherwise MoveAllToActiveOrBackoffQueue() would
   345  		// add this event to in-flight events and thus move the assumed pod to backoffQ anyways if the plugins don't have appropriate QueueingHint.
   346  		if status.IsRejected() {
   347  			defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
   348  				return assumedPod.UID != pod.UID
   349  			})
   350  		} else {
   351  			sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, internalqueue.AssignedPodDelete, assumedPod, nil, nil)
   352  		}
   353  	}
   354  
   355  	sched.FailureHandler(ctx, fwk, podInfo, status, clearNominatedNode, start)
   356  }
   357  
   358  func (sched *Scheduler) frameworkForPod(pod *v1.Pod) (framework.Framework, error) {
   359  	fwk, ok := sched.Profiles[pod.Spec.SchedulerName]
   360  	if !ok {
   361  		return nil, fmt.Errorf("profile not found for scheduler name %q", pod.Spec.SchedulerName)
   362  	}
   363  	return fwk, nil
   364  }
   365  
   366  // skipPodSchedule returns true if we could skip scheduling the pod for specified cases.
   367  func (sched *Scheduler) skipPodSchedule(ctx context.Context, fwk framework.Framework, pod *v1.Pod) bool {
   368  	// Case 1: pod is being deleted.
   369  	if pod.DeletionTimestamp != nil {
   370  		fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", "skip schedule deleting pod: %v/%v", pod.Namespace, pod.Name)
   371  		klog.FromContext(ctx).V(3).Info("Skip schedule deleting pod", "pod", klog.KObj(pod))
   372  		return true
   373  	}
   374  
   375  	// Case 2: pod that has been assumed could be skipped.
   376  	// An assumed pod can be added again to the scheduling queue if it got an update event
   377  	// during its previous scheduling cycle but before getting assumed.
   378  	isAssumed, err := sched.Cache.IsAssumedPod(pod)
   379  	if err != nil {
   380  		// TODO(91633): pass ctx into a revised HandleError
   381  		utilruntime.HandleError(fmt.Errorf("failed to check whether pod %s/%s is assumed: %v", pod.Namespace, pod.Name, err))
   382  		return false
   383  	}
   384  	return isAssumed
   385  }
   386  
   387  // schedulePod tries to schedule the given pod to one of the nodes in the node list.
   388  // If it succeeds, it will return the name of the node.
   389  // If it fails, it will return a FitError with reasons.
   390  func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
   391  	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
   392  	defer trace.LogIfLong(100 * time.Millisecond)
   393  	if err := sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot); err != nil {
   394  		return result, err
   395  	}
   396  	trace.Step("Snapshotting scheduler cache and node infos done")
   397  
   398  	if sched.nodeInfoSnapshot.NumNodes() == 0 {
   399  		return result, ErrNoNodesAvailable
   400  	}
   401  
   402  	feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
   403  	if err != nil {
   404  		return result, err
   405  	}
   406  	trace.Step("Computing predicates done")
   407  
   408  	if len(feasibleNodes) == 0 {
   409  		return result, &framework.FitError{
   410  			Pod:         pod,
   411  			NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
   412  			Diagnosis:   diagnosis,
   413  		}
   414  	}
   415  
   416  	// When only one node after predicate, just use it.
   417  	if len(feasibleNodes) == 1 {
   418  		return ScheduleResult{
   419  			SuggestedHost:  feasibleNodes[0].Node().Name,
   420  			EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
   421  			FeasibleNodes:  1,
   422  		}, nil
   423  	}
   424  
   425  	priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
   426  	if err != nil {
   427  		return result, err
   428  	}
   429  
   430  	host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
   431  	trace.Step("Prioritizing done")
   432  
   433  	return ScheduleResult{
   434  		SuggestedHost:  host,
   435  		EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
   436  		FeasibleNodes:  len(feasibleNodes),
   437  	}, err
   438  }
   439  
   440  // Filters the nodes to find the ones that fit the pod based on the framework
   441  // filter plugins and filter extenders.
   442  func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*framework.NodeInfo, framework.Diagnosis, error) {
   443  	logger := klog.FromContext(ctx)
   444  
   445  	allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
   446  	if err != nil {
   447  		return nil, framework.Diagnosis{
   448  			NodeToStatusMap: make(framework.NodeToStatusMap),
   449  		}, err
   450  	}
   451  
   452  	diagnosis := framework.Diagnosis{
   453  		NodeToStatusMap: make(framework.NodeToStatusMap, len(allNodes)),
   454  	}
   455  	// Run "prefilter" plugins.
   456  	preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
   457  	if !s.IsSuccess() {
   458  		if !s.IsRejected() {
   459  			return nil, diagnosis, s.AsError()
   460  		}
   461  		// All nodes in NodeToStatusMap will have the same status so that they can be handled in the preemption.
   462  		// Some non trivial refactoring is needed to avoid this copy.
   463  		for _, n := range allNodes {
   464  			diagnosis.NodeToStatusMap[n.Node().Name] = s
   465  		}
   466  
   467  		// Record the messages from PreFilter in Diagnosis.PreFilterMsg.
   468  		msg := s.Message()
   469  		diagnosis.PreFilterMsg = msg
   470  		logger.V(5).Info("Status after running PreFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
   471  		diagnosis.AddPluginStatus(s)
   472  		return nil, diagnosis, nil
   473  	}
   474  
   475  	// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
   476  	// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
   477  	if len(pod.Status.NominatedNodeName) > 0 {
   478  		feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
   479  		if err != nil {
   480  			logger.Error(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
   481  		}
   482  		// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
   483  		if len(feasibleNodes) != 0 {
   484  			return feasibleNodes, diagnosis, nil
   485  		}
   486  	}
   487  
   488  	nodes := allNodes
   489  	if !preRes.AllNodes() {
   490  		nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
   491  		for _, n := range allNodes {
   492  			if !preRes.NodeNames.Has(n.Node().Name) {
   493  				// We consider Nodes that are filtered out by PreFilterResult as rejected via UnschedulableAndUnresolvable.
   494  				// We have to record them in NodeToStatusMap so that they won't be considered as candidates in the preemption.
   495  				diagnosis.NodeToStatusMap[n.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result")
   496  				continue
   497  			}
   498  			nodes = append(nodes, n)
   499  		}
   500  	}
   501  	feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
   502  	// always try to update the sched.nextStartNodeIndex regardless of whether an error has occurred
   503  	// this is helpful to make sure that all the nodes have a chance to be searched
   504  	processedNodes := len(feasibleNodes) + len(diagnosis.NodeToStatusMap)
   505  	sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)
   506  	if err != nil {
   507  		return nil, diagnosis, err
   508  	}
   509  
   510  	feasibleNodesAfterExtender, err := findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
   511  	if err != nil {
   512  		return nil, diagnosis, err
   513  	}
   514  	if len(feasibleNodesAfterExtender) != len(feasibleNodes) {
   515  		// Extenders filtered out some nodes.
   516  		//
   517  		// Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework.
   518  		// When Extenders reject some Nodes and the pod ends up being unschedulable,
   519  		// we put framework.ExtenderName to pInfo.UnschedulablePlugins.
   520  		// This Pod will be requeued from unschedulable pod pool to activeQ/backoffQ
   521  		// by any kind of cluster events.
   522  		// https://github.com/kubernetes/kubernetes/issues/122019
   523  		if diagnosis.UnschedulablePlugins == nil {
   524  			diagnosis.UnschedulablePlugins = sets.New[string]()
   525  		}
   526  		diagnosis.UnschedulablePlugins.Insert(framework.ExtenderName)
   527  	}
   528  
   529  	return feasibleNodesAfterExtender, diagnosis, nil
   530  }
   531  
   532  func (sched *Scheduler) evaluateNominatedNode(ctx context.Context, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*framework.NodeInfo, error) {
   533  	nnn := pod.Status.NominatedNodeName
   534  	nodeInfo, err := sched.nodeInfoSnapshot.Get(nnn)
   535  	if err != nil {
   536  		return nil, err
   537  	}
   538  	node := []*framework.NodeInfo{nodeInfo}
   539  	feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, node)
   540  	if err != nil {
   541  		return nil, err
   542  	}
   543  
   544  	feasibleNodes, err = findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
   545  	if err != nil {
   546  		return nil, err
   547  	}
   548  
   549  	return feasibleNodes, nil
   550  }
   551  
   552  // hasScoring checks if scoring nodes is configured.
   553  func (sched *Scheduler) hasScoring(fwk framework.Framework) bool {
   554  	if fwk.HasScorePlugins() {
   555  		return true
   556  	}
   557  	for _, extender := range sched.Extenders {
   558  		if extender.IsPrioritizer() {
   559  			return true
   560  		}
   561  	}
   562  	return false
   563  }
   564  
   565  // hasExtenderFilters checks if any extenders filter nodes.
   566  func (sched *Scheduler) hasExtenderFilters() bool {
   567  	for _, extender := range sched.Extenders {
   568  		if extender.IsFilter() {
   569  			return true
   570  		}
   571  	}
   572  	return false
   573  }
   574  
   575  // findNodesThatPassFilters finds the nodes that fit the filter plugins.
   576  func (sched *Scheduler) findNodesThatPassFilters(
   577  	ctx context.Context,
   578  	fwk framework.Framework,
   579  	state *framework.CycleState,
   580  	pod *v1.Pod,
   581  	diagnosis *framework.Diagnosis,
   582  	nodes []*framework.NodeInfo) ([]*framework.NodeInfo, error) {
   583  	numAllNodes := len(nodes)
   584  	numNodesToFind := sched.numFeasibleNodesToFind(fwk.PercentageOfNodesToScore(), int32(numAllNodes))
   585  	if !sched.hasExtenderFilters() && !sched.hasScoring(fwk) {
   586  		numNodesToFind = 1
   587  	}
   588  
   589  	// Create feasible list with enough space to avoid growing it
   590  	// and allow assigning.
   591  	feasibleNodes := make([]*framework.NodeInfo, numNodesToFind)
   592  
   593  	if !fwk.HasFilterPlugins() {
   594  		for i := range feasibleNodes {
   595  			feasibleNodes[i] = nodes[(sched.nextStartNodeIndex+i)%numAllNodes]
   596  		}
   597  		return feasibleNodes, nil
   598  	}
   599  
   600  	errCh := parallelize.NewErrorChannel()
   601  	var feasibleNodesLen int32
   602  	ctx, cancel := context.WithCancel(ctx)
   603  	defer cancel()
   604  
   605  	type nodeStatus struct {
   606  		node   string
   607  		status *framework.Status
   608  	}
   609  	result := make([]*nodeStatus, numAllNodes)
   610  	checkNode := func(i int) {
   611  		// We check the nodes starting from where we left off in the previous scheduling cycle,
   612  		// this is to make sure all nodes have the same chance of being examined across pods.
   613  		nodeInfo := nodes[(sched.nextStartNodeIndex+i)%numAllNodes]
   614  		status := fwk.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
   615  		if status.Code() == framework.Error {
   616  			errCh.SendErrorWithCancel(status.AsError(), cancel)
   617  			return
   618  		}
   619  		if status.IsSuccess() {
   620  			length := atomic.AddInt32(&feasibleNodesLen, 1)
   621  			if length > numNodesToFind {
   622  				cancel()
   623  				atomic.AddInt32(&feasibleNodesLen, -1)
   624  			} else {
   625  				feasibleNodes[length-1] = nodeInfo
   626  			}
   627  		} else {
   628  			result[i] = &nodeStatus{node: nodeInfo.Node().Name, status: status}
   629  		}
   630  	}
   631  
   632  	beginCheckNode := time.Now()
   633  	statusCode := framework.Success
   634  	defer func() {
   635  		// We record Filter extension point latency here instead of in framework.go because framework.RunFilterPlugins
   636  		// function is called for each node, whereas we want to have an overall latency for all nodes per scheduling cycle.
   637  		// Note that this latency also includes latency for `addNominatedPods`, which calls framework.RunPreFilterAddPod.
   638  		metrics.FrameworkExtensionPointDuration.WithLabelValues(metrics.Filter, statusCode.String(), fwk.ProfileName()).Observe(metrics.SinceInSeconds(beginCheckNode))
   639  	}()
   640  
   641  	// Stops searching for more nodes once the configured number of feasible nodes
   642  	// are found.
   643  	fwk.Parallelizer().Until(ctx, numAllNodes, checkNode, metrics.Filter)
   644  	feasibleNodes = feasibleNodes[:feasibleNodesLen]
   645  	for _, item := range result {
   646  		if item == nil {
   647  			continue
   648  		}
   649  		diagnosis.NodeToStatusMap[item.node] = item.status
   650  		diagnosis.AddPluginStatus(item.status)
   651  	}
   652  	if err := errCh.ReceiveError(); err != nil {
   653  		statusCode = framework.Error
   654  		return feasibleNodes, err
   655  	}
   656  	return feasibleNodes, nil
   657  }
   658  
   659  // numFeasibleNodesToFind returns the number of feasible nodes that once found, the scheduler stops
   660  // its search for more feasible nodes.
   661  func (sched *Scheduler) numFeasibleNodesToFind(percentageOfNodesToScore *int32, numAllNodes int32) (numNodes int32) {
   662  	if numAllNodes < minFeasibleNodesToFind {
   663  		return numAllNodes
   664  	}
   665  
   666  	// Use profile percentageOfNodesToScore if it's set. Otherwise, use global percentageOfNodesToScore.
   667  	var percentage int32
   668  	if percentageOfNodesToScore != nil {
   669  		percentage = *percentageOfNodesToScore
   670  	} else {
   671  		percentage = sched.percentageOfNodesToScore
   672  	}
   673  
   674  	if percentage == 0 {
   675  		percentage = int32(50) - numAllNodes/125
   676  		if percentage < minFeasibleNodesPercentageToFind {
   677  			percentage = minFeasibleNodesPercentageToFind
   678  		}
   679  	}
   680  
   681  	numNodes = numAllNodes * percentage / 100
   682  	if numNodes < minFeasibleNodesToFind {
   683  		return minFeasibleNodesToFind
   684  	}
   685  
   686  	return numNodes
   687  }
   688  
   689  func findNodesThatPassExtenders(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, feasibleNodes []*framework.NodeInfo, statuses framework.NodeToStatusMap) ([]*framework.NodeInfo, error) {
   690  	logger := klog.FromContext(ctx)
   691  	// Extenders are called sequentially.
   692  	// Nodes in original feasibleNodes can be excluded in one extender, and pass on to the next
   693  	// extender in a decreasing manner.
   694  	for _, extender := range extenders {
   695  		if len(feasibleNodes) == 0 {
   696  			break
   697  		}
   698  		if !extender.IsInterested(pod) {
   699  			continue
   700  		}
   701  
   702  		// Status of failed nodes in failedAndUnresolvableMap will be added or overwritten in <statuses>,
   703  		// so that the scheduler framework can respect the UnschedulableAndUnresolvable status for
   704  		// particular nodes, and this may eventually improve preemption efficiency.
   705  		// Note: users are recommended to configure the extenders that may return UnschedulableAndUnresolvable
   706  		// status ahead of others.
   707  		feasibleList, failedMap, failedAndUnresolvableMap, err := extender.Filter(pod, feasibleNodes)
   708  		if err != nil {
   709  			if extender.IsIgnorable() {
   710  				logger.Info("Skipping extender as it returned error and has ignorable flag set", "extender", extender, "err", err)
   711  				continue
   712  			}
   713  			return nil, err
   714  		}
   715  
   716  		for failedNodeName, failedMsg := range failedAndUnresolvableMap {
   717  			var aggregatedReasons []string
   718  			if _, found := statuses[failedNodeName]; found {
   719  				aggregatedReasons = statuses[failedNodeName].Reasons()
   720  			}
   721  			aggregatedReasons = append(aggregatedReasons, failedMsg)
   722  			statuses[failedNodeName] = framework.NewStatus(framework.UnschedulableAndUnresolvable, aggregatedReasons...)
   723  		}
   724  
   725  		for failedNodeName, failedMsg := range failedMap {
   726  			if _, found := failedAndUnresolvableMap[failedNodeName]; found {
   727  				// failedAndUnresolvableMap takes precedence over failedMap
   728  				// note that this only happens if the extender returns the node in both maps
   729  				continue
   730  			}
   731  			if _, found := statuses[failedNodeName]; !found {
   732  				statuses[failedNodeName] = framework.NewStatus(framework.Unschedulable, failedMsg)
   733  			} else {
   734  				statuses[failedNodeName].AppendReason(failedMsg)
   735  			}
   736  		}
   737  
   738  		feasibleNodes = feasibleList
   739  	}
   740  	return feasibleNodes, nil
   741  }
   742  
   743  // prioritizeNodes prioritizes the nodes by running the score plugins,
   744  // which return a score for each node from the call to RunScorePlugins().
   745  // The scores from each plugin are added together to make the score for that node, then
   746  // any extenders are run as well.
   747  // All scores are finally combined (added) to get the total weighted scores of all nodes
   748  func prioritizeNodes(
   749  	ctx context.Context,
   750  	extenders []framework.Extender,
   751  	fwk framework.Framework,
   752  	state *framework.CycleState,
   753  	pod *v1.Pod,
   754  	nodes []*framework.NodeInfo,
   755  ) ([]framework.NodePluginScores, error) {
   756  	logger := klog.FromContext(ctx)
   757  	// If no priority configs are provided, then all nodes will have a score of one.
   758  	// This is required to generate the priority list in the required format
   759  	if len(extenders) == 0 && !fwk.HasScorePlugins() {
   760  		result := make([]framework.NodePluginScores, 0, len(nodes))
   761  		for i := range nodes {
   762  			result = append(result, framework.NodePluginScores{
   763  				Name:       nodes[i].Node().Name,
   764  				TotalScore: 1,
   765  			})
   766  		}
   767  		return result, nil
   768  	}
   769  
   770  	// Run PreScore plugins.
   771  	preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
   772  	if !preScoreStatus.IsSuccess() {
   773  		return nil, preScoreStatus.AsError()
   774  	}
   775  
   776  	// Run the Score plugins.
   777  	nodesScores, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
   778  	if !scoreStatus.IsSuccess() {
   779  		return nil, scoreStatus.AsError()
   780  	}
   781  
   782  	// Additional details logged at level 10 if enabled.
   783  	loggerVTen := logger.V(10)
   784  	if loggerVTen.Enabled() {
   785  		for _, nodeScore := range nodesScores {
   786  			for _, pluginScore := range nodeScore.Scores {
   787  				loggerVTen.Info("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", pluginScore.Name, "node", nodeScore.Name, "score", pluginScore.Score)
   788  			}
   789  		}
   790  	}
   791  
   792  	if len(extenders) != 0 && nodes != nil {
   793  		// allNodeExtendersScores has all extenders scores for all nodes.
   794  		// It is keyed with node name.
   795  		allNodeExtendersScores := make(map[string]*framework.NodePluginScores, len(nodes))
   796  		var mu sync.Mutex
   797  		var wg sync.WaitGroup
   798  		for i := range extenders {
   799  			if !extenders[i].IsInterested(pod) {
   800  				continue
   801  			}
   802  			wg.Add(1)
   803  			go func(extIndex int) {
   804  				metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
   805  				defer func() {
   806  					metrics.Goroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
   807  					wg.Done()
   808  				}()
   809  				prioritizedList, weight, err := extenders[extIndex].Prioritize(pod, nodes)
   810  				if err != nil {
   811  					// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
   812  					logger.V(5).Info("Failed to run extender's priority function. No score given by this extender.", "error", err, "pod", klog.KObj(pod), "extender", extenders[extIndex].Name())
   813  					return
   814  				}
   815  				mu.Lock()
   816  				defer mu.Unlock()
   817  				for i := range *prioritizedList {
   818  					nodename := (*prioritizedList)[i].Host
   819  					score := (*prioritizedList)[i].Score
   820  					if loggerVTen.Enabled() {
   821  						loggerVTen.Info("Extender scored node for pod", "pod", klog.KObj(pod), "extender", extenders[extIndex].Name(), "node", nodename, "score", score)
   822  					}
   823  
   824  					// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
   825  					// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
   826  					finalscore := score * weight * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
   827  
   828  					if allNodeExtendersScores[nodename] == nil {
   829  						allNodeExtendersScores[nodename] = &framework.NodePluginScores{
   830  							Name:   nodename,
   831  							Scores: make([]framework.PluginScore, 0, len(extenders)),
   832  						}
   833  					}
   834  					allNodeExtendersScores[nodename].Scores = append(allNodeExtendersScores[nodename].Scores, framework.PluginScore{
   835  						Name:  extenders[extIndex].Name(),
   836  						Score: finalscore,
   837  					})
   838  					allNodeExtendersScores[nodename].TotalScore += finalscore
   839  				}
   840  			}(i)
   841  		}
   842  		// wait for all go routines to finish
   843  		wg.Wait()
   844  		for i := range nodesScores {
   845  			if score, ok := allNodeExtendersScores[nodes[i].Node().Name]; ok {
   846  				nodesScores[i].Scores = append(nodesScores[i].Scores, score.Scores...)
   847  				nodesScores[i].TotalScore += score.TotalScore
   848  			}
   849  		}
   850  	}
   851  
   852  	if loggerVTen.Enabled() {
   853  		for i := range nodesScores {
   854  			loggerVTen.Info("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", nodesScores[i].Name, "score", nodesScores[i].TotalScore)
   855  		}
   856  	}
   857  	return nodesScores, nil
   858  }
   859  
   860  var errEmptyPriorityList = errors.New("empty priorityList")
   861  
   862  // selectHost takes a prioritized list of nodes and then picks one
   863  // in a reservoir sampling manner from the nodes that had the highest score.
   864  // It also returns the top {count} Nodes,
   865  // and the top of the list will be always the selected host.
   866  func selectHost(nodeScoreList []framework.NodePluginScores, count int) (string, []framework.NodePluginScores, error) {
   867  	if len(nodeScoreList) == 0 {
   868  		return "", nil, errEmptyPriorityList
   869  	}
   870  
   871  	var h nodeScoreHeap = nodeScoreList
   872  	heap.Init(&h)
   873  	cntOfMaxScore := 1
   874  	selectedIndex := 0
   875  	// The top of the heap is the NodeScoreResult with the highest score.
   876  	sortedNodeScoreList := make([]framework.NodePluginScores, 0, count)
   877  	sortedNodeScoreList = append(sortedNodeScoreList, heap.Pop(&h).(framework.NodePluginScores))
   878  
   879  	// This for-loop will continue until all Nodes with the highest scores get checked for a reservoir sampling,
   880  	// and sortedNodeScoreList gets (count - 1) elements.
   881  	for ns := heap.Pop(&h).(framework.NodePluginScores); ; ns = heap.Pop(&h).(framework.NodePluginScores) {
   882  		if ns.TotalScore != sortedNodeScoreList[0].TotalScore && len(sortedNodeScoreList) == count {
   883  			break
   884  		}
   885  
   886  		if ns.TotalScore == sortedNodeScoreList[0].TotalScore {
   887  			cntOfMaxScore++
   888  			if rand.Intn(cntOfMaxScore) == 0 {
   889  				// Replace the candidate with probability of 1/cntOfMaxScore
   890  				selectedIndex = cntOfMaxScore - 1
   891  			}
   892  		}
   893  
   894  		sortedNodeScoreList = append(sortedNodeScoreList, ns)
   895  
   896  		if h.Len() == 0 {
   897  			break
   898  		}
   899  	}
   900  
   901  	if selectedIndex != 0 {
   902  		// replace the first one with selected one
   903  		previous := sortedNodeScoreList[0]
   904  		sortedNodeScoreList[0] = sortedNodeScoreList[selectedIndex]
   905  		sortedNodeScoreList[selectedIndex] = previous
   906  	}
   907  
   908  	if len(sortedNodeScoreList) > count {
   909  		sortedNodeScoreList = sortedNodeScoreList[:count]
   910  	}
   911  
   912  	return sortedNodeScoreList[0].Name, sortedNodeScoreList, nil
   913  }
   914  
   915  // nodeScoreHeap is a heap of framework.NodePluginScores.
   916  type nodeScoreHeap []framework.NodePluginScores
   917  
   918  // nodeScoreHeap implements heap.Interface.
   919  var _ heap.Interface = &nodeScoreHeap{}
   920  
   921  func (h nodeScoreHeap) Len() int           { return len(h) }
   922  func (h nodeScoreHeap) Less(i, j int) bool { return h[i].TotalScore > h[j].TotalScore }
   923  func (h nodeScoreHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
   924  
   925  func (h *nodeScoreHeap) Push(x interface{}) {
   926  	*h = append(*h, x.(framework.NodePluginScores))
   927  }
   928  
   929  func (h *nodeScoreHeap) Pop() interface{} {
   930  	old := *h
   931  	n := len(old)
   932  	x := old[n-1]
   933  	*h = old[0 : n-1]
   934  	return x
   935  }
   936  
   937  // assume signals to the cache that a pod is already in the cache, so that binding can be asynchronous.
   938  // assume modifies `assumed`.
   939  func (sched *Scheduler) assume(logger klog.Logger, assumed *v1.Pod, host string) error {
   940  	// Optimistically assume that the binding will succeed and send it to apiserver
   941  	// in the background.
   942  	// If the binding fails, scheduler will release resources allocated to assumed pod
   943  	// immediately.
   944  	assumed.Spec.NodeName = host
   945  
   946  	if err := sched.Cache.AssumePod(logger, assumed); err != nil {
   947  		logger.Error(err, "Scheduler cache AssumePod failed")
   948  		return err
   949  	}
   950  	// if "assumed" is a nominated pod, we should remove it from internal cache
   951  	if sched.SchedulingQueue != nil {
   952  		sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
   953  	}
   954  
   955  	return nil
   956  }
   957  
   958  // bind binds a pod to a given node defined in a binding object.
   959  // The precedence for binding is: (1) extenders and (2) framework plugins.
   960  // We expect this to run asynchronously, so we handle binding metrics internally.
   961  func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (status *framework.Status) {
   962  	logger := klog.FromContext(ctx)
   963  	defer func() {
   964  		sched.finishBinding(logger, fwk, assumed, targetNode, status)
   965  	}()
   966  
   967  	bound, err := sched.extendersBinding(logger, assumed, targetNode)
   968  	if bound {
   969  		return framework.AsStatus(err)
   970  	}
   971  	return fwk.RunBindPlugins(ctx, state, assumed, targetNode)
   972  }
   973  
   974  // TODO(#87159): Move this to a Plugin.
   975  func (sched *Scheduler) extendersBinding(logger klog.Logger, pod *v1.Pod, node string) (bool, error) {
   976  	for _, extender := range sched.Extenders {
   977  		if !extender.IsBinder() || !extender.IsInterested(pod) {
   978  			continue
   979  		}
   980  		err := extender.Bind(&v1.Binding{
   981  			ObjectMeta: metav1.ObjectMeta{Namespace: pod.Namespace, Name: pod.Name, UID: pod.UID},
   982  			Target:     v1.ObjectReference{Kind: "Node", Name: node},
   983  		})
   984  		if err != nil && extender.IsIgnorable() {
   985  			logger.Info("Skipping extender in bind as it returned error and has ignorable flag set", "extender", extender, "err", err)
   986  			continue
   987  		}
   988  		return true, err
   989  	}
   990  	return false, nil
   991  }
   992  
   993  func (sched *Scheduler) finishBinding(logger klog.Logger, fwk framework.Framework, assumed *v1.Pod, targetNode string, status *framework.Status) {
   994  	if finErr := sched.Cache.FinishBinding(logger, assumed); finErr != nil {
   995  		logger.Error(finErr, "Scheduler cache FinishBinding failed")
   996  	}
   997  	if !status.IsSuccess() {
   998  		logger.V(1).Info("Failed to bind pod", "pod", klog.KObj(assumed))
   999  		return
  1000  	}
  1001  
  1002  	fwk.EventRecorder().Eventf(assumed, nil, v1.EventTypeNormal, "Scheduled", "Binding", "Successfully assigned %v/%v to %v", assumed.Namespace, assumed.Name, targetNode)
  1003  }
  1004  
  1005  func getAttemptsLabel(p *framework.QueuedPodInfo) string {
  1006  	// We breakdown the pod scheduling duration by attempts capped to a limit
  1007  	// to avoid ending up with a high cardinality metric.
  1008  	if p.Attempts >= 15 {
  1009  		return "15+"
  1010  	}
  1011  	return strconv.Itoa(p.Attempts)
  1012  }
  1013  
  1014  // handleSchedulingFailure records an event for the pod that indicates the
  1015  // pod has failed to schedule. Also, update the pod condition and nominated node name if set.
  1016  func (sched *Scheduler) handleSchedulingFailure(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time) {
  1017  	calledDone := false
  1018  	defer func() {
  1019  		if !calledDone {
  1020  			// Basically, AddUnschedulableIfNotPresent calls DonePod internally.
  1021  			// But, AddUnschedulableIfNotPresent isn't called in some corner cases.
  1022  			// Here, we call DonePod explicitly to avoid leaking the pod.
  1023  			sched.SchedulingQueue.Done(podInfo.Pod.UID)
  1024  		}
  1025  	}()
  1026  
  1027  	logger := klog.FromContext(ctx)
  1028  	reason := v1.PodReasonSchedulerError
  1029  	if status.IsRejected() {
  1030  		reason = v1.PodReasonUnschedulable
  1031  	}
  1032  
  1033  	switch reason {
  1034  	case v1.PodReasonUnschedulable:
  1035  		metrics.PodUnschedulable(fwk.ProfileName(), metrics.SinceInSeconds(start))
  1036  	case v1.PodReasonSchedulerError:
  1037  		metrics.PodScheduleError(fwk.ProfileName(), metrics.SinceInSeconds(start))
  1038  	}
  1039  
  1040  	pod := podInfo.Pod
  1041  	err := status.AsError()
  1042  	errMsg := status.Message()
  1043  
  1044  	if err == ErrNoNodesAvailable {
  1045  		logger.V(2).Info("Unable to schedule pod; no nodes are registered to the cluster; waiting", "pod", klog.KObj(pod))
  1046  	} else if fitError, ok := err.(*framework.FitError); ok { // Inject UnschedulablePlugins to PodInfo, which will be used later for moving Pods between queues efficiently.
  1047  		podInfo.UnschedulablePlugins = fitError.Diagnosis.UnschedulablePlugins
  1048  		podInfo.PendingPlugins = fitError.Diagnosis.PendingPlugins
  1049  		logger.V(2).Info("Unable to schedule pod; no fit; waiting", "pod", klog.KObj(pod), "err", errMsg)
  1050  	} else {
  1051  		logger.Error(err, "Error scheduling pod; retrying", "pod", klog.KObj(pod))
  1052  	}
  1053  
  1054  	// Check if the Pod exists in informer cache.
  1055  	podLister := fwk.SharedInformerFactory().Core().V1().Pods().Lister()
  1056  	cachedPod, e := podLister.Pods(pod.Namespace).Get(pod.Name)
  1057  	if e != nil {
  1058  		logger.Info("Pod doesn't exist in informer cache", "pod", klog.KObj(pod), "err", e)
  1059  		// We need to call DonePod here because we don't call AddUnschedulableIfNotPresent in this case.
  1060  	} else {
  1061  		// In the case of extender, the pod may have been bound successfully, but timed out returning its response to the scheduler.
  1062  		// It could result in the live version to carry .spec.nodeName, and that's inconsistent with the internal-queued version.
  1063  		if len(cachedPod.Spec.NodeName) != 0 {
  1064  			logger.Info("Pod has been assigned to node. Abort adding it back to queue.", "pod", klog.KObj(pod), "node", cachedPod.Spec.NodeName)
  1065  			// We need to call DonePod here because we don't call AddUnschedulableIfNotPresent in this case.
  1066  		} else {
  1067  			// As <cachedPod> is from SharedInformer, we need to do a DeepCopy() here.
  1068  			// ignore this err since apiserver doesn't properly validate affinity terms
  1069  			// and we can't fix the validation for backwards compatibility.
  1070  			podInfo.PodInfo, _ = framework.NewPodInfo(cachedPod.DeepCopy())
  1071  			if err := sched.SchedulingQueue.AddUnschedulableIfNotPresent(logger, podInfo, sched.SchedulingQueue.SchedulingCycle()); err != nil {
  1072  				logger.Error(err, "Error occurred")
  1073  			}
  1074  			calledDone = true
  1075  		}
  1076  	}
  1077  
  1078  	// Update the scheduling queue with the nominated pod information. Without
  1079  	// this, there would be a race condition between the next scheduling cycle
  1080  	// and the time the scheduler receives a Pod Update for the nominated pod.
  1081  	// Here we check for nil only for tests.
  1082  	if sched.SchedulingQueue != nil {
  1083  		logger := klog.FromContext(ctx)
  1084  		sched.SchedulingQueue.AddNominatedPod(logger, podInfo.PodInfo, nominatingInfo)
  1085  	}
  1086  
  1087  	if err == nil {
  1088  		// Only tests can reach here.
  1089  		return
  1090  	}
  1091  
  1092  	msg := truncateMessage(errMsg)
  1093  	fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)
  1094  	if err := updatePod(ctx, sched.client, pod, &v1.PodCondition{
  1095  		Type:    v1.PodScheduled,
  1096  		Status:  v1.ConditionFalse,
  1097  		Reason:  reason,
  1098  		Message: errMsg,
  1099  	}, nominatingInfo); err != nil {
  1100  		klog.FromContext(ctx).Error(err, "Error updating pod", "pod", klog.KObj(pod))
  1101  	}
  1102  }
  1103  
  1104  // truncateMessage truncates a message if it hits the NoteLengthLimit.
  1105  func truncateMessage(message string) string {
  1106  	max := validation.NoteLengthLimit
  1107  	if len(message) <= max {
  1108  		return message
  1109  	}
  1110  	suffix := " ..."
  1111  	return message[:max-len(suffix)] + suffix
  1112  }
  1113  
  1114  func updatePod(ctx context.Context, client clientset.Interface, pod *v1.Pod, condition *v1.PodCondition, nominatingInfo *framework.NominatingInfo) error {
  1115  	logger := klog.FromContext(ctx)
  1116  	logger.V(3).Info("Updating pod condition", "pod", klog.KObj(pod), "conditionType", condition.Type, "conditionStatus", condition.Status, "conditionReason", condition.Reason)
  1117  	podStatusCopy := pod.Status.DeepCopy()
  1118  	// NominatedNodeName is updated only if we are trying to set it, and the value is
  1119  	// different from the existing one.
  1120  	nnnNeedsUpdate := nominatingInfo.Mode() == framework.ModeOverride && pod.Status.NominatedNodeName != nominatingInfo.NominatedNodeName
  1121  	if !podutil.UpdatePodCondition(podStatusCopy, condition) && !nnnNeedsUpdate {
  1122  		return nil
  1123  	}
  1124  	if nnnNeedsUpdate {
  1125  		podStatusCopy.NominatedNodeName = nominatingInfo.NominatedNodeName
  1126  	}
  1127  	return util.PatchPodStatus(ctx, client, pod, podStatusCopy)
  1128  }
  1129  

View as plain text