1
16
17 package autoscaling
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "math"
24 "strings"
25 "time"
26
27 v1 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/fields"
30 "k8s.io/apimachinery/pkg/types"
31 "k8s.io/apimachinery/pkg/util/strategicpatch"
32 clientset "k8s.io/client-go/kubernetes"
33 "k8s.io/klog/v2"
34 "k8s.io/kubernetes/test/e2e/feature"
35 "k8s.io/kubernetes/test/e2e/framework"
36 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
37 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
38 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
39 testutils "k8s.io/kubernetes/test/utils"
40 imageutils "k8s.io/kubernetes/test/utils/image"
41 admissionapi "k8s.io/pod-security-admission/api"
42
43 "github.com/onsi/ginkgo/v2"
44 "github.com/onsi/gomega"
45 )
46
47 const (
48 memoryReservationTimeout = 5 * time.Minute
49 largeResizeTimeout = 8 * time.Minute
50 largeScaleUpTimeout = 10 * time.Minute
51 maxNodes = 1000
52 )
53
54 type clusterPredicates struct {
55 nodes int
56 }
57
58 type scaleUpTestConfig struct {
59 initialNodes int
60 initialPods int
61 extraPods *testutils.RCConfig
62 expectedResult *clusterPredicates
63 }
64
65 var _ = SIGDescribe("Cluster size autoscaler scalability", framework.WithSlow(), func() {
66 f := framework.NewDefaultFramework("autoscaling")
67 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
68 var c clientset.Interface
69 var nodeCount int
70 var coresPerNode int
71 var memCapacityMb int
72 var originalSizes map[string]int
73 var sum int
74
75 ginkgo.BeforeEach(func(ctx context.Context) {
76 e2eskipper.SkipUnlessProviderIs("gce", "gke", "kubemark")
77
78
79 _, err := f.ClientSet.CoreV1().ConfigMaps("kube-system").Get(ctx, "cluster-autoscaler-status", metav1.GetOptions{})
80 if err != nil {
81 e2eskipper.Skipf("test expects Cluster Autoscaler to be enabled")
82 }
83
84 c = f.ClientSet
85 if originalSizes == nil {
86 originalSizes = make(map[string]int)
87 sum = 0
88 for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
89 size, err := framework.GroupSize(mig)
90 framework.ExpectNoError(err)
91 ginkgo.By(fmt.Sprintf("Initial size of %s: %d", mig, size))
92 originalSizes[mig] = size
93 sum += size
94 }
95 }
96
97 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, sum, scaleUpTimeout))
98
99 nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
100 framework.ExpectNoError(err)
101 nodeCount = len(nodes.Items)
102 cpu := nodes.Items[0].Status.Capacity[v1.ResourceCPU]
103 mem := nodes.Items[0].Status.Capacity[v1.ResourceMemory]
104 coresPerNode = int((&cpu).MilliValue() / 1000)
105 memCapacityMb = int((&mem).Value() / 1024 / 1024)
106
107 gomega.Expect(nodeCount).To(gomega.Equal(sum))
108
109 if framework.ProviderIs("gke") {
110 val, err := isAutoscalerEnabled(3)
111 framework.ExpectNoError(err)
112 if !val {
113 err = enableAutoscaler("default-pool", 3, 5)
114 framework.ExpectNoError(err)
115 }
116 }
117 })
118
119 ginkgo.AfterEach(func(ctx context.Context) {
120 ginkgo.By(fmt.Sprintf("Restoring initial size of the cluster"))
121 setMigSizes(originalSizes)
122 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount, scaleDownTimeout))
123 nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
124 framework.ExpectNoError(err)
125 s := time.Now()
126 makeSchedulableLoop:
127 for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) {
128 for _, n := range nodes.Items {
129 err = makeNodeSchedulable(ctx, c, &n, true)
130 switch err.(type) {
131 case CriticalAddonsOnlyError:
132 continue makeSchedulableLoop
133 default:
134 framework.ExpectNoError(err)
135 }
136 }
137 break
138 }
139 klog.Infof("Made nodes schedulable again in %v", time.Since(s).String())
140 })
141
142 f.It("should scale up at all", feature.ClusterAutoscalerScalability1, func(ctx context.Context) {
143 perNodeReservation := int(float64(memCapacityMb) * 0.95)
144 replicasPerNode := 10
145
146 additionalNodes := maxNodes - nodeCount
147 replicas := additionalNodes * replicasPerNode
148 additionalReservation := additionalNodes * perNodeReservation
149
150
151 reservationCleanup := ReserveMemory(ctx, f, "some-pod", nodeCount*2, nodeCount*perNodeReservation, true, memoryReservationTimeout)
152 defer reservationCleanup()
153 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
154
155
156 rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas, additionalReservation, largeScaleUpTimeout)
157 expectedResult := createClusterPredicates(nodeCount + additionalNodes)
158 config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
159
160
161 testCleanup := simpleScaleUpTest(ctx, f, config)
162 defer testCleanup()
163 })
164
165 f.It("should scale up twice", feature.ClusterAutoscalerScalability2, func(ctx context.Context) {
166 perNodeReservation := int(float64(memCapacityMb) * 0.95)
167 replicasPerNode := 10
168 additionalNodes1 := int(math.Ceil(0.7 * maxNodes))
169 additionalNodes2 := int(math.Ceil(0.25 * maxNodes))
170 if additionalNodes1+additionalNodes2 > maxNodes {
171 additionalNodes2 = maxNodes - additionalNodes1
172 }
173
174 replicas1 := additionalNodes1 * replicasPerNode
175 replicas2 := additionalNodes2 * replicasPerNode
176
177 klog.Infof("cores per node: %v", coresPerNode)
178
179
180 initialReplicas := nodeCount
181 reservationCleanup := ReserveMemory(ctx, f, "some-pod", initialReplicas, nodeCount*perNodeReservation, true, memoryReservationTimeout)
182 defer reservationCleanup()
183 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
184
185 klog.Infof("Reserved successfully")
186
187
188 rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas1, additionalNodes1*perNodeReservation, largeScaleUpTimeout)
189 expectedResult := createClusterPredicates(nodeCount + additionalNodes1)
190 config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
191
192
193 tolerateUnreadyNodes := additionalNodes1 / 20
194 tolerateUnreadyPods := (initialReplicas + replicas1) / 20
195 testCleanup1 := simpleScaleUpTestWithTolerance(ctx, f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
196 defer testCleanup1()
197
198 klog.Infof("Scaled up once")
199
200
201 rcConfig2 := reserveMemoryRCConfig(f, "extra-pod-2", replicas2, additionalNodes2*perNodeReservation, largeScaleUpTimeout)
202 expectedResult2 := createClusterPredicates(nodeCount + additionalNodes1 + additionalNodes2)
203 config2 := createScaleUpTestConfig(nodeCount+additionalNodes1, nodeCount+additionalNodes2, rcConfig2, expectedResult2)
204
205
206 tolerateUnreadyNodes = maxNodes / 20
207 tolerateUnreadyPods = (initialReplicas + replicas1 + replicas2) / 20
208 testCleanup2 := simpleScaleUpTestWithTolerance(ctx, f, config2, tolerateUnreadyNodes, tolerateUnreadyPods)
209 defer testCleanup2()
210
211 klog.Infof("Scaled up twice")
212 })
213
214 f.It("should scale down empty nodes", feature.ClusterAutoscalerScalability3, func(ctx context.Context) {
215 perNodeReservation := int(float64(memCapacityMb) * 0.7)
216 replicas := int(math.Ceil(maxNodes * 0.7))
217 totalNodes := maxNodes
218
219
220 newSizes := map[string]int{
221 anyKey(originalSizes): totalNodes,
222 }
223 setMigSizes(newSizes)
224 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, f.ClientSet, totalNodes, largeResizeTimeout))
225
226
227 rcConfig := reserveMemoryRCConfig(f, "some-pod", replicas, replicas*perNodeReservation, largeScaleUpTimeout)
228 expectedResult := createClusterPredicates(totalNodes)
229 config := createScaleUpTestConfig(totalNodes, totalNodes, rcConfig, expectedResult)
230 tolerateUnreadyNodes := totalNodes / 10
231 tolerateUnreadyPods := replicas / 10
232 testCleanup := simpleScaleUpTestWithTolerance(ctx, f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
233 defer testCleanup()
234
235
236 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
237 func(size int) bool {
238 return size <= replicas+3
239 }, scaleDownTimeout))
240 })
241
242 f.It("should scale down underutilized nodes", feature.ClusterAutoscalerScalability4, func(ctx context.Context) {
243 perPodReservation := int(float64(memCapacityMb) * 0.01)
244
245 underutilizedPerNodeReplicas := 10
246
247 fullPerNodeReplicas := 70
248 totalNodes := maxNodes
249 underutilizedRatio := 0.3
250 maxDelta := 30
251
252
253 newSizes := map[string]int{
254 anyKey(originalSizes): totalNodes,
255 }
256 setMigSizes(newSizes)
257
258 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, f.ClientSet, totalNodes, largeResizeTimeout))
259
260
261 ScaleDownDisabledKey := "cluster-autoscaler.kubernetes.io/scale-down-disabled"
262
263 nodes, err := f.ClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
264 FieldSelector: fields.Set{
265 "spec.unschedulable": "false",
266 }.AsSelector().String(),
267 })
268
269 framework.ExpectNoError(err)
270 framework.ExpectNoError(addAnnotation(ctx, f, nodes.Items, ScaleDownDisabledKey, "true"))
271
272
273
274 underutilizedNodesNum := int(float64(maxNodes) * underutilizedRatio)
275 fullNodesNum := totalNodes - underutilizedNodesNum
276
277 podDistribution := []podBatch{
278 {numNodes: fullNodesNum, podsPerNode: fullPerNodeReplicas},
279 {numNodes: underutilizedNodesNum, podsPerNode: underutilizedPerNodeReplicas}}
280
281 distributeLoad(ctx, f, f.Namespace.Name, "10-70", podDistribution, perPodReservation,
282 int(0.95*float64(memCapacityMb)), map[string]string{}, largeScaleUpTimeout)
283
284
285 framework.ExpectNoError(addAnnotation(ctx, f, nodes.Items, ScaleDownDisabledKey, "false"))
286
287
288
289 nodesToScaleDownCount := int(float64(totalNodes) * 0.1)
290 if nodesToScaleDownCount > maxDelta {
291 nodesToScaleDownCount = maxDelta
292 }
293 expectedSize := totalNodes - nodesToScaleDownCount
294 timeout := time.Duration(nodesToScaleDownCount)*time.Minute + scaleDownTimeout
295 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet, func(size int) bool {
296 return size <= expectedSize
297 }, timeout))
298 })
299
300 f.It("shouldn't scale down with underutilized nodes due to host port conflicts", feature.ClusterAutoscalerScalability5, func(ctx context.Context) {
301 fullReservation := int(float64(memCapacityMb) * 0.9)
302 hostPortPodReservation := int(float64(memCapacityMb) * 0.3)
303 totalNodes := maxNodes
304 reservedPort := 4321
305
306
307 newSizes := map[string]int{
308 anyKey(originalSizes): totalNodes,
309 }
310 setMigSizes(newSizes)
311 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, f.ClientSet, totalNodes, largeResizeTimeout))
312 divider := int(float64(totalNodes) * 0.7)
313 fullNodesCount := divider
314 underutilizedNodesCount := totalNodes - fullNodesCount
315
316 ginkgo.By("Reserving full nodes")
317
318 cleanup := ReserveMemory(ctx, f, "filling-pod", fullNodesCount, fullNodesCount*fullReservation, true, largeScaleUpTimeout*2)
319 defer cleanup()
320
321 ginkgo.By("Reserving host ports on remaining nodes")
322
323 ginkgo.DeferCleanup(createHostPortPodsWithMemory, f, "underutilizing-host-port-pod", underutilizedNodesCount, reservedPort, underutilizedNodesCount*hostPortPodReservation, largeScaleUpTimeout)
324
325 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
326
327 ginkgo.By(fmt.Sprintf("Sleeping %v minutes...", scaleDownTimeout.Minutes()))
328 time.Sleep(scaleDownTimeout)
329
330 ginkgo.By("Checking if the number of nodes is as expected")
331 nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
332 framework.ExpectNoError(err)
333 klog.Infof("Nodes: %v, expected: %v", len(nodes.Items), totalNodes)
334 gomega.Expect(nodes.Items).To(gomega.HaveLen(totalNodes))
335 })
336
337 f.It("CA ignores unschedulable pods while scheduling schedulable pods", feature.ClusterAutoscalerScalability6, func(ctx context.Context) {
338
339 perNodeReservation := int(float64(memCapacityMb) * 0.80)
340 replicasPerNode := 10
341 initialPodReplicas := nodeCount * replicasPerNode
342 initialPodsTotalMemory := nodeCount * perNodeReservation
343 reservationCleanup := ReserveMemory(ctx, f, "initial-pod", initialPodReplicas, initialPodsTotalMemory, true , memoryReservationTimeout)
344 ginkgo.DeferCleanup(reservationCleanup)
345 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
346
347
348 unschedulableMemReservation := memCapacityMb * 2
349 unschedulablePodReplicas := 1000
350 totalMemReservation := unschedulableMemReservation * unschedulablePodReplicas
351 timeToWait := 5 * time.Minute
352 podsConfig := reserveMemoryRCConfig(f, "unschedulable-pod", unschedulablePodReplicas, totalMemReservation, timeToWait)
353 _ = e2erc.RunRC(ctx, *podsConfig)
354 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, podsConfig.Name)
355
356
357 readyNodeCount, _ := e2enode.TotalReady(ctx, f.ClientSet)
358 gomega.Expect(readyNodeCount).To(gomega.Equal(nodeCount))
359
360
361 additionalNodes := maxNodes - nodeCount
362 replicas := additionalNodes * replicasPerNode
363 totalMemory := additionalNodes * perNodeReservation
364 rcConfig := reserveMemoryRCConfig(f, "extra-pod", replicas, totalMemory, largeScaleUpTimeout)
365 expectedResult := createClusterPredicates(nodeCount + additionalNodes)
366 config := createScaleUpTestConfig(nodeCount, initialPodReplicas, rcConfig, expectedResult)
367
368
369 testCleanup := simpleScaleUpTestWithTolerance(ctx, f, config, 0, unschedulablePodReplicas)
370 ginkgo.DeferCleanup(testCleanup)
371 })
372
373 })
374
375 func anyKey(input map[string]int) string {
376 for k := range input {
377 return k
378 }
379 return ""
380 }
381
382 func simpleScaleUpTestWithTolerance(ctx context.Context, f *framework.Framework, config *scaleUpTestConfig, tolerateMissingNodeCount int, tolerateMissingPodCount int) func() error {
383
384
385 ginkgo.By(fmt.Sprintf("Running RC %v from config", config.extraPods.Name))
386 start := time.Now()
387 framework.ExpectNoError(e2erc.RunRC(ctx, *config.extraPods))
388
389 if tolerateMissingNodeCount > 0 {
390
391 minExpectedNodeCount := config.expectedResult.nodes - tolerateMissingNodeCount
392 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
393 func(size int) bool { return size >= minExpectedNodeCount }, scaleUpTimeout))
394 } else {
395 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, f.ClientSet, config.expectedResult.nodes, scaleUpTimeout))
396 }
397 klog.Infof("cluster is increased")
398 if tolerateMissingPodCount > 0 {
399 framework.ExpectNoError(waitForCaPodsReadyInNamespace(ctx, f, f.ClientSet, tolerateMissingPodCount))
400 } else {
401 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, f.ClientSet))
402 }
403 timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes))
404 return func() error {
405 return e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, config.extraPods.Name)
406 }
407 }
408
409 func simpleScaleUpTest(ctx context.Context, f *framework.Framework, config *scaleUpTestConfig) func() error {
410 return simpleScaleUpTestWithTolerance(ctx, f, config, 0, 0)
411 }
412
413 func reserveMemoryRCConfig(f *framework.Framework, id string, replicas, megabytes int, timeout time.Duration) *testutils.RCConfig {
414 return &testutils.RCConfig{
415 Client: f.ClientSet,
416 Name: id,
417 Namespace: f.Namespace.Name,
418 Timeout: timeout,
419 Image: imageutils.GetPauseImageName(),
420 Replicas: replicas,
421 MemRequest: int64(1024 * 1024 * megabytes / replicas),
422 }
423 }
424
425 func createScaleUpTestConfig(nodes, pods int, extraPods *testutils.RCConfig, expectedResult *clusterPredicates) *scaleUpTestConfig {
426 return &scaleUpTestConfig{
427 initialNodes: nodes,
428 initialPods: pods,
429 extraPods: extraPods,
430 expectedResult: expectedResult,
431 }
432 }
433
434 func createClusterPredicates(nodes int) *clusterPredicates {
435 return &clusterPredicates{
436 nodes: nodes,
437 }
438 }
439
440 func addAnnotation(ctx context.Context, f *framework.Framework, nodes []v1.Node, key, value string) error {
441 for _, node := range nodes {
442 oldData, err := json.Marshal(node)
443 if err != nil {
444 return err
445 }
446
447 if node.Annotations == nil {
448 node.Annotations = make(map[string]string)
449 }
450 node.Annotations[key] = value
451
452 newData, err := json.Marshal(node)
453 if err != nil {
454 return err
455 }
456
457 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
458 if err != nil {
459 return err
460 }
461
462 _, err = f.ClientSet.CoreV1().Nodes().Patch(ctx, string(node.Name), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
463 if err != nil {
464 return err
465 }
466 }
467 return nil
468 }
469
470 func createHostPortPodsWithMemory(ctx context.Context, f *framework.Framework, id string, replicas, port, megabytes int, timeout time.Duration) func() error {
471 ginkgo.By(fmt.Sprintf("Running RC which reserves host port and memory"))
472 request := int64(1024 * 1024 * megabytes / replicas)
473 config := &testutils.RCConfig{
474 Client: f.ClientSet,
475 Name: id,
476 Namespace: f.Namespace.Name,
477 Timeout: timeout,
478 Image: imageutils.GetPauseImageName(),
479 Replicas: replicas,
480 HostPorts: map[string]int{"port1": port},
481 MemRequest: request,
482 }
483 err := e2erc.RunRC(ctx, *config)
484 framework.ExpectNoError(err)
485 return func() error {
486 return e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, id)
487 }
488 }
489
490 type podBatch struct {
491 numNodes int
492 podsPerNode int
493 }
494
495
496
497
498
499
500
501
502
503
504
505
506 func distributeLoad(ctx context.Context, f *framework.Framework, namespace string, id string, podDistribution []podBatch,
507 podMemRequestMegabytes int, nodeMemCapacity int, labels map[string]string, timeout time.Duration) {
508 port := 8013
509
510
511
512 totalPods := 0
513 for i, podBatch := range podDistribution {
514 totalPods += podBatch.numNodes * podBatch.podsPerNode
515 remainingMem := nodeMemCapacity - podBatch.podsPerNode*podMemRequestMegabytes
516 replicas := podBatch.numNodes
517 cleanup := createHostPortPodsWithMemory(ctx, f, fmt.Sprintf("load-distribution%d", i), replicas, port, remainingMem*replicas, timeout)
518 defer cleanup()
519 }
520 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, f.ClientSet))
521
522 rcConfig := reserveMemoryRCConfig(f, id, totalPods, totalPods*podMemRequestMegabytes, timeout)
523 framework.ExpectNoError(e2erc.RunRC(ctx, *rcConfig))
524 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, f.ClientSet))
525 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, id)
526 }
527
528 func timeTrack(start time.Time, name string) {
529 elapsed := time.Since(start)
530 klog.Infof("%s took %s", name, elapsed)
531 }
532
View as plain text