1
16
17 package preemption
18
19 import (
20 "context"
21 "fmt"
22 "sort"
23 "testing"
24
25 "github.com/google/go-cmp/cmp"
26 v1 "k8s.io/api/core/v1"
27 policy "k8s.io/api/policy/v1"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/util/sets"
30 "k8s.io/client-go/informers"
31 clientsetfake "k8s.io/client-go/kubernetes/fake"
32 "k8s.io/klog/v2/ktesting"
33 extenderv1 "k8s.io/kube-scheduler/extender/v1"
34 "k8s.io/kubernetes/pkg/scheduler/framework"
35 "k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
36 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
37 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
38 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeaffinity"
39 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodename"
40 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodeunschedulable"
41 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
42 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
43 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/tainttoleration"
44 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumebinding"
45 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumerestrictions"
46 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/volumezone"
47 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
48 internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
49 internalqueue "k8s.io/kubernetes/pkg/scheduler/internal/queue"
50 st "k8s.io/kubernetes/pkg/scheduler/testing"
51 tf "k8s.io/kubernetes/pkg/scheduler/testing/framework"
52 )
53
54 var (
55 midPriority, highPriority = int32(100), int32(1000)
56
57 veryLargeRes = map[v1.ResourceName]string{
58 v1.ResourceCPU: "500m",
59 v1.ResourceMemory: "500",
60 }
61 )
62
63 type FakePostFilterPlugin struct {
64 numViolatingVictim int
65 }
66
67 func (pl *FakePostFilterPlugin) SelectVictimsOnNode(
68 ctx context.Context, state *framework.CycleState, pod *v1.Pod,
69 nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) (victims []*v1.Pod, numViolatingVictim int, status *framework.Status) {
70 return append(victims, nodeInfo.Pods[0].Pod), pl.numViolatingVictim, nil
71 }
72
73 func (pl *FakePostFilterPlugin) GetOffsetAndNumCandidates(nodes int32) (int32, int32) {
74 return 0, nodes
75 }
76
77 func (pl *FakePostFilterPlugin) CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims {
78 return nil
79 }
80
81 func (pl *FakePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
82 return true, ""
83 }
84
85 func (pl *FakePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
86 return nil
87 }
88
89 type FakePreemptionScorePostFilterPlugin struct{}
90
91 func (pl *FakePreemptionScorePostFilterPlugin) SelectVictimsOnNode(
92 ctx context.Context, state *framework.CycleState, pod *v1.Pod,
93 nodeInfo *framework.NodeInfo, pdbs []*policy.PodDisruptionBudget) (victims []*v1.Pod, numViolatingVictim int, status *framework.Status) {
94 return append(victims, nodeInfo.Pods[0].Pod), 1, nil
95 }
96
97 func (pl *FakePreemptionScorePostFilterPlugin) GetOffsetAndNumCandidates(nodes int32) (int32, int32) {
98 return 0, nodes
99 }
100
101 func (pl *FakePreemptionScorePostFilterPlugin) CandidatesToVictimsMap(candidates []Candidate) map[string]*extenderv1.Victims {
102 m := make(map[string]*extenderv1.Victims, len(candidates))
103 for _, c := range candidates {
104 m[c.Name()] = c.Victims()
105 }
106 return m
107 }
108
109 func (pl *FakePreemptionScorePostFilterPlugin) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) (bool, string) {
110 return true, ""
111 }
112
113 func (pl *FakePreemptionScorePostFilterPlugin) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
114 return []func(string) int64{
115 func(node string) int64 {
116 var sumContainers int64
117 for _, pod := range nodesToVictims[node].Pods {
118 sumContainers += int64(len(pod.Spec.Containers) + len(pod.Spec.InitContainers))
119 }
120
121 return -sumContainers
122 },
123 }
124 }
125
126 func TestNodesWherePreemptionMightHelp(t *testing.T) {
127
128 nodeNames := []string{"node1", "node2", "node3", "node4"}
129 tests := []struct {
130 name string
131 nodesStatuses framework.NodeToStatusMap
132 expected sets.Set[string]
133 }{
134 {
135 name: "No node should be attempted",
136 nodesStatuses: framework.NodeToStatusMap{
137 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeaffinity.ErrReasonPod),
138 "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
139 "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, tainttoleration.ErrReasonNotMatch),
140 "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, interpodaffinity.ErrReasonAffinityRulesNotMatch),
141 },
142 expected: sets.New[string](),
143 },
144 {
145 name: "ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity",
146 nodesStatuses: framework.NodeToStatusMap{
147 "node1": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch),
148 "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
149 "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnschedulable),
150 },
151 expected: sets.New("node1", "node4"),
152 },
153 {
154 name: "ErrReasonAffinityRulesNotMatch should not be tried as it indicates that the pod is unschedulable due to inter-pod affinity, but ErrReasonAntiAffinityRulesNotMatch should be tried as it indicates that the pod is unschedulable due to inter-pod anti-affinity",
155 nodesStatuses: framework.NodeToStatusMap{
156 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, interpodaffinity.ErrReasonAffinityRulesNotMatch),
157 "node2": framework.NewStatus(framework.Unschedulable, interpodaffinity.ErrReasonAntiAffinityRulesNotMatch),
158 },
159 expected: sets.New("node2", "node3", "node4"),
160 },
161 {
162 name: "Mix of failed predicates works fine",
163 nodesStatuses: framework.NodeToStatusMap{
164 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumerestrictions.ErrReasonDiskConflict),
165 "node2": framework.NewStatus(framework.Unschedulable, fmt.Sprintf("Insufficient %v", v1.ResourceMemory)),
166 },
167 expected: sets.New("node2", "node3", "node4"),
168 },
169 {
170 name: "Node condition errors should be considered unresolvable",
171 nodesStatuses: framework.NodeToStatusMap{
172 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodeunschedulable.ErrReasonUnknownCondition),
173 },
174 expected: sets.New("node2", "node3", "node4"),
175 },
176 {
177 name: "ErrVolume... errors should not be tried as it indicates that the pod is unschedulable due to no matching volumes for pod on node",
178 nodesStatuses: framework.NodeToStatusMap{
179 "node1": framework.NewStatus(framework.UnschedulableAndUnresolvable, volumezone.ErrReasonConflict),
180 "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumebinding.ErrReasonNodeConflict)),
181 "node3": framework.NewStatus(framework.UnschedulableAndUnresolvable, string(volumebinding.ErrReasonBindConflict)),
182 },
183 expected: sets.New("node4"),
184 },
185 {
186 name: "ErrReasonConstraintsNotMatch should be tried as it indicates that the pod is unschedulable due to topology spread constraints",
187 nodesStatuses: framework.NodeToStatusMap{
188 "node1": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch),
189 "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, nodename.ErrReason),
190 "node3": framework.NewStatus(framework.Unschedulable, podtopologyspread.ErrReasonConstraintsNotMatch),
191 },
192 expected: sets.New("node1", "node3", "node4"),
193 },
194 {
195 name: "UnschedulableAndUnresolvable status should be skipped but Unschedulable should be tried",
196 nodesStatuses: framework.NodeToStatusMap{
197 "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
198 "node3": framework.NewStatus(framework.Unschedulable, ""),
199 "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
200 },
201 expected: sets.New("node1", "node3"),
202 },
203 {
204 name: "ErrReasonNodeLabelNotMatch should not be tried as it indicates that the pod is unschedulable due to node doesn't have the required label",
205 nodesStatuses: framework.NodeToStatusMap{
206 "node2": framework.NewStatus(framework.UnschedulableAndUnresolvable, podtopologyspread.ErrReasonNodeLabelNotMatch),
207 "node3": framework.NewStatus(framework.Unschedulable, ""),
208 "node4": framework.NewStatus(framework.UnschedulableAndUnresolvable, ""),
209 },
210 expected: sets.New("node1", "node3"),
211 },
212 }
213
214 for _, tt := range tests {
215 t.Run(tt.name, func(t *testing.T) {
216 var nodeInfos []*framework.NodeInfo
217 for _, name := range nodeNames {
218 ni := framework.NewNodeInfo()
219 ni.SetNode(st.MakeNode().Name(name).Obj())
220 nodeInfos = append(nodeInfos, ni)
221 }
222 nodes, _ := nodesWherePreemptionMightHelp(nodeInfos, tt.nodesStatuses)
223 if len(tt.expected) != len(nodes) {
224 t.Errorf("number of nodes is not the same as expected. exptectd: %d, got: %d. Nodes: %v", len(tt.expected), len(nodes), nodes)
225 }
226 for _, node := range nodes {
227 name := node.Node().Name
228 if _, found := tt.expected[name]; !found {
229 t.Errorf("node %v is not expected.", name)
230 }
231 }
232 })
233 }
234 }
235
236 func TestDryRunPreemption(t *testing.T) {
237 tests := []struct {
238 name string
239 nodes []*v1.Node
240 testPods []*v1.Pod
241 initPods []*v1.Pod
242 numViolatingVictim int
243 expected [][]Candidate
244 }{
245 {
246 name: "no pdb violation",
247 nodes: []*v1.Node{
248 st.MakeNode().Name("node1").Capacity(veryLargeRes).Obj(),
249 st.MakeNode().Name("node2").Capacity(veryLargeRes).Obj(),
250 },
251 testPods: []*v1.Pod{
252 st.MakePod().Name("p").UID("p").Priority(highPriority).Obj(),
253 },
254 initPods: []*v1.Pod{
255 st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
256 st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
257 },
258 expected: [][]Candidate{
259 {
260 &candidate{
261 victims: &extenderv1.Victims{
262 Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj()},
263 },
264 name: "node1",
265 },
266 &candidate{
267 victims: &extenderv1.Victims{
268 Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj()},
269 },
270 name: "node2",
271 },
272 },
273 },
274 },
275 {
276 name: "pdb violation on each node",
277 nodes: []*v1.Node{
278 st.MakeNode().Name("node1").Capacity(veryLargeRes).Obj(),
279 st.MakeNode().Name("node2").Capacity(veryLargeRes).Obj(),
280 },
281 testPods: []*v1.Pod{
282 st.MakePod().Name("p").UID("p").Priority(highPriority).Obj(),
283 },
284 initPods: []*v1.Pod{
285 st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj(),
286 st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj(),
287 },
288 numViolatingVictim: 1,
289 expected: [][]Candidate{
290 {
291 &candidate{
292 victims: &extenderv1.Victims{
293 Pods: []*v1.Pod{st.MakePod().Name("p1").UID("p1").Node("node1").Priority(midPriority).Obj()},
294 NumPDBViolations: 1,
295 },
296 name: "node1",
297 },
298 &candidate{
299 victims: &extenderv1.Victims{
300 Pods: []*v1.Pod{st.MakePod().Name("p2").UID("p2").Node("node2").Priority(midPriority).Obj()},
301 NumPDBViolations: 1,
302 },
303 name: "node2",
304 },
305 },
306 },
307 },
308 }
309
310 for _, tt := range tests {
311 t.Run(tt.name, func(t *testing.T) {
312 logger, _ := ktesting.NewTestContext(t)
313 registeredPlugins := append([]tf.RegisterPluginFunc{
314 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
315 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
316 )
317 var objs []runtime.Object
318 for _, p := range append(tt.testPods, tt.initPods...) {
319 objs = append(objs, p)
320 }
321 for _, n := range tt.nodes {
322 objs = append(objs, n)
323 }
324 informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
325 parallelism := parallelize.DefaultParallelism
326 _, ctx := ktesting.NewTestContext(t)
327 ctx, cancel := context.WithCancel(ctx)
328 defer cancel()
329 fwk, err := tf.NewFramework(
330 ctx,
331 registeredPlugins, "",
332 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
333 frameworkruntime.WithInformerFactory(informerFactory),
334 frameworkruntime.WithParallelism(parallelism),
335 frameworkruntime.WithSnapshotSharedLister(internalcache.NewSnapshot(tt.testPods, tt.nodes)),
336 frameworkruntime.WithLogger(logger),
337 )
338 if err != nil {
339 t.Fatal(err)
340 }
341
342 informerFactory.Start(ctx.Done())
343 informerFactory.WaitForCacheSync(ctx.Done())
344 snapshot := internalcache.NewSnapshot(tt.initPods, tt.nodes)
345 nodeInfos, err := snapshot.NodeInfos().List()
346 if err != nil {
347 t.Fatal(err)
348 }
349 sort.Slice(nodeInfos, func(i, j int) bool {
350 return nodeInfos[i].Node().Name < nodeInfos[j].Node().Name
351 })
352
353 fakePostPlugin := &FakePostFilterPlugin{numViolatingVictim: tt.numViolatingVictim}
354
355 for cycle, pod := range tt.testPods {
356 state := framework.NewCycleState()
357 pe := Evaluator{
358 PluginName: "FakePostFilter",
359 Handler: fwk,
360 Interface: fakePostPlugin,
361 State: state,
362 }
363 got, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
364
365 for i := range got {
366 victims := got[i].Victims().Pods
367 sort.Slice(victims, func(i, j int) bool {
368 return victims[i].Name < victims[j].Name
369 })
370 }
371 sort.Slice(got, func(i, j int) bool {
372 return got[i].Name() < got[j].Name()
373 })
374 if diff := cmp.Diff(tt.expected[cycle], got, cmp.AllowUnexported(candidate{})); diff != "" {
375 t.Errorf("cycle %d: unexpected candidates (-want, +got): %s", cycle, diff)
376 }
377 }
378 })
379 }
380 }
381
382 func TestSelectCandidate(t *testing.T) {
383 tests := []struct {
384 name string
385 nodeNames []string
386 pod *v1.Pod
387 testPods []*v1.Pod
388 expected string
389 }{
390 {
391 name: "pod has different number of containers on each node",
392 nodeNames: []string{"node1", "node2", "node3"},
393 pod: st.MakePod().Name("p").UID("p").Priority(highPriority).Req(veryLargeRes).Obj(),
394 testPods: []*v1.Pod{
395 st.MakePod().Name("p1.1").UID("p1.1").Node("node1").Priority(midPriority).Containers([]v1.Container{
396 st.MakeContainer().Name("container1").Obj(),
397 st.MakeContainer().Name("container2").Obj(),
398 }).Obj(),
399 st.MakePod().Name("p2.1").UID("p2.1").Node("node2").Priority(midPriority).Containers([]v1.Container{
400 st.MakeContainer().Name("container1").Obj(),
401 }).Obj(),
402 st.MakePod().Name("p3.1").UID("p3.1").Node("node3").Priority(midPriority).Containers([]v1.Container{
403 st.MakeContainer().Name("container1").Obj(),
404 st.MakeContainer().Name("container2").Obj(),
405 st.MakeContainer().Name("container3").Obj(),
406 }).Obj(),
407 },
408 expected: "node2",
409 },
410 }
411
412 for _, tt := range tests {
413 t.Run(tt.name, func(t *testing.T) {
414 logger, _ := ktesting.NewTestContext(t)
415 nodes := make([]*v1.Node, len(tt.nodeNames))
416 for i, nodeName := range tt.nodeNames {
417 nodes[i] = st.MakeNode().Name(nodeName).Capacity(veryLargeRes).Obj()
418 }
419 registeredPlugins := append([]tf.RegisterPluginFunc{
420 tf.RegisterQueueSortPlugin(queuesort.Name, queuesort.New)},
421 tf.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
422 )
423 var objs []runtime.Object
424 objs = append(objs, tt.pod)
425 for _, pod := range tt.testPods {
426 objs = append(objs, pod)
427 }
428 informerFactory := informers.NewSharedInformerFactory(clientsetfake.NewSimpleClientset(objs...), 0)
429 snapshot := internalcache.NewSnapshot(tt.testPods, nodes)
430 _, ctx := ktesting.NewTestContext(t)
431 ctx, cancel := context.WithCancel(ctx)
432 defer cancel()
433 fwk, err := tf.NewFramework(
434 ctx,
435 registeredPlugins,
436 "",
437 frameworkruntime.WithPodNominator(internalqueue.NewPodNominator(informerFactory.Core().V1().Pods().Lister())),
438 frameworkruntime.WithSnapshotSharedLister(snapshot),
439 frameworkruntime.WithLogger(logger),
440 )
441 if err != nil {
442 t.Fatal(err)
443 }
444
445 state := framework.NewCycleState()
446
447 if _, status := fwk.RunPreFilterPlugins(ctx, state, tt.pod); !status.IsSuccess() {
448 t.Errorf("Unexpected PreFilter Status: %v", status)
449 }
450 nodeInfos, err := snapshot.NodeInfos().List()
451 if err != nil {
452 t.Fatal(err)
453 }
454
455 fakePreemptionScorePostFilterPlugin := &FakePreemptionScorePostFilterPlugin{}
456
457 for _, pod := range tt.testPods {
458 state := framework.NewCycleState()
459 pe := Evaluator{
460 PluginName: "FakePreemptionScorePostFilter",
461 Handler: fwk,
462 Interface: fakePreemptionScorePostFilterPlugin,
463 State: state,
464 }
465 candidates, _, _ := pe.DryRunPreemption(context.Background(), pod, nodeInfos, nil, 0, int32(len(nodeInfos)))
466 s := pe.SelectCandidate(ctx, candidates)
467 if s == nil || len(s.Name()) == 0 {
468 t.Errorf("expect any node in %v, but no candidate selected", tt.expected)
469 return
470 }
471 if diff := cmp.Diff(tt.expected, s.Name()); diff != "" {
472 t.Errorf("expect any node in %v, but got %v", tt.expected, s.Name())
473 }
474 }
475 })
476 }
477 }
478
View as plain text