1
16
17 package scheduling
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "strings"
24 "sync/atomic"
25 "time"
26
27 "github.com/google/uuid"
28 appsv1 "k8s.io/api/apps/v1"
29 v1 "k8s.io/api/core/v1"
30 schedulingv1 "k8s.io/api/scheduling/v1"
31 apierrors "k8s.io/apimachinery/pkg/api/errors"
32 "k8s.io/apimachinery/pkg/api/resource"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/runtime"
35 "k8s.io/apimachinery/pkg/types"
36 "k8s.io/apimachinery/pkg/util/sets"
37 "k8s.io/apimachinery/pkg/util/strategicpatch"
38 "k8s.io/apimachinery/pkg/util/wait"
39 "k8s.io/apimachinery/pkg/watch"
40 clientset "k8s.io/client-go/kubernetes"
41 "k8s.io/client-go/tools/cache"
42 "k8s.io/kubernetes/pkg/apis/scheduling"
43 "k8s.io/kubernetes/test/e2e/framework"
44 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
45 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
46 e2ereplicaset "k8s.io/kubernetes/test/e2e/framework/replicaset"
47 admissionapi "k8s.io/pod-security-admission/api"
48
49 "github.com/onsi/ginkgo/v2"
50 "github.com/onsi/gomega"
51
52
53 _ "github.com/stretchr/testify/assert"
54 )
55
56 type priorityPair struct {
57 name string
58 value int32
59 }
60
61 var testExtendedResource = v1.ResourceName("scheduling.k8s.io/foo")
62
63 const (
64 testFinalizer = "example.com/test-finalizer"
65 )
66
67 var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
68 var cs clientset.Interface
69 var nodeList *v1.NodeList
70 var ns string
71 f := framework.NewDefaultFramework("sched-preemption")
72 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
73
74 lowPriority, mediumPriority, highPriority := int32(1), int32(100), int32(1000)
75 lowPriorityClassName := f.BaseName + "-low-priority"
76 mediumPriorityClassName := f.BaseName + "-medium-priority"
77 highPriorityClassName := f.BaseName + "-high-priority"
78 priorityPairs := []priorityPair{
79 {name: lowPriorityClassName, value: lowPriority},
80 {name: mediumPriorityClassName, value: mediumPriority},
81 {name: highPriorityClassName, value: highPriority},
82 }
83
84 ginkgo.AfterEach(func(ctx context.Context) {
85 for _, pair := range priorityPairs {
86 _ = cs.SchedulingV1().PriorityClasses().Delete(ctx, pair.name, *metav1.NewDeleteOptions(0))
87 }
88 for _, node := range nodeList.Items {
89 nodeCopy := node.DeepCopy()
90 delete(nodeCopy.Status.Capacity, testExtendedResource)
91 delete(nodeCopy.Status.Allocatable, testExtendedResource)
92 err := patchNode(ctx, cs, &node, nodeCopy)
93 framework.ExpectNoError(err)
94 }
95 })
96
97 ginkgo.BeforeEach(func(ctx context.Context) {
98 cs = f.ClientSet
99 ns = f.Namespace.Name
100 nodeList = &v1.NodeList{}
101 var err error
102 for _, pair := range priorityPairs {
103 _, err := f.ClientSet.SchedulingV1().PriorityClasses().Create(ctx, &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: pair.name}, Value: pair.value}, metav1.CreateOptions{})
104 if err != nil && !apierrors.IsAlreadyExists(err) {
105 framework.Failf("expected 'alreadyExists' as error, got instead: %v", err)
106 }
107 }
108
109 e2enode.WaitForTotalHealthy(ctx, cs, time.Minute)
110 nodeList, err = e2enode.GetReadySchedulableNodes(ctx, cs)
111 if err != nil {
112 framework.Logf("Unexpected error occurred: %v", err)
113 }
114 framework.ExpectNoErrorWithOffset(0, err)
115 for _, n := range nodeList.Items {
116 workerNodes.Insert(n.Name)
117 }
118
119 err = framework.CheckTestingNSDeletedExcept(ctx, cs, ns)
120 framework.ExpectNoError(err)
121 })
122
123
130 framework.ConformanceIt("validates basic preemption works", func(ctx context.Context) {
131 var podRes v1.ResourceList
132
133
134 ginkgo.By("Create pods that use 4/5 of node resources.")
135 pods := make([]*v1.Pod, 0, 2*len(nodeList.Items))
136
137
138 for i, node := range nodeList.Items {
139
140 nodeCopy := node.DeepCopy()
141 nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("5")
142 nodeCopy.Status.Allocatable[testExtendedResource] = resource.MustParse("5")
143 err := patchNode(ctx, cs, &node, nodeCopy)
144 framework.ExpectNoError(err)
145
146 for j := 0; j < 2; j++ {
147
148 podRes = v1.ResourceList{}
149 podRes[testExtendedResource] = resource.MustParse("2")
150
151
152 priorityName := mediumPriorityClassName
153 if len(pods) == 0 {
154 priorityName = lowPriorityClassName
155 }
156 pausePod := createPausePod(ctx, f, pausePodConfig{
157 Name: fmt.Sprintf("pod%d-%d-%v", i, j, priorityName),
158 PriorityClassName: priorityName,
159 Resources: &v1.ResourceRequirements{
160 Requests: podRes,
161 Limits: podRes,
162 },
163 Affinity: &v1.Affinity{
164 NodeAffinity: &v1.NodeAffinity{
165 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
166 NodeSelectorTerms: []v1.NodeSelectorTerm{
167 {
168 MatchFields: []v1.NodeSelectorRequirement{
169 {Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}},
170 },
171 },
172 },
173 },
174 },
175 },
176 })
177 pods = append(pods, pausePod)
178 framework.Logf("Created pod: %v", pausePod.Name)
179 }
180 }
181 if len(pods) < 2 {
182 framework.Failf("We need at least two pods to be created but " +
183 "all nodes are already heavily utilized, so preemption tests cannot be run")
184 }
185 ginkgo.By("Wait for pods to be scheduled.")
186 for _, pod := range pods {
187 framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, cs, pod))
188 }
189
190
191 podRes = pods[0].Spec.Containers[0].Resources.Requests
192
193 ginkgo.By("Run a high priority pod that has same requirements as that of lower priority pod")
194
195 runPausePod(ctx, f, pausePodConfig{
196 Name: "preemptor-pod",
197 PriorityClassName: highPriorityClassName,
198 Resources: &v1.ResourceRequirements{
199 Requests: podRes,
200 Limits: podRes,
201 },
202 })
203
204 preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(ctx, pods[0].Name, metav1.GetOptions{})
205 podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
206 (err == nil && preemptedPod.DeletionTimestamp != nil)
207 if !podPreempted {
208 framework.Failf("expected pod to be preempted, instead got pod %+v and error %v", preemptedPod, err)
209 }
210 for i := 1; i < len(pods); i++ {
211 livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(ctx, pods[i].Name, metav1.GetOptions{})
212 framework.ExpectNoError(err)
213 gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
214 }
215 })
216
217
224 framework.ConformanceIt("validates lower priority pod preemption by critical pod", func(ctx context.Context) {
225 var podRes v1.ResourceList
226
227 ginkgo.By("Create pods that use 4/5 of node resources.")
228 pods := make([]*v1.Pod, 0, len(nodeList.Items))
229 for i, node := range nodeList.Items {
230
231 nodeCopy := node.DeepCopy()
232 nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("5")
233 nodeCopy.Status.Allocatable[testExtendedResource] = resource.MustParse("5")
234 err := patchNode(ctx, cs, &node, nodeCopy)
235 framework.ExpectNoError(err)
236
237 for j := 0; j < 2; j++ {
238
239 podRes = v1.ResourceList{}
240 podRes[testExtendedResource] = resource.MustParse("2")
241
242
243 priorityName := mediumPriorityClassName
244 if len(pods) == 0 {
245 priorityName = lowPriorityClassName
246 }
247 pausePod := createPausePod(ctx, f, pausePodConfig{
248 Name: fmt.Sprintf("pod%d-%d-%v", i, j, priorityName),
249 PriorityClassName: priorityName,
250 Resources: &v1.ResourceRequirements{
251 Requests: podRes,
252 Limits: podRes,
253 },
254 Affinity: &v1.Affinity{
255 NodeAffinity: &v1.NodeAffinity{
256 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
257 NodeSelectorTerms: []v1.NodeSelectorTerm{
258 {
259 MatchFields: []v1.NodeSelectorRequirement{
260 {Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}},
261 },
262 },
263 },
264 },
265 },
266 },
267 })
268 pods = append(pods, pausePod)
269 framework.Logf("Created pod: %v", pausePod.Name)
270 }
271 }
272 if len(pods) < 2 {
273 framework.Failf("We need at least two pods to be created but " +
274 "all nodes are already heavily utilized, so preemption tests cannot be run")
275 }
276 ginkgo.By("Wait for pods to be scheduled.")
277 for _, pod := range pods {
278 framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, cs, pod))
279 }
280
281
282 podRes = pods[0].Spec.Containers[0].Resources.Requests
283 ginkgo.By("Run a critical pod that use same resources as that of a lower priority pod")
284
285 defer func() {
286
287
288 err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(ctx, "critical-pod", *metav1.NewDeleteOptions(0))
289 if err != nil && !apierrors.IsNotFound(err) {
290 framework.Failf("Error cleanup pod `%s/%s`: %v", metav1.NamespaceSystem, "critical-pod", err)
291 }
292 }()
293 runPausePod(ctx, f, pausePodConfig{
294 Name: "critical-pod",
295 Namespace: metav1.NamespaceSystem,
296 PriorityClassName: scheduling.SystemClusterCritical,
297 Resources: &v1.ResourceRequirements{
298 Requests: podRes,
299 Limits: podRes,
300 },
301 })
302
303 defer func() {
304
305 err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Delete(ctx, "critical-pod", *metav1.NewDeleteOptions(0))
306 framework.ExpectNoError(err)
307 }()
308
309 preemptedPod, err := cs.CoreV1().Pods(pods[0].Namespace).Get(ctx, pods[0].Name, metav1.GetOptions{})
310 podPreempted := (err != nil && apierrors.IsNotFound(err)) ||
311 (err == nil && preemptedPod.DeletionTimestamp != nil)
312 for i := 1; i < len(pods); i++ {
313 livePod, err := cs.CoreV1().Pods(pods[i].Namespace).Get(ctx, pods[i].Name, metav1.GetOptions{})
314 framework.ExpectNoError(err)
315 gomega.Expect(livePod.DeletionTimestamp).To(gomega.BeNil())
316 }
317
318 if !podPreempted {
319 framework.Failf("expected pod to be preempted, instead got pod %+v and error %v", preemptedPod, err)
320 }
321 })
322
323
324
325
326
327 ginkgo.It("validates pod disruption condition is added to the preempted pod", func(ctx context.Context) {
328 podRes := v1.ResourceList{testExtendedResource: resource.MustParse("1")}
329
330 ginkgo.By("Select a node to run the lower and higher priority pods")
331 gomega.Expect(nodeList.Items).ToNot(gomega.BeEmpty(), "We need at least one node for the test to run")
332 node := nodeList.Items[0]
333 nodeCopy := node.DeepCopy()
334 nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("1")
335 nodeCopy.Status.Allocatable[testExtendedResource] = resource.MustParse("1")
336 err := patchNode(ctx, cs, &node, nodeCopy)
337 framework.ExpectNoError(err)
338
339
340 testNodeAffinity := v1.Affinity{
341 NodeAffinity: &v1.NodeAffinity{
342 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
343 NodeSelectorTerms: []v1.NodeSelectorTerm{
344 {
345 MatchFields: []v1.NodeSelectorRequirement{
346 {Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}},
347 },
348 },
349 },
350 },
351 },
352 }
353
354 ginkgo.By("Create a low priority pod that consumes 1/1 of node resources")
355 victimPod := createPausePod(ctx, f, pausePodConfig{
356 Name: "victim-pod",
357 PriorityClassName: lowPriorityClassName,
358 Resources: &v1.ResourceRequirements{
359 Requests: podRes,
360 Limits: podRes,
361 },
362 Finalizers: []string{testFinalizer},
363 Affinity: &testNodeAffinity,
364 })
365 framework.Logf("Created pod: %v", victimPod.Name)
366
367 ginkgo.By("Wait for the victim pod to be scheduled")
368 framework.ExpectNoError(e2epod.WaitForPodRunningInNamespace(ctx, cs, victimPod))
369
370
371 defer e2epod.NewPodClient(f).RemoveFinalizer(ctx, victimPod.Name, testFinalizer)
372
373 ginkgo.By("Create a high priority pod to trigger preemption of the lower priority pod")
374 preemptorPod := createPausePod(ctx, f, pausePodConfig{
375 Name: "preemptor-pod",
376 PriorityClassName: highPriorityClassName,
377 Resources: &v1.ResourceRequirements{
378 Requests: podRes,
379 Limits: podRes,
380 },
381 Affinity: &testNodeAffinity,
382 })
383 framework.Logf("Created pod: %v", preemptorPod.Name)
384
385 ginkgo.By("Waiting for the victim pod to be terminating")
386 err = e2epod.WaitForPodTerminatingInNamespaceTimeout(ctx, f.ClientSet, victimPod.Name, victimPod.Namespace, framework.PodDeleteTimeout)
387 framework.ExpectNoError(err)
388
389 ginkgo.By("Verifying the pod has the pod disruption condition")
390 e2epod.VerifyPodHasConditionWithType(ctx, f, victimPod, v1.DisruptionTarget)
391 })
392
393 ginkgo.Context("PodTopologySpread Preemption", func() {
394 var nodeNames []string
395 var nodes []*v1.Node
396 topologyKey := "kubernetes.io/e2e-pts-preemption"
397 var fakeRes v1.ResourceName = "example.com/fakePTSRes"
398
399 ginkgo.BeforeEach(func(ctx context.Context) {
400 if len(nodeList.Items) < 2 {
401 ginkgo.Skip("At least 2 nodes are required to run the test")
402 }
403 ginkgo.By("Trying to get 2 available nodes which can run pod")
404 nodeNames = Get2NodesThatCanRunPod(ctx, f)
405 ginkgo.By(fmt.Sprintf("Apply dedicated topologyKey %v for this test on the 2 nodes.", topologyKey))
406 for _, nodeName := range nodeNames {
407 e2enode.AddOrUpdateLabelOnNode(cs, nodeName, topologyKey, nodeName)
408
409 node, err := cs.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
410 framework.ExpectNoError(err)
411
412 ginkgo.By(fmt.Sprintf("Apply 10 fake resource to node %v.", node.Name))
413 nodeCopy := node.DeepCopy()
414 nodeCopy.Status.Capacity[fakeRes] = resource.MustParse("10")
415 nodeCopy.Status.Allocatable[fakeRes] = resource.MustParse("10")
416 err = patchNode(ctx, cs, node, nodeCopy)
417 framework.ExpectNoError(err)
418 nodes = append(nodes, node)
419 }
420 })
421 ginkgo.AfterEach(func(ctx context.Context) {
422 for _, nodeName := range nodeNames {
423 e2enode.RemoveLabelOffNode(cs, nodeName, topologyKey)
424 }
425 for _, node := range nodes {
426 nodeCopy := node.DeepCopy()
427 delete(nodeCopy.Status.Capacity, fakeRes)
428 delete(nodeCopy.Status.Allocatable, fakeRes)
429 err := patchNode(ctx, cs, node, nodeCopy)
430 framework.ExpectNoError(err)
431 }
432 })
433
434 ginkgo.It("validates proper pods are preempted", func(ctx context.Context) {
435 podLabel := "e2e-pts-preemption"
436 nodeAffinity := &v1.Affinity{
437 NodeAffinity: &v1.NodeAffinity{
438 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
439 NodeSelectorTerms: []v1.NodeSelectorTerm{
440 {
441 MatchExpressions: []v1.NodeSelectorRequirement{
442 {
443 Key: topologyKey,
444 Operator: v1.NodeSelectorOpIn,
445 Values: nodeNames,
446 },
447 },
448 },
449 },
450 },
451 },
452 }
453 highPodCfg := pausePodConfig{
454 Name: "high",
455 Namespace: ns,
456 Labels: map[string]string{podLabel: ""},
457 PriorityClassName: highPriorityClassName,
458 Affinity: nodeAffinity,
459 Resources: &v1.ResourceRequirements{
460 Requests: v1.ResourceList{fakeRes: resource.MustParse("9")},
461 Limits: v1.ResourceList{fakeRes: resource.MustParse("9")},
462 },
463 }
464 lowPodCfg := pausePodConfig{
465 Namespace: ns,
466 Labels: map[string]string{podLabel: ""},
467 PriorityClassName: lowPriorityClassName,
468 Affinity: nodeAffinity,
469 Resources: &v1.ResourceRequirements{
470 Requests: v1.ResourceList{fakeRes: resource.MustParse("3")},
471 Limits: v1.ResourceList{fakeRes: resource.MustParse("3")},
472 },
473 }
474
475 ginkgo.By("Create 1 High Pod and 3 Low Pods to occupy 9/10 of fake resources on both nodes.")
476
477 runPausePod(ctx, f, highPodCfg)
478 for i := 1; i <= 3; i++ {
479 lowPodCfg.Name = fmt.Sprintf("low-%v", i)
480 runPausePod(ctx, f, lowPodCfg)
481 }
482
483 ginkgo.By("Create 1 Medium Pod with TopologySpreadConstraints")
484 mediumPodCfg := pausePodConfig{
485 Name: "medium",
486 Namespace: ns,
487 Labels: map[string]string{podLabel: ""},
488 PriorityClassName: mediumPriorityClassName,
489 Affinity: nodeAffinity,
490 Resources: &v1.ResourceRequirements{
491 Requests: v1.ResourceList{fakeRes: resource.MustParse("3")},
492 Limits: v1.ResourceList{fakeRes: resource.MustParse("3")},
493 },
494 TopologySpreadConstraints: []v1.TopologySpreadConstraint{
495 {
496 MaxSkew: 1,
497 TopologyKey: topologyKey,
498 WhenUnsatisfiable: v1.DoNotSchedule,
499 LabelSelector: &metav1.LabelSelector{
500 MatchExpressions: []metav1.LabelSelectorRequirement{
501 {
502 Key: podLabel,
503 Operator: metav1.LabelSelectorOpExists,
504 },
505 },
506 },
507 },
508 },
509 }
510
511
512
513
514 runPausePod(ctx, f, mediumPodCfg)
515
516 ginkgo.By("Verify there are 3 Pods left in this namespace")
517 wantPods := sets.New("high", "medium", "low")
518
519
520
521 pods, err := e2epod.WaitForNumberOfPods(ctx, cs, ns, 3, framework.PollShortTimeout)
522 framework.ExpectNoError(err)
523
524 for _, pod := range pods.Items {
525
526 podName := strings.Split(pod.Name, "-")[0]
527 if wantPods.Has(podName) {
528 ginkgo.By(fmt.Sprintf("Pod %q is as expected to be running.", pod.Name))
529 wantPods.Delete(podName)
530 } else {
531 framework.Failf("Pod %q conflicted with expected PodSet %v", podName, wantPods)
532 }
533 }
534 })
535 })
536
537 ginkgo.Context("PreemptionExecutionPath", func() {
538
539
540 var fakecpu v1.ResourceName = "example.com/fakecpu"
541 var cs clientset.Interface
542 var node *v1.Node
543 var ns, nodeHostNameLabel string
544 f := framework.NewDefaultFramework("sched-preemption-path")
545 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
546
547 priorityPairs := make([]priorityPair, 0)
548
549 ginkgo.AfterEach(func(ctx context.Context) {
550
551 if ginkgo.CurrentSpecReport().Failed() {
552
553 priorityList, err := cs.SchedulingV1().PriorityClasses().List(ctx, metav1.ListOptions{})
554 if err != nil {
555 framework.Logf("Unable to list PriorityClasses: %v", err)
556 } else {
557 framework.Logf("List existing PriorityClasses:")
558 for _, p := range priorityList.Items {
559 framework.Logf("%v/%v created at %v", p.Name, p.Value, p.CreationTimestamp)
560 }
561 }
562 }
563
564 if node != nil {
565 nodeCopy := node.DeepCopy()
566 delete(nodeCopy.Status.Capacity, fakecpu)
567 delete(nodeCopy.Status.Allocatable, fakecpu)
568 err := patchNode(ctx, cs, node, nodeCopy)
569 framework.ExpectNoError(err)
570 }
571 for _, pair := range priorityPairs {
572 _ = cs.SchedulingV1().PriorityClasses().Delete(ctx, pair.name, *metav1.NewDeleteOptions(0))
573 }
574 })
575
576 ginkgo.BeforeEach(func(ctx context.Context) {
577 cs = f.ClientSet
578 ns = f.Namespace.Name
579
580
581 ginkgo.By("Finding an available node")
582 nodeName := GetNodeThatCanRunPod(ctx, f)
583 framework.Logf("found a healthy node: %s", nodeName)
584
585
586 var err error
587 node, err = cs.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
588 if err != nil {
589 framework.Failf("error getting node %q: %v", nodeName, err)
590 }
591 var ok bool
592 nodeHostNameLabel, ok = node.GetObjectMeta().GetLabels()["kubernetes.io/hostname"]
593 if !ok {
594 framework.Failf("error getting kubernetes.io/hostname label on node %s", nodeName)
595 }
596
597
598 nodeCopy := node.DeepCopy()
599 nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
600 nodeCopy.Status.Allocatable[fakecpu] = resource.MustParse("1000")
601 err = patchNode(ctx, cs, node, nodeCopy)
602 framework.ExpectNoError(err)
603
604
605 for i := 1; i <= 4; i++ {
606 priorityName := fmt.Sprintf("p%d", i)
607 priorityVal := int32(i)
608 priorityPairs = append(priorityPairs, priorityPair{name: priorityName, value: priorityVal})
609 _, err := cs.SchedulingV1().PriorityClasses().Create(ctx, &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: priorityName}, Value: priorityVal}, metav1.CreateOptions{})
610 if err != nil {
611 framework.Logf("Failed to create priority '%v/%v'. Reason: %v. Msg: %v", priorityName, priorityVal, apierrors.ReasonForError(err), err)
612 }
613 if err != nil && !apierrors.IsAlreadyExists(err) {
614 framework.Failf("expected 'alreadyExists' as error, got instead: %v", err)
615 }
616 }
617 })
618
619
624 framework.ConformanceIt("runs ReplicaSets to verify preemption running path", func(ctx context.Context) {
625 podNamesSeen := []int32{0, 0, 0}
626
627
628 _, podController := cache.NewInformer(
629 &cache.ListWatch{
630 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
631 obj, err := f.ClientSet.CoreV1().Pods(ns).List(ctx, options)
632 return runtime.Object(obj), err
633 },
634 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
635 return f.ClientSet.CoreV1().Pods(ns).Watch(ctx, options)
636 },
637 },
638 &v1.Pod{},
639 0,
640 cache.ResourceEventHandlerFuncs{
641 AddFunc: func(obj interface{}) {
642 if pod, ok := obj.(*v1.Pod); ok {
643 if strings.HasPrefix(pod.Name, "rs-pod1") {
644 atomic.AddInt32(&podNamesSeen[0], 1)
645 } else if strings.HasPrefix(pod.Name, "rs-pod2") {
646 atomic.AddInt32(&podNamesSeen[1], 1)
647 } else if strings.HasPrefix(pod.Name, "rs-pod3") {
648 atomic.AddInt32(&podNamesSeen[2], 1)
649 }
650 }
651 },
652 },
653 )
654 go podController.Run(ctx.Done())
655
656
657 rsConfs := []pauseRSConfig{
658 {
659 Replicas: int32(1),
660 PodConfig: pausePodConfig{
661 Name: "pod1",
662 Namespace: ns,
663 Labels: map[string]string{"name": "pod1"},
664 PriorityClassName: "p1",
665 NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
666 Resources: &v1.ResourceRequirements{
667 Requests: v1.ResourceList{fakecpu: resource.MustParse("200")},
668 Limits: v1.ResourceList{fakecpu: resource.MustParse("200")},
669 },
670 },
671 },
672 {
673 Replicas: int32(1),
674 PodConfig: pausePodConfig{
675 Name: "pod2",
676 Namespace: ns,
677 Labels: map[string]string{"name": "pod2"},
678 PriorityClassName: "p2",
679 NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
680 Resources: &v1.ResourceRequirements{
681 Requests: v1.ResourceList{fakecpu: resource.MustParse("300")},
682 Limits: v1.ResourceList{fakecpu: resource.MustParse("300")},
683 },
684 },
685 },
686 {
687 Replicas: int32(1),
688 PodConfig: pausePodConfig{
689 Name: "pod3",
690 Namespace: ns,
691 Labels: map[string]string{"name": "pod3"},
692 PriorityClassName: "p3",
693 NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
694 Resources: &v1.ResourceRequirements{
695 Requests: v1.ResourceList{fakecpu: resource.MustParse("450")},
696 Limits: v1.ResourceList{fakecpu: resource.MustParse("450")},
697 },
698 },
699 },
700 }
701
702 for i := range rsConfs {
703 runPauseRS(ctx, f, rsConfs[i])
704 }
705
706 framework.Logf("pods created so far: %v", podNamesSeen)
707 framework.Logf("length of pods created so far: %v", len(podNamesSeen))
708
709
710 preemptorPodConf := pausePodConfig{
711 Name: "pod4",
712 Namespace: ns,
713 Labels: map[string]string{"name": "pod4"},
714 PriorityClassName: "p4",
715 NodeSelector: map[string]string{"kubernetes.io/hostname": nodeHostNameLabel},
716 Resources: &v1.ResourceRequirements{
717 Requests: v1.ResourceList{fakecpu: resource.MustParse("500")},
718 Limits: v1.ResourceList{fakecpu: resource.MustParse("500")},
719 },
720 }
721 preemptorPod := createPod(ctx, f, preemptorPodConf)
722 waitForPreemptingWithTimeout(ctx, f, preemptorPod, framework.PodGetTimeout)
723
724 framework.Logf("pods created so far: %v", podNamesSeen)
725
726
727
728
729
730 expectedRSPods := []int32{1 * 2, 1 * 2, 1}
731 err := wait.PollUntilContextTimeout(ctx, framework.Poll, framework.PollShortTimeout, false, func(ctx context.Context) (bool, error) {
732 for i := 0; i < len(podNamesSeen); i++ {
733 got := atomic.LoadInt32(&podNamesSeen[i])
734 if got < expectedRSPods[i] {
735 framework.Logf("waiting for rs%d to observe %d pod creations, got %d", i+1, expectedRSPods[i], got)
736 return false, nil
737 } else if got > expectedRSPods[i] {
738 return false, fmt.Errorf("rs%d had more than %d pods created: %d", i+1, expectedRSPods[i], got)
739 }
740 }
741 return true, nil
742 })
743 if err != nil {
744 framework.Logf("pods created so far: %v", podNamesSeen)
745 framework.Failf("failed pod observation expectations: %v", err)
746 }
747
748
749
750 time.Sleep(5 * time.Second)
751 for i := 0; i < len(podNamesSeen); i++ {
752 got := atomic.LoadInt32(&podNamesSeen[i])
753 if got < expectedRSPods[i] {
754 framework.Failf("pods of ReplicaSet%d have been under-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
755 } else if got > expectedRSPods[i] {
756 framework.Failf("pods of ReplicaSet%d have been over-preempted: expect %v pod names, but got %d", i+1, expectedRSPods[i], got)
757 }
758 }
759 })
760 })
761
762 ginkgo.Context("PriorityClass endpoints", func() {
763 var cs clientset.Interface
764 f := framework.NewDefaultFramework("sched-preemption-path")
765 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
766 testUUID := uuid.New().String()
767 var pcs []*schedulingv1.PriorityClass
768
769 ginkgo.BeforeEach(func(ctx context.Context) {
770 cs = f.ClientSet
771
772 for i := 1; i <= 2; i++ {
773 name, val := fmt.Sprintf("p%d", i), int32(i)
774 pc, err := cs.SchedulingV1().PriorityClasses().Create(ctx, &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{"e2e": testUUID}}, Value: val}, metav1.CreateOptions{})
775 if err != nil {
776 framework.Logf("Failed to create priority '%v/%v'. Reason: %v. Msg: %v", name, val, apierrors.ReasonForError(err), err)
777 }
778 if err != nil && !apierrors.IsAlreadyExists(err) {
779 framework.Failf("expected 'alreadyExists' as error, got instead: %v", err)
780 }
781 pcs = append(pcs, pc)
782 }
783 })
784
785 ginkgo.AfterEach(func(ctx context.Context) {
786
787 if ginkgo.CurrentSpecReport().Failed() {
788
789 priorityList, err := cs.SchedulingV1().PriorityClasses().List(ctx, metav1.ListOptions{})
790 if err != nil {
791 framework.Logf("Unable to list PriorityClasses: %v", err)
792 } else {
793 framework.Logf("List existing PriorityClasses:")
794 for _, p := range priorityList.Items {
795 framework.Logf("%v/%v created at %v", p.Name, p.Value, p.CreationTimestamp)
796 }
797 }
798 }
799
800
801 err := cs.SchedulingV1().PriorityClasses().DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: fmt.Sprintf("e2e=%v", testUUID)})
802 framework.ExpectNoError(err)
803 })
804
805
812 framework.ConformanceIt("verify PriorityClass endpoints can be operated with different HTTP methods", func(ctx context.Context) {
813
814 pcCopy := pcs[0].DeepCopy()
815 pcCopy.Value = pcCopy.Value * 10
816 err := patchPriorityClass(ctx, cs, pcs[0], pcCopy)
817 gomega.Expect(err).To(gomega.HaveOccurred(), "expect a patch error on an immutable field")
818 framework.Logf("%v", err)
819
820 pcCopy = pcs[1].DeepCopy()
821 pcCopy.Value = pcCopy.Value * 10
822 _, err = cs.SchedulingV1().PriorityClasses().Update(ctx, pcCopy, metav1.UpdateOptions{})
823 gomega.Expect(err).To(gomega.HaveOccurred(), "expect an update error on an immutable field")
824 framework.Logf("%v", err)
825
826
827 newDesc := "updated description"
828 pcCopy = pcs[0].DeepCopy()
829 pcCopy.Description = newDesc
830 err = patchPriorityClass(ctx, cs, pcs[0], pcCopy)
831 framework.ExpectNoError(err)
832
833 pcCopy = pcs[1].DeepCopy()
834 pcCopy.Description = newDesc
835 _, err = cs.SchedulingV1().PriorityClasses().Update(ctx, pcCopy, metav1.UpdateOptions{})
836 framework.ExpectNoError(err)
837
838
839 _, err = cs.SchedulingV1().PriorityClasses().List(ctx, metav1.ListOptions{})
840 framework.ExpectNoError(err)
841
842
843 for _, pc := range pcs {
844 livePC, err := cs.SchedulingV1().PriorityClasses().Get(ctx, pc.Name, metav1.GetOptions{})
845 framework.ExpectNoError(err)
846 gomega.Expect(livePC.Value).To(gomega.Equal(pc.Value))
847 gomega.Expect(livePC.Description).To(gomega.Equal(newDesc))
848 }
849 })
850 })
851 })
852
853 type pauseRSConfig struct {
854 Replicas int32
855 PodConfig pausePodConfig
856 }
857
858 func initPauseRS(f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
859 pausePod := initPausePod(f, conf.PodConfig)
860 pauseRS := &appsv1.ReplicaSet{
861 ObjectMeta: metav1.ObjectMeta{
862 Name: "rs-" + pausePod.Name,
863 Namespace: pausePod.Namespace,
864 },
865 Spec: appsv1.ReplicaSetSpec{
866 Replicas: &conf.Replicas,
867 Selector: &metav1.LabelSelector{
868 MatchLabels: pausePod.Labels,
869 },
870 Template: v1.PodTemplateSpec{
871 ObjectMeta: metav1.ObjectMeta{Labels: pausePod.ObjectMeta.Labels},
872 Spec: pausePod.Spec,
873 },
874 },
875 }
876 return pauseRS
877 }
878
879 func createPauseRS(ctx context.Context, f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
880 namespace := conf.PodConfig.Namespace
881 if len(namespace) == 0 {
882 namespace = f.Namespace.Name
883 }
884 rs, err := f.ClientSet.AppsV1().ReplicaSets(namespace).Create(ctx, initPauseRS(f, conf), metav1.CreateOptions{})
885 framework.ExpectNoError(err)
886 return rs
887 }
888
889 func runPauseRS(ctx context.Context, f *framework.Framework, conf pauseRSConfig) *appsv1.ReplicaSet {
890 rs := createPauseRS(ctx, f, conf)
891 framework.ExpectNoError(e2ereplicaset.WaitForReplicaSetTargetAvailableReplicasWithTimeout(ctx, f.ClientSet, rs, conf.Replicas, framework.PodGetTimeout))
892 return rs
893 }
894
895 func createPod(ctx context.Context, f *framework.Framework, conf pausePodConfig) *v1.Pod {
896 namespace := conf.Namespace
897 if len(namespace) == 0 {
898 namespace = f.Namespace.Name
899 }
900 pod, err := f.ClientSet.CoreV1().Pods(namespace).Create(ctx, initPausePod(f, conf), metav1.CreateOptions{})
901 framework.ExpectNoError(err)
902 return pod
903 }
904
905
906
907 func waitForPreemptingWithTimeout(ctx context.Context, f *framework.Framework, pod *v1.Pod, timeout time.Duration) {
908 err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, false, func(ctx context.Context) (bool, error) {
909 pod, err := f.ClientSet.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
910 if err != nil {
911 return false, err
912 }
913 if len(pod.Spec.NodeName) > 0 {
914 return true, nil
915 }
916 return false, err
917 })
918 framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
919 }
920
921 func patchNode(ctx context.Context, client clientset.Interface, old *v1.Node, new *v1.Node) error {
922 oldData, err := json.Marshal(old)
923 if err != nil {
924 return err
925 }
926
927 newData, err := json.Marshal(new)
928 if err != nil {
929 return err
930 }
931 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
932 if err != nil {
933 return fmt.Errorf("failed to create merge patch for node %q: %w", old.Name, err)
934 }
935 _, err = client.CoreV1().Nodes().Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
936 return err
937 }
938
939 func patchPriorityClass(ctx context.Context, cs clientset.Interface, old, new *schedulingv1.PriorityClass) error {
940 oldData, err := json.Marshal(old)
941 if err != nil {
942 return err
943 }
944
945 newData, err := json.Marshal(new)
946 if err != nil {
947 return err
948 }
949 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &schedulingv1.PriorityClass{})
950 if err != nil {
951 return fmt.Errorf("failed to create merge patch for PriorityClass %q: %w", old.Name, err)
952 }
953 _, err = cs.SchedulingV1().PriorityClasses().Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
954 return err
955 }
956
View as plain text