1
16
17 package defaultpreemption
18
19 import (
20 "context"
21 "fmt"
22 "math/rand"
23 "sort"
24
25 v1 "k8s.io/api/core/v1"
26 policy "k8s.io/api/policy/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/labels"
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/client-go/informers"
31 corelisters "k8s.io/client-go/listers/core/v1"
32 policylisters "k8s.io/client-go/listers/policy/v1"
33 corev1helpers "k8s.io/component-helpers/scheduling/corev1"
34 "k8s.io/klog/v2"
35 extenderv1 "k8s.io/kube-scheduler/extender/v1"
36 "k8s.io/kubernetes/pkg/scheduler/apis/config"
37 "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
38 "k8s.io/kubernetes/pkg/scheduler/framework"
39 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
40 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
41 "k8s.io/kubernetes/pkg/scheduler/framework/preemption"
42 "k8s.io/kubernetes/pkg/scheduler/metrics"
43 "k8s.io/kubernetes/pkg/scheduler/util"
44 )
45
46
47 const Name = names.DefaultPreemption
48
49
50 type DefaultPreemption struct {
51 fh framework.Handle
52 fts feature.Features
53 args config.DefaultPreemptionArgs
54 podLister corelisters.PodLister
55 pdbLister policylisters.PodDisruptionBudgetLister
56 }
57
58 var _ framework.PostFilterPlugin = &DefaultPreemption{}
59
60
61 func (pl *DefaultPreemption) Name() string {
62 return Name
63 }
64
65
66 func New(_ context.Context, dpArgs runtime.Object, fh framework.Handle, fts feature.Features) (framework.Plugin, error) {
67 args, ok := dpArgs.(*config.DefaultPreemptionArgs)
68 if !ok {
69 return nil, fmt.Errorf("got args of type %T, want *DefaultPreemptionArgs", dpArgs)
70 }
71 if err := validation.ValidateDefaultPreemptionArgs(nil, args); err != nil {
72 return nil, err
73 }
74 pl := DefaultPreemption{
75 fh: fh,
76 fts: fts,
77 args: *args,
78 podLister: fh.SharedInformerFactory().Core().V1().Pods().Lister(),
79 pdbLister: getPDBLister(fh.SharedInformerFactory()),
80 }
81 return &pl, nil
82 }
83
84
85 func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
86 defer func() {
87 metrics.PreemptionAttempts.Inc()
88 }()
89
90 pe := preemption.Evaluator{
91 PluginName: names.DefaultPreemption,
92 Handler: pl.fh,
93 PodLister: pl.podLister,
94 PdbLister: pl.pdbLister,
95 State: state,
96 Interface: pl,
97 }
98
99 result, status := pe.Preempt(ctx, pod, m)
100 msg := status.Message()
101 if len(msg) > 0 {
102 return result, framework.NewStatus(status.Code(), "preemption: "+msg)
103 }
104 return result, status
105 }
106
107
108
109
110
111 func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {
112 n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100
113 if n < pl.args.MinCandidateNodesAbsolute {
114 n = pl.args.MinCandidateNodesAbsolute
115 }
116 if n > numNodes {
117 n = numNodes
118 }
119 return n
120 }
121
122
123
124 func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) {
125 return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
126 }
127
128
129
130 func (pl *DefaultPreemption) CandidatesToVictimsMap(candidates []preemption.Candidate) map[string]*extenderv1.Victims {
131 m := make(map[string]*extenderv1.Victims, len(candidates))
132 for _, c := range candidates {
133 m[c.Name()] = c.Victims()
134 }
135 return m
136 }
137
138
139
140 func (pl *DefaultPreemption) SelectVictimsOnNode(
141 ctx context.Context,
142 state *framework.CycleState,
143 pod *v1.Pod,
144 nodeInfo *framework.NodeInfo,
145 pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {
146 logger := klog.FromContext(ctx)
147 var potentialVictims []*framework.PodInfo
148 removePod := func(rpi *framework.PodInfo) error {
149 if err := nodeInfo.RemovePod(logger, rpi.Pod); err != nil {
150 return err
151 }
152 status := pl.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)
153 if !status.IsSuccess() {
154 return status.AsError()
155 }
156 return nil
157 }
158 addPod := func(api *framework.PodInfo) error {
159 nodeInfo.AddPodInfo(api)
160 status := pl.fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)
161 if !status.IsSuccess() {
162 return status.AsError()
163 }
164 return nil
165 }
166
167
168 podPriority := corev1helpers.PodPriority(pod)
169 for _, pi := range nodeInfo.Pods {
170 if corev1helpers.PodPriority(pi.Pod) < podPriority {
171 potentialVictims = append(potentialVictims, pi)
172 if err := removePod(pi); err != nil {
173 return nil, 0, framework.AsStatus(err)
174 }
175 }
176 }
177
178
179 if len(potentialVictims) == 0 {
180 return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, "No preemption victims found for incoming pod")
181 }
182
183
184
185
186
187
188
189 if status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {
190 return nil, 0, status
191 }
192 var victims []*v1.Pod
193 numViolatingVictim := 0
194 sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod) })
195
196
197
198 violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)
199 reprievePod := func(pi *framework.PodInfo) (bool, error) {
200 if err := addPod(pi); err != nil {
201 return false, err
202 }
203 status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)
204 fits := status.IsSuccess()
205 if !fits {
206 if err := removePod(pi); err != nil {
207 return false, err
208 }
209 rpi := pi.Pod
210 victims = append(victims, rpi)
211 logger.V(5).Info("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node()))
212 }
213 return fits, nil
214 }
215 for _, p := range violatingVictims {
216 if fits, err := reprievePod(p); err != nil {
217 return nil, 0, framework.AsStatus(err)
218 } else if !fits {
219 numViolatingVictim++
220 }
221 }
222
223 for _, p := range nonViolatingVictims {
224 if _, err := reprievePod(p); err != nil {
225 return nil, 0, framework.AsStatus(err)
226 }
227 }
228 return victims, numViolatingVictim, framework.NewStatus(framework.Success)
229 }
230
231
232
233
234
235
236
237
238
239 func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
240 if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
241 return false, "not eligible due to preemptionPolicy=Never."
242 }
243
244 nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos()
245 nomNodeName := pod.Status.NominatedNodeName
246 if len(nomNodeName) > 0 {
247
248
249 if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {
250 return true, ""
251 }
252
253 if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
254 podPriority := corev1helpers.PodPriority(pod)
255 for _, p := range nodeInfo.Pods {
256 if corev1helpers.PodPriority(p.Pod) < podPriority && podTerminatingByPreemption(p.Pod, pl.fts.EnablePodDisruptionConditions) {
257
258 return false, "not eligible due to a terminating pod on the nominated node."
259 }
260 }
261 }
262 }
263 return true, ""
264 }
265
266
267 func (pl *DefaultPreemption) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
268 return nil
269 }
270
271
272
273 func podTerminatingByPreemption(p *v1.Pod, enablePodDisruptionConditions bool) bool {
274 if p.DeletionTimestamp == nil {
275 return false
276 }
277
278 if !enablePodDisruptionConditions {
279 return true
280 }
281
282 for _, condition := range p.Status.Conditions {
283 if condition.Type == v1.DisruptionTarget {
284 return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler
285 }
286 }
287 return false
288 }
289
290
291
292
293
294
295 func filterPodsWithPDBViolation(podInfos []*framework.PodInfo, pdbs []*policy.PodDisruptionBudget) (violatingPodInfos, nonViolatingPodInfos []*framework.PodInfo) {
296 pdbsAllowed := make([]int32, len(pdbs))
297 for i, pdb := range pdbs {
298 pdbsAllowed[i] = pdb.Status.DisruptionsAllowed
299 }
300
301 for _, podInfo := range podInfos {
302 pod := podInfo.Pod
303 pdbForPodIsViolated := false
304
305 if len(pod.Labels) != 0 {
306 for i, pdb := range pdbs {
307 if pdb.Namespace != pod.Namespace {
308 continue
309 }
310 selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
311 if err != nil {
312
313 continue
314 }
315
316 if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
317 continue
318 }
319
320
321
322 if _, exist := pdb.Status.DisruptedPods[pod.Name]; exist {
323 continue
324 }
325
326
327 pdbsAllowed[i]--
328
329 if pdbsAllowed[i] < 0 {
330 pdbForPodIsViolated = true
331 }
332 }
333 }
334 if pdbForPodIsViolated {
335 violatingPodInfos = append(violatingPodInfos, podInfo)
336 } else {
337 nonViolatingPodInfos = append(nonViolatingPodInfos, podInfo)
338 }
339 }
340 return violatingPodInfos, nonViolatingPodInfos
341 }
342
343 func getPDBLister(informerFactory informers.SharedInformerFactory) policylisters.PodDisruptionBudgetLister {
344 return informerFactory.Policy().V1().PodDisruptionBudgets().Lister()
345 }
346
View as plain text