/* Copyright 2020 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package defaultpreemption import ( "context" "fmt" "math/rand" "sort" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" extenderv1 "k8s.io/kube-scheduler/extender/v1" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" "k8s.io/kubernetes/pkg/scheduler/framework/preemption" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" ) // Name of the plugin used in the plugin registry and configurations. const Name = names.DefaultPreemption // DefaultPreemption is a PostFilter plugin implements the preemption logic. type DefaultPreemption struct { fh framework.Handle fts feature.Features args config.DefaultPreemptionArgs podLister corelisters.PodLister pdbLister policylisters.PodDisruptionBudgetLister } var _ framework.PostFilterPlugin = &DefaultPreemption{} // Name returns name of the plugin. It is used in logs, etc. func (pl *DefaultPreemption) Name() string { return Name } // New initializes a new plugin and returns it. func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) { args, ok := dpArgs.(*config.DefaultPreemptionArgs) if !ok { return nil, fmt.Errorf("got args of type %T, want *DefaultPreemptionArgs", dpArgs) } if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil { return nil, err } pl := DefaultPreemption{ fh: fh, fts: fts, args: *args, podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(), pdbLister: getPDBLister(fh.SharedInformerFactory()), } return &pl, nil } // PostFilter invoked at the postFilter extension point. func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { defer func() { metrics.PreemptionAttempts.Inc() }() pe := preemption.Evaluator{ PluginName: names.DefaultPreemption, Handler: pl.fh, PodLister: pl.podLister, PdbLister: pl.pdbLister, State: state, Interface: pl, } result, status := pe.Preempt(ctx, pod, m) msg := status.Message() if len(msg) > 0 { return result, framework.NewStatus(status.Code(), "preemption: "+msg) } return result, status } // calculateNumCandidates returns the number of candidates the FindCandidates // method must produce from dry running based on the constraints given by // and . The number of // candidates returned will never be greater than . func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 { n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100 if n < pl.args.MinCandidateNodesAbsolute { n = pl.args.MinCandidateNodesAbsolute } if n > numNodes { n = numNodes } return n } // GetOffsetAndNumCandidates chooses a random offset and calculates the number // of candidates that should be shortlisted for dry running preemption. func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) { return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes) } // This function is not applicable for out-of-tree preemption plugins that exercise // different preemption candidates on the same nominated node. func (pl *DefaultPreemption) CandidatesToVictimsMap(candidates []preemption.Candidate) map[string]*extenderv1.Victims { m := make(map[string]*extenderv1.Victims, len(candidates)) for _, c := range candidates { m[c.Name()] = c.Victims() } return m } // SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room // for "pod" to be scheduled. func (pl *DefaultPreemption) SelectVictimsOnNode( ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) { logger := klog.FromContext(ctx) var potentialVictims []*framework.PodInfo removePod := func(rpi *framework.PodInfo) error { if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil { return err } status := pl.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo) if !status.IsSuccess() { return status.AsError() } return nil } addPod := func(api *framework.PodInfo) error { nodeInfo.AddPodInfo(api) status := pl.fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo) if !status.IsSuccess() { return status.AsError() } return nil } // As the first step, remove all the lower priority pods from the node and // check if the given pod can be scheduled. podPriority := corev1helpers.PodPriority(pod) for _, pi := range nodeInfo.Pods { if corev1helpers.PodPriority(pi.Pod) < podPriority { potentialVictims = append(potentialVictims, pi) if err := removePod(pi); err != nil { return nil, 0, framework.AsStatus(err) } } } // No potential victims are found, and so we don't need to evaluate the node again since its state didn't change. if len(potentialVictims) == 0 { return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, "No preemption victims found for incoming pod") } // If the new pod does not fit after removing all the lower priority pods, // we are almost done and this node is not suitable for preemption. The only // condition that we could check is if the "pod" is failing to schedule due to // inter-pod affinity to one or more victims, but we have decided not to // support this case for performance reasons. Having affinity to lower // priority pods is not a recommended configuration anyway. if status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() { return nil, 0, status } var victims []*v1.Pod numViolatingVictim := 0 sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod) }) // Try to reprieve as many pods as possible. We first try to reprieve the PDB // violating victims and then other non-violating ones. In both cases, we start // from the highest priority victims. violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs) reprievePod := func(pi *framework.PodInfo) (bool, error) { if err := addPod(pi); err != nil { return false, err } status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo) fits := status.IsSuccess() if !fits { if err := removePod(pi); err != nil { return false, err } rpi := pi.Pod victims = append(victims, rpi) logger.V(5).Info("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node())) } return fits, nil } for _, p := range violatingVictims { if fits, err := reprievePod(p); err != nil { return nil, 0, framework.AsStatus(err) } else if !fits { numViolatingVictim++ } } // Now we try to reprieve non-violating victims. for _, p := range nonViolatingVictims { if _, err := reprievePod(p); err != nil { return nil, 0, framework.AsStatus(err) } } return victims, numViolatingVictim, framework.NewStatus(framework.Success) } // PodEligibleToPreemptOthers returns one bool and one string. The bool // indicates whether this pod should be considered for preempting other pods or // not. The string includes the reason if this pod isn't eligible. // There're several reasons: // 1. The pod has a preemptionPolicy of Never. // 2. The pod has already preempted other pods and the victims are in their graceful termination period. // Currently we check the node that is nominated for this pod, and as long as there are // terminating pods on this node, we don't attempt to preempt more pods. func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) { if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever { return false, "not eligible due to preemptionPolicy=Never." } nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos() nomNodeName := pod.Status.NominatedNodeName if len(nomNodeName) > 0 { // If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters, // then the pod should be considered for preempting again. if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable { return true, "" } if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil { podPriority := corev1helpers.PodPriority(pod) for _, p := range nodeInfo.Pods { if corev1helpers.PodPriority(p.Pod) < podPriority && podTerminatingByPreemption(p.Pod, pl.fts.EnablePodDisruptionConditions) { // There is a terminating pod on the nominated node. return false, "not eligible due to a terminating pod on the nominated node." } } } } return true, "" } // OrderedScoreFuncs returns a list of ordered score functions to select preferable node where victims will be preempted. func (pl *DefaultPreemption) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 { return nil } // podTerminatingByPreemption returns the pod's terminating state if feature PodDisruptionConditions is not enabled. // Otherwise, it additionally checks if the termination state is caused by scheduler preemption. func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) bool { if p.DeletionTimestamp == nil { return false } if !enablePodDisruptionConditions { return true } for _, condition := range p.Status.Conditions { if condition.Type == v1.DisruptionTarget { return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler } } return false } // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods" // and "nonViolatingPods" based on whether their PDBs will be violated if they are // preempted. // This function is stable and does not change the order of received pods. So, if it // receives a sorted list, grouping will preserve the order of the input list. func filterPodsWithPDBViolation(podInfos []*framework.PodInfo, pdbs []*policy.PodDisruptionBudget) (violatingPodInfos, nonViolatingPodInfos []*framework.PodInfo) { pdbsAllowed := make([]int32, len(pdbs)) for i, pdb := range pdbs { pdbsAllowed[i] = pdb.Status.DisruptionsAllowed } for _, podInfo := range podInfos { pod := podInfo.Pod pdbForPodIsViolated := false // A pod with no labels will not match any PDB. So, no need to check. if len(pod.Labels) != 0 { for i, pdb := range pdbs { if pdb.Namespace != pod.Namespace { continue } selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector) if err != nil { // This object has an invalid selector, it does not match the pod continue } // A PDB with a nil or empty selector matches nothing. if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) { continue } // Existing in DisruptedPods means it has been processed in API server, // we don't treat it as a violating case. if _, exist := pdb.Status.DisruptedPods[pod.Name]; exist { continue } // Only decrement the matched pdb when it's not in its ; // otherwise we may over-decrement the budget number. pdbsAllowed[i]-- // We have found a matching PDB. if pdbsAllowed[i] < 0 { pdbForPodIsViolated = true } } } if pdbForPodIsViolated { violatingPodInfos = append(violatingPodInfos, podInfo) } else { nonViolatingPodInfos = append(nonViolatingPodInfos, podInfo) } } return violatingPodInfos, nonViolatingPodInfos } func getPDBLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister { return informerFactory.Policy().V1().PodDisruptionBudgets().Lister() }