...

Source file src/k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go

Documentation: k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package defaultpreemption
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math/rand"
    23  	"sort"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	policy "k8s.io/api/policy/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/labels"
    29  	"k8s.io/apimachinery/pkg/runtime"
    30  	"k8s.io/client-go/informers"
    31  	corelisters "k8s.io/client-go/listers/core/v1"
    32  	policylisters "k8s.io/client-go/listers/policy/v1"
    33  	corev1helpers "k8s.io/component-helpers/scheduling/corev1"
    34  	"k8s.io/klog/v2"
    35  	extenderv1 "k8s.io/kube-scheduler/extender/v1"
    36  	"k8s.io/kubernetes/pkg/scheduler/apis/config"
    37  	"k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
    38  	"k8s.io/kubernetes/pkg/scheduler/framework"
    39  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
    40  	"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
    41  	"k8s.io/kubernetes/pkg/scheduler/framework/preemption"
    42  	"k8s.io/kubernetes/pkg/scheduler/metrics"
    43  	"k8s.io/kubernetes/pkg/scheduler/util"
    44  )
    45  
    46  // Name of the plugin used in the plugin registry and configurations.
    47  const Name = names.DefaultPreemption
    48  
    49  // DefaultPreemption is a PostFilter plugin implements the preemption logic.
    50  type DefaultPreemption struct {
    51  	fh        framework.Handle
    52  	fts       feature.Features
    53  	args      config.DefaultPreemptionArgs
    54  	podLister corelisters.PodLister
    55  	pdbLister policylisters.PodDisruptionBudgetLister
    56  }
    57  
    58  var _ framework.PostFilterPlugin = &DefaultPreemption{}
    59  
    60  // Name returns name of the plugin. It is used in logs, etc.
    61  func (pl *DefaultPreemption) Name() string {
    62  	return Name
    63  }
    64  
    65  // New initializes a new plugin and returns it.
    66  func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
    67  	args, ok := dpArgs.(*config.DefaultPreemptionArgs)
    68  	if !ok {
    69  		return nil, fmt.Errorf("got args of type %T, want *DefaultPreemptionArgs", dpArgs)
    70  	}
    71  	if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil {
    72  		return nil, err
    73  	}
    74  	pl := DefaultPreemption{
    75  		fh:        fh,
    76  		fts:       fts,
    77  		args:      *args,
    78  		podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(),
    79  		pdbLister: getPDBLister(fh.SharedInformerFactory()),
    80  	}
    81  	return &pl, nil
    82  }
    83  
    84  // PostFilter invoked at the postFilter extension point.
    85  func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
    86  	defer func() {
    87  		metrics.PreemptionAttempts.Inc()
    88  	}()
    89  
    90  	pe := preemption.Evaluator{
    91  		PluginName: names.DefaultPreemption,
    92  		Handler:    pl.fh,
    93  		PodLister:  pl.podLister,
    94  		PdbLister:  pl.pdbLister,
    95  		State:      state,
    96  		Interface:  pl,
    97  	}
    98  
    99  	result, status := pe.Preempt(ctx, pod, m)
   100  	msg := status.Message()
   101  	if len(msg) > 0 {
   102  		return result, framework.NewStatus(status.Code(), "preemption: "+msg)
   103  	}
   104  	return result, status
   105  }
   106  
   107  // calculateNumCandidates returns the number of candidates the FindCandidates
   108  // method must produce from dry running based on the constraints given by
   109  // <minCandidateNodesPercentage> and <minCandidateNodesAbsolute>. The number of
   110  // candidates returned will never be greater than <numNodes>.
   111  func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
   112  	n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100
   113  	if n < pl.args.MinCandidateNodesAbsolute {
   114  		n = pl.args.MinCandidateNodesAbsolute
   115  	}
   116  	if n > numNodes {
   117  		n = numNodes
   118  	}
   119  	return n
   120  }
   121  
   122  // GetOffsetAndNumCandidates chooses a random offset and calculates the number
   123  // of candidates that should be shortlisted for dry running preemption.
   124  func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) {
   125  	return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
   126  }
   127  
   128  // This function is not applicable for out-of-tree preemption plugins that exercise
   129  // different preemption candidates on the same nominated node.
   130  func (pl *DefaultPreemption) CandidatesToVictimsMap(candidates []preemption.Candidate) map[string]*extenderv1.Victims {
   131  	m := make(map[string]*extenderv1.Victims, len(candidates))
   132  	for _, c := range candidates {
   133  		m[c.Name()] = c.Victims()
   134  	}
   135  	return m
   136  }
   137  
   138  // SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
   139  // for "pod" to be scheduled.
   140  func (pl *DefaultPreemption) SelectVictimsOnNode(
   141  	ctx context.Context,
   142  	state *framework.CycleState,
   143  	pod *v1.Pod,
   144  	nodeInfo *framework.NodeInfo,
   145  	pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {
   146  	logger := klog.FromContext(ctx)
   147  	var potentialVictims []*framework.PodInfo
   148  	removePod := func(rpi *framework.PodInfo) error {
   149  		if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil {
   150  			return err
   151  		}
   152  		status := pl.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
   153  		if !status.IsSuccess() {
   154  			return status.AsError()
   155  		}
   156  		return nil
   157  	}
   158  	addPod := func(api *framework.PodInfo) error {
   159  		nodeInfo.AddPodInfo(api)
   160  		status := pl.fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
   161  		if !status.IsSuccess() {
   162  			return status.AsError()
   163  		}
   164  		return nil
   165  	}
   166  	// As the first step, remove all the lower priority pods from the node and
   167  	// check if the given pod can be scheduled.
   168  	podPriority := corev1helpers.PodPriority(pod)
   169  	for _, pi := range nodeInfo.Pods {
   170  		if corev1helpers.PodPriority(pi.Pod) < podPriority {
   171  			potentialVictims = append(potentialVictims, pi)
   172  			if err := removePod(pi); err != nil {
   173  				return nil, 0, framework.AsStatus(err)
   174  			}
   175  		}
   176  	}
   177  
   178  	// No potential victims are found, and so we don't need to evaluate the node again since its state didn't change.
   179  	if len(potentialVictims) == 0 {
   180  		return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, "No preemption victims found for incoming pod")
   181  	}
   182  
   183  	// If the new pod does not fit after removing all the lower priority pods,
   184  	// we are almost done and this node is not suitable for preemption. The only
   185  	// condition that we could check is if the "pod" is failing to schedule due to
   186  	// inter-pod affinity to one or more victims, but we have decided not to
   187  	// support this case for performance reasons. Having affinity to lower
   188  	// priority pods is not a recommended configuration anyway.
   189  	if status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {
   190  		return nil, 0, status
   191  	}
   192  	var victims []*v1.Pod
   193  	numViolatingVictim := 0
   194  	sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod) })
   195  	// Try to reprieve as many pods as possible. We first try to reprieve the PDB
   196  	// violating victims and then other non-violating ones. In both cases, we start
   197  	// from the highest priority victims.
   198  	violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
   199  	reprievePod := func(pi *framework.PodInfo) (bool, error) {
   200  		if err := addPod(pi); err != nil {
   201  			return false, err
   202  		}
   203  		status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
   204  		fits := status.IsSuccess()
   205  		if !fits {
   206  			if err := removePod(pi); err != nil {
   207  				return false, err
   208  			}
   209  			rpi := pi.Pod
   210  			victims = append(victims, rpi)
   211  			logger.V(5).Info("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node()))
   212  		}
   213  		return fits, nil
   214  	}
   215  	for _, p := range violatingVictims {
   216  		if fits, err := reprievePod(p); err != nil {
   217  			return nil, 0, framework.AsStatus(err)
   218  		} else if !fits {
   219  			numViolatingVictim++
   220  		}
   221  	}
   222  	// Now we try to reprieve non-violating victims.
   223  	for _, p := range nonViolatingVictims {
   224  		if _, err := reprievePod(p); err != nil {
   225  			return nil, 0, framework.AsStatus(err)
   226  		}
   227  	}
   228  	return victims, numViolatingVictim, framework.NewStatus(framework.Success)
   229  }
   230  
   231  // PodEligibleToPreemptOthers returns one bool and one string. The bool
   232  // indicates whether this pod should be considered for preempting other pods or
   233  // not. The string includes the reason if this pod isn't eligible.
   234  // There're several reasons:
   235  //  1. The pod has a preemptionPolicy of Never.
   236  //  2. The pod has already preempted other pods and the victims are in their graceful termination period.
   237  //     Currently we check the node that is nominated for this pod, and as long as there are
   238  //     terminating pods on this node, we don't attempt to preempt more pods.
   239  func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
   240  	if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
   241  		return false, "not eligible due to preemptionPolicy=Never."
   242  	}
   243  
   244  	nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos()
   245  	nomNodeName := pod.Status.NominatedNodeName
   246  	if len(nomNodeName) > 0 {
   247  		// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters,
   248  		// then the pod should be considered for preempting again.
   249  		if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
   250  			return true, ""
   251  		}
   252  
   253  		if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
   254  			podPriority := corev1helpers.PodPriority(pod)
   255  			for _, p := range nodeInfo.Pods {
   256  				if corev1helpers.PodPriority(p.Pod) < podPriority && podTerminatingByPreemption(p.Pod, pl.fts.EnablePodDisruptionConditions) {
   257  					// There is a terminating pod on the nominated node.
   258  					return false, "not eligible due to a terminating pod on the nominated node."
   259  				}
   260  			}
   261  		}
   262  	}
   263  	return true, ""
   264  }
   265  
   266  // OrderedScoreFuncs returns a list of ordered score functions to select preferable node where victims will be preempted.
   267  func (pl *DefaultPreemption) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
   268  	return nil
   269  }
   270  
   271  // podTerminatingByPreemption returns the pod's terminating state if feature PodDisruptionConditions is not enabled.
   272  // Otherwise, it additionally checks if the termination state is caused by scheduler preemption.
   273  func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) bool {
   274  	if p.DeletionTimestamp == nil {
   275  		return false
   276  	}
   277  
   278  	if !enablePodDisruptionConditions {
   279  		return true
   280  	}
   281  
   282  	for _, condition := range p.Status.Conditions {
   283  		if condition.Type == v1.DisruptionTarget {
   284  			return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler
   285  		}
   286  	}
   287  	return false
   288  }
   289  
   290  // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
   291  // and "nonViolatingPods" based on whether their PDBs will be violated if they are
   292  // preempted.
   293  // This function is stable and does not change the order of received pods. So, if it
   294  // receives a sorted list, grouping will preserve the order of the input list.
   295  func filterPodsWithPDBViolation(podInfos []*framework.PodInfo, pdbs []*policy.PodDisruptionBudget) (violatingPodInfos, nonViolatingPodInfos []*framework.PodInfo) {
   296  	pdbsAllowed := make([]int32, len(pdbs))
   297  	for i, pdb := range pdbs {
   298  		pdbsAllowed[i] = pdb.Status.DisruptionsAllowed
   299  	}
   300  
   301  	for _, podInfo := range podInfos {
   302  		pod := podInfo.Pod
   303  		pdbForPodIsViolated := false
   304  		// A pod with no labels will not match any PDB. So, no need to check.
   305  		if len(pod.Labels) != 0 {
   306  			for i, pdb := range pdbs {
   307  				if pdb.Namespace != pod.Namespace {
   308  					continue
   309  				}
   310  				selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
   311  				if err != nil {
   312  					// This object has an invalid selector, it does not match the pod
   313  					continue
   314  				}
   315  				// A PDB with a nil or empty selector matches nothing.
   316  				if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
   317  					continue
   318  				}
   319  
   320  				// Existing in DisruptedPods means it has been processed in API server,
   321  				// we don't treat it as a violating case.
   322  				if _, exist := pdb.Status.DisruptedPods[pod.Name]; exist {
   323  					continue
   324  				}
   325  				// Only decrement the matched pdb when it's not in its <DisruptedPods>;
   326  				// otherwise we may over-decrement the budget number.
   327  				pdbsAllowed[i]--
   328  				// We have found a matching PDB.
   329  				if pdbsAllowed[i] < 0 {
   330  					pdbForPodIsViolated = true
   331  				}
   332  			}
   333  		}
   334  		if pdbForPodIsViolated {
   335  			violatingPodInfos = append(violatingPodInfos, podInfo)
   336  		} else {
   337  			nonViolatingPodInfos = append(nonViolatingPodInfos, podInfo)
   338  		}
   339  	}
   340  	return violatingPodInfos, nonViolatingPodInfos
   341  }
   342  
   343  func getPDBLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
   344  	return informerFactory.Policy().V1().PodDisruptionBudgets().Lister()
   345  }
   346  

View as plain text