1
16
17 package scheduling
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "sync"
24 "time"
25
26 "github.com/onsi/ginkgo/v2"
27 "github.com/onsi/gomega"
28
29
30 _ "github.com/stretchr/testify/assert"
31
32 v1 "k8s.io/api/core/v1"
33 "k8s.io/apimachinery/pkg/api/resource"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/labels"
36 "k8s.io/apimachinery/pkg/util/errors"
37 "k8s.io/apimachinery/pkg/util/uuid"
38 "k8s.io/apimachinery/pkg/util/wait"
39 clientset "k8s.io/client-go/kubernetes"
40 v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
41 schedutil "k8s.io/kubernetes/pkg/scheduler/util"
42 "k8s.io/kubernetes/test/e2e/framework"
43 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
44 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
45 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
46 testutils "k8s.io/kubernetes/test/utils"
47 admissionapi "k8s.io/pod-security-admission/api"
48 )
49
50
51 type Resource struct {
52 MilliCPU int64
53 Memory int64
54 }
55
56 var balancePodLabel = map[string]string{"podname": "priority-balanced-memory"}
57
58
59
60
61 var crioMinMemLimit = 12 * 1024 * 1024
62
63 var podRequestedResource = &v1.ResourceRequirements{
64 Limits: v1.ResourceList{
65 v1.ResourceMemory: resource.MustParse("100Mi"),
66 v1.ResourceCPU: resource.MustParse("100m"),
67 },
68 Requests: v1.ResourceList{
69 v1.ResourceMemory: resource.MustParse("100Mi"),
70 v1.ResourceCPU: resource.MustParse("100m"),
71 },
72 }
73
74
75
76
77 func nodesAreTooUtilized(ctx context.Context, cs clientset.Interface, nodeList *v1.NodeList) bool {
78 nodeNameToPodList := podListForEachNode(ctx, cs)
79 for _, node := range nodeList.Items {
80 _, memFraction, _, memAllocatable := computeCPUMemFraction(node, podRequestedResource, nodeNameToPodList[node.Name])
81 if float64(memAllocatable)-(memFraction*float64(memAllocatable)) < float64(2*crioMinMemLimit) {
82 return true
83 }
84 }
85 return false
86 }
87
88
89 var _ = SIGDescribe("SchedulerPriorities", framework.WithSerial(), func() {
90 var cs clientset.Interface
91 var nodeList *v1.NodeList
92 var systemPodsNo int
93 var ns string
94 f := framework.NewDefaultFramework("sched-priority")
95 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
96
97 ginkgo.BeforeEach(func(ctx context.Context) {
98 cs = f.ClientSet
99 ns = f.Namespace.Name
100 nodeList = &v1.NodeList{}
101 var err error
102
103 e2enode.WaitForTotalHealthy(ctx, cs, time.Minute)
104 nodeList, err = e2enode.GetReadySchedulableNodes(ctx, cs)
105 if err != nil {
106 framework.Logf("Unexpected error occurred: %v", err)
107 }
108 framework.ExpectNoErrorWithOffset(0, err)
109
110 err = framework.CheckTestingNSDeletedExcept(ctx, cs, ns)
111 framework.ExpectNoError(err)
112 err = e2epod.WaitForPodsRunningReady(ctx, cs, metav1.NamespaceSystem, int32(systemPodsNo), 0, framework.PodReadyBeforeTimeout)
113 framework.ExpectNoError(err)
114
115
116
117 if nodesAreTooUtilized(ctx, cs, nodeList) {
118 ginkgo.Skip("nodes are too utilized to schedule test pods")
119 }
120 })
121
122 ginkgo.It("Pod should be scheduled to node that don't match the PodAntiAffinity terms", func(ctx context.Context) {
123
124 e2eskipper.SkipUnlessNodeCountIsAtLeast(2)
125
126 ginkgo.By("Trying to launch a pod with a label to get a node which can launch it.")
127 pod := runPausePod(ctx, f, pausePodConfig{
128 Name: "pod-with-label-security-s1",
129 Labels: map[string]string{"security": "S1"},
130 })
131 nodeName := pod.Spec.NodeName
132
133 k := v1.LabelHostname
134 ginkgo.By("Verifying the node has a label " + k)
135 node, err := cs.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
136 framework.ExpectNoError(err)
137 if _, hasLabel := node.Labels[k]; !hasLabel {
138
139
140 ginkgo.By("Trying to apply a label on the found node.")
141 k = "kubernetes.io/e2e-node-topologyKey"
142 v := "topologyvalue1"
143 e2enode.AddOrUpdateLabelOnNode(cs, nodeName, k, v)
144 e2enode.ExpectNodeHasLabel(ctx, cs, nodeName, k, v)
145 defer e2enode.RemoveLabelOffNode(cs, nodeName, k)
146
147 ginkgo.By("Trying to apply a label on other nodes.")
148 v = "topologyvalue2"
149 for _, node := range nodeList.Items {
150 if node.Name != nodeName {
151 e2enode.AddOrUpdateLabelOnNode(cs, node.Name, k, v)
152 e2enode.ExpectNodeHasLabel(ctx, cs, node.Name, k, v)
153 defer e2enode.RemoveLabelOffNode(cs, node.Name, k)
154 }
155 }
156 }
157
158
159 err = createBalancedPodForNodes(ctx, f, cs, ns, nodeList.Items, podRequestedResource, 0.6)
160 framework.ExpectNoError(err)
161 ginkgo.By("Trying to launch the pod with podAntiAffinity.")
162 labelPodName := "pod-with-pod-antiaffinity"
163 pod = createPausePod(ctx, f, pausePodConfig{
164 Resources: podRequestedResource,
165 Name: labelPodName,
166 Affinity: &v1.Affinity{
167 PodAntiAffinity: &v1.PodAntiAffinity{
168 PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{
169 {
170 PodAffinityTerm: v1.PodAffinityTerm{
171 LabelSelector: &metav1.LabelSelector{
172 MatchExpressions: []metav1.LabelSelectorRequirement{
173 {
174 Key: "security",
175 Operator: metav1.LabelSelectorOpIn,
176 Values: []string{"S1", "value2"},
177 },
178 {
179 Key: "security",
180 Operator: metav1.LabelSelectorOpNotIn,
181 Values: []string{"S2"},
182 }, {
183 Key: "security",
184 Operator: metav1.LabelSelectorOpExists,
185 },
186 },
187 },
188 TopologyKey: k,
189 Namespaces: []string{ns},
190 },
191 Weight: 10,
192 },
193 },
194 },
195 },
196 })
197 ginkgo.By("Wait the pod becomes running")
198 framework.ExpectNoError(e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name))
199 labelPod, err := cs.CoreV1().Pods(ns).Get(ctx, labelPodName, metav1.GetOptions{})
200 framework.ExpectNoError(err)
201 ginkgo.By("Verify the pod was scheduled to the expected node.")
202 gomega.Expect(labelPod.Spec.NodeName).ToNot(gomega.Equal(nodeName))
203 })
204
205 ginkgo.It("Pod should be preferably scheduled to nodes pod can tolerate", func(ctx context.Context) {
206
207 err := createBalancedPodForNodes(ctx, f, cs, ns, nodeList.Items, podRequestedResource, 0.5)
208 framework.ExpectNoError(err)
209
210 nodeName := nodeList.Items[0].Name
211
212
213
214 tolerableTaints := make([]v1.Taint, 0)
215 var tolerations []v1.Toleration
216 for i := 0; i < 10; i++ {
217 testTaint := getRandomTaint()
218 tolerableTaints = append(tolerableTaints, testTaint)
219 tolerations = append(tolerations, v1.Toleration{Key: testTaint.Key, Value: testTaint.Value, Effect: testTaint.Effect})
220 }
221
222 intolerableTaints := make(map[string][]v1.Taint)
223 for i := 1; i < len(nodeList.Items); i++ {
224 nodeTaints := make([]v1.Taint, 0)
225 for i := 0; i < 10; i++ {
226 nodeTaints = append(nodeTaints, getRandomTaint())
227 }
228 intolerableTaints[nodeList.Items[i].Name] = nodeTaints
229 }
230
231
232 ginkgo.By("Trying to apply 10 (tolerable) taints on the first node.")
233
234
235 ginkgo.DeferCleanup(e2enode.RemoveTaintsOffNode, cs, nodeName, tolerableTaints)
236 for _, taint := range tolerableTaints {
237 addTaintToNode(ctx, cs, nodeName, taint)
238 }
239
240 ginkgo.By("Adding 10 intolerable taints to all other nodes")
241 for i := 1; i < len(nodeList.Items); i++ {
242 node := nodeList.Items[i]
243 ginkgo.DeferCleanup(e2enode.RemoveTaintsOffNode, cs, node.Name, intolerableTaints[node.Name])
244 for _, taint := range intolerableTaints[node.Name] {
245 addTaintToNode(ctx, cs, node.Name, taint)
246 }
247 }
248
249 tolerationPodName := "with-tolerations"
250 ginkgo.By("Create a pod that tolerates all the taints of the first node.")
251 pod := createPausePod(ctx, f, pausePodConfig{
252 Name: tolerationPodName,
253 Tolerations: tolerations,
254 })
255 framework.ExpectNoError(e2epod.WaitForPodNameRunningInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name))
256
257 ginkgo.By("Pod should prefer scheduled to the node that pod can tolerate.")
258 tolePod, err := cs.CoreV1().Pods(ns).Get(ctx, tolerationPodName, metav1.GetOptions{})
259 framework.ExpectNoError(err)
260 gomega.Expect(tolePod.Spec.NodeName).To(gomega.Equal(nodeName))
261 })
262
263 ginkgo.Context("PodTopologySpread Scoring", func() {
264 var nodeNames []string
265 topologyKey := "kubernetes.io/e2e-pts-score"
266
267 ginkgo.BeforeEach(func(ctx context.Context) {
268 if len(nodeList.Items) < 2 {
269 ginkgo.Skip("At least 2 nodes are required to run the test")
270 }
271 ginkgo.By("Trying to get 2 available nodes which can run pod")
272 nodeNames = Get2NodesThatCanRunPod(ctx, f)
273 ginkgo.By(fmt.Sprintf("Apply dedicated topologyKey %v for this test on the 2 nodes.", topologyKey))
274 for _, nodeName := range nodeNames {
275 e2enode.AddOrUpdateLabelOnNode(cs, nodeName, topologyKey, nodeName)
276 }
277 })
278 ginkgo.AfterEach(func() {
279 for _, nodeName := range nodeNames {
280 e2enode.RemoveLabelOffNode(cs, nodeName, topologyKey)
281 }
282 })
283
284 ginkgo.It("validates pod should be preferably scheduled to node which makes the matching pods more evenly distributed", func(ctx context.Context) {
285 var nodes []v1.Node
286 for _, nodeName := range nodeNames {
287 node, err := cs.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
288 framework.ExpectNoError(err)
289 nodes = append(nodes, *node)
290 }
291
292
293 err := createBalancedPodForNodes(ctx, f, cs, ns, nodes, podRequestedResource, 0.5)
294 framework.ExpectNoError(err)
295
296 replicas := 4
297 podLabel := "e2e-pts-score"
298 ginkgo.By(fmt.Sprintf("Run a ReplicaSet with %v replicas on node %q", replicas, nodeNames[0]))
299 rsConfig := pauseRSConfig{
300 Replicas: int32(replicas),
301 PodConfig: pausePodConfig{
302 Name: podLabel,
303 Namespace: ns,
304 Labels: map[string]string{podLabel: "foo"},
305 NodeSelector: map[string]string{topologyKey: nodeNames[0]},
306 },
307 }
308 runPauseRS(ctx, f, rsConfig)
309
310
311 podCfg := pausePodConfig{
312 Name: "test-pod",
313 Namespace: ns,
314
315
316 Labels: map[string]string{podLabel: "bar"},
317 Affinity: &v1.Affinity{
318 NodeAffinity: &v1.NodeAffinity{
319 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
320 NodeSelectorTerms: []v1.NodeSelectorTerm{
321 {
322 MatchExpressions: []v1.NodeSelectorRequirement{
323 {
324 Key: topologyKey,
325 Operator: v1.NodeSelectorOpIn,
326 Values: nodeNames,
327 },
328 },
329 },
330 },
331 },
332 },
333 },
334 TopologySpreadConstraints: []v1.TopologySpreadConstraint{
335 {
336 MaxSkew: 1,
337 TopologyKey: topologyKey,
338 WhenUnsatisfiable: v1.ScheduleAnyway,
339 LabelSelector: &metav1.LabelSelector{
340 MatchExpressions: []metav1.LabelSelectorRequirement{
341 {
342 Key: podLabel,
343 Operator: metav1.LabelSelectorOpExists,
344 },
345 },
346 },
347 },
348 },
349 }
350 testPod := runPausePod(ctx, f, podCfg)
351 ginkgo.By(fmt.Sprintf("Verifying if the test-pod lands on node %q", nodeNames[1]))
352 gomega.Expect(testPod.Spec.NodeName).To(gomega.Equal(nodeNames[1]))
353 })
354 })
355 })
356
357
358 func createBalancedPodForNodes(ctx context.Context, f *framework.Framework, cs clientset.Interface, ns string, nodes []v1.Node, requestedResource *v1.ResourceRequirements, ratio float64) error {
359 cleanUp := func(ctx context.Context) {
360
361 err := cs.CoreV1().Pods(ns).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{
362 LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(),
363 })
364 if err != nil {
365 framework.Logf("Failed to delete memory balanced pods: %v.", err)
366 } else {
367 err := wait.PollUntilContextTimeout(ctx, 2*time.Second, time.Minute, true, func(ctx context.Context) (bool, error) {
368 podList, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{
369 LabelSelector: labels.SelectorFromSet(labels.Set(balancePodLabel)).String(),
370 })
371 if err != nil {
372 framework.Logf("Failed to list memory balanced pods: %v.", err)
373 return false, nil
374 }
375 if len(podList.Items) > 0 {
376 return false, nil
377 }
378 return true, nil
379 })
380 if err != nil {
381 framework.Logf("Failed to wait until all memory balanced pods are deleted: %v.", err)
382 }
383 }
384 }
385 ginkgo.DeferCleanup(cleanUp)
386
387
388 var maxCPUFraction, maxMemFraction float64 = ratio, ratio
389 var cpuFractionMap = make(map[string]float64)
390 var memFractionMap = make(map[string]float64)
391
392
393 nodeNameToPodList := podListForEachNode(ctx, cs)
394
395 for _, node := range nodes {
396 cpuFraction, memFraction, _, _ := computeCPUMemFraction(node, requestedResource, nodeNameToPodList[node.Name])
397 cpuFractionMap[node.Name] = cpuFraction
398 memFractionMap[node.Name] = memFraction
399 if cpuFraction > maxCPUFraction {
400 maxCPUFraction = cpuFraction
401 }
402 if memFraction > maxMemFraction {
403 maxMemFraction = memFraction
404 }
405 }
406
407 errChan := make(chan error, len(nodes))
408 var wg sync.WaitGroup
409
410
411 ratio = math.Max(maxCPUFraction, maxMemFraction)
412 for _, node := range nodes {
413 memAllocatable, found := node.Status.Allocatable[v1.ResourceMemory]
414 if !found {
415 framework.Failf("Node %v: node.Status.Allocatable %v does not contain entry %v", node.Name, node.Status.Allocatable, v1.ResourceMemory)
416 }
417 memAllocatableVal := memAllocatable.Value()
418
419 cpuAllocatable, found := node.Status.Allocatable[v1.ResourceCPU]
420 if !found {
421 framework.Failf("Node %v: node.Status.Allocatable %v does not contain entry %v", node.Name, node.Status.Allocatable, v1.ResourceCPU)
422 }
423 cpuAllocatableMil := cpuAllocatable.MilliValue()
424
425 needCreateResource := v1.ResourceList{}
426 cpuFraction := cpuFractionMap[node.Name]
427 memFraction := memFractionMap[node.Name]
428 needCreateResource[v1.ResourceCPU] = *resource.NewMilliQuantity(int64((ratio-cpuFraction)*float64(cpuAllocatableMil)), resource.DecimalSI)
429
430
431 needCreateResource[v1.ResourceMemory] = *resource.NewQuantity(int64((ratio-memFraction)*float64(memAllocatableVal)+float64(crioMinMemLimit)), resource.BinarySI)
432
433 podConfig := &pausePodConfig{
434 Name: "",
435 Labels: balancePodLabel,
436 Resources: &v1.ResourceRequirements{
437 Requests: needCreateResource,
438 },
439 Affinity: &v1.Affinity{
440 NodeAffinity: &v1.NodeAffinity{
441 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
442 NodeSelectorTerms: []v1.NodeSelectorTerm{
443 {
444 MatchFields: []v1.NodeSelectorRequirement{
445 {Key: "metadata.name", Operator: v1.NodeSelectorOpIn, Values: []string{node.Name}},
446 },
447 },
448 },
449 },
450 },
451 },
452 }
453 wg.Add(1)
454 go func() {
455 defer wg.Done()
456 err := testutils.StartPods(cs, 1, ns, string(uuid.NewUUID()),
457 *initPausePod(f, *podConfig), true, framework.Logf)
458 if err != nil {
459 errChan <- err
460 }
461 }()
462 }
463 wg.Wait()
464 close(errChan)
465 var errs []error
466 for err := range errChan {
467 if err != nil {
468 errs = append(errs, err)
469 }
470 }
471 if len(errs) > 0 {
472 return errors.NewAggregate(errs)
473 }
474
475 nodeNameToPodList = podListForEachNode(ctx, cs)
476 for _, node := range nodes {
477 ginkgo.By("Compute Cpu, Mem Fraction after create balanced pods.")
478 computeCPUMemFraction(node, requestedResource, nodeNameToPodList[node.Name])
479 }
480
481 return nil
482 }
483
484 func podListForEachNode(ctx context.Context, cs clientset.Interface) map[string][]*v1.Pod {
485 nodeNameToPodList := make(map[string][]*v1.Pod)
486 allPods, err := cs.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
487 if err != nil {
488 framework.Failf("Expect error of invalid, got : %v", err)
489 }
490 for i, pod := range allPods.Items {
491 nodeName := pod.Spec.NodeName
492 nodeNameToPodList[nodeName] = append(nodeNameToPodList[nodeName], &allPods.Items[i])
493 }
494 return nodeNameToPodList
495 }
496
497 func computeCPUMemFraction(node v1.Node, resource *v1.ResourceRequirements, pods []*v1.Pod) (float64, float64, int64, int64) {
498 framework.Logf("ComputeCPUMemFraction for node: %v", node.Name)
499 totalRequestedCPUResource := resource.Requests.Cpu().MilliValue()
500 totalRequestedMemResource := resource.Requests.Memory().Value()
501
502 for _, pod := range pods {
503 framework.Logf("Pod for on the node: %v, Cpu: %v, Mem: %v", pod.Name, getNonZeroRequests(pod).MilliCPU, getNonZeroRequests(pod).Memory)
504
505 if v1qos.GetPodQOS(pod) == v1.PodQOSBestEffort {
506 continue
507 }
508 totalRequestedCPUResource += getNonZeroRequests(pod).MilliCPU
509 totalRequestedMemResource += getNonZeroRequests(pod).Memory
510 }
511
512 cpuAllocatable, found := node.Status.Allocatable[v1.ResourceCPU]
513 if !found {
514 framework.Failf("Node %v: node.Status.Allocatable %v does not contain entry %v", node.Name, node.Status.Allocatable, v1.ResourceCPU)
515 }
516 cpuAllocatableMil := cpuAllocatable.MilliValue()
517
518 floatOne := float64(1)
519 cpuFraction := float64(totalRequestedCPUResource) / float64(cpuAllocatableMil)
520 if cpuFraction > floatOne {
521 cpuFraction = floatOne
522 }
523 memAllocatable, found := node.Status.Allocatable[v1.ResourceMemory]
524 if !found {
525 framework.Failf("Node %v: node.Status.Allocatable %v does not contain entry %v", node.Name, node.Status.Allocatable, v1.ResourceMemory)
526 }
527 memAllocatableVal := memAllocatable.Value()
528 memFraction := float64(totalRequestedMemResource) / float64(memAllocatableVal)
529 if memFraction > floatOne {
530 memFraction = floatOne
531 }
532
533 framework.Logf("Node: %v, totalRequestedCPUResource: %v, cpuAllocatableMil: %v, cpuFraction: %v", node.Name, totalRequestedCPUResource, cpuAllocatableMil, cpuFraction)
534 framework.Logf("Node: %v, totalRequestedMemResource: %v, memAllocatableVal: %v, memFraction: %v", node.Name, totalRequestedMemResource, memAllocatableVal, memFraction)
535
536 return cpuFraction, memFraction, cpuAllocatableMil, memAllocatableVal
537 }
538
539 func getNonZeroRequests(pod *v1.Pod) Resource {
540 result := Resource{}
541 for i := range pod.Spec.Containers {
542 container := &pod.Spec.Containers[i]
543 cpu, memory := schedutil.GetNonzeroRequests(&container.Resources.Requests)
544 result.MilliCPU += cpu
545 result.Memory += memory
546 }
547 return result
548 }
549
550 func getRandomTaint() v1.Taint {
551 return v1.Taint{
552 Key: fmt.Sprintf("kubernetes.io/e2e-scheduling-priorities-%s", string(uuid.NewUUID()[:23])),
553 Value: fmt.Sprintf("testing-taint-value-%s", string(uuid.NewUUID())),
554 Effect: v1.TaintEffectPreferNoSchedule,
555 }
556 }
557
558 func addTaintToNode(ctx context.Context, cs clientset.Interface, nodeName string, testTaint v1.Taint) {
559 e2enode.AddOrUpdateTaintOnNode(ctx, cs, nodeName, testTaint)
560 e2enode.ExpectNodeHasTaint(ctx, cs, nodeName, &testTaint)
561 }
562
View as plain text