    17  package defaultpreemption
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math/rand"
    23  	"sort"
    25  	v1 "k8s.io/api/core/v1"
    26  	policy "k8s.io/api/policy/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/labels"
    29  	"k8s.io/apimachinery/pkg/runtime"
    30  	"k8s.io/client-go/informers"
    31  	corelisters "k8s.io/client-go/listers/core/v1"
    32  	policylisters "k8s.io/client-go/listers/policy/v1"
    33  	corev1helpers "k8s.io/component-helpers/scheduling/corev1"
    34  	"k8s.io/klog/v2"
    35  	extenderv1 "k8s.io/kube-scheduler/extender/v1"
    36  	"k8s.io/kubernetes/pkg/scheduler/apis/config"
    37  	"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
    38  	"k8s.io/kubernetes/pkg/scheduler/framework"
    39  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
    40  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    41  	"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
    42  	"k8s.io/kubernetes/pkg/scheduler/metrics"
    43  	"k8s.io/kubernetes/pkg/scheduler/util"
    44  )
    46  // Name of the plugin used in the plugin registry and configurations.
    47  const Name = names.DefaultPreemption
    49  // DefaultPreemption is a PostFilter plugin implements the preemption logic.
    50  type DefaultPreemption struct {
    51  	fh        framework.Handle
    52  	fts       feature.Features
    53  	args      config.DefaultPreemptionArgs
    54  	podLister corelisters.PodLister
    55  	pdbLister policylisters.PodDisruptionBudgetLister
    56  }
    58  var _ framework.PostFilterPlugin = &DefaultPreemption{}
    60  // Name returns name of the plugin. It is used in logs, etc.
    61  func (pl *DefaultPreemption) Name() string {
    62  	return Name
    63  }
    65  // New initializes a new plugin and returns it.
    66  func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
    67  	args, ok := dpArgs.(*config.DefaultPreemptionArgs)
    68  	if !ok {
    69  		return nil, fmt.Errorf("got args of type %T, want *DefaultPreemptionArgs", dpArgs)
    70  	}
    71  	if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil {
    72  		return nil, err
    73  	}
    74  	pl := DefaultPreemption{
    75  		fh:        fh,
    76  		fts:       fts,
    77  		args:      *args,
    78  		podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(),
    79  		pdbLister: getPDBLister(fh.SharedInformerFactory()),
    80  	}
    81  	return &pl, nil
    82  }
    84  // PostFilter invoked at the postFilter extension point.
    85  func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
    86  	defer func() {
    87  		metrics.PreemptionAttempts.Inc()
    88  	}()
    90  	pe := preemption.Evaluator{
    91  		PluginName: names.DefaultPreemption,
    92  		Handler:    pl.fh,
    93  		PodLister:  pl.podLister,
    94  		PdbLister:  pl.pdbLister,
    95  		State:      state,
    96  		Interface:  pl,
    97  	}
    99  	result, status := pe.Preempt(ctx, pod, m)
   100  	msg := status.Message()
   101  	if len(msg) > 0 {
   102  		return result, framework.NewStatus(status.Code(), "preemption: "+msg)
   103  	}
   104  	return result, status
   105  }
   107  // calculateNumCandidates returns the number of candidates the FindCandidates
   108  // method must produce from dry running based on the constraints given by
   109  // <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of
   110  // candidates returned will never be greater than <numNodes>.
   111  func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
   112  	n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100
   113  	if n < pl.args.MinCandidateNodesAbsolute {
   114  		n = pl.args.MinCandidateNodesAbsolute
   115  	}
   116  	if n > numNodes {
   117  		n = numNodes
   118  	}
   119  	return n
   120  }
   122  // GetOffsetAndNumCandidates chooses a random offset and calculates the number
   123  // of candidates that should be shortlisted for dry running preemption.
   124  func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) {
   125  	return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
   126  }
   128  // This function is not applicable for out-of-tree preemption plugins that exercise
   129  // different preemption candidates on the same nominated node.
   130  func (pl *DefaultPreemption) CandidatesToVictimsMap(candidates []preemption.Candidate) map[string]*extenderv1.Victims {
   131  	m := make(map[string]*extenderv1.Victims, len(candidates))
   132  	for _, c := range candidates {
   133  		m[c.Name()] = c.Victims()
   134  	}
   135  	return m
   136  }
   138  // SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
   139  // for "pod" to be scheduled.
   140  func (pl *DefaultPreemption) SelectVictimsOnNode(
   141  	ctx context.Context,
   142  	state *framework.CycleState,
   143  	pod *v1.Pod,
   144  	nodeInfo *framework.NodeInfo,
   145  	pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {
   146  	logger := klog.FromContext(ctx)
   147  	var potentialVictims []*framework.PodInfo
   148  	removePod := func(rpi *framework.PodInfo) error {
   149  		if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil {
   150  			return err
   151  		}
   152  		status := pl.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
   153  		if !status.IsSuccess() {
   154  			return status.AsError()
   155  		}
   156  		return nil
   157  	}
   158  	addPod := func(api *framework.PodInfo) error {
   159  		nodeInfo.AddPodInfo(api)
   160  		status := pl.fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
   161  		if !status.IsSuccess() {
   162  			return status.AsError()
   163  		}
   164  		return nil
   165  	}
   166  	// As the first step, remove all the lower priority pods from the node and
   167  	// check if the given pod can be scheduled.
   168  	podPriority := corev1helpers.PodPriority(pod)
   169  	for _, pi := range nodeInfo.Pods {
   170  		if corev1helpers.PodPriority(pi.Pod) < podPriority {
   171  			potentialVictims = append(potentialVictims, pi)
   172  			if err := removePod(pi); err != nil {
   173  				return nil, 0, framework.AsStatus(err)
   174  			}
   175  		}
   176  	}
   178  	// No potential victims are found, and so we don't need to evaluate the node again since its state didn't change.
   179  	if len(potentialVictims) == 0 {
   180  		return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, "No preemption victims found for incoming pod")
   181  	}
   183  	// If the new pod does not fit after removing all the lower priority pods,
   184  	// we are almost done and this node is not suitable for preemption. The only
   185  	// condition that we could check is if the "pod" is failing to schedule due to
   186  	// inter-pod affinity to one or more victims, but we have decided not to
   187  	// support this case for performance reasons. Having affinity to lower
   188  	// priority pods is not a recommended configuration anyway.
   189  	if status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {
   190  		return nil, 0, status
   191  	}
   192  	var victims []*v1.Pod
   193  	numViolatingVictim := 0
   194  	sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod) })
   195  	// Try to reprieve as many pods as possible. We first try to reprieve the PDB
   196  	// violating victims and then other non-violating ones. In both cases, we start
   197  	// from the highest priority victims.
   198  	violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
   199  	reprievePod := func(pi *framework.PodInfo) (bool, error) {
   200  		if err := addPod(pi); err != nil {
   201  			return false, err
   202  		}
   203  		status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
   204  		fits := status.IsSuccess()
   205  		if !fits {
   206  			if err := removePod(pi); err != nil {
   207  				return false, err
   208  			}
   209  			rpi := pi.Pod
   210  			victims = append(victims, rpi)
   211  			logger.V(5).Info("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node()))
   212  		}
   213  		return fits, nil
   214  	}
   215  	for _, p := range violatingVictims {
   216  		if fits, err := reprievePod(p); err != nil {
   217  			return nil, 0, framework.AsStatus(err)
   218  		} else if !fits {
   219  			numViolatingVictim++
   220  		}
   221  	}
   222  	// Now we try to reprieve non-violating victims.
   223  	for _, p := range nonViolatingVictims {
   224  		if _, err := reprievePod(p); err != nil {
   225  			return nil, 0, framework.AsStatus(err)
   226  		}
   227  	}
   228  	return victims, numViolatingVictim, framework.NewStatus(framework.Success)
   229  }
   231  // PodEligibleToPreemptOthers returns one bool and one string. The bool
   232  // indicates whether this pod should be considered for preempting other pods or
   233  // not. The string includes the reason if this pod isn't eligible.
   234  // There're several reasons:
   235  //  1. The pod has a preemptionPolicy of Never.
   236  //  2. The pod has already preempted other pods and the victims are in their graceful termination period.
   237  //     Currently we check the node that is nominated for this pod, and as long as there are
   238  //     terminating pods on this node, we don't attempt to preempt more pods.
   239  func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
   240  	if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
   241  		return false, "not eligible due to preemptionPolicy=Never."
   242  	}
   244  	nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos()
   245  	nomNodeName := pod.Status.NominatedNodeName
   246  	if len(nomNodeName) > 0 {
   247  		// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters,
   248  		// then the pod should be considered for preempting again.
   249  		if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
   250  			return true, ""
   251  		}
   253  		if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
   254  			podPriority := corev1helpers.PodPriority(pod)
   255  			for _, p := range nodeInfo.Pods {
   256  				if corev1helpers.PodPriority(p.Pod) < podPriority && podTerminatingByPreemption(p.Pod, pl.fts.EnablePodDisruptionConditions) {
   257  					// There is a terminating pod on the nominated node.
   258  					return false, "not eligible due to a terminating pod on the nominated node."
   259  				}
   260  			}
   261  		}
   262  	}
   263  	return true, ""
   264  }
   266  // OrderedScoreFuncs returns a list of ordered score functions to select preferable node where victims will be preempted.
   267  func (pl *DefaultPreemption) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
   268  	return nil
   269  }
   271  // podTerminatingByPreemption returns the pod's terminating state if feature PodDisruptionConditions is not enabled.
   272  // Otherwise, it additionally checks if the termination state is caused by scheduler preemption.
   273  func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) bool {
   274  	if p.DeletionTimestamp == nil {
   275  		return false
   276  	}
   278  	if !enablePodDisruptionConditions {
   279  		return true
   280  	}
   282  	for _, condition := range p.Status.Conditions {
   283  		if condition.Type == v1.DisruptionTarget {
   284  			return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler
   285  		}
   286  	}
   287  	return false
   288  }
   290  // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
   291  // and "nonViolatingPods" based on whether their PDBs will be violated if they are
   292  // preempted.
   293  // This function is stable and does not change the order of received pods. So, if it
   294  // receives a sorted list, grouping will preserve the order of the input list.
   295  func filterPodsWithPDBViolation(podInfos []*framework.PodInfo, pdbs []*policy.PodDisruptionBudget) (violatingPodInfos, nonViolatingPodInfos []*framework.PodInfo) {
   296  	pdbsAllowed := make([]int32, len(pdbs))
   297  	for i, pdb := range pdbs {
   298  		pdbsAllowed[i] = pdb.Status.DisruptionsAllowed
   299  	}
   301  	for _, podInfo := range podInfos {
   302  		pod := podInfo.Pod
   303  		pdbForPodIsViolated := false
   304  		// A pod with no labels will not match any PDB. So, no need to check.
   305  		if len(pod.Labels) != 0 {
   306  			for i, pdb := range pdbs {
   307  				if pdb.Namespace != pod.Namespace {
   308  					continue
   309  				}
   310  				selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
   311  				if err != nil {
   312  					// This object has an invalid selector, it does not match the pod
   313  					continue
   314  				}
   315  				// A PDB with a nil or empty selector matches nothing.
   316  				if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
   317  					continue
   318  				}
   320  				// Existing in DisruptedPods means it has been processed in API server,
   321  				// we don't treat it as a violating case.
   322  				if _, exist := pdb.Status.DisruptedPods[pod.Name]; exist {
   323  					continue
   324  				}
   325  				// Only decrement the matched pdb when it's not in its <DisruptedPods>;
   326  				// otherwise we may over-decrement the budget number.
   327  				pdbsAllowed[i]--
   328  				// We have found a matching PDB.
   329  				if pdbsAllowed[i] < 0 {
   330  					pdbForPodIsViolated = true
   331  				}
   332  			}
   333  		}
   334  		if pdbForPodIsViolated {
   335  			violatingPodInfos = append(violatingPodInfos, podInfo)
   336  		} else {
   337  			nonViolatingPodInfos = append(nonViolatingPodInfos, podInfo)
   338  		}
   339  	}
   340  	return violatingPodInfos, nonViolatingPodInfos
   341  }
   343  func getPDBLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
   344  	return informerFactory.Policy().V1().PodDisruptionBudgets().Lister()
   345  }

