1
16
17 package autoscaling
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "math"
24 "net/http"
25 "os"
26 "os/exec"
27 "regexp"
28 "strconv"
29 "strings"
30 "time"
31
32 v1 "k8s.io/api/core/v1"
33 policyv1 "k8s.io/api/policy/v1"
34 schedulingv1 "k8s.io/api/scheduling/v1"
35 apierrors "k8s.io/apimachinery/pkg/api/errors"
36 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37 "k8s.io/apimachinery/pkg/fields"
38 "k8s.io/apimachinery/pkg/labels"
39 utilerrors "k8s.io/apimachinery/pkg/util/errors"
40 "k8s.io/apimachinery/pkg/util/intstr"
41 "k8s.io/apimachinery/pkg/util/sets"
42 "k8s.io/apimachinery/pkg/util/uuid"
43 "k8s.io/apimachinery/pkg/util/wait"
44 clientset "k8s.io/client-go/kubernetes"
45 "k8s.io/klog/v2"
46 "k8s.io/kubernetes/test/e2e/feature"
47 "k8s.io/kubernetes/test/e2e/framework"
48 e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
49 e2emanifest "k8s.io/kubernetes/test/e2e/framework/manifest"
50 e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
51 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
52 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
53 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
54 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
55 "k8s.io/kubernetes/test/e2e/scheduling"
56 testutils "k8s.io/kubernetes/test/utils"
57 imageutils "k8s.io/kubernetes/test/utils/image"
58 admissionapi "k8s.io/pod-security-admission/api"
59
60 "github.com/onsi/ginkgo/v2"
61 "github.com/onsi/gomega"
62 )
63
64 const (
65 defaultTimeout = 3 * time.Minute
66 resizeTimeout = 5 * time.Minute
67 manualResizeTimeout = 6 * time.Minute
68 scaleUpTimeout = 5 * time.Minute
69 scaleUpTriggerTimeout = 2 * time.Minute
70 scaleDownTimeout = 20 * time.Minute
71 podTimeout = 2 * time.Minute
72 nodesRecoverTimeout = 5 * time.Minute
73 rcCreationRetryTimeout = 4 * time.Minute
74 rcCreationRetryDelay = 20 * time.Second
75 makeSchedulableTimeout = 10 * time.Minute
76 makeSchedulableDelay = 20 * time.Second
77 freshStatusLimit = 20 * time.Second
78
79 gkeUpdateTimeout = 15 * time.Minute
80 gkeNodepoolNameKey = "cloud.google.com/gke-nodepool"
81
82 disabledTaint = "DisabledForAutoscalingTest"
83 criticalAddonsOnlyTaint = "CriticalAddonsOnly"
84 newNodesForScaledownTests = 2
85 unhealthyClusterThreshold = 4
86
87 caNoScaleUpStatus = "NoActivity"
88 caOngoingScaleUpStatus = "InProgress"
89 timestampFormat = "2006-01-02 15:04:05 -0700 MST"
90
91 expendablePriorityClassName = "expendable-priority"
92 highPriorityClassName = "high-priority"
93
94 gpuLabel = "cloud.google.com/gke-accelerator"
95
96 nonExistingBypassedSchedulerName = "non-existing-bypassed-scheduler"
97 )
98
99 var _ = SIGDescribe("Cluster size autoscaling", framework.WithSlow(), func() {
100 f := framework.NewDefaultFramework("autoscaling")
101 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
102 var c clientset.Interface
103 var nodeCount int
104 var memAllocatableMb int
105 var originalSizes map[string]int
106
107 ginkgo.BeforeEach(func(ctx context.Context) {
108 c = f.ClientSet
109 e2eskipper.SkipUnlessProviderIs("gce", "gke")
110
111 originalSizes = make(map[string]int)
112 sum := 0
113 for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
114 size, err := framework.GroupSize(mig)
115 framework.ExpectNoError(err)
116 ginkgo.By(fmt.Sprintf("Initial size of %s: %d", mig, size))
117 originalSizes[mig] = size
118 sum += size
119 }
120
121 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, sum, scaleUpTimeout))
122
123 nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
124 framework.ExpectNoError(err)
125 nodeCount = len(nodes.Items)
126 ginkgo.By(fmt.Sprintf("Initial number of schedulable nodes: %v", nodeCount))
127 gomega.Expect(nodes.Items).ToNot(gomega.BeEmpty())
128 mem := nodes.Items[0].Status.Allocatable[v1.ResourceMemory]
129 memAllocatableMb = int((&mem).Value() / 1024 / 1024)
130
131 gomega.Expect(nodeCount).To(gomega.Equal(sum))
132
133 if framework.ProviderIs("gke") {
134 val, err := isAutoscalerEnabled(5)
135 framework.ExpectNoError(err)
136 if !val {
137 err = enableAutoscaler("default-pool", 3, 5)
138 framework.ExpectNoError(err)
139 }
140 }
141 })
142
143 ginkgo.AfterEach(func(ctx context.Context) {
144 e2eskipper.SkipUnlessProviderIs("gce", "gke")
145 ginkgo.By(fmt.Sprintf("Restoring initial size of the cluster"))
146 setMigSizes(originalSizes)
147 expectedNodes := 0
148 for _, size := range originalSizes {
149 expectedNodes += size
150 }
151 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, expectedNodes, scaleDownTimeout))
152 nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
153 framework.ExpectNoError(err)
154
155 s := time.Now()
156 makeSchedulableLoop:
157 for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) {
158 for _, n := range nodes.Items {
159 err = makeNodeSchedulable(ctx, c, &n, true)
160 switch err.(type) {
161 case CriticalAddonsOnlyError:
162 continue makeSchedulableLoop
163 default:
164 framework.ExpectNoError(err)
165 }
166 }
167 break
168 }
169 klog.Infof("Made nodes schedulable again in %v", time.Since(s).String())
170 })
171
172 f.It("shouldn't increase cluster size if pending pod is too large", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
173 ginkgo.By("Creating unschedulable pod")
174 ReserveMemory(ctx, f, "memory-reservation", 1, int(1.1*float64(memAllocatableMb)), false, defaultTimeout)
175 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "memory-reservation")
176
177 ginkgo.By("Waiting for scale up hoping it won't happen")
178
179 eventFound := false
180 EventsLoop:
181 for start := time.Now(); time.Since(start) < scaleUpTimeout; time.Sleep(20 * time.Second) {
182 ginkgo.By("Waiting for NotTriggerScaleUp event")
183 events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{})
184 framework.ExpectNoError(err)
185
186 for _, e := range events.Items {
187 if e.InvolvedObject.Kind == "Pod" && e.Reason == "NotTriggerScaleUp" {
188 ginkgo.By("NotTriggerScaleUp event found")
189 eventFound = true
190 break EventsLoop
191 }
192 }
193 }
194 if !eventFound {
195 framework.Failf("Expected event with kind 'Pod' and reason 'NotTriggerScaleUp' not found.")
196 }
197
198 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
199 func(size int) bool { return size <= nodeCount }, time.Second))
200 })
201
202 simpleScaleUpTest := func(ctx context.Context, unready int) {
203 ReserveMemory(ctx, f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
204 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "memory-reservation")
205
206
207 framework.ExpectNoError(WaitForClusterSizeFuncWithUnready(ctx, f.ClientSet,
208 func(size int) bool { return size >= nodeCount+1 }, scaleUpTimeout, unready))
209 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
210 }
211
212 f.It("should increase cluster size if pending pods are small", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
213 simpleScaleUpTest(ctx, 0)
214 })
215
216 gpuType := os.Getenv("TESTED_GPU_TYPE")
217
218 f.It(fmt.Sprintf("Should scale up GPU pool from 0 [GpuType:%s]", gpuType), feature.ClusterSizeAutoscalingGpu, func(ctx context.Context) {
219 e2eskipper.SkipUnlessProviderIs("gke")
220 if gpuType == "" {
221 framework.Failf("TEST_GPU_TYPE not defined")
222 return
223 }
224
225 const gpuPoolName = "gpu-pool"
226 addGpuNodePool(gpuPoolName, gpuType, 1, 0)
227 defer deleteNodePool(gpuPoolName)
228
229 installNvidiaDriversDaemonSet(ctx, f)
230
231 ginkgo.By("Enable autoscaler")
232 framework.ExpectNoError(enableAutoscaler(gpuPoolName, 0, 1))
233 defer disableAutoscaler(gpuPoolName, 0, 1)
234 gomega.Expect(getPoolNodes(ctx, f, gpuPoolName)).To(gomega.BeEmpty())
235
236 ginkgo.By("Schedule a pod which requires GPU")
237 framework.ExpectNoError(ScheduleAnySingleGpuPod(ctx, f, "gpu-pod-rc"))
238 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "gpu-pod-rc")
239
240 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
241 func(size int) bool { return size == nodeCount+1 }, scaleUpTimeout))
242 gomega.Expect(getPoolNodes(ctx, f, gpuPoolName)).To(gomega.HaveLen(1))
243 })
244
245 f.It(fmt.Sprintf("Should scale up GPU pool from 1 [GpuType:%s]", gpuType), feature.ClusterSizeAutoscalingGpu, func(ctx context.Context) {
246 e2eskipper.SkipUnlessProviderIs("gke")
247 if gpuType == "" {
248 framework.Failf("TEST_GPU_TYPE not defined")
249 return
250 }
251
252 const gpuPoolName = "gpu-pool"
253 addGpuNodePool(gpuPoolName, gpuType, 1, 1)
254 defer deleteNodePool(gpuPoolName)
255
256 installNvidiaDriversDaemonSet(ctx, f)
257
258 ginkgo.By("Schedule a single pod which requires GPU")
259 framework.ExpectNoError(ScheduleAnySingleGpuPod(ctx, f, "gpu-pod-rc"))
260 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "gpu-pod-rc")
261
262 ginkgo.By("Enable autoscaler")
263 framework.ExpectNoError(enableAutoscaler(gpuPoolName, 0, 2))
264 defer disableAutoscaler(gpuPoolName, 0, 2)
265 gomega.Expect(getPoolNodes(ctx, f, gpuPoolName)).To(gomega.HaveLen(1))
266
267 ginkgo.By("Scale GPU deployment")
268 e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, f.Namespace.Name, "gpu-pod-rc", 2, true)
269
270 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
271 func(size int) bool { return size == nodeCount+2 }, scaleUpTimeout))
272 gomega.Expect(getPoolNodes(ctx, f, gpuPoolName)).To(gomega.HaveLen(2))
273 })
274
275 f.It(fmt.Sprintf("Should not scale GPU pool up if pod does not require GPUs [GpuType:%s]", gpuType), feature.ClusterSizeAutoscalingGpu, func(ctx context.Context) {
276 e2eskipper.SkipUnlessProviderIs("gke")
277 if gpuType == "" {
278 framework.Failf("TEST_GPU_TYPE not defined")
279 return
280 }
281
282 const gpuPoolName = "gpu-pool"
283 addGpuNodePool(gpuPoolName, gpuType, 1, 0)
284 defer deleteNodePool(gpuPoolName)
285
286 installNvidiaDriversDaemonSet(ctx, f)
287
288 ginkgo.By("Enable autoscaler")
289 framework.ExpectNoError(enableAutoscaler(gpuPoolName, 0, 1))
290 defer disableAutoscaler(gpuPoolName, 0, 1)
291 gomega.Expect(getPoolNodes(ctx, f, gpuPoolName)).To(gomega.BeEmpty())
292
293 ginkgo.By("Schedule bunch of pods beyond point of filling default pool but do not request any GPUs")
294 ReserveMemory(ctx, f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
295 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "memory-reservation")
296
297 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
298 func(size int) bool { return size >= nodeCount+1 }, scaleUpTimeout))
299
300
301 gomega.Expect(getPoolNodes(ctx, f, gpuPoolName)).To(gomega.BeEmpty())
302 })
303
304 f.It(fmt.Sprintf("Should scale down GPU pool from 1 [GpuType:%s]", gpuType), feature.ClusterSizeAutoscalingGpu, func(ctx context.Context) {
305 e2eskipper.SkipUnlessProviderIs("gke")
306 if gpuType == "" {
307 framework.Failf("TEST_GPU_TYPE not defined")
308 return
309 }
310
311 const gpuPoolName = "gpu-pool"
312 addGpuNodePool(gpuPoolName, gpuType, 1, 1)
313 defer deleteNodePool(gpuPoolName)
314
315 installNvidiaDriversDaemonSet(ctx, f)
316
317 ginkgo.By("Schedule a single pod which requires GPU")
318 framework.ExpectNoError(ScheduleAnySingleGpuPod(ctx, f, "gpu-pod-rc"))
319 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "gpu-pod-rc")
320
321 ginkgo.By("Enable autoscaler")
322 framework.ExpectNoError(enableAutoscaler(gpuPoolName, 0, 1))
323 defer disableAutoscaler(gpuPoolName, 0, 1)
324 gomega.Expect(getPoolNodes(ctx, f, gpuPoolName)).To(gomega.HaveLen(1))
325
326 ginkgo.By("Remove the only POD requiring GPU")
327 e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, "gpu-pod-rc")
328
329 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
330 func(size int) bool { return size == nodeCount }, scaleDownTimeout))
331 gomega.Expect(getPoolNodes(ctx, f, gpuPoolName)).To(gomega.BeEmpty())
332 })
333
334 f.It("should increase cluster size if pending pods are small and one node is broken", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
335 e2enetwork.TestUnderTemporaryNetworkFailure(ctx, c, "default", getAnyNode(ctx, c), func(ctx context.Context) { simpleScaleUpTest(ctx, 1) })
336 })
337
338 f.It("shouldn't trigger additional scale-ups during processing scale-up", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
339
340 status, err := waitForScaleUpStatus(ctx, c, func(s *scaleUpStatus) bool {
341 return s.ready == s.target && s.ready <= nodeCount
342 }, scaleUpTriggerTimeout)
343 framework.ExpectNoError(err)
344
345 unmanagedNodes := nodeCount - status.ready
346
347 ginkgo.By("Schedule more pods than can fit and wait for cluster to scale-up")
348 ReserveMemory(ctx, f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, 1*time.Second)
349 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "memory-reservation")
350
351 status, err = waitForScaleUpStatus(ctx, c, func(s *scaleUpStatus) bool {
352 return s.status == caOngoingScaleUpStatus
353 }, scaleUpTriggerTimeout)
354 framework.ExpectNoError(err)
355 target := status.target
356 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
357
358 ginkgo.By("Expect no more scale-up to be happening after all pods are scheduled")
359
360
361
362 status, err = waitForScaleUpStatus(ctx, c, func(s *scaleUpStatus) bool {
363 return s.status == caNoScaleUpStatus
364 }, 2*freshStatusLimit)
365 framework.ExpectNoError(err)
366
367 if status.target != target {
368 klog.Warningf("Final number of nodes (%v) does not match initial scale-up target (%v).", status.target, target)
369 }
370 gomega.Expect(status.timestamp.Add(freshStatusLimit)).To(gomega.BeTemporally(">=", time.Now()))
371 gomega.Expect(status.status).To(gomega.Equal(caNoScaleUpStatus))
372 gomega.Expect(status.ready).To(gomega.Equal(status.target))
373 nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
374 framework.ExpectNoError(err)
375 gomega.Expect(nodes.Items).To(gomega.HaveLen(status.target + unmanagedNodes))
376 })
377
378 f.It("should increase cluster size if pending pods are small and there is another node pool that is not autoscaled", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
379 e2eskipper.SkipUnlessProviderIs("gke")
380
381 ginkgo.By("Creating new node-pool with e2-standard-4 machines")
382 const extraPoolName = "extra-pool"
383 addNodePool(extraPoolName, "e2-standard-4", 1)
384 defer deleteNodePool(extraPoolName)
385 extraNodes := getPoolInitialSize(extraPoolName)
386 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount+extraNodes, resizeTimeout))
387
388
389 framework.ExpectNoError(e2enode.WaitForAllNodesSchedulable(ctx, c, resizeTimeout))
390 klog.Infof("Not enabling cluster autoscaler for the node pool (on purpose).")
391
392 ginkgo.By("Getting memory available on new nodes, so we can account for it when creating RC")
393 nodes := getPoolNodes(ctx, f, extraPoolName)
394 gomega.Expect(nodes).To(gomega.HaveLen(extraNodes))
395 extraMemMb := 0
396 for _, node := range nodes {
397 mem := node.Status.Allocatable[v1.ResourceMemory]
398 extraMemMb += int((&mem).Value() / 1024 / 1024)
399 }
400
401 ginkgo.By("Reserving 0.1x more memory than the cluster holds to trigger scale up")
402 totalMemoryReservation := int(1.1 * float64(nodeCount*memAllocatableMb+extraMemMb))
403 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "memory-reservation")
404 ReserveMemory(ctx, f, "memory-reservation", 100, totalMemoryReservation, false, defaultTimeout)
405
406
407 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
408 func(size int) bool { return size >= nodeCount+extraNodes+1 }, scaleUpTimeout))
409 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
410 })
411
412 f.It("should disable node pool autoscaling", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
413 e2eskipper.SkipUnlessProviderIs("gke")
414
415 ginkgo.By("Creating new node-pool with e2-standard-4 machines")
416 const extraPoolName = "extra-pool"
417 addNodePool(extraPoolName, "e2-standard-4", 1)
418 defer deleteNodePool(extraPoolName)
419 extraNodes := getPoolInitialSize(extraPoolName)
420 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount+extraNodes, resizeTimeout))
421 framework.ExpectNoError(enableAutoscaler(extraPoolName, 1, 2))
422 framework.ExpectNoError(disableAutoscaler(extraPoolName, 1, 2))
423 })
424
425 f.It("should increase cluster size if pods are pending due to host port conflict", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
426 scheduling.CreateHostPortPods(ctx, f, "host-port", nodeCount+2, false)
427 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "host-port")
428
429 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
430 func(size int) bool { return size >= nodeCount+2 }, scaleUpTimeout))
431 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
432 })
433
434 f.It("should increase cluster size if pods are pending due to pod anti-affinity", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
435 pods := nodeCount
436 newPods := 2
437 labels := map[string]string{
438 "anti-affinity": "yes",
439 }
440 ginkgo.By("starting a pod with anti-affinity on each node")
441 framework.ExpectNoError(runAntiAffinityPods(ctx, f, f.Namespace.Name, pods, "some-pod", labels, labels))
442 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "some-pod")
443 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
444
445 ginkgo.By("scheduling extra pods with anti-affinity to existing ones")
446 framework.ExpectNoError(runAntiAffinityPods(ctx, f, f.Namespace.Name, newPods, "extra-pod", labels, labels))
447 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "extra-pod")
448
449 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
450 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount+newPods, scaleUpTimeout))
451 })
452
453 f.It("should increase cluster size if pod requesting EmptyDir volume is pending", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
454 ginkgo.By("creating pods")
455 pods := nodeCount
456 newPods := 1
457 labels := map[string]string{
458 "anti-affinity": "yes",
459 }
460 framework.ExpectNoError(runAntiAffinityPods(ctx, f, f.Namespace.Name, pods, "some-pod", labels, labels))
461 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "some-pod")
462
463 ginkgo.By("waiting for all pods before triggering scale up")
464 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
465
466 ginkgo.By("creating a pod requesting EmptyDir")
467 framework.ExpectNoError(runVolumeAntiAffinityPods(ctx, f, f.Namespace.Name, newPods, "extra-pod", labels, labels, emptyDirVolumes))
468 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "extra-pod")
469
470 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
471 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount+newPods, scaleUpTimeout))
472 })
473
474 f.It("should increase cluster size if pod requesting volume is pending", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
475 e2eskipper.SkipUnlessProviderIs("gce", "gke")
476
477 volumeLabels := labels.Set{
478 e2epv.VolumeSelectorKey: f.Namespace.Name,
479 }
480 selector := metav1.SetAsLabelSelector(volumeLabels)
481
482 ginkgo.By("creating volume & pvc")
483 diskName, err := e2epv.CreatePDWithRetry(ctx)
484 framework.ExpectNoError(err)
485 pvConfig := e2epv.PersistentVolumeConfig{
486 NamePrefix: "gce-",
487 Labels: volumeLabels,
488 PVSource: v1.PersistentVolumeSource{
489 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
490 PDName: diskName,
491 FSType: "ext3",
492 ReadOnly: false,
493 },
494 },
495 Prebind: nil,
496 }
497 emptyStorageClass := ""
498 pvcConfig := e2epv.PersistentVolumeClaimConfig{
499 Selector: selector,
500 StorageClassName: &emptyStorageClass,
501 }
502
503 pv, pvc, err := e2epv.CreatePVPVC(ctx, c, f.Timeouts, pvConfig, pvcConfig, f.Namespace.Name, false)
504 framework.ExpectNoError(err)
505 framework.ExpectNoError(e2epv.WaitOnPVandPVC(ctx, c, f.Timeouts, f.Namespace.Name, pv, pvc))
506
507 defer func() {
508 errs := e2epv.PVPVCCleanup(ctx, c, f.Namespace.Name, pv, pvc)
509 if len(errs) > 0 {
510 framework.Failf("failed to delete PVC and/or PV. Errors: %v", utilerrors.NewAggregate(errs))
511 }
512 pv, pvc = nil, nil
513 if diskName != "" {
514 framework.ExpectNoError(e2epv.DeletePDWithRetry(ctx, diskName))
515 }
516 }()
517
518 ginkgo.By("creating pods")
519 pods := nodeCount
520 labels := map[string]string{
521 "anti-affinity": "yes",
522 }
523 framework.ExpectNoError(runAntiAffinityPods(ctx, f, f.Namespace.Name, pods, "some-pod", labels, labels))
524 ginkgo.DeferCleanup(func(ctx context.Context) {
525 e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, "some-pod")
526 klog.Infof("RC and pods not using volume deleted")
527 })
528
529 ginkgo.By("waiting for all pods before triggering scale up")
530 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
531
532 ginkgo.By("creating a pod requesting PVC")
533 pvcPodName := "pvc-pod"
534 newPods := 1
535 volumes := buildVolumes(pv, pvc)
536 framework.ExpectNoError(runVolumeAntiAffinityPods(ctx, f, f.Namespace.Name, newPods, pvcPodName, labels, labels, volumes))
537 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, pvcPodName)
538 ginkgo.DeferCleanup(waitForAllCaPodsReadyInNamespace, f, c)
539
540 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
541 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount+newPods, scaleUpTimeout))
542 })
543
544 f.It("should add node to the particular mig", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
545 labelKey := "cluster-autoscaling-test.special-node"
546 labelValue := "true"
547
548 ginkgo.By("Finding the smallest MIG")
549 minMig := ""
550 minSize := nodeCount
551 for mig, size := range originalSizes {
552 if size <= minSize {
553 minMig = mig
554 minSize = size
555 }
556 }
557
558 if minSize == 0 {
559 newSizes := make(map[string]int)
560 for mig, size := range originalSizes {
561 newSizes[mig] = size
562 }
563 newSizes[minMig] = 1
564 setMigSizes(newSizes)
565 }
566
567 removeLabels := func(nodesToClean sets.String) {
568 ginkgo.By("Removing labels from nodes")
569 for node := range nodesToClean {
570 e2enode.RemoveLabelOffNode(c, node, labelKey)
571 }
572 }
573
574 nodes, err := framework.GetGroupNodes(minMig)
575 framework.ExpectNoError(err)
576 nodesSet := sets.NewString(nodes...)
577 defer removeLabels(nodesSet)
578 ginkgo.By(fmt.Sprintf("Annotating nodes of the smallest MIG(%s): %v", minMig, nodes))
579
580 for node := range nodesSet {
581 e2enode.AddOrUpdateLabelOnNode(c, node, labelKey, labelValue)
582 }
583
584 err = scheduling.CreateNodeSelectorPods(ctx, f, "node-selector", minSize+1, map[string]string{labelKey: labelValue}, false)
585 framework.ExpectNoError(err)
586 ginkgo.By("Waiting for new node to appear and annotating it")
587 framework.WaitForGroupSize(minMig, int32(minSize+1))
588
589 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
590 func(size int) bool { return size >= nodeCount+1 }, scaleUpTimeout))
591
592 newNodes, err := framework.GetGroupNodes(minMig)
593 framework.ExpectNoError(err)
594 newNodesSet := sets.NewString(newNodes...)
595 newNodesSet.Delete(nodes...)
596 if len(newNodesSet) > 1 {
597 ginkgo.By(fmt.Sprintf("Spotted following new nodes in %s: %v", minMig, newNodesSet))
598 klog.Infof("Usually only 1 new node is expected, investigating")
599 klog.Infof("Kubectl:%s\n", e2ekubectl.RunKubectlOrDie(f.Namespace.Name, "get", "nodes", "-o", "json"))
600 if output, err := exec.Command("gcloud", "compute", "instances", "list",
601 "--project="+framework.TestContext.CloudConfig.ProjectID,
602 "--zone="+framework.TestContext.CloudConfig.Zone).Output(); err == nil {
603 klog.Infof("Gcloud compute instances list: %s", output)
604 } else {
605 klog.Errorf("Failed to get instances list: %v", err)
606 }
607
608 for newNode := range newNodesSet {
609 if output, err := execCmd("gcloud", "compute", "instances", "describe",
610 newNode,
611 "--project="+framework.TestContext.CloudConfig.ProjectID,
612 "--zone="+framework.TestContext.CloudConfig.Zone).Output(); err == nil {
613 klog.Infof("Gcloud compute instances describe: %s", output)
614 } else {
615 klog.Errorf("Failed to get instances describe: %v", err)
616 }
617 }
618
619
620
621
622 }
623 ginkgo.By(fmt.Sprintf("New nodes: %v\n", newNodesSet))
624 registeredNodes := sets.NewString()
625 for nodeName := range newNodesSet {
626 node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
627 if err == nil && node != nil {
628 registeredNodes.Insert(nodeName)
629 } else {
630 klog.Errorf("Failed to get node %v: %v", nodeName, err)
631 }
632 }
633 ginkgo.By(fmt.Sprintf("Setting labels for registered new nodes: %v", registeredNodes.List()))
634 for node := range registeredNodes {
635 e2enode.AddOrUpdateLabelOnNode(c, node, labelKey, labelValue)
636 }
637
638 defer removeLabels(registeredNodes)
639
640 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
641 framework.ExpectNoError(e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, "node-selector"))
642 })
643
644 f.It("should scale up correct target pool", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
645 e2eskipper.SkipUnlessProviderIs("gke")
646
647 ginkgo.By("Creating new node-pool with e2-standard-4 machines")
648 const extraPoolName = "extra-pool"
649 addNodePool(extraPoolName, "e2-standard-4", 1)
650 defer deleteNodePool(extraPoolName)
651 extraNodes := getPoolInitialSize(extraPoolName)
652 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount+extraNodes, resizeTimeout))
653 framework.ExpectNoError(enableAutoscaler(extraPoolName, 1, 2))
654 defer disableAutoscaler(extraPoolName, 1, 2)
655
656 extraPods := extraNodes + 1
657 totalMemoryReservation := int(float64(extraPods) * 1.5 * float64(memAllocatableMb))
658 ginkgo.By(fmt.Sprintf("Creating rc with %v pods too big to fit default-pool but fitting extra-pool", extraPods))
659 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "memory-reservation")
660 ReserveMemory(ctx, f, "memory-reservation", extraPods, totalMemoryReservation, false, defaultTimeout)
661
662
663
664
665
666 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount+extraNodes+1, scaleUpTimeout+5*time.Minute))
667 })
668
669 simpleScaleDownTest := func(ctx context.Context, unready int) {
670 err := addKubeSystemPdbs(ctx, f)
671 framework.ExpectNoError(err)
672
673 ginkgo.By("Manually increase cluster size")
674 increasedSize := 0
675 newSizes := make(map[string]int)
676 for key, val := range originalSizes {
677 newSizes[key] = val + 2 + unready
678 increasedSize += val + 2 + unready
679 }
680 setMigSizes(newSizes)
681 framework.ExpectNoError(WaitForClusterSizeFuncWithUnready(ctx, f.ClientSet,
682 func(size int) bool { return size >= increasedSize }, manualResizeTimeout, unready))
683
684 ginkgo.By("Some node should be removed")
685 framework.ExpectNoError(WaitForClusterSizeFuncWithUnready(ctx, f.ClientSet,
686 func(size int) bool { return size < increasedSize }, scaleDownTimeout, unready))
687 }
688
689 f.It("should correctly scale down after a node is not needed", feature.ClusterSizeAutoscalingScaleDown,
690 func(ctx context.Context) { simpleScaleDownTest(ctx, 0) })
691
692 f.It("should correctly scale down after a node is not needed and one node is broken", feature.ClusterSizeAutoscalingScaleDown, func(ctx context.Context) {
693 e2eskipper.SkipUnlessSSHKeyPresent()
694 e2enetwork.TestUnderTemporaryNetworkFailure(ctx, c, "default", getAnyNode(ctx, c), func(ctx context.Context) { simpleScaleDownTest(ctx, 1) })
695 })
696
697 f.It("should correctly scale down after a node is not needed when there is non autoscaled pool", feature.ClusterSizeAutoscalingScaleDown, func(ctx context.Context) {
698 e2eskipper.SkipUnlessProviderIs("gke")
699
700 increasedSize := manuallyIncreaseClusterSize(ctx, f, originalSizes)
701
702 const extraPoolName = "extra-pool"
703 addNodePool(extraPoolName, "e2-standard-2", 3)
704 defer deleteNodePool(extraPoolName)
705 extraNodes := getPoolInitialSize(extraPoolName)
706
707 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
708 func(size int) bool { return size >= increasedSize+extraNodes }, scaleUpTimeout))
709
710 ginkgo.By("Some node should be removed")
711
712
713
714
715 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
716 func(size int) bool { return size < increasedSize+extraNodes }, scaleDownTimeout+10*time.Minute))
717 })
718
719 f.It("should be able to scale down when rescheduling a pod is required and pdb allows for it", feature.ClusterSizeAutoscalingScaleDown, func(ctx context.Context) {
720 runDrainTest(ctx, f, originalSizes, f.Namespace.Name, 1, 1, func(increasedSize int) {
721 ginkgo.By("Some node should be removed")
722 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
723 func(size int) bool { return size < increasedSize }, scaleDownTimeout))
724 })
725 })
726
727 f.It("shouldn't be able to scale down when rescheduling a pod is required, but pdb doesn't allow drain", feature.ClusterSizeAutoscalingScaleDown, func(ctx context.Context) {
728 runDrainTest(ctx, f, originalSizes, f.Namespace.Name, 1, 0, func(increasedSize int) {
729 ginkgo.By("No nodes should be removed")
730 time.Sleep(scaleDownTimeout)
731 nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
732 framework.ExpectNoError(err)
733 gomega.Expect(nodes.Items).To(gomega.HaveLen(increasedSize))
734 })
735 })
736
737 f.It("should be able to scale down by draining multiple pods one by one as dictated by pdb", feature.ClusterSizeAutoscalingScaleDown, func(ctx context.Context) {
738 runDrainTest(ctx, f, originalSizes, f.Namespace.Name, 2, 1, func(increasedSize int) {
739 ginkgo.By("Some node should be removed")
740 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
741 func(size int) bool { return size < increasedSize }, scaleDownTimeout))
742 })
743 })
744
745 f.It("should be able to scale down by draining system pods with pdb", feature.ClusterSizeAutoscalingScaleDown, func(ctx context.Context) {
746 runDrainTest(ctx, f, originalSizes, "kube-system", 2, 1, func(increasedSize int) {
747 ginkgo.By("Some node should be removed")
748 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
749 func(size int) bool { return size < increasedSize }, scaleDownTimeout))
750 })
751 })
752
753 f.It("Should be able to scale a node group up from 0", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
754
755 if framework.ProviderIs("gke") {
756
757 ginkgo.By("Add a new node pool with 0 nodes and min size 0")
758 const extraPoolName = "extra-pool"
759 addNodePool(extraPoolName, "e2-standard-4", 0)
760 defer deleteNodePool(extraPoolName)
761 framework.ExpectNoError(enableAutoscaler(extraPoolName, 0, 1))
762 defer disableAutoscaler(extraPoolName, 0, 1)
763 } else {
764
765 e2eskipper.SkipUnlessAtLeast(len(originalSizes), 2, "At least 2 node groups are needed for scale-to-0 tests")
766
767 ginkgo.By("Manually scale smallest node group to 0")
768 minMig := ""
769 minSize := nodeCount
770 for mig, size := range originalSizes {
771 if size <= minSize {
772 minMig = mig
773 minSize = size
774 }
775 }
776 framework.ExpectNoError(framework.ResizeGroup(minMig, int32(0)))
777 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount-minSize, resizeTimeout))
778 }
779
780 ginkgo.By("Make remaining nodes unschedulable")
781 nodes, err := f.ClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{
782 "spec.unschedulable": "false",
783 }.AsSelector().String()})
784 framework.ExpectNoError(err)
785
786 for _, node := range nodes.Items {
787 err = makeNodeUnschedulable(ctx, f.ClientSet, &node)
788
789 n := node
790 ginkgo.DeferCleanup(makeNodeSchedulable, f.ClientSet, &n, false)
791
792 framework.ExpectNoError(err)
793 }
794
795 ginkgo.By("Run a scale-up test")
796 ReserveMemory(ctx, f, "memory-reservation", 1, 100, false, 1*time.Second)
797 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "memory-reservation")
798
799
800 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
801 func(size int) bool { return size >= len(nodes.Items)+1 }, scaleUpTimeout))
802 framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
803 })
804
805
806
807
808
809
810
811
812
813
814
815 gkeScaleToZero := func(ctx context.Context) {
816
817 ginkgo.By("Add a new node pool with size 1 and min size 0")
818 const extraPoolName = "extra-pool"
819 addNodePool(extraPoolName, "e2-standard-4", 1)
820 defer deleteNodePool(extraPoolName)
821 extraNodes := getPoolInitialSize(extraPoolName)
822 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount+extraNodes, resizeTimeout))
823 framework.ExpectNoError(enableAutoscaler(extraPoolName, 0, 1))
824 defer disableAutoscaler(extraPoolName, 0, 1)
825
826 ngNodes := getPoolNodes(ctx, f, extraPoolName)
827 gomega.Expect(ngNodes).To(gomega.HaveLen(extraNodes))
828 for _, node := range ngNodes {
829 ginkgo.By(fmt.Sprintf("Target node for scale-down: %s", node.Name))
830 }
831
832 for _, node := range ngNodes {
833 drainNode(ctx, f, node)
834 }
835 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
836 func(size int) bool { return size <= nodeCount }, scaleDownTimeout))
837
838
839 newSize := getPoolSize(ctx, f, extraPoolName)
840 gomega.Expect(newSize).To(gomega.BeEmpty())
841 }
842
843 gceScaleToZero := func(ctx context.Context) {
844
845 ginkgo.By("Find smallest node group and manually scale it to a single node")
846 minMig := ""
847 minSize := nodeCount
848 for mig, size := range originalSizes {
849 if size <= minSize {
850 minMig = mig
851 minSize = size
852 }
853 }
854 framework.ExpectNoError(framework.ResizeGroup(minMig, int32(1)))
855 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount-minSize+1, resizeTimeout))
856 ngNodes, err := framework.GetGroupNodes(minMig)
857 framework.ExpectNoError(err)
858 if len(ngNodes) != 1 {
859 framework.Failf("Expected one node, got instead: %v", ngNodes)
860 }
861 node, err := f.ClientSet.CoreV1().Nodes().Get(ctx, ngNodes[0], metav1.GetOptions{})
862 ginkgo.By(fmt.Sprintf("Target node for scale-down: %s", node.Name))
863 framework.ExpectNoError(err)
864
865
866 drainNode(ctx, f, node)
867 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
868 func(size int) bool { return size < nodeCount-minSize+1 }, scaleDownTimeout))
869
870
871 newSize, err := framework.GroupSize(minMig)
872 framework.ExpectNoError(err)
873 gomega.Expect(newSize).To(gomega.BeEmpty())
874 }
875
876 f.It("Should be able to scale a node group down to 0", feature.ClusterSizeAutoscalingScaleDown, func(ctx context.Context) {
877 if framework.ProviderIs("gke") {
878 gkeScaleToZero(ctx)
879 } else if len(originalSizes) >= 2 {
880 gceScaleToZero(ctx)
881 } else {
882 e2eskipper.Skipf("At least 2 node groups are needed for scale-to-0 tests")
883 }
884 })
885
886 f.It("Shouldn't perform scale up operation and should list unhealthy status if most of the cluster is broken", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
887 e2eskipper.SkipUnlessSSHKeyPresent()
888
889 clusterSize := nodeCount
890 for clusterSize < unhealthyClusterThreshold+1 {
891 clusterSize = manuallyIncreaseClusterSize(ctx, f, originalSizes)
892 }
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908 time.Sleep(2 * time.Minute)
909
910 ginkgo.By("Block network connectivity to some nodes to simulate unhealthy cluster")
911 nodesToBreakCount := int(math.Ceil(math.Max(float64(unhealthyClusterThreshold), 0.5*float64(clusterSize))))
912 nodes, err := f.ClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{
913 "spec.unschedulable": "false",
914 }.AsSelector().String()})
915 framework.ExpectNoError(err)
916 if nodesToBreakCount > len(nodes.Items) {
917 framework.Failf("Expected at most %d nodes to break, got %d", len(nodes.Items), nodesToBreakCount)
918 }
919 nodesToBreak := nodes.Items[:nodesToBreakCount]
920
921
922
923
924 var testFunction func(ctx context.Context)
925 testFunction = func(ctx context.Context) {
926 if len(nodesToBreak) > 0 {
927 ntb := &nodesToBreak[0]
928 nodesToBreak = nodesToBreak[1:]
929 e2enetwork.TestUnderTemporaryNetworkFailure(ctx, c, "default", ntb, testFunction)
930 } else {
931 ReserveMemory(ctx, f, "memory-reservation", 100, nodeCount*memAllocatableMb, false, defaultTimeout)
932 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, "memory-reservation")
933
934 time.Sleep(15 * time.Minute)
935 currentNodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
936 framework.ExpectNoError(err)
937 framework.Logf("Currently available nodes: %v, nodes available at the start of test: %v, disabled nodes: %v", len(currentNodes.Items), len(nodes.Items), nodesToBreakCount)
938 gomega.Expect(currentNodes.Items).To(gomega.HaveLen(len(nodes.Items) - nodesToBreakCount))
939 status, err := getClusterwideStatus(ctx, c)
940 framework.Logf("Clusterwide status: %v", status)
941 framework.ExpectNoError(err)
942 gomega.Expect(status).To(gomega.Equal("Unhealthy"))
943 }
944 }
945 testFunction(ctx)
946
947 framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, len(nodes.Items), nodesRecoverTimeout))
948 })
949
950 f.It("shouldn't scale up when expendable pod is created", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
951 createPriorityClasses(ctx, f)
952
953 ginkgo.DeferCleanup(ReserveMemoryWithPriority, f, "memory-reservation", nodeCount+1, int(float64(nodeCount+1)*float64(0.7)*float64(memAllocatableMb)), false, time.Second, expendablePriorityClassName)
954 ginkgo.By(fmt.Sprintf("Waiting for scale up hoping it won't happen, sleep for %s", scaleUpTimeout.String()))
955 time.Sleep(scaleUpTimeout)
956
957 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
958 func(size int) bool { return size == nodeCount }, time.Second))
959 })
960
961 f.It("should scale up when non expendable pod is created", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
962 createPriorityClasses(ctx, f)
963
964 cleanupFunc := ReserveMemoryWithPriority(ctx, f, "memory-reservation", nodeCount+1, int(float64(nodeCount+1)*float64(0.7)*float64(memAllocatableMb)), true, scaleUpTimeout, highPriorityClassName)
965 defer cleanupFunc()
966
967 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
968 func(size int) bool { return size > nodeCount }, time.Second))
969 })
970
971 f.It("shouldn't scale up when expendable pod is preempted", feature.ClusterSizeAutoscalingScaleUp, func(ctx context.Context) {
972 createPriorityClasses(ctx, f)
973
974 cleanupFunc1 := ReserveMemoryWithPriority(ctx, f, "memory-reservation1", nodeCount, int(float64(nodeCount)*float64(0.7)*float64(memAllocatableMb)), true, defaultTimeout, expendablePriorityClassName)
975 defer cleanupFunc1()
976
977 cleanupFunc2 := ReserveMemoryWithPriority(ctx, f, "memory-reservation2", nodeCount, int(float64(nodeCount)*float64(0.7)*float64(memAllocatableMb)), true, defaultTimeout, highPriorityClassName)
978 defer cleanupFunc2()
979 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
980 func(size int) bool { return size == nodeCount }, time.Second))
981 })
982
983 f.It("should scale down when expendable pod is running", feature.ClusterSizeAutoscalingScaleDown, func(ctx context.Context) {
984 createPriorityClasses(ctx, f)
985 increasedSize := manuallyIncreaseClusterSize(ctx, f, originalSizes)
986
987 cleanupFunc := ReserveMemoryWithPriority(ctx, f, "memory-reservation", increasedSize, int(float64(increasedSize)*float64(0.7)*float64(memAllocatableMb)), true, scaleUpTimeout, expendablePriorityClassName)
988 defer cleanupFunc()
989 ginkgo.By("Waiting for scale down")
990 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
991 func(size int) bool { return size == nodeCount }, scaleDownTimeout))
992 })
993
994 f.It("shouldn't scale down when non expendable pod is running", feature.ClusterSizeAutoscalingScaleDown, func(ctx context.Context) {
995 createPriorityClasses(ctx, f)
996 increasedSize := manuallyIncreaseClusterSize(ctx, f, originalSizes)
997
998 cleanupFunc := ReserveMemoryWithPriority(ctx, f, "memory-reservation", increasedSize, int(float64(increasedSize)*float64(0.7)*float64(memAllocatableMb)), true, scaleUpTimeout, highPriorityClassName)
999 defer cleanupFunc()
1000 ginkgo.By(fmt.Sprintf("Waiting for scale down hoping it won't happen, sleep for %s", scaleDownTimeout.String()))
1001 time.Sleep(scaleDownTimeout)
1002 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
1003 func(size int) bool { return size == increasedSize }, time.Second))
1004 })
1005
1006 f.It("should scale up when unprocessed pod is created and is going to be unschedulable", feature.ClusterScaleUpBypassScheduler, func(ctx context.Context) {
1007
1008 replicaCount := 2 * nodeCount
1009 reservedMemory := int(float64(replicaCount) * float64(0.7) * float64(memAllocatableMb))
1010 cleanupFunc := ReserveMemoryWithSchedulerName(ctx, f, "memory-reservation", replicaCount, reservedMemory, false, 1, nonExistingBypassedSchedulerName)
1011 defer framework.ExpectNoError(cleanupFunc())
1012
1013 ginkgo.By("Waiting for cluster scale-up")
1014 sizeFunc := func(size int) bool {
1015
1016 return size > nodeCount
1017 }
1018 framework.ExpectNoError(WaitForClusterSizeFuncWithUnready(ctx, f.ClientSet, sizeFunc, scaleUpTimeout, 0))
1019 })
1020 f.It("shouldn't scale up when unprocessed pod is created and is going to be schedulable", feature.ClusterScaleUpBypassScheduler, func(ctx context.Context) {
1021
1022 replicaCount := 1
1023 reservedMemory := int(float64(0.5) * float64(memAllocatableMb))
1024 cleanupFunc := ReserveMemoryWithSchedulerName(ctx, f, "memory-reservation", replicaCount, reservedMemory, false, 1, nonExistingBypassedSchedulerName)
1025 defer framework.ExpectNoError(cleanupFunc())
1026
1027 ginkgo.By(fmt.Sprintf("Waiting for scale up hoping it won't happen, polling cluster size for %s", scaleUpTimeout.String()))
1028 sizeFunc := func(size int) bool {
1029 return size == nodeCount
1030 }
1031 gomega.Consistently(ctx, func() error {
1032 return WaitForClusterSizeFunc(ctx, f.ClientSet, sizeFunc, time.Second)
1033 }).WithTimeout(scaleUpTimeout).WithPolling(framework.Poll).ShouldNot(gomega.HaveOccurred())
1034 })
1035 f.It("shouldn't scale up when unprocessed pod is created and scheduler is not specified to be bypassed", feature.ClusterScaleUpBypassScheduler, func(ctx context.Context) {
1036
1037 replicaCount := 2 * nodeCount
1038 reservedMemory := int(float64(replicaCount) * float64(0.7) * float64(memAllocatableMb))
1039 schedulerName := "non-existent-scheduler-" + f.UniqueName
1040 cleanupFunc := ReserveMemoryWithSchedulerName(ctx, f, "memory-reservation", replicaCount, reservedMemory, false, 1, schedulerName)
1041 defer framework.ExpectNoError(cleanupFunc())
1042
1043 ginkgo.By(fmt.Sprintf("Waiting for scale up hoping it won't happen, polling cluster size for %s", scaleUpTimeout.String()))
1044 sizeFunc := func(size int) bool {
1045 return size == nodeCount
1046 }
1047 gomega.Consistently(ctx, func() error {
1048 return WaitForClusterSizeFunc(ctx, f.ClientSet, sizeFunc, time.Second)
1049 }).WithTimeout(scaleUpTimeout).WithPolling(framework.Poll).ShouldNot(gomega.HaveOccurred())
1050 })
1051 })
1052
1053 func installNvidiaDriversDaemonSet(ctx context.Context, f *framework.Framework) {
1054 ginkgo.By("Add daemonset which installs nvidia drivers")
1055
1056 dsYamlURL := "https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/daemonset.yaml"
1057 framework.Logf("Using %v", dsYamlURL)
1058
1059 ds, err := e2emanifest.DaemonSetFromURL(ctx, dsYamlURL)
1060 framework.ExpectNoError(err)
1061 ds.Namespace = f.Namespace.Name
1062
1063 _, err = f.ClientSet.AppsV1().DaemonSets(f.Namespace.Name).Create(ctx, ds, metav1.CreateOptions{})
1064 framework.ExpectNoError(err, "failed to create nvidia-driver-installer daemonset")
1065 }
1066
1067 func execCmd(args ...string) *exec.Cmd {
1068 klog.Infof("Executing: %s", strings.Join(args, " "))
1069 return exec.Command(args[0], args[1:]...)
1070 }
1071
1072 func runDrainTest(ctx context.Context, f *framework.Framework, migSizes map[string]int, namespace string, podsPerNode, pdbSize int, verifyFunction func(int)) {
1073 increasedSize := manuallyIncreaseClusterSize(ctx, f, migSizes)
1074
1075 nodes, err := f.ClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{
1076 "spec.unschedulable": "false",
1077 }.AsSelector().String()})
1078 framework.ExpectNoError(err)
1079 numPods := len(nodes.Items) * podsPerNode
1080 testID := string(uuid.NewUUID())
1081 labelMap := map[string]string{"test_id": testID}
1082 framework.ExpectNoError(runReplicatedPodOnEachNode(ctx, f, nodes.Items, namespace, podsPerNode, "reschedulable-pods", labelMap, 0))
1083
1084 ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, namespace, "reschedulable-pods")
1085
1086 ginkgo.By("Create a PodDisruptionBudget")
1087 minAvailable := intstr.FromInt32(int32(numPods - pdbSize))
1088 pdb := &policyv1.PodDisruptionBudget{
1089 ObjectMeta: metav1.ObjectMeta{
1090 Name: "test_pdb",
1091 Namespace: namespace,
1092 },
1093 Spec: policyv1.PodDisruptionBudgetSpec{
1094 Selector: &metav1.LabelSelector{MatchLabels: labelMap},
1095 MinAvailable: &minAvailable,
1096 },
1097 }
1098 _, err = f.ClientSet.PolicyV1().PodDisruptionBudgets(namespace).Create(ctx, pdb, metav1.CreateOptions{})
1099
1100 ginkgo.DeferCleanup(framework.IgnoreNotFound(f.ClientSet.PolicyV1().PodDisruptionBudgets(namespace).Delete), pdb.Name, metav1.DeleteOptions{})
1101
1102 framework.ExpectNoError(err)
1103 verifyFunction(increasedSize)
1104 }
1105
1106 func getGkeAPIEndpoint() string {
1107 gkeAPIEndpoint := os.Getenv("CLOUDSDK_API_ENDPOINT_OVERRIDES_CONTAINER")
1108 if gkeAPIEndpoint == "" {
1109 gkeAPIEndpoint = "https://test-container.sandbox.googleapis.com"
1110 }
1111 if strings.HasSuffix(gkeAPIEndpoint, "/") {
1112 gkeAPIEndpoint = gkeAPIEndpoint[:len(gkeAPIEndpoint)-1]
1113 }
1114 return gkeAPIEndpoint
1115 }
1116
1117 func getGKEURL(apiVersion string, suffix string) string {
1118 out, err := execCmd("gcloud", "auth", "print-access-token").Output()
1119 framework.ExpectNoError(err)
1120 token := strings.Replace(string(out), "\n", "", -1)
1121
1122 return fmt.Sprintf("%s/%s/%s?access_token=%s",
1123 getGkeAPIEndpoint(),
1124 apiVersion,
1125 suffix,
1126 token)
1127 }
1128
1129 func getGKEClusterURL(apiVersion string) string {
1130 if isRegionalCluster() {
1131
1132 return getGKEURL(apiVersion, fmt.Sprintf("projects/%s/locations/%s/clusters/%s",
1133 framework.TestContext.CloudConfig.ProjectID,
1134 framework.TestContext.CloudConfig.Region,
1135 framework.TestContext.CloudConfig.Cluster))
1136 }
1137 return getGKEURL(apiVersion, fmt.Sprintf("projects/%s/zones/%s/clusters/%s",
1138 framework.TestContext.CloudConfig.ProjectID,
1139 framework.TestContext.CloudConfig.Zone,
1140 framework.TestContext.CloudConfig.Cluster))
1141 }
1142
1143 func getCluster(apiVersion string) (string, error) {
1144 resp, err := http.Get(getGKEClusterURL(apiVersion))
1145 if err != nil {
1146 return "", err
1147 }
1148 defer resp.Body.Close()
1149 body, err := io.ReadAll(resp.Body)
1150 if err != nil {
1151 return "", err
1152 }
1153 if resp.StatusCode != http.StatusOK {
1154 return "", fmt.Errorf("error: %s %s", resp.Status, body)
1155 }
1156
1157 return string(body), nil
1158 }
1159
1160 func isAutoscalerEnabled(expectedMaxNodeCountInTargetPool int) (bool, error) {
1161 apiVersion := "v1"
1162 if isRegionalCluster() {
1163 apiVersion = "v1beta1"
1164 }
1165 strBody, err := getCluster(apiVersion)
1166 if err != nil {
1167 return false, err
1168 }
1169 if strings.Contains(strBody, "\"maxNodeCount\": "+strconv.Itoa(expectedMaxNodeCountInTargetPool)) {
1170 return true, nil
1171 }
1172 return false, nil
1173 }
1174
1175 func getClusterLocation() string {
1176 if isRegionalCluster() {
1177 return "--region=" + framework.TestContext.CloudConfig.Region
1178 }
1179 return "--zone=" + framework.TestContext.CloudConfig.Zone
1180 }
1181
1182 func getGcloudCommandFromTrack(commandTrack string, args []string) []string {
1183 command := []string{"gcloud"}
1184 if commandTrack == "beta" || commandTrack == "alpha" {
1185 command = append(command, commandTrack)
1186 }
1187 command = append(command, args...)
1188 command = append(command, getClusterLocation())
1189 command = append(command, "--project="+framework.TestContext.CloudConfig.ProjectID)
1190 return command
1191 }
1192
1193 func getGcloudCommand(args []string) []string {
1194 track := ""
1195 if isRegionalCluster() {
1196 track = "beta"
1197 }
1198 return getGcloudCommandFromTrack(track, args)
1199 }
1200
1201 func isRegionalCluster() bool {
1202
1203 return framework.TestContext.CloudConfig.MultiZone
1204 }
1205
1206 func enableAutoscaler(nodePool string, minCount, maxCount int) error {
1207 klog.Infof("Using gcloud to enable autoscaling for pool %s", nodePool)
1208
1209 args := []string{"container", "clusters", "update", framework.TestContext.CloudConfig.Cluster,
1210 "--enable-autoscaling",
1211 "--min-nodes=" + strconv.Itoa(minCount),
1212 "--max-nodes=" + strconv.Itoa(maxCount),
1213 "--node-pool=" + nodePool}
1214 output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
1215
1216 if err != nil {
1217 klog.Errorf("Failed config update result: %s", output)
1218 return fmt.Errorf("Failed to enable autoscaling: %w", err)
1219 }
1220 klog.Infof("Config update result: %s", output)
1221
1222 var finalErr error
1223 for startTime := time.Now(); startTime.Add(gkeUpdateTimeout).After(time.Now()); time.Sleep(30 * time.Second) {
1224 val, err := isAutoscalerEnabled(maxCount)
1225 if err == nil && val {
1226 return nil
1227 }
1228 finalErr = err
1229 }
1230 return fmt.Errorf("autoscaler not enabled, last error: %v", finalErr)
1231 }
1232
1233 func disableAutoscaler(nodePool string, minCount, maxCount int) error {
1234 klog.Infof("Using gcloud to disable autoscaling for pool %s", nodePool)
1235 args := []string{"container", "clusters", "update", framework.TestContext.CloudConfig.Cluster,
1236 "--no-enable-autoscaling",
1237 "--node-pool=" + nodePool}
1238 output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
1239
1240 if err != nil {
1241 klog.Errorf("Failed config update result: %s", output)
1242 return fmt.Errorf("Failed to disable autoscaling: %w", err)
1243 }
1244 klog.Infof("Config update result: %s", output)
1245
1246 var finalErr error
1247 for startTime := time.Now(); startTime.Add(gkeUpdateTimeout).After(time.Now()); time.Sleep(30 * time.Second) {
1248 val, err := isAutoscalerEnabled(maxCount)
1249 if err == nil && !val {
1250 return nil
1251 }
1252 finalErr = err
1253 }
1254 return fmt.Errorf("autoscaler still enabled, last error: %v", finalErr)
1255 }
1256
1257 func addNodePool(name string, machineType string, numNodes int) {
1258 args := []string{"container", "node-pools", "create", name, "--quiet",
1259 "--machine-type=" + machineType,
1260 "--num-nodes=" + strconv.Itoa(numNodes),
1261 "--cluster=" + framework.TestContext.CloudConfig.Cluster}
1262 output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
1263 klog.Infof("Creating node-pool %s: %s", name, output)
1264 framework.ExpectNoError(err, string(output))
1265 }
1266
1267 func addGpuNodePool(name string, gpuType string, gpuCount int, numNodes int) {
1268 args := []string{"beta", "container", "node-pools", "create", name, "--quiet",
1269 "--accelerator", "type=" + gpuType + ",count=" + strconv.Itoa(gpuCount),
1270 "--num-nodes=" + strconv.Itoa(numNodes),
1271 "--cluster=" + framework.TestContext.CloudConfig.Cluster}
1272 output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
1273 klog.Infof("Creating node-pool %s: %s", name, output)
1274 framework.ExpectNoError(err, string(output))
1275 }
1276
1277 func deleteNodePool(name string) {
1278 klog.Infof("Deleting node pool %s", name)
1279 args := []string{"container", "node-pools", "delete", name, "--quiet",
1280 "--cluster=" + framework.TestContext.CloudConfig.Cluster}
1281 err := wait.ExponentialBackoff(
1282 wait.Backoff{Duration: 1 * time.Minute, Factor: float64(3), Steps: 3},
1283 func() (bool, error) {
1284 output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
1285 if err != nil {
1286 klog.Warningf("Error deleting nodegroup - error:%v, output: %s", err, output)
1287 return false, nil
1288 }
1289 klog.Infof("Node-pool deletion output: %s", output)
1290 return true, nil
1291 })
1292 framework.ExpectNoError(err)
1293 }
1294
1295 func getPoolNodes(ctx context.Context, f *framework.Framework, poolName string) []*v1.Node {
1296 nodes := make([]*v1.Node, 0, 1)
1297 nodeList, err := e2enode.GetReadyNodesIncludingTainted(ctx, f.ClientSet)
1298 if err != nil {
1299 framework.Logf("Unexpected error occurred: %v", err)
1300 }
1301 framework.ExpectNoErrorWithOffset(0, err)
1302 for _, node := range nodeList.Items {
1303 if node.Labels[gkeNodepoolNameKey] == poolName {
1304 node := node
1305 nodes = append(nodes, &node)
1306 }
1307 }
1308 return nodes
1309 }
1310
1311
1312
1313
1314 func getPoolInitialSize(poolName string) int {
1315
1316 args := []string{"container", "node-pools", "describe", poolName, "--quiet",
1317 "--cluster=" + framework.TestContext.CloudConfig.Cluster,
1318 "--format=value(initialNodeCount)"}
1319 output, err := execCmd(getGcloudCommand(args)...).CombinedOutput()
1320 klog.Infof("Node-pool initial size: %s", output)
1321 framework.ExpectNoError(err, string(output))
1322 fields := strings.Fields(string(output))
1323 gomega.Expect(fields).To(gomega.HaveLen(1))
1324 size, err := strconv.ParseInt(fields[0], 10, 64)
1325 framework.ExpectNoError(err)
1326
1327
1328 args = []string{"container", "node-pools", "describe", poolName, "--quiet",
1329 "--cluster=" + framework.TestContext.CloudConfig.Cluster,
1330 "--format=value(instanceGroupUrls)"}
1331 output, err = execCmd(getGcloudCommand(args)...).CombinedOutput()
1332 framework.ExpectNoError(err, string(output))
1333 nodeGroupCount := len(strings.Split(string(output), ";"))
1334 return int(size) * nodeGroupCount
1335 }
1336
1337 func getPoolSize(ctx context.Context, f *framework.Framework, poolName string) int {
1338 size := 0
1339 nodeList, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
1340 framework.ExpectNoError(err)
1341 for _, node := range nodeList.Items {
1342 if node.Labels[gkeNodepoolNameKey] == poolName {
1343 size++
1344 }
1345 }
1346 return size
1347 }
1348
1349 func reserveMemory(ctx context.Context, f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration, selector map[string]string, tolerations []v1.Toleration, priorityClassName, schedulerName string) func() error {
1350 ginkgo.By(fmt.Sprintf("Running RC which reserves %v MB of memory", megabytes))
1351 request := int64(1024 * 1024 * megabytes / replicas)
1352 config := &testutils.RCConfig{
1353 Client: f.ClientSet,
1354 Name: id,
1355 Namespace: f.Namespace.Name,
1356 Timeout: timeout,
1357 Image: imageutils.GetPauseImageName(),
1358 Replicas: replicas,
1359 MemRequest: request,
1360 NodeSelector: selector,
1361 Tolerations: tolerations,
1362 PriorityClassName: priorityClassName,
1363 SchedulerName: schedulerName,
1364 }
1365 for start := time.Now(); time.Since(start) < rcCreationRetryTimeout; time.Sleep(rcCreationRetryDelay) {
1366 err := e2erc.RunRC(ctx, *config)
1367 if err != nil && strings.Contains(err.Error(), "Error creating replication controller") {
1368 klog.Warningf("Failed to create memory reservation: %v", err)
1369 continue
1370 }
1371 if expectRunning {
1372 framework.ExpectNoError(err)
1373 }
1374 return func() error {
1375 return e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, id)
1376 }
1377 }
1378 framework.Failf("Failed to reserve memory within timeout")
1379 return nil
1380 }
1381
1382
1383
1384 func ReserveMemoryWithPriority(ctx context.Context, f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration, priorityClassName string) func() error {
1385 return reserveMemory(ctx, f, id, replicas, megabytes, expectRunning, timeout, nil, nil, priorityClassName, "")
1386 }
1387
1388
1389
1390 func ReserveMemoryWithSelectorAndTolerations(ctx context.Context, f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration, selector map[string]string, tolerations []v1.Toleration) func() error {
1391 return reserveMemory(ctx, f, id, replicas, megabytes, expectRunning, timeout, selector, tolerations, "", "")
1392 }
1393
1394
1395
1396 func ReserveMemoryWithSchedulerName(ctx context.Context, f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration, schedulerName string) func() error {
1397 return reserveMemory(ctx, f, id, replicas, megabytes, expectRunning, timeout, nil, nil, "", schedulerName)
1398 }
1399
1400
1401
1402 func ReserveMemory(ctx context.Context, f *framework.Framework, id string, replicas, megabytes int, expectRunning bool, timeout time.Duration) func() error {
1403 return reserveMemory(ctx, f, id, replicas, megabytes, expectRunning, timeout, nil, nil, "", "")
1404 }
1405
1406
1407 func WaitForClusterSizeFunc(ctx context.Context, c clientset.Interface, sizeFunc func(int) bool, timeout time.Duration) error {
1408 return WaitForClusterSizeFuncWithUnready(ctx, c, sizeFunc, timeout, 0)
1409 }
1410
1411
1412 func WaitForClusterSizeFuncWithUnready(ctx context.Context, c clientset.Interface, sizeFunc func(int) bool, timeout time.Duration, expectedUnready int) error {
1413 for start := time.Now(); time.Since(start) < timeout && ctx.Err() == nil; time.Sleep(20 * time.Second) {
1414 nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{
1415 "spec.unschedulable": "false",
1416 }.AsSelector().String()})
1417 if err != nil {
1418 klog.Warningf("Failed to list nodes: %v", err)
1419 continue
1420 }
1421 numNodes := len(nodes.Items)
1422
1423
1424 e2enode.Filter(nodes, func(node v1.Node) bool {
1425 return e2enode.IsConditionSetAsExpected(&node, v1.NodeReady, true)
1426 })
1427 numReady := len(nodes.Items)
1428
1429 if numNodes == numReady+expectedUnready && sizeFunc(numNodes) {
1430 klog.Infof("Cluster has reached the desired size")
1431 return nil
1432 }
1433 klog.Infof("Waiting for cluster with func, current size %d, not ready nodes %d", numNodes, numNodes-numReady)
1434 }
1435 return fmt.Errorf("timeout waiting %v for appropriate cluster size", timeout)
1436 }
1437
1438 func waitForCaPodsReadyInNamespace(ctx context.Context, f *framework.Framework, c clientset.Interface, tolerateUnreadyCount int) error {
1439 var notready []string
1440 for start := time.Now(); time.Now().Before(start.Add(scaleUpTimeout)) && ctx.Err() == nil; time.Sleep(20 * time.Second) {
1441 pods, err := c.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{})
1442 if err != nil {
1443 return fmt.Errorf("failed to get pods: %w", err)
1444 }
1445 notready = make([]string, 0)
1446 for _, pod := range pods.Items {
1447 ready := false
1448 for _, c := range pod.Status.Conditions {
1449 if c.Type == v1.PodReady && c.Status == v1.ConditionTrue {
1450 ready = true
1451 }
1452 }
1453
1454
1455 if pod.Status.Phase == v1.PodFailed {
1456 klog.Warningf("Pod has failed: %v", pod)
1457 }
1458 if !ready && pod.Status.Phase != v1.PodFailed {
1459 notready = append(notready, pod.Name)
1460 }
1461 }
1462 if len(notready) <= tolerateUnreadyCount {
1463 klog.Infof("sufficient number of pods ready. Tolerating %d unready", tolerateUnreadyCount)
1464 return nil
1465 }
1466 klog.Infof("Too many pods are not ready yet: %v", notready)
1467 }
1468 klog.Info("Timeout on waiting for pods being ready")
1469 klog.Info(e2ekubectl.RunKubectlOrDie(f.Namespace.Name, "get", "pods", "-o", "json", "--all-namespaces"))
1470 klog.Info(e2ekubectl.RunKubectlOrDie(f.Namespace.Name, "get", "nodes", "-o", "json"))
1471
1472
1473 return fmt.Errorf("Too many pods are still not running: %v", notready)
1474 }
1475
1476 func waitForAllCaPodsReadyInNamespace(ctx context.Context, f *framework.Framework, c clientset.Interface) error {
1477 return waitForCaPodsReadyInNamespace(ctx, f, c, 0)
1478 }
1479
1480 func getAnyNode(ctx context.Context, c clientset.Interface) *v1.Node {
1481 nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{
1482 "spec.unschedulable": "false",
1483 }.AsSelector().String()})
1484 if err != nil {
1485 klog.Errorf("Failed to get node list: %v", err)
1486 return nil
1487 }
1488 if len(nodes.Items) == 0 {
1489 klog.Errorf("No nodes")
1490 return nil
1491 }
1492 return &nodes.Items[0]
1493 }
1494
1495 func setMigSizes(sizes map[string]int) bool {
1496 madeChanges := false
1497 for mig, desiredSize := range sizes {
1498 currentSize, err := framework.GroupSize(mig)
1499 framework.ExpectNoError(err)
1500 if desiredSize != currentSize {
1501 ginkgo.By(fmt.Sprintf("Setting size of %s to %d", mig, desiredSize))
1502 err = framework.ResizeGroup(mig, int32(desiredSize))
1503 framework.ExpectNoError(err)
1504 madeChanges = true
1505 }
1506 }
1507 return madeChanges
1508 }
1509
1510 func drainNode(ctx context.Context, f *framework.Framework, node *v1.Node) {
1511 ginkgo.By("Make the single node unschedulable")
1512 framework.ExpectNoError(makeNodeUnschedulable(ctx, f.ClientSet, node))
1513
1514 ginkgo.By("Manually drain the single node")
1515 podOpts := metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector("spec.nodeName", node.Name).String()}
1516 pods, err := f.ClientSet.CoreV1().Pods(metav1.NamespaceAll).List(ctx, podOpts)
1517 framework.ExpectNoError(err)
1518 for _, pod := range pods.Items {
1519 err = f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0))
1520 framework.ExpectNoError(err)
1521 }
1522 }
1523
1524 func makeNodeUnschedulable(ctx context.Context, c clientset.Interface, node *v1.Node) error {
1525 ginkgo.By(fmt.Sprintf("Taint node %s", node.Name))
1526 for j := 0; j < 3; j++ {
1527 freshNode, err := c.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
1528 if err != nil {
1529 return err
1530 }
1531 for _, taint := range freshNode.Spec.Taints {
1532 if taint.Key == disabledTaint {
1533 return nil
1534 }
1535 }
1536 freshNode.Spec.Taints = append(freshNode.Spec.Taints, v1.Taint{
1537 Key: disabledTaint,
1538 Value: "DisabledForTest",
1539 Effect: v1.TaintEffectNoSchedule,
1540 })
1541 _, err = c.CoreV1().Nodes().Update(ctx, freshNode, metav1.UpdateOptions{})
1542 if err == nil {
1543 return nil
1544 }
1545 if !apierrors.IsConflict(err) {
1546 return err
1547 }
1548 klog.Warningf("Got 409 conflict when trying to taint node, retries left: %v", 3-j)
1549 }
1550 return fmt.Errorf("Failed to taint node in allowed number of retries")
1551 }
1552
1553
1554
1555 type CriticalAddonsOnlyError struct{}
1556
1557 func (CriticalAddonsOnlyError) Error() string {
1558 return fmt.Sprintf("CriticalAddonsOnly taint found on node")
1559 }
1560
1561 func makeNodeSchedulable(ctx context.Context, c clientset.Interface, node *v1.Node, failOnCriticalAddonsOnly bool) error {
1562 ginkgo.By(fmt.Sprintf("Remove taint from node %s", node.Name))
1563 for j := 0; j < 3; j++ {
1564 freshNode, err := c.CoreV1().Nodes().Get(ctx, node.Name, metav1.GetOptions{})
1565 if err != nil {
1566 return err
1567 }
1568 var newTaints []v1.Taint
1569 for _, taint := range freshNode.Spec.Taints {
1570 if failOnCriticalAddonsOnly && taint.Key == criticalAddonsOnlyTaint {
1571 return CriticalAddonsOnlyError{}
1572 }
1573 if taint.Key != disabledTaint {
1574 newTaints = append(newTaints, taint)
1575 }
1576 }
1577
1578 if len(newTaints) == len(freshNode.Spec.Taints) {
1579 return nil
1580 }
1581 freshNode.Spec.Taints = newTaints
1582 _, err = c.CoreV1().Nodes().Update(ctx, freshNode, metav1.UpdateOptions{})
1583 if err == nil {
1584 return nil
1585 }
1586 if !apierrors.IsConflict(err) {
1587 return err
1588 }
1589 klog.Warningf("Got 409 conflict when trying to taint node, retries left: %v", 3-j)
1590 }
1591 return fmt.Errorf("Failed to remove taint from node in allowed number of retries")
1592 }
1593
1594
1595 func ScheduleAnySingleGpuPod(ctx context.Context, f *framework.Framework, id string) error {
1596 return ScheduleGpuPod(ctx, f, id, "", 1)
1597 }
1598
1599
1600 func ScheduleGpuPod(ctx context.Context, f *framework.Framework, id string, gpuType string, gpuLimit int64) error {
1601 config := &testutils.RCConfig{
1602 Client: f.ClientSet,
1603 Name: id,
1604 Namespace: f.Namespace.Name,
1605 Timeout: 3 * scaleUpTimeout,
1606 Image: imageutils.GetPauseImageName(),
1607 Replicas: 1,
1608 GpuLimit: gpuLimit,
1609 Labels: map[string]string{"requires-gpu": "yes"},
1610 }
1611
1612 if gpuType != "" {
1613 config.NodeSelector = map[string]string{gpuLabel: gpuType}
1614 }
1615
1616 err := e2erc.RunRC(ctx, *config)
1617 if err != nil {
1618 return err
1619 }
1620 return nil
1621 }
1622
1623
1624 func runAntiAffinityPods(ctx context.Context, f *framework.Framework, namespace string, pods int, id string, podLabels, antiAffinityLabels map[string]string) error {
1625 config := &testutils.RCConfig{
1626 Affinity: buildAntiAffinity(antiAffinityLabels),
1627 Client: f.ClientSet,
1628 Name: id,
1629 Namespace: namespace,
1630 Timeout: scaleUpTimeout,
1631 Image: imageutils.GetPauseImageName(),
1632 Replicas: pods,
1633 Labels: podLabels,
1634 }
1635 err := e2erc.RunRC(ctx, *config)
1636 if err != nil {
1637 return err
1638 }
1639 _, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(ctx, id, metav1.GetOptions{})
1640 if err != nil {
1641 return err
1642 }
1643 return nil
1644 }
1645
1646 func runVolumeAntiAffinityPods(ctx context.Context, f *framework.Framework, namespace string, pods int, id string, podLabels, antiAffinityLabels map[string]string, volumes []v1.Volume) error {
1647 config := &testutils.RCConfig{
1648 Affinity: buildAntiAffinity(antiAffinityLabels),
1649 Volumes: volumes,
1650 Client: f.ClientSet,
1651 Name: id,
1652 Namespace: namespace,
1653 Timeout: scaleUpTimeout,
1654 Image: imageutils.GetPauseImageName(),
1655 Replicas: pods,
1656 Labels: podLabels,
1657 }
1658 err := e2erc.RunRC(ctx, *config)
1659 if err != nil {
1660 return err
1661 }
1662 _, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(ctx, id, metav1.GetOptions{})
1663 if err != nil {
1664 return err
1665 }
1666 return nil
1667 }
1668
1669 var emptyDirVolumes = []v1.Volume{
1670 {
1671 Name: "empty-volume",
1672 VolumeSource: v1.VolumeSource{
1673 EmptyDir: &v1.EmptyDirVolumeSource{},
1674 },
1675 },
1676 }
1677
1678 func buildVolumes(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim) []v1.Volume {
1679 return []v1.Volume{
1680 {
1681 Name: pv.Name,
1682 VolumeSource: v1.VolumeSource{
1683 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
1684 ClaimName: pvc.Name,
1685 ReadOnly: false,
1686 },
1687 },
1688 },
1689 }
1690 }
1691
1692 func buildAntiAffinity(labels map[string]string) *v1.Affinity {
1693 return &v1.Affinity{
1694 PodAntiAffinity: &v1.PodAntiAffinity{
1695 RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
1696 {
1697 LabelSelector: &metav1.LabelSelector{
1698 MatchLabels: labels,
1699 },
1700 TopologyKey: "kubernetes.io/hostname",
1701 },
1702 },
1703 },
1704 }
1705 }
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716 func runReplicatedPodOnEachNode(ctx context.Context, f *framework.Framework, nodes []v1.Node, namespace string, podsPerNode int, id string, labels map[string]string, memRequest int64) error {
1717 ginkgo.By("Run a pod on each node")
1718 for _, node := range nodes {
1719 err := makeNodeUnschedulable(ctx, f.ClientSet, &node)
1720
1721 n := node
1722 ginkgo.DeferCleanup(makeNodeSchedulable, f.ClientSet, &n, false)
1723
1724 if err != nil {
1725 return err
1726 }
1727 }
1728 config := &testutils.RCConfig{
1729 Client: f.ClientSet,
1730 Name: id,
1731 Namespace: namespace,
1732 Timeout: defaultTimeout,
1733 Image: imageutils.GetPauseImageName(),
1734 Replicas: 0,
1735 Labels: labels,
1736 MemRequest: memRequest,
1737 }
1738 err := e2erc.RunRC(ctx, *config)
1739 if err != nil {
1740 return err
1741 }
1742 rc, err := f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(ctx, id, metav1.GetOptions{})
1743 if err != nil {
1744 return err
1745 }
1746 for i, node := range nodes {
1747 err = makeNodeSchedulable(ctx, f.ClientSet, &node, false)
1748 if err != nil {
1749 return err
1750 }
1751
1752
1753
1754 for j := 0; j < 3; j++ {
1755 *rc.Spec.Replicas = int32((i + 1) * podsPerNode)
1756 rc, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Update(ctx, rc, metav1.UpdateOptions{})
1757 if err == nil {
1758 break
1759 }
1760 if !apierrors.IsConflict(err) {
1761 return err
1762 }
1763 klog.Warningf("Got 409 conflict when trying to scale RC, retries left: %v", 3-j)
1764 rc, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(ctx, id, metav1.GetOptions{})
1765 if err != nil {
1766 return err
1767 }
1768 }
1769
1770 err = wait.PollImmediate(5*time.Second, podTimeout, func() (bool, error) {
1771 rc, err = f.ClientSet.CoreV1().ReplicationControllers(namespace).Get(ctx, id, metav1.GetOptions{})
1772 if err != nil || rc.Status.ReadyReplicas < int32((i+1)*podsPerNode) {
1773 return false, nil
1774 }
1775 return true, nil
1776 })
1777 if err != nil {
1778 return fmt.Errorf("failed to coerce RC into spawning a pod on node %s within timeout", node.Name)
1779 }
1780 err = makeNodeUnschedulable(ctx, f.ClientSet, &node)
1781 if err != nil {
1782 return err
1783 }
1784 }
1785 return nil
1786 }
1787
1788
1789
1790 func manuallyIncreaseClusterSize(ctx context.Context, f *framework.Framework, originalSizes map[string]int) int {
1791 ginkgo.By("Manually increase cluster size")
1792 increasedSize := 0
1793 newSizes := make(map[string]int)
1794 for key, val := range originalSizes {
1795 newSizes[key] = val + newNodesForScaledownTests
1796 increasedSize += val + newNodesForScaledownTests
1797 }
1798 setMigSizes(newSizes)
1799
1800 checkClusterSize := func(size int) bool {
1801 if size >= increasedSize {
1802 return true
1803 }
1804 resized := setMigSizes(newSizes)
1805 if resized {
1806 klog.Warning("Unexpected node group size while waiting for cluster resize. Setting size to target again.")
1807 }
1808 return false
1809 }
1810
1811 framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet, checkClusterSize, manualResizeTimeout))
1812 return increasedSize
1813 }
1814
1815
1816
1817 func getClusterwideStatus(ctx context.Context, c clientset.Interface) (string, error) {
1818 configMap, err := c.CoreV1().ConfigMaps("kube-system").Get(ctx, "cluster-autoscaler-status", metav1.GetOptions{})
1819 if err != nil {
1820 return "", err
1821 }
1822 status, ok := configMap.Data["status"]
1823 if !ok {
1824 return "", fmt.Errorf("Status information not found in configmap")
1825 }
1826 matcher, err := regexp.Compile("Cluster-wide:\\s*\n\\s*Health:\\s*([A-Za-z]+)")
1827 if err != nil {
1828 return "", err
1829 }
1830 result := matcher.FindStringSubmatch(status)
1831 if len(result) < 2 {
1832 return "", fmt.Errorf("Failed to parse CA status configmap, raw status: %v", status)
1833 }
1834 return result[1], nil
1835 }
1836
1837 type scaleUpStatus struct {
1838 status string
1839 ready int
1840 target int
1841 timestamp time.Time
1842 }
1843
1844
1845
1846 func getStatusTimestamp(status string) (time.Time, error) {
1847 timestampMatcher, err := regexp.Compile("Cluster-autoscaler status at \\s*([0-9\\-]+ [0-9]+:[0-9]+:[0-9]+\\.[0-9]+ \\+[0-9]+ [A-Za-z]+)")
1848 if err != nil {
1849 return time.Time{}, err
1850 }
1851
1852 timestampMatch := timestampMatcher.FindStringSubmatch(status)
1853 if len(timestampMatch) < 2 {
1854 return time.Time{}, fmt.Errorf("Failed to parse CA status timestamp, raw status: %v", status)
1855 }
1856
1857 timestamp, err := time.Parse(timestampFormat, timestampMatch[1])
1858 if err != nil {
1859 return time.Time{}, err
1860 }
1861 return timestamp, nil
1862 }
1863
1864
1865
1866 func getScaleUpStatus(ctx context.Context, c clientset.Interface) (*scaleUpStatus, error) {
1867 configMap, err := c.CoreV1().ConfigMaps("kube-system").Get(ctx, "cluster-autoscaler-status", metav1.GetOptions{})
1868 if err != nil {
1869 return nil, err
1870 }
1871 status, ok := configMap.Data["status"]
1872 if !ok {
1873 return nil, fmt.Errorf("Status information not found in configmap")
1874 }
1875
1876 timestamp, err := getStatusTimestamp(status)
1877 if err != nil {
1878 return nil, err
1879 }
1880
1881 matcher, err := regexp.Compile("s*ScaleUp:\\s*([A-Za-z]+)\\s*\\(ready=([0-9]+)\\s*cloudProviderTarget=([0-9]+)\\s*\\)")
1882 if err != nil {
1883 return nil, err
1884 }
1885 matches := matcher.FindAllStringSubmatch(status, -1)
1886 if len(matches) < 1 {
1887 return nil, fmt.Errorf("Failed to parse CA status configmap, raw status: %v", status)
1888 }
1889
1890 result := scaleUpStatus{
1891 status: caNoScaleUpStatus,
1892 ready: 0,
1893 target: 0,
1894 timestamp: timestamp,
1895 }
1896 for _, match := range matches {
1897 if match[1] == caOngoingScaleUpStatus {
1898 result.status = caOngoingScaleUpStatus
1899 }
1900 newReady, err := strconv.Atoi(match[2])
1901 if err != nil {
1902 return nil, err
1903 }
1904 result.ready += newReady
1905 newTarget, err := strconv.Atoi(match[3])
1906 if err != nil {
1907 return nil, err
1908 }
1909 result.target += newTarget
1910 }
1911 klog.Infof("Cluster-Autoscaler scale-up status: %v (%v, %v)", result.status, result.ready, result.target)
1912 return &result, nil
1913 }
1914
1915 func waitForScaleUpStatus(ctx context.Context, c clientset.Interface, cond func(s *scaleUpStatus) bool, timeout time.Duration) (*scaleUpStatus, error) {
1916 var finalErr error
1917 var status *scaleUpStatus
1918 err := wait.PollUntilContextTimeout(ctx, 5*time.Second, timeout, true, func(ctx context.Context) (bool, error) {
1919 status, finalErr = getScaleUpStatus(ctx, c)
1920 if finalErr != nil {
1921 return false, nil
1922 }
1923 if status.timestamp.Add(freshStatusLimit).Before(time.Now()) {
1924
1925 finalErr = fmt.Errorf("Status too old")
1926 return false, nil
1927 }
1928 return cond(status), nil
1929 })
1930 if err != nil {
1931 err = fmt.Errorf("Failed to find expected scale up status: %v, last status: %v, final err: %v", err, status, finalErr)
1932 }
1933 return status, err
1934 }
1935
1936
1937
1938 func addKubeSystemPdbs(ctx context.Context, f *framework.Framework) error {
1939 ginkgo.By("Create PodDisruptionBudgets for kube-system components, so they can be migrated if required")
1940
1941 var newPdbs []string
1942 cleanup := func(ctx context.Context) {
1943 var finalErr error
1944 for _, newPdbName := range newPdbs {
1945 ginkgo.By(fmt.Sprintf("Delete PodDisruptionBudget %v", newPdbName))
1946 err := f.ClientSet.PolicyV1().PodDisruptionBudgets("kube-system").Delete(ctx, newPdbName, metav1.DeleteOptions{})
1947 if err != nil {
1948
1949 klog.Errorf("Failed to delete PodDisruptionBudget %v, err: %v", newPdbName, err)
1950 finalErr = err
1951 }
1952 }
1953 if finalErr != nil {
1954 framework.Failf("Error during PodDisruptionBudget cleanup: %v", finalErr)
1955 }
1956 }
1957 ginkgo.DeferCleanup(cleanup)
1958
1959 type pdbInfo struct {
1960 label string
1961 minAvailable int
1962 }
1963 pdbsToAdd := []pdbInfo{
1964 {label: "kube-dns", minAvailable: 1},
1965 {label: "kube-dns-autoscaler", minAvailable: 0},
1966 {label: "metrics-server", minAvailable: 0},
1967 {label: "kubernetes-dashboard", minAvailable: 0},
1968 {label: "glbc", minAvailable: 0},
1969 }
1970 for _, pdbData := range pdbsToAdd {
1971 ginkgo.By(fmt.Sprintf("Create PodDisruptionBudget for %v", pdbData.label))
1972 labelMap := map[string]string{"k8s-app": pdbData.label}
1973 pdbName := fmt.Sprintf("test-pdb-for-%v", pdbData.label)
1974 minAvailable := intstr.FromInt32(int32(pdbData.minAvailable))
1975 pdb := &policyv1.PodDisruptionBudget{
1976 ObjectMeta: metav1.ObjectMeta{
1977 Name: pdbName,
1978 Namespace: "kube-system",
1979 },
1980 Spec: policyv1.PodDisruptionBudgetSpec{
1981 Selector: &metav1.LabelSelector{MatchLabels: labelMap},
1982 MinAvailable: &minAvailable,
1983 },
1984 }
1985 _, err := f.ClientSet.PolicyV1().PodDisruptionBudgets("kube-system").Create(ctx, pdb, metav1.CreateOptions{})
1986 newPdbs = append(newPdbs, pdbName)
1987
1988 if err != nil {
1989 return err
1990 }
1991 }
1992 return nil
1993 }
1994
1995 func createPriorityClasses(ctx context.Context, f *framework.Framework) {
1996 priorityClasses := map[string]int32{
1997 expendablePriorityClassName: -15,
1998 highPriorityClassName: 1000,
1999 }
2000 for className, priority := range priorityClasses {
2001 _, err := f.ClientSet.SchedulingV1().PriorityClasses().Create(ctx, &schedulingv1.PriorityClass{ObjectMeta: metav1.ObjectMeta{Name: className}, Value: priority}, metav1.CreateOptions{})
2002 if err != nil {
2003 klog.Errorf("Error creating priority class: %v", err)
2004 }
2005 if err != nil && !apierrors.IsAlreadyExists(err) {
2006 framework.Failf("unexpected error while creating priority class: %v", err)
2007 }
2008 }
2009
2010 ginkgo.DeferCleanup(func(ctx context.Context) {
2011 for className := range priorityClasses {
2012 err := f.ClientSet.SchedulingV1().PriorityClasses().Delete(ctx, className, metav1.DeleteOptions{})
2013 if err != nil {
2014 klog.Errorf("Error deleting priority class: %v", err)
2015 }
2016 }
2017 })
2018 }
2019
View as plain text