1
16
17 package autoscaling
18
19 import (
20 "context"
21 "fmt"
22 "strconv"
23 "sync"
24 "time"
25
26 autoscalingv1 "k8s.io/api/autoscaling/v1"
27 autoscalingv2 "k8s.io/api/autoscaling/v2"
28 v1 "k8s.io/api/core/v1"
29 apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
30 crdclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
31 "k8s.io/apiextensions-apiserver/test/integration/fixtures"
32 "k8s.io/apimachinery/pkg/api/meta"
33 "k8s.io/apimachinery/pkg/api/resource"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
36 "k8s.io/apimachinery/pkg/runtime/schema"
37 "k8s.io/apimachinery/pkg/util/intstr"
38 "k8s.io/client-go/dynamic"
39 clientset "k8s.io/client-go/kubernetes"
40 scaleclient "k8s.io/client-go/scale"
41 "k8s.io/kubernetes/test/e2e/framework"
42 e2edebug "k8s.io/kubernetes/test/e2e/framework/debug"
43 e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
44 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
45 e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
46 e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
47 testutils "k8s.io/kubernetes/test/utils"
48 utilpointer "k8s.io/utils/pointer"
49
50 "github.com/onsi/ginkgo/v2"
51 "github.com/onsi/gomega"
52
53 imageutils "k8s.io/kubernetes/test/utils/image"
54 )
55
56 const (
57 dynamicConsumptionTimeInSeconds = 30
58 dynamicRequestSizeInMillicores = 100
59 dynamicRequestSizeInMegabytes = 100
60 dynamicRequestSizeCustomMetric = 10
61 port = 80
62 targetPort = 8080
63 sidecarTargetPort = 8081
64 timeoutRC = 120 * time.Second
65 startServiceTimeout = time.Minute
66 startServiceInterval = 5 * time.Second
67 invalidKind = "ERROR: invalid workload kind for resource consumer"
68 customMetricName = "QPS"
69 serviceInitializationTimeout = 2 * time.Minute
70 serviceInitializationInterval = 15 * time.Second
71 megabytes = 1024 * 1024
72 crdVersion = "v1"
73 crdKind = "TestCRD"
74 crdGroup = "autoscalinge2e.example.com"
75 crdName = "testcrd"
76 crdNamePlural = "testcrds"
77 )
78
79 var (
80 resourceConsumerImage = imageutils.GetE2EImage(imageutils.ResourceConsumer)
81 )
82
83 var (
84
85 KindRC = schema.GroupVersionKind{Version: "v1", Kind: "ReplicationController"}
86
87 KindDeployment = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "Deployment"}
88
89 KindReplicaSet = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "ReplicaSet"}
90
91 KindCRD = schema.GroupVersionKind{Group: crdGroup, Version: crdVersion, Kind: crdKind}
92 )
93
94
95 type ScalingDirection int
96
97 const (
98 DirectionUnknown ScalingDirection = iota
99 ScaleUpDirection
100 ScaleDownDirection
101 )
102
103
111 type ResourceConsumer struct {
112 name string
113 controllerName string
114 kind schema.GroupVersionKind
115 nsName string
116 clientSet clientset.Interface
117 apiExtensionClient crdclientset.Interface
118 dynamicClient dynamic.Interface
119 resourceClient dynamic.ResourceInterface
120 scaleClient scaleclient.ScalesGetter
121 cpu chan int
122 mem chan int
123 customMetric chan int
124 stopCPU chan int
125 stopMem chan int
126 stopCustomMetric chan int
127 stopWaitGroup sync.WaitGroup
128 consumptionTimeInSeconds int
129 sleepTime time.Duration
130 requestSizeInMillicores int
131 requestSizeInMegabytes int
132 requestSizeCustomMetric int
133 sidecarStatus SidecarStatusType
134 sidecarType SidecarWorkloadType
135 }
136
137
138 func NewDynamicResourceConsumer(ctx context.Context, name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, scaleClient scaleclient.ScalesGetter, enableSidecar SidecarStatusType, sidecarType SidecarWorkloadType) *ResourceConsumer {
139 return newResourceConsumer(ctx, name, nsName, kind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, dynamicConsumptionTimeInSeconds,
140 dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, scaleClient, nil, nil, enableSidecar, sidecarType)
141 }
142
143
144 func getSidecarContainer(name string, cpuLimit, memLimit int64) v1.Container {
145 container := v1.Container{
146 Name: name + "-sidecar",
147 Image: resourceConsumerImage,
148 Command: []string{"/consumer", "-port=8081"},
149 Ports: []v1.ContainerPort{{ContainerPort: 80}},
150 }
151
152 if cpuLimit > 0 || memLimit > 0 {
153 container.Resources.Limits = v1.ResourceList{}
154 container.Resources.Requests = v1.ResourceList{}
155 }
156
157 if cpuLimit > 0 {
158 container.Resources.Limits[v1.ResourceCPU] = *resource.NewMilliQuantity(cpuLimit, resource.DecimalSI)
159 container.Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(cpuLimit, resource.DecimalSI)
160 }
161
162 if memLimit > 0 {
163 container.Resources.Limits[v1.ResourceMemory] = *resource.NewQuantity(memLimit*megabytes, resource.DecimalSI)
164 container.Resources.Requests[v1.ResourceMemory] = *resource.NewQuantity(memLimit*megabytes, resource.DecimalSI)
165 }
166
167 return container
168 }
169
170
177 func newResourceConsumer(ctx context.Context, name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, consumptionTimeInSeconds, requestSizeInMillicores,
178 requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, scaleClient scaleclient.ScalesGetter, podAnnotations, serviceAnnotations map[string]string, sidecarStatus SidecarStatusType, sidecarType SidecarWorkloadType) *ResourceConsumer {
179 if podAnnotations == nil {
180 podAnnotations = make(map[string]string)
181 }
182 if serviceAnnotations == nil {
183 serviceAnnotations = make(map[string]string)
184 }
185
186 var additionalContainers []v1.Container
187
188 if sidecarStatus == Enable {
189 sidecarContainer := getSidecarContainer(name, cpuLimit, memLimit)
190 additionalContainers = append(additionalContainers, sidecarContainer)
191 }
192
193 config, err := framework.LoadConfig()
194 framework.ExpectNoError(err)
195 apiExtensionClient, err := crdclientset.NewForConfig(config)
196 framework.ExpectNoError(err)
197 dynamicClient, err := dynamic.NewForConfig(config)
198 framework.ExpectNoError(err)
199 resourceClient := dynamicClient.Resource(schema.GroupVersionResource{Group: crdGroup, Version: crdVersion, Resource: crdNamePlural}).Namespace(nsName)
200
201 runServiceAndWorkloadForResourceConsumer(ctx, clientset, resourceClient, apiExtensionClient, nsName, name, kind, replicas, cpuLimit, memLimit, podAnnotations, serviceAnnotations, additionalContainers)
202 controllerName := name + "-ctrl"
203
204 if sidecarStatus == Enable && sidecarType == Busy {
205 runServiceAndSidecarForResourceConsumer(ctx, clientset, nsName, name, kind, replicas, serviceAnnotations)
206 controllerName = name + "-sidecar-ctrl"
207 }
208
209 rc := &ResourceConsumer{
210 name: name,
211 controllerName: controllerName,
212 kind: kind,
213 nsName: nsName,
214 clientSet: clientset,
215 apiExtensionClient: apiExtensionClient,
216 scaleClient: scaleClient,
217 resourceClient: resourceClient,
218 dynamicClient: dynamicClient,
219 cpu: make(chan int),
220 mem: make(chan int),
221 customMetric: make(chan int),
222 stopCPU: make(chan int),
223 stopMem: make(chan int),
224 stopCustomMetric: make(chan int),
225 consumptionTimeInSeconds: consumptionTimeInSeconds,
226 sleepTime: time.Duration(consumptionTimeInSeconds) * time.Second,
227 requestSizeInMillicores: requestSizeInMillicores,
228 requestSizeInMegabytes: requestSizeInMegabytes,
229 requestSizeCustomMetric: requestSizeCustomMetric,
230 sidecarType: sidecarType,
231 sidecarStatus: sidecarStatus,
232 }
233
234 go rc.makeConsumeCPURequests(ctx)
235 rc.ConsumeCPU(initCPUTotal)
236 go rc.makeConsumeMemRequests(ctx)
237 rc.ConsumeMem(initMemoryTotal)
238 go rc.makeConsumeCustomMetric(ctx)
239 rc.ConsumeCustomMetric(initCustomMetric)
240 return rc
241 }
242
243
244 func (rc *ResourceConsumer) ConsumeCPU(millicores int) {
245 framework.Logf("RC %s: consume %v millicores in total", rc.name, millicores)
246 rc.cpu <- millicores
247 }
248
249
250 func (rc *ResourceConsumer) ConsumeMem(megabytes int) {
251 framework.Logf("RC %s: consume %v MB in total", rc.name, megabytes)
252 rc.mem <- megabytes
253 }
254
255
256 func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) {
257 framework.Logf("RC %s: consume custom metric %v in total", rc.name, amount)
258 rc.customMetric <- amount
259 }
260
261 func (rc *ResourceConsumer) makeConsumeCPURequests(ctx context.Context) {
262 defer ginkgo.GinkgoRecover()
263 rc.stopWaitGroup.Add(1)
264 defer rc.stopWaitGroup.Done()
265 tick := time.After(time.Duration(0))
266 millicores := 0
267 for {
268 select {
269 case millicores = <-rc.cpu:
270 if millicores != 0 {
271 framework.Logf("RC %s: setting consumption to %v millicores in total", rc.name, millicores)
272 } else {
273 framework.Logf("RC %s: disabling CPU consumption", rc.name)
274 }
275 case <-tick:
276 if millicores != 0 {
277 framework.Logf("RC %s: sending request to consume %d millicores", rc.name, millicores)
278 rc.sendConsumeCPURequest(ctx, millicores)
279 }
280 tick = time.After(rc.sleepTime)
281 case <-ctx.Done():
282 framework.Logf("RC %s: stopping CPU consumer: %v", rc.name, ctx.Err())
283 return
284 case <-rc.stopCPU:
285 framework.Logf("RC %s: stopping CPU consumer", rc.name)
286 return
287 }
288 }
289 }
290
291 func (rc *ResourceConsumer) makeConsumeMemRequests(ctx context.Context) {
292 defer ginkgo.GinkgoRecover()
293 rc.stopWaitGroup.Add(1)
294 defer rc.stopWaitGroup.Done()
295 tick := time.After(time.Duration(0))
296 megabytes := 0
297 for {
298 select {
299 case megabytes = <-rc.mem:
300 if megabytes != 0 {
301 framework.Logf("RC %s: setting consumption to %v MB in total", rc.name, megabytes)
302 } else {
303 framework.Logf("RC %s: disabling mem consumption", rc.name)
304 }
305 case <-tick:
306 if megabytes != 0 {
307 framework.Logf("RC %s: sending request to consume %d MB", rc.name, megabytes)
308 rc.sendConsumeMemRequest(ctx, megabytes)
309 }
310 tick = time.After(rc.sleepTime)
311 case <-ctx.Done():
312 framework.Logf("RC %s: stopping mem consumer: %v", rc.name, ctx.Err())
313 return
314 case <-rc.stopMem:
315 framework.Logf("RC %s: stopping mem consumer", rc.name)
316 return
317 }
318 }
319 }
320
321 func (rc *ResourceConsumer) makeConsumeCustomMetric(ctx context.Context) {
322 defer ginkgo.GinkgoRecover()
323 rc.stopWaitGroup.Add(1)
324 defer rc.stopWaitGroup.Done()
325 tick := time.After(time.Duration(0))
326 delta := 0
327 for {
328 select {
329 case delta = <-rc.customMetric:
330 if delta != 0 {
331 framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta)
332 } else {
333 framework.Logf("RC %s: disabling consumption of custom metric %s", rc.name, customMetricName)
334 }
335 case <-tick:
336 if delta != 0 {
337 framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName)
338 rc.sendConsumeCustomMetric(ctx, delta)
339 }
340 tick = time.After(rc.sleepTime)
341 case <-ctx.Done():
342 framework.Logf("RC %s: stopping metric consumer: %v", rc.name, ctx.Err())
343 return
344 case <-rc.stopCustomMetric:
345 framework.Logf("RC %s: stopping metric consumer", rc.name)
346 return
347 }
348 }
349 }
350
351 func (rc *ResourceConsumer) sendConsumeCPURequest(ctx context.Context, millicores int) {
352 err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error {
353 proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post())
354 if err != nil {
355 return err
356 }
357 req := proxyRequest.Namespace(rc.nsName).
358 Name(rc.controllerName).
359 Suffix("ConsumeCPU").
360 Param("millicores", strconv.Itoa(millicores)).
361 Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
362 Param("requestSizeMillicores", strconv.Itoa(rc.requestSizeInMillicores))
363 framework.Logf("ConsumeCPU URL: %v", *req.URL())
364 _, err = req.DoRaw(ctx)
365 if err != nil {
366 framework.Logf("ConsumeCPU failure: %v", err)
367 return err
368 }
369 return nil
370 }).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed())
371
372
373
374 if ctx.Err() != nil {
375 return
376 }
377
378 framework.ExpectNoError(err)
379 }
380
381
382 func (rc *ResourceConsumer) sendConsumeMemRequest(ctx context.Context, megabytes int) {
383 err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error {
384 proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post())
385 if err != nil {
386 return err
387 }
388 req := proxyRequest.Namespace(rc.nsName).
389 Name(rc.controllerName).
390 Suffix("ConsumeMem").
391 Param("megabytes", strconv.Itoa(megabytes)).
392 Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
393 Param("requestSizeMegabytes", strconv.Itoa(rc.requestSizeInMegabytes))
394 framework.Logf("ConsumeMem URL: %v", *req.URL())
395 _, err = req.DoRaw(ctx)
396 if err != nil {
397 framework.Logf("ConsumeMem failure: %v", err)
398 return err
399 }
400 return nil
401 }).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed())
402
403
404
405 if ctx.Err() != nil {
406 return
407 }
408
409 framework.ExpectNoError(err)
410 }
411
412
413 func (rc *ResourceConsumer) sendConsumeCustomMetric(ctx context.Context, delta int) {
414 err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error {
415 proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post())
416 if err != nil {
417 return err
418 }
419 req := proxyRequest.Namespace(rc.nsName).
420 Name(rc.controllerName).
421 Suffix("BumpMetric").
422 Param("metric", customMetricName).
423 Param("delta", strconv.Itoa(delta)).
424 Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
425 Param("requestSizeMetrics", strconv.Itoa(rc.requestSizeCustomMetric))
426 framework.Logf("ConsumeCustomMetric URL: %v", *req.URL())
427 _, err = req.DoRaw(ctx)
428 if err != nil {
429 framework.Logf("ConsumeCustomMetric failure: %v", err)
430 return err
431 }
432 return nil
433 }).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed())
434
435
436
437 if ctx.Err() != nil {
438 return
439 }
440
441 framework.ExpectNoError(err)
442 }
443
444
445 func (rc *ResourceConsumer) GetReplicas(ctx context.Context) (int, error) {
446 switch rc.kind {
447 case KindRC:
448 replicationController, err := rc.clientSet.CoreV1().ReplicationControllers(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{})
449 if err != nil {
450 return 0, err
451 }
452 return int(replicationController.Status.ReadyReplicas), nil
453 case KindDeployment:
454 deployment, err := rc.clientSet.AppsV1().Deployments(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{})
455 if err != nil {
456 return 0, err
457 }
458 return int(deployment.Status.ReadyReplicas), nil
459 case KindReplicaSet:
460 rs, err := rc.clientSet.AppsV1().ReplicaSets(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{})
461 if err != nil {
462 return 0, err
463 }
464 return int(rs.Status.ReadyReplicas), nil
465 case KindCRD:
466 deployment, err := rc.clientSet.AppsV1().Deployments(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{})
467 if err != nil {
468 return 0, err
469 }
470 deploymentReplicas := int64(deployment.Status.ReadyReplicas)
471
472 scale, err := rc.scaleClient.Scales(rc.nsName).Get(ctx, schema.GroupResource{Group: crdGroup, Resource: crdNamePlural}, rc.name, metav1.GetOptions{})
473 if err != nil {
474 return 0, err
475 }
476 crdInstance, err := rc.resourceClient.Get(ctx, rc.name, metav1.GetOptions{})
477 if err != nil {
478 return 0, err
479 }
480
481 err = unstructured.SetNestedField(crdInstance.Object, deploymentReplicas, "status", "replicas")
482 if err != nil {
483 return 0, err
484 }
485 _, err = rc.resourceClient.Update(ctx, crdInstance, metav1.UpdateOptions{})
486 if err != nil {
487 return 0, err
488 }
489 return int(scale.Spec.Replicas), nil
490 default:
491 return 0, fmt.Errorf(invalidKind)
492 }
493 }
494
495
496 func (rc *ResourceConsumer) GetHpa(ctx context.Context, name string) (*autoscalingv1.HorizontalPodAutoscaler, error) {
497 return rc.clientSet.AutoscalingV1().HorizontalPodAutoscalers(rc.nsName).Get(ctx, name, metav1.GetOptions{})
498 }
499
500
501 func (rc *ResourceConsumer) WaitForReplicas(ctx context.Context, desiredReplicas int, duration time.Duration) {
502 interval := 20 * time.Second
503 err := framework.Gomega().Eventually(ctx, framework.HandleRetry(rc.GetReplicas)).
504 WithTimeout(duration).
505 WithPolling(interval).
506 Should(gomega.Equal(desiredReplicas))
507
508 framework.ExpectNoErrorWithOffset(1, err, "timeout waiting %v for %d replicas", duration, desiredReplicas)
509 }
510
511
512 func (rc *ResourceConsumer) EnsureDesiredReplicasInRange(ctx context.Context, minDesiredReplicas, maxDesiredReplicas int, duration time.Duration, hpaName string) {
513 interval := 10 * time.Second
514 desiredReplicasErr := framework.Gomega().Consistently(ctx, framework.HandleRetry(rc.GetReplicas)).
515 WithTimeout(duration).
516 WithPolling(interval).
517 Should(gomega.And(gomega.BeNumerically(">=", minDesiredReplicas), gomega.BeNumerically("<=", maxDesiredReplicas)))
518
519
520 as, err := rc.GetHpa(ctx, hpaName)
521 if err != nil {
522 framework.Logf("Error getting HPA: %s", err)
523 } else {
524 framework.Logf("HPA status: %+v", as.Status)
525 }
526 framework.ExpectNoError(desiredReplicasErr)
527 }
528
529
530 func (rc *ResourceConsumer) Pause() {
531 ginkgo.By(fmt.Sprintf("HPA pausing RC %s", rc.name))
532 rc.stopCPU <- 0
533 rc.stopMem <- 0
534 rc.stopCustomMetric <- 0
535 rc.stopWaitGroup.Wait()
536 }
537
538
539 func (rc *ResourceConsumer) Resume(ctx context.Context) {
540 ginkgo.By(fmt.Sprintf("HPA resuming RC %s", rc.name))
541 go rc.makeConsumeCPURequests(ctx)
542 go rc.makeConsumeMemRequests(ctx)
543 go rc.makeConsumeCustomMetric(ctx)
544 }
545
546
547 func (rc *ResourceConsumer) CleanUp(ctx context.Context) {
548 ginkgo.By(fmt.Sprintf("Removing consuming RC %s", rc.name))
549 close(rc.stopCPU)
550 close(rc.stopMem)
551 close(rc.stopCustomMetric)
552 rc.stopWaitGroup.Wait()
553
554 time.Sleep(10 * time.Second)
555 kind := rc.kind.GroupKind()
556 if kind.Kind == crdKind {
557 gvr := schema.GroupVersionResource{Group: crdGroup, Version: crdVersion, Resource: crdNamePlural}
558 framework.ExpectNoError(e2eresource.DeleteCustomResourceAndWaitForGC(ctx, rc.clientSet, rc.dynamicClient, rc.scaleClient, gvr, rc.nsName, rc.name))
559
560 } else {
561 framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, rc.clientSet, kind, rc.nsName, rc.name))
562 }
563
564 framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(ctx, rc.name, metav1.DeleteOptions{}))
565 framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, rc.clientSet, schema.GroupKind{Kind: "ReplicationController"}, rc.nsName, rc.controllerName))
566 framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(ctx, rc.name+"-ctrl", metav1.DeleteOptions{}))
567
568 if rc.sidecarStatus == Enable && rc.sidecarType == Busy {
569 framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(ctx, rc.name+"-sidecar", metav1.DeleteOptions{}))
570 framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(ctx, rc.name+"-sidecar-ctrl", metav1.DeleteOptions{}))
571 }
572 }
573
574 func createService(ctx context.Context, c clientset.Interface, name, ns string, annotations, selectors map[string]string, port int32, targetPort int) (*v1.Service, error) {
575 return c.CoreV1().Services(ns).Create(ctx, &v1.Service{
576 ObjectMeta: metav1.ObjectMeta{
577 Name: name,
578 Annotations: annotations,
579 },
580 Spec: v1.ServiceSpec{
581 Ports: []v1.ServicePort{{
582 Port: port,
583 TargetPort: intstr.FromInt32(int32(targetPort)),
584 }},
585 Selector: selectors,
586 },
587 }, metav1.CreateOptions{})
588 }
589
590
591 func runServiceAndSidecarForResourceConsumer(ctx context.Context, c clientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, serviceAnnotations map[string]string) {
592 ginkgo.By(fmt.Sprintf("Running consuming RC sidecar %s via %s with %v replicas", name, kind, replicas))
593
594 sidecarName := name + "-sidecar"
595 serviceSelectors := map[string]string{
596 "name": name,
597 }
598 _, err := createService(ctx, c, sidecarName, ns, serviceAnnotations, serviceSelectors, port, sidecarTargetPort)
599 framework.ExpectNoError(err)
600
601 ginkgo.By("Running controller for sidecar")
602 controllerName := sidecarName + "-ctrl"
603 _, err = createService(ctx, c, controllerName, ns, map[string]string{}, map[string]string{"name": controllerName}, port, targetPort)
604 framework.ExpectNoError(err)
605
606 dnsClusterFirst := v1.DNSClusterFirst
607 controllerRcConfig := testutils.RCConfig{
608 Client: c,
609 Image: imageutils.GetE2EImage(imageutils.Agnhost),
610 Name: controllerName,
611 Namespace: ns,
612 Timeout: timeoutRC,
613 Replicas: 1,
614 Command: []string{"/agnhost", "resource-consumer-controller", "--consumer-service-name=" + sidecarName, "--consumer-service-namespace=" + ns, "--consumer-port=80"},
615 DNSPolicy: &dnsClusterFirst,
616 }
617
618 framework.ExpectNoError(e2erc.RunRC(ctx, controllerRcConfig))
619
620 framework.ExpectNoError(framework.WaitForServiceEndpointsNum(
621 ctx, c, ns, controllerName, 1, startServiceInterval, startServiceTimeout))
622 }
623
624 func runServiceAndWorkloadForResourceConsumer(ctx context.Context, c clientset.Interface, resourceClient dynamic.ResourceInterface, apiExtensionClient crdclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64, podAnnotations, serviceAnnotations map[string]string, additionalContainers []v1.Container) {
625 ginkgo.By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas))
626 _, err := createService(ctx, c, name, ns, serviceAnnotations, map[string]string{"name": name}, port, targetPort)
627 framework.ExpectNoError(err)
628
629 rcConfig := testutils.RCConfig{
630 Client: c,
631 Image: resourceConsumerImage,
632 Name: name,
633 Namespace: ns,
634 Timeout: timeoutRC,
635 Replicas: replicas,
636 CpuRequest: cpuLimitMillis,
637 CpuLimit: cpuLimitMillis,
638 MemRequest: memLimitMb * 1024 * 1024,
639 MemLimit: memLimitMb * 1024 * 1024,
640 Annotations: podAnnotations,
641 AdditionalContainers: additionalContainers,
642 }
643
644 dpConfig := testutils.DeploymentConfig{
645 RCConfig: rcConfig,
646 }
647 dpConfig.NodeDumpFunc = e2edebug.DumpNodeDebugInfo
648 dpConfig.ContainerDumpFunc = e2ekubectl.LogFailedContainers
649
650 switch kind {
651 case KindRC:
652 framework.ExpectNoError(e2erc.RunRC(ctx, rcConfig))
653 case KindDeployment:
654 ginkgo.By(fmt.Sprintf("Creating deployment %s in namespace %s", dpConfig.Name, dpConfig.Namespace))
655 framework.ExpectNoError(testutils.RunDeployment(ctx, dpConfig))
656 case KindReplicaSet:
657 rsConfig := testutils.ReplicaSetConfig{
658 RCConfig: rcConfig,
659 }
660 ginkgo.By(fmt.Sprintf("Creating replicaset %s in namespace %s", rsConfig.Name, rsConfig.Namespace))
661 framework.ExpectNoError(runReplicaSet(ctx, rsConfig))
662 case KindCRD:
663 crd := CreateCustomResourceDefinition(ctx, apiExtensionClient)
664 crdInstance, err := CreateCustomSubresourceInstance(ctx, ns, name, resourceClient, crd)
665 framework.ExpectNoError(err)
666
667 ginkgo.By(fmt.Sprintf("Creating deployment %s backing CRD in namespace %s", dpConfig.Name, dpConfig.Namespace))
668 framework.ExpectNoError(testutils.RunDeployment(ctx, dpConfig))
669
670 deployment, err := c.AppsV1().Deployments(dpConfig.Namespace).Get(ctx, dpConfig.Name, metav1.GetOptions{})
671 framework.ExpectNoError(err)
672 deployment.SetOwnerReferences([]metav1.OwnerReference{{
673 APIVersion: kind.GroupVersion().String(),
674 Kind: crdKind,
675 Name: name,
676 UID: crdInstance.GetUID(),
677 }})
678 _, err = c.AppsV1().Deployments(dpConfig.Namespace).Update(ctx, deployment, metav1.UpdateOptions{})
679 framework.ExpectNoError(err)
680 default:
681 framework.Failf(invalidKind)
682 }
683
684 ginkgo.By(fmt.Sprintf("Running controller"))
685 controllerName := name + "-ctrl"
686 _, err = createService(ctx, c, controllerName, ns, map[string]string{}, map[string]string{"name": controllerName}, port, targetPort)
687 framework.ExpectNoError(err)
688
689 dnsClusterFirst := v1.DNSClusterFirst
690 controllerRcConfig := testutils.RCConfig{
691 Client: c,
692 Image: imageutils.GetE2EImage(imageutils.Agnhost),
693 Name: controllerName,
694 Namespace: ns,
695 Timeout: timeoutRC,
696 Replicas: 1,
697 Command: []string{"/agnhost", "resource-consumer-controller", "--consumer-service-name=" + name, "--consumer-service-namespace=" + ns, "--consumer-port=80"},
698 DNSPolicy: &dnsClusterFirst,
699 }
700
701 framework.ExpectNoError(e2erc.RunRC(ctx, controllerRcConfig))
702
703 framework.ExpectNoError(framework.WaitForServiceEndpointsNum(
704 ctx, c, ns, controllerName, 1, startServiceInterval, startServiceTimeout))
705 }
706
707 func CreateHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, targetRef autoscalingv2.CrossVersionObjectReference, namespace string, metrics []autoscalingv2.MetricSpec, resourceType v1.ResourceName, metricTargetType autoscalingv2.MetricTargetType, metricTargetValue, minReplicas, maxReplicas int32) *autoscalingv2.HorizontalPodAutoscaler {
708 hpa := &autoscalingv2.HorizontalPodAutoscaler{
709 ObjectMeta: metav1.ObjectMeta{
710 Name: targetRef.Name,
711 Namespace: namespace,
712 },
713 Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
714 ScaleTargetRef: targetRef,
715 MinReplicas: &minReplicas,
716 MaxReplicas: maxReplicas,
717 Metrics: metrics,
718 },
719 }
720 hpa, errHPA := rc.clientSet.AutoscalingV2().HorizontalPodAutoscalers(namespace).Create(ctx, hpa, metav1.CreateOptions{})
721 framework.ExpectNoError(errHPA)
722 return hpa
723 }
724
725 func CreateResourceHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, resourceType v1.ResourceName, metricTargetType autoscalingv2.MetricTargetType, metricTargetValue, minReplicas, maxReplicas int32) *autoscalingv2.HorizontalPodAutoscaler {
726 targetRef := autoscalingv2.CrossVersionObjectReference{
727 APIVersion: rc.kind.GroupVersion().String(),
728 Kind: rc.kind.Kind,
729 Name: rc.name,
730 }
731 metrics := []autoscalingv2.MetricSpec{
732 {
733 Type: autoscalingv2.ResourceMetricSourceType,
734 Resource: &autoscalingv2.ResourceMetricSource{
735 Name: resourceType,
736 Target: CreateMetricTargetWithType(resourceType, metricTargetType, metricTargetValue),
737 },
738 },
739 }
740 return CreateHorizontalPodAutoscaler(ctx, rc, targetRef, rc.nsName, metrics, resourceType, metricTargetType, metricTargetValue, minReplicas, maxReplicas)
741 }
742
743 func CreateCPUResourceHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, cpu, minReplicas, maxReplicas int32) *autoscalingv2.HorizontalPodAutoscaler {
744 return CreateResourceHorizontalPodAutoscaler(ctx, rc, v1.ResourceCPU, autoscalingv2.UtilizationMetricType, cpu, minReplicas, maxReplicas)
745 }
746
747
748 func DeleteHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, autoscalerName string) {
749 framework.ExpectNoError(rc.clientSet.AutoscalingV1().HorizontalPodAutoscalers(rc.nsName).Delete(ctx, autoscalerName, metav1.DeleteOptions{}))
750 }
751
752
753 func runReplicaSet(ctx context.Context, config testutils.ReplicaSetConfig) error {
754 ginkgo.By(fmt.Sprintf("creating replicaset %s in namespace %s", config.Name, config.Namespace))
755 config.NodeDumpFunc = e2edebug.DumpNodeDebugInfo
756 config.ContainerDumpFunc = e2ekubectl.LogFailedContainers
757 return testutils.RunReplicaSet(ctx, config)
758 }
759
760 func CreateContainerResourceHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, resourceType v1.ResourceName, metricTargetType autoscalingv2.MetricTargetType, metricTargetValue, minReplicas, maxReplicas int32) *autoscalingv2.HorizontalPodAutoscaler {
761 targetRef := autoscalingv2.CrossVersionObjectReference{
762 APIVersion: rc.kind.GroupVersion().String(),
763 Kind: rc.kind.Kind,
764 Name: rc.name,
765 }
766 metrics := []autoscalingv2.MetricSpec{
767 {
768 Type: autoscalingv2.ContainerResourceMetricSourceType,
769 ContainerResource: &autoscalingv2.ContainerResourceMetricSource{
770 Name: resourceType,
771 Container: rc.name,
772 Target: CreateMetricTargetWithType(resourceType, metricTargetType, metricTargetValue),
773 },
774 },
775 }
776 return CreateHorizontalPodAutoscaler(ctx, rc, targetRef, rc.nsName, metrics, resourceType, metricTargetType, metricTargetValue, minReplicas, maxReplicas)
777 }
778
779
780 func DeleteContainerResourceHPA(ctx context.Context, rc *ResourceConsumer, autoscalerName string) {
781 framework.ExpectNoError(rc.clientSet.AutoscalingV2().HorizontalPodAutoscalers(rc.nsName).Delete(ctx, autoscalerName, metav1.DeleteOptions{}))
782 }
783
784 func CreateMetricTargetWithType(resourceType v1.ResourceName, targetType autoscalingv2.MetricTargetType, targetValue int32) autoscalingv2.MetricTarget {
785 var metricTarget autoscalingv2.MetricTarget
786 if targetType == autoscalingv2.UtilizationMetricType {
787 metricTarget = autoscalingv2.MetricTarget{
788 Type: targetType,
789 AverageUtilization: &targetValue,
790 }
791 } else if targetType == autoscalingv2.AverageValueMetricType {
792 var averageValue *resource.Quantity
793 if resourceType == v1.ResourceCPU {
794 averageValue = resource.NewMilliQuantity(int64(targetValue), resource.DecimalSI)
795 } else {
796 averageValue = resource.NewQuantity(int64(targetValue*megabytes), resource.DecimalSI)
797 }
798 metricTarget = autoscalingv2.MetricTarget{
799 Type: targetType,
800 AverageValue: averageValue,
801 }
802 }
803 return metricTarget
804 }
805
806 func CreateCPUHorizontalPodAutoscalerWithBehavior(ctx context.Context, rc *ResourceConsumer, cpu int32, minReplicas int32, maxRepl int32, behavior *autoscalingv2.HorizontalPodAutoscalerBehavior) *autoscalingv2.HorizontalPodAutoscaler {
807 hpa := &autoscalingv2.HorizontalPodAutoscaler{
808 ObjectMeta: metav1.ObjectMeta{
809 Name: rc.name,
810 Namespace: rc.nsName,
811 },
812 Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
813 ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
814 APIVersion: rc.kind.GroupVersion().String(),
815 Kind: rc.kind.Kind,
816 Name: rc.name,
817 },
818 MinReplicas: &minReplicas,
819 MaxReplicas: maxRepl,
820 Metrics: []autoscalingv2.MetricSpec{
821 {
822 Type: autoscalingv2.ResourceMetricSourceType,
823 Resource: &autoscalingv2.ResourceMetricSource{
824 Name: v1.ResourceCPU,
825 Target: autoscalingv2.MetricTarget{
826 Type: autoscalingv2.UtilizationMetricType,
827 AverageUtilization: &cpu,
828 },
829 },
830 },
831 },
832 Behavior: behavior,
833 },
834 }
835 hpa, errHPA := rc.clientSet.AutoscalingV2().HorizontalPodAutoscalers(rc.nsName).Create(ctx, hpa, metav1.CreateOptions{})
836 framework.ExpectNoError(errHPA)
837 return hpa
838 }
839
840 func HPABehaviorWithScaleUpAndDownRules(scaleUpRule, scaleDownRule *autoscalingv2.HPAScalingRules) *autoscalingv2.HorizontalPodAutoscalerBehavior {
841 return &autoscalingv2.HorizontalPodAutoscalerBehavior{
842 ScaleUp: scaleUpRule,
843 ScaleDown: scaleDownRule,
844 }
845 }
846
847 func HPABehaviorWithScalingRuleInDirection(scalingDirection ScalingDirection, rule *autoscalingv2.HPAScalingRules) *autoscalingv2.HorizontalPodAutoscalerBehavior {
848 var scaleUpRule, scaleDownRule *autoscalingv2.HPAScalingRules
849 if scalingDirection == ScaleUpDirection {
850 scaleUpRule = rule
851 }
852 if scalingDirection == ScaleDownDirection {
853 scaleDownRule = rule
854 }
855 return HPABehaviorWithScaleUpAndDownRules(scaleUpRule, scaleDownRule)
856 }
857
858 func HPAScalingRuleWithStabilizationWindow(stabilizationDuration int32) *autoscalingv2.HPAScalingRules {
859 return &autoscalingv2.HPAScalingRules{
860 StabilizationWindowSeconds: &stabilizationDuration,
861 }
862 }
863
864 func HPAScalingRuleWithPolicyDisabled() *autoscalingv2.HPAScalingRules {
865 disabledPolicy := autoscalingv2.DisabledPolicySelect
866 return &autoscalingv2.HPAScalingRules{
867 SelectPolicy: &disabledPolicy,
868 }
869 }
870
871 func HPAScalingRuleWithScalingPolicy(policyType autoscalingv2.HPAScalingPolicyType, value, periodSeconds int32) *autoscalingv2.HPAScalingRules {
872 stabilizationWindowDisabledDuration := int32(0)
873 selectPolicy := autoscalingv2.MaxChangePolicySelect
874 return &autoscalingv2.HPAScalingRules{
875 Policies: []autoscalingv2.HPAScalingPolicy{
876 {
877 Type: policyType,
878 Value: value,
879 PeriodSeconds: periodSeconds,
880 },
881 },
882 SelectPolicy: &selectPolicy,
883 StabilizationWindowSeconds: &stabilizationWindowDisabledDuration,
884 }
885 }
886
887 func HPABehaviorWithStabilizationWindows(upscaleStabilization, downscaleStabilization time.Duration) *autoscalingv2.HorizontalPodAutoscalerBehavior {
888 scaleUpRule := HPAScalingRuleWithStabilizationWindow(int32(upscaleStabilization.Seconds()))
889 scaleDownRule := HPAScalingRuleWithStabilizationWindow(int32(downscaleStabilization.Seconds()))
890 return HPABehaviorWithScaleUpAndDownRules(scaleUpRule, scaleDownRule)
891 }
892
893 func HPABehaviorWithScaleDisabled(scalingDirection ScalingDirection) *autoscalingv2.HorizontalPodAutoscalerBehavior {
894 scalingRule := HPAScalingRuleWithPolicyDisabled()
895 return HPABehaviorWithScalingRuleInDirection(scalingDirection, scalingRule)
896 }
897
898 func HPABehaviorWithScaleLimitedByNumberOfPods(scalingDirection ScalingDirection, numberOfPods, periodSeconds int32) *autoscalingv2.HorizontalPodAutoscalerBehavior {
899 scalingRule := HPAScalingRuleWithScalingPolicy(autoscalingv2.PodsScalingPolicy, numberOfPods, periodSeconds)
900 return HPABehaviorWithScalingRuleInDirection(scalingDirection, scalingRule)
901 }
902
903 func HPABehaviorWithScaleLimitedByPercentage(scalingDirection ScalingDirection, percentage, periodSeconds int32) *autoscalingv2.HorizontalPodAutoscalerBehavior {
904 scalingRule := HPAScalingRuleWithScalingPolicy(autoscalingv2.PercentScalingPolicy, percentage, periodSeconds)
905 return HPABehaviorWithScalingRuleInDirection(scalingDirection, scalingRule)
906 }
907
908 func DeleteHPAWithBehavior(ctx context.Context, rc *ResourceConsumer, autoscalerName string) {
909 framework.ExpectNoError(rc.clientSet.AutoscalingV2().HorizontalPodAutoscalers(rc.nsName).Delete(ctx, autoscalerName, metav1.DeleteOptions{}))
910 }
911
912
913 type SidecarStatusType bool
914
915 const (
916 Enable SidecarStatusType = true
917 Disable SidecarStatusType = false
918 )
919
920
921 type SidecarWorkloadType string
922
923 const (
924 Busy SidecarWorkloadType = "Busy"
925 Idle SidecarWorkloadType = "Idle"
926 )
927
928 func CreateCustomResourceDefinition(ctx context.Context, c crdclientset.Interface) *apiextensionsv1.CustomResourceDefinition {
929 crdSchema := &apiextensionsv1.CustomResourceDefinition{
930 ObjectMeta: metav1.ObjectMeta{Name: crdNamePlural + "." + crdGroup},
931 Spec: apiextensionsv1.CustomResourceDefinitionSpec{
932 Group: crdGroup,
933 Scope: apiextensionsv1.ResourceScope("Namespaced"),
934 Names: apiextensionsv1.CustomResourceDefinitionNames{
935 Plural: crdNamePlural,
936 Singular: crdName,
937 Kind: crdKind,
938 ListKind: "TestCRDList",
939 },
940 Versions: []apiextensionsv1.CustomResourceDefinitionVersion{{
941 Name: crdVersion,
942 Served: true,
943 Storage: true,
944 Schema: fixtures.AllowAllSchema(),
945 Subresources: &apiextensionsv1.CustomResourceSubresources{
946 Scale: &apiextensionsv1.CustomResourceSubresourceScale{
947 SpecReplicasPath: ".spec.replicas",
948 StatusReplicasPath: ".status.replicas",
949 LabelSelectorPath: utilpointer.String(".status.selector"),
950 },
951 },
952 }},
953 },
954 Status: apiextensionsv1.CustomResourceDefinitionStatus{},
955 }
956
957 crd, err := c.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdSchema.Name, metav1.GetOptions{})
958 if err != nil {
959 crd, err = c.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, crdSchema, metav1.CreateOptions{})
960 framework.ExpectNoError(err)
961
962 err = framework.Gomega().Eventually(ctx, framework.RetryNotFound(framework.HandleRetry(func(ctx context.Context) (*metav1.APIResourceList, error) {
963 return c.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + "v1")
964 }))).Should(framework.MakeMatcher(func(actual *metav1.APIResourceList) (func() string, error) {
965 for _, g := range actual.APIResources {
966 if g.Name == crd.Spec.Names.Plural {
967 return nil, nil
968 }
969 }
970 return func() string {
971 return fmt.Sprintf("CRD %s not found in discovery", crd.Spec.Names.Plural)
972 }, nil
973 }))
974 framework.ExpectNoError(err)
975 ginkgo.By(fmt.Sprintf("Successfully created Custom Resource Definition: %v", crd))
976 }
977 return crd
978 }
979
980 func CreateCustomSubresourceInstance(ctx context.Context, namespace, name string, client dynamic.ResourceInterface, definition *apiextensionsv1.CustomResourceDefinition) (*unstructured.Unstructured, error) {
981 instance := &unstructured.Unstructured{
982 Object: map[string]interface{}{
983 "apiVersion": crdGroup + "/" + crdVersion,
984 "kind": crdKind,
985 "metadata": map[string]interface{}{
986 "namespace": namespace,
987 "name": name,
988 },
989 "spec": map[string]interface{}{
990 "num": int64(1),
991 "replicas": int64(1),
992 },
993 "status": map[string]interface{}{
994 "replicas": int64(1),
995 "selector": "name=" + name,
996 },
997 },
998 }
999 instance, err := client.Create(ctx, instance, metav1.CreateOptions{})
1000 if err != nil {
1001 framework.Logf("%#v", instance)
1002 return nil, err
1003 }
1004 createdObjectMeta, err := meta.Accessor(instance)
1005 if err != nil {
1006 return nil, fmt.Errorf("Error while creating object meta: %w", err)
1007 }
1008 if len(createdObjectMeta.GetUID()) == 0 {
1009 return nil, fmt.Errorf("Missing UUID: %v", instance)
1010 }
1011 ginkgo.By(fmt.Sprintf("Successfully created instance of CRD of kind %v: %v", definition.Kind, instance))
1012 return instance, nil
1013 }
1014
View as plain text