...

Source file src/k8s.io/kubernetes/test/e2e/framework/autoscaling/autoscaling_utils.go

Documentation: k8s.io/kubernetes/test/e2e/framework/autoscaling

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package autoscaling
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strconv"
    23  	"sync"
    24  	"time"
    25  
    26  	autoscalingv1 "k8s.io/api/autoscaling/v1"
    27  	autoscalingv2 "k8s.io/api/autoscaling/v2"
    28  	v1 "k8s.io/api/core/v1"
    29  	apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
    30  	crdclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
    31  	"k8s.io/apiextensions-apiserver/test/integration/fixtures"
    32  	"k8s.io/apimachinery/pkg/api/meta"
    33  	"k8s.io/apimachinery/pkg/api/resource"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    36  	"k8s.io/apimachinery/pkg/runtime/schema"
    37  	"k8s.io/apimachinery/pkg/util/intstr"
    38  	"k8s.io/client-go/dynamic"
    39  	clientset "k8s.io/client-go/kubernetes"
    40  	scaleclient "k8s.io/client-go/scale"
    41  	"k8s.io/kubernetes/test/e2e/framework"
    42  	e2edebug "k8s.io/kubernetes/test/e2e/framework/debug"
    43  	e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
    44  	e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
    45  	e2eresource "k8s.io/kubernetes/test/e2e/framework/resource"
    46  	e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
    47  	testutils "k8s.io/kubernetes/test/utils"
    48  	utilpointer "k8s.io/utils/pointer"
    49  
    50  	"github.com/onsi/ginkgo/v2"
    51  	"github.com/onsi/gomega"
    52  
    53  	imageutils "k8s.io/kubernetes/test/utils/image"
    54  )
    55  
    56  const (
    57  	dynamicConsumptionTimeInSeconds = 30
    58  	dynamicRequestSizeInMillicores  = 100
    59  	dynamicRequestSizeInMegabytes   = 100
    60  	dynamicRequestSizeCustomMetric  = 10
    61  	port                            = 80
    62  	targetPort                      = 8080
    63  	sidecarTargetPort               = 8081
    64  	timeoutRC                       = 120 * time.Second
    65  	startServiceTimeout             = time.Minute
    66  	startServiceInterval            = 5 * time.Second
    67  	invalidKind                     = "ERROR: invalid workload kind for resource consumer"
    68  	customMetricName                = "QPS"
    69  	serviceInitializationTimeout    = 2 * time.Minute
    70  	serviceInitializationInterval   = 15 * time.Second
    71  	megabytes                       = 1024 * 1024
    72  	crdVersion                      = "v1"
    73  	crdKind                         = "TestCRD"
    74  	crdGroup                        = "autoscalinge2e.example.com"
    75  	crdName                         = "testcrd"
    76  	crdNamePlural                   = "testcrds"
    77  )
    78  
    79  var (
    80  	resourceConsumerImage = imageutils.GetE2EImage(imageutils.ResourceConsumer)
    81  )
    82  
    83  var (
    84  	// KindRC is the GVK for ReplicationController
    85  	KindRC = schema.GroupVersionKind{Version: "v1", Kind: "ReplicationController"}
    86  	// KindDeployment is the GVK for Deployment
    87  	KindDeployment = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "Deployment"}
    88  	// KindReplicaSet is the GVK for ReplicaSet
    89  	KindReplicaSet = schema.GroupVersionKind{Group: "apps", Version: "v1beta2", Kind: "ReplicaSet"}
    90  	// KindCRD is the GVK for CRD for test purposes
    91  	KindCRD = schema.GroupVersionKind{Group: crdGroup, Version: crdVersion, Kind: crdKind}
    92  )
    93  
    94  // ScalingDirection identifies the scale direction for HPA Behavior.
    95  type ScalingDirection int
    96  
    97  const (
    98  	DirectionUnknown ScalingDirection = iota
    99  	ScaleUpDirection
   100  	ScaleDownDirection
   101  )
   102  
   103  /*
   104  ResourceConsumer is a tool for testing. It helps to create a specified usage of CPU or memory.
   105  Typical use case:
   106  rc.ConsumeCPU(600)
   107  // ... check your assumption here
   108  rc.ConsumeCPU(300)
   109  // ... check your assumption here
   110  */
   111  type ResourceConsumer struct {
   112  	name                     string
   113  	controllerName           string
   114  	kind                     schema.GroupVersionKind
   115  	nsName                   string
   116  	clientSet                clientset.Interface
   117  	apiExtensionClient       crdclientset.Interface
   118  	dynamicClient            dynamic.Interface
   119  	resourceClient           dynamic.ResourceInterface
   120  	scaleClient              scaleclient.ScalesGetter
   121  	cpu                      chan int
   122  	mem                      chan int
   123  	customMetric             chan int
   124  	stopCPU                  chan int
   125  	stopMem                  chan int
   126  	stopCustomMetric         chan int
   127  	stopWaitGroup            sync.WaitGroup
   128  	consumptionTimeInSeconds int
   129  	sleepTime                time.Duration
   130  	requestSizeInMillicores  int
   131  	requestSizeInMegabytes   int
   132  	requestSizeCustomMetric  int
   133  	sidecarStatus            SidecarStatusType
   134  	sidecarType              SidecarWorkloadType
   135  }
   136  
   137  // NewDynamicResourceConsumer is a wrapper to create a new dynamic ResourceConsumer
   138  func NewDynamicResourceConsumer(ctx context.Context, name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, scaleClient scaleclient.ScalesGetter, enableSidecar SidecarStatusType, sidecarType SidecarWorkloadType) *ResourceConsumer {
   139  	return newResourceConsumer(ctx, name, nsName, kind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, dynamicConsumptionTimeInSeconds,
   140  		dynamicRequestSizeInMillicores, dynamicRequestSizeInMegabytes, dynamicRequestSizeCustomMetric, cpuLimit, memLimit, clientset, scaleClient, nil, nil, enableSidecar, sidecarType)
   141  }
   142  
   143  // getSidecarContainer returns sidecar container
   144  func getSidecarContainer(name string, cpuLimit, memLimit int64) v1.Container {
   145  	container := v1.Container{
   146  		Name:    name + "-sidecar",
   147  		Image:   resourceConsumerImage,
   148  		Command: []string{"/consumer", "-port=8081"},
   149  		Ports:   []v1.ContainerPort{{ContainerPort: 80}},
   150  	}
   151  
   152  	if cpuLimit > 0 || memLimit > 0 {
   153  		container.Resources.Limits = v1.ResourceList{}
   154  		container.Resources.Requests = v1.ResourceList{}
   155  	}
   156  
   157  	if cpuLimit > 0 {
   158  		container.Resources.Limits[v1.ResourceCPU] = *resource.NewMilliQuantity(cpuLimit, resource.DecimalSI)
   159  		container.Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(cpuLimit, resource.DecimalSI)
   160  	}
   161  
   162  	if memLimit > 0 {
   163  		container.Resources.Limits[v1.ResourceMemory] = *resource.NewQuantity(memLimit*megabytes, resource.DecimalSI)
   164  		container.Resources.Requests[v1.ResourceMemory] = *resource.NewQuantity(memLimit*megabytes, resource.DecimalSI)
   165  	}
   166  
   167  	return container
   168  }
   169  
   170  /*
   171  NewResourceConsumer creates new ResourceConsumer
   172  initCPUTotal argument is in millicores
   173  initMemoryTotal argument is in megabytes
   174  memLimit argument is in megabytes, memLimit is a maximum amount of memory that can be consumed by a single pod
   175  cpuLimit argument is in millicores, cpuLimit is a maximum amount of cpu that can be consumed by a single pod
   176  */
   177  func newResourceConsumer(ctx context.Context, name, nsName string, kind schema.GroupVersionKind, replicas, initCPUTotal, initMemoryTotal, initCustomMetric, consumptionTimeInSeconds, requestSizeInMillicores,
   178  	requestSizeInMegabytes int, requestSizeCustomMetric int, cpuLimit, memLimit int64, clientset clientset.Interface, scaleClient scaleclient.ScalesGetter, podAnnotations, serviceAnnotations map[string]string, sidecarStatus SidecarStatusType, sidecarType SidecarWorkloadType) *ResourceConsumer {
   179  	if podAnnotations == nil {
   180  		podAnnotations = make(map[string]string)
   181  	}
   182  	if serviceAnnotations == nil {
   183  		serviceAnnotations = make(map[string]string)
   184  	}
   185  
   186  	var additionalContainers []v1.Container
   187  
   188  	if sidecarStatus == Enable {
   189  		sidecarContainer := getSidecarContainer(name, cpuLimit, memLimit)
   190  		additionalContainers = append(additionalContainers, sidecarContainer)
   191  	}
   192  
   193  	config, err := framework.LoadConfig()
   194  	framework.ExpectNoError(err)
   195  	apiExtensionClient, err := crdclientset.NewForConfig(config)
   196  	framework.ExpectNoError(err)
   197  	dynamicClient, err := dynamic.NewForConfig(config)
   198  	framework.ExpectNoError(err)
   199  	resourceClient := dynamicClient.Resource(schema.GroupVersionResource{Group: crdGroup, Version: crdVersion, Resource: crdNamePlural}).Namespace(nsName)
   200  
   201  	runServiceAndWorkloadForResourceConsumer(ctx, clientset, resourceClient, apiExtensionClient, nsName, name, kind, replicas, cpuLimit, memLimit, podAnnotations, serviceAnnotations, additionalContainers)
   202  	controllerName := name + "-ctrl"
   203  	// If sidecar is enabled and busy, run service and consumer for sidecar
   204  	if sidecarStatus == Enable && sidecarType == Busy {
   205  		runServiceAndSidecarForResourceConsumer(ctx, clientset, nsName, name, kind, replicas, serviceAnnotations)
   206  		controllerName = name + "-sidecar-ctrl"
   207  	}
   208  
   209  	rc := &ResourceConsumer{
   210  		name:                     name,
   211  		controllerName:           controllerName,
   212  		kind:                     kind,
   213  		nsName:                   nsName,
   214  		clientSet:                clientset,
   215  		apiExtensionClient:       apiExtensionClient,
   216  		scaleClient:              scaleClient,
   217  		resourceClient:           resourceClient,
   218  		dynamicClient:            dynamicClient,
   219  		cpu:                      make(chan int),
   220  		mem:                      make(chan int),
   221  		customMetric:             make(chan int),
   222  		stopCPU:                  make(chan int),
   223  		stopMem:                  make(chan int),
   224  		stopCustomMetric:         make(chan int),
   225  		consumptionTimeInSeconds: consumptionTimeInSeconds,
   226  		sleepTime:                time.Duration(consumptionTimeInSeconds) * time.Second,
   227  		requestSizeInMillicores:  requestSizeInMillicores,
   228  		requestSizeInMegabytes:   requestSizeInMegabytes,
   229  		requestSizeCustomMetric:  requestSizeCustomMetric,
   230  		sidecarType:              sidecarType,
   231  		sidecarStatus:            sidecarStatus,
   232  	}
   233  
   234  	go rc.makeConsumeCPURequests(ctx)
   235  	rc.ConsumeCPU(initCPUTotal)
   236  	go rc.makeConsumeMemRequests(ctx)
   237  	rc.ConsumeMem(initMemoryTotal)
   238  	go rc.makeConsumeCustomMetric(ctx)
   239  	rc.ConsumeCustomMetric(initCustomMetric)
   240  	return rc
   241  }
   242  
   243  // ConsumeCPU consumes given number of CPU
   244  func (rc *ResourceConsumer) ConsumeCPU(millicores int) {
   245  	framework.Logf("RC %s: consume %v millicores in total", rc.name, millicores)
   246  	rc.cpu <- millicores
   247  }
   248  
   249  // ConsumeMem consumes given number of Mem
   250  func (rc *ResourceConsumer) ConsumeMem(megabytes int) {
   251  	framework.Logf("RC %s: consume %v MB in total", rc.name, megabytes)
   252  	rc.mem <- megabytes
   253  }
   254  
   255  // ConsumeCustomMetric consumes given number of custom metric
   256  func (rc *ResourceConsumer) ConsumeCustomMetric(amount int) {
   257  	framework.Logf("RC %s: consume custom metric %v in total", rc.name, amount)
   258  	rc.customMetric <- amount
   259  }
   260  
   261  func (rc *ResourceConsumer) makeConsumeCPURequests(ctx context.Context) {
   262  	defer ginkgo.GinkgoRecover()
   263  	rc.stopWaitGroup.Add(1)
   264  	defer rc.stopWaitGroup.Done()
   265  	tick := time.After(time.Duration(0))
   266  	millicores := 0
   267  	for {
   268  		select {
   269  		case millicores = <-rc.cpu:
   270  			if millicores != 0 {
   271  				framework.Logf("RC %s: setting consumption to %v millicores in total", rc.name, millicores)
   272  			} else {
   273  				framework.Logf("RC %s: disabling CPU consumption", rc.name)
   274  			}
   275  		case <-tick:
   276  			if millicores != 0 {
   277  				framework.Logf("RC %s: sending request to consume %d millicores", rc.name, millicores)
   278  				rc.sendConsumeCPURequest(ctx, millicores)
   279  			}
   280  			tick = time.After(rc.sleepTime)
   281  		case <-ctx.Done():
   282  			framework.Logf("RC %s: stopping CPU consumer: %v", rc.name, ctx.Err())
   283  			return
   284  		case <-rc.stopCPU:
   285  			framework.Logf("RC %s: stopping CPU consumer", rc.name)
   286  			return
   287  		}
   288  	}
   289  }
   290  
   291  func (rc *ResourceConsumer) makeConsumeMemRequests(ctx context.Context) {
   292  	defer ginkgo.GinkgoRecover()
   293  	rc.stopWaitGroup.Add(1)
   294  	defer rc.stopWaitGroup.Done()
   295  	tick := time.After(time.Duration(0))
   296  	megabytes := 0
   297  	for {
   298  		select {
   299  		case megabytes = <-rc.mem:
   300  			if megabytes != 0 {
   301  				framework.Logf("RC %s: setting consumption to %v MB in total", rc.name, megabytes)
   302  			} else {
   303  				framework.Logf("RC %s: disabling mem consumption", rc.name)
   304  			}
   305  		case <-tick:
   306  			if megabytes != 0 {
   307  				framework.Logf("RC %s: sending request to consume %d MB", rc.name, megabytes)
   308  				rc.sendConsumeMemRequest(ctx, megabytes)
   309  			}
   310  			tick = time.After(rc.sleepTime)
   311  		case <-ctx.Done():
   312  			framework.Logf("RC %s: stopping mem consumer: %v", rc.name, ctx.Err())
   313  			return
   314  		case <-rc.stopMem:
   315  			framework.Logf("RC %s: stopping mem consumer", rc.name)
   316  			return
   317  		}
   318  	}
   319  }
   320  
   321  func (rc *ResourceConsumer) makeConsumeCustomMetric(ctx context.Context) {
   322  	defer ginkgo.GinkgoRecover()
   323  	rc.stopWaitGroup.Add(1)
   324  	defer rc.stopWaitGroup.Done()
   325  	tick := time.After(time.Duration(0))
   326  	delta := 0
   327  	for {
   328  		select {
   329  		case delta = <-rc.customMetric:
   330  			if delta != 0 {
   331  				framework.Logf("RC %s: setting bump of metric %s to %d in total", rc.name, customMetricName, delta)
   332  			} else {
   333  				framework.Logf("RC %s: disabling consumption of custom metric %s", rc.name, customMetricName)
   334  			}
   335  		case <-tick:
   336  			if delta != 0 {
   337  				framework.Logf("RC %s: sending request to consume %d of custom metric %s", rc.name, delta, customMetricName)
   338  				rc.sendConsumeCustomMetric(ctx, delta)
   339  			}
   340  			tick = time.After(rc.sleepTime)
   341  		case <-ctx.Done():
   342  			framework.Logf("RC %s: stopping metric consumer: %v", rc.name, ctx.Err())
   343  			return
   344  		case <-rc.stopCustomMetric:
   345  			framework.Logf("RC %s: stopping metric consumer", rc.name)
   346  			return
   347  		}
   348  	}
   349  }
   350  
   351  func (rc *ResourceConsumer) sendConsumeCPURequest(ctx context.Context, millicores int) {
   352  	err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error {
   353  		proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post())
   354  		if err != nil {
   355  			return err
   356  		}
   357  		req := proxyRequest.Namespace(rc.nsName).
   358  			Name(rc.controllerName).
   359  			Suffix("ConsumeCPU").
   360  			Param("millicores", strconv.Itoa(millicores)).
   361  			Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
   362  			Param("requestSizeMillicores", strconv.Itoa(rc.requestSizeInMillicores))
   363  		framework.Logf("ConsumeCPU URL: %v", *req.URL())
   364  		_, err = req.DoRaw(ctx)
   365  		if err != nil {
   366  			framework.Logf("ConsumeCPU failure: %v", err)
   367  			return err
   368  		}
   369  		return nil
   370  	}).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed())
   371  
   372  	// Test has already finished (ctx got canceled), so don't fail on err from PollUntilContextTimeout
   373  	// which is a side-effect to context cancelling from the cleanup task.
   374  	if ctx.Err() != nil {
   375  		return
   376  	}
   377  
   378  	framework.ExpectNoError(err)
   379  }
   380  
   381  // sendConsumeMemRequest sends POST request for memory consumption
   382  func (rc *ResourceConsumer) sendConsumeMemRequest(ctx context.Context, megabytes int) {
   383  	err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error {
   384  		proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post())
   385  		if err != nil {
   386  			return err
   387  		}
   388  		req := proxyRequest.Namespace(rc.nsName).
   389  			Name(rc.controllerName).
   390  			Suffix("ConsumeMem").
   391  			Param("megabytes", strconv.Itoa(megabytes)).
   392  			Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
   393  			Param("requestSizeMegabytes", strconv.Itoa(rc.requestSizeInMegabytes))
   394  		framework.Logf("ConsumeMem URL: %v", *req.URL())
   395  		_, err = req.DoRaw(ctx)
   396  		if err != nil {
   397  			framework.Logf("ConsumeMem failure: %v", err)
   398  			return err
   399  		}
   400  		return nil
   401  	}).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed())
   402  
   403  	// Test has already finished (ctx got canceled), so don't fail on err from PollUntilContextTimeout
   404  	// which is a side-effect to context cancelling from the cleanup task.
   405  	if ctx.Err() != nil {
   406  		return
   407  	}
   408  
   409  	framework.ExpectNoError(err)
   410  }
   411  
   412  // sendConsumeCustomMetric sends POST request for custom metric consumption
   413  func (rc *ResourceConsumer) sendConsumeCustomMetric(ctx context.Context, delta int) {
   414  	err := framework.Gomega().Eventually(ctx, func(ctx context.Context) error {
   415  		proxyRequest, err := e2eservice.GetServicesProxyRequest(rc.clientSet, rc.clientSet.CoreV1().RESTClient().Post())
   416  		if err != nil {
   417  			return err
   418  		}
   419  		req := proxyRequest.Namespace(rc.nsName).
   420  			Name(rc.controllerName).
   421  			Suffix("BumpMetric").
   422  			Param("metric", customMetricName).
   423  			Param("delta", strconv.Itoa(delta)).
   424  			Param("durationSec", strconv.Itoa(rc.consumptionTimeInSeconds)).
   425  			Param("requestSizeMetrics", strconv.Itoa(rc.requestSizeCustomMetric))
   426  		framework.Logf("ConsumeCustomMetric URL: %v", *req.URL())
   427  		_, err = req.DoRaw(ctx)
   428  		if err != nil {
   429  			framework.Logf("ConsumeCustomMetric failure: %v", err)
   430  			return err
   431  		}
   432  		return nil
   433  	}).WithTimeout(serviceInitializationTimeout).WithPolling(serviceInitializationInterval).Should(gomega.Succeed())
   434  
   435  	// Test has already finished (ctx got canceled), so don't fail on err from PollUntilContextTimeout
   436  	// which is a side-effect to context cancelling from the cleanup task.
   437  	if ctx.Err() != nil {
   438  		return
   439  	}
   440  
   441  	framework.ExpectNoError(err)
   442  }
   443  
   444  // GetReplicas get the replicas
   445  func (rc *ResourceConsumer) GetReplicas(ctx context.Context) (int, error) {
   446  	switch rc.kind {
   447  	case KindRC:
   448  		replicationController, err := rc.clientSet.CoreV1().ReplicationControllers(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{})
   449  		if err != nil {
   450  			return 0, err
   451  		}
   452  		return int(replicationController.Status.ReadyReplicas), nil
   453  	case KindDeployment:
   454  		deployment, err := rc.clientSet.AppsV1().Deployments(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{})
   455  		if err != nil {
   456  			return 0, err
   457  		}
   458  		return int(deployment.Status.ReadyReplicas), nil
   459  	case KindReplicaSet:
   460  		rs, err := rc.clientSet.AppsV1().ReplicaSets(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{})
   461  		if err != nil {
   462  			return 0, err
   463  		}
   464  		return int(rs.Status.ReadyReplicas), nil
   465  	case KindCRD:
   466  		deployment, err := rc.clientSet.AppsV1().Deployments(rc.nsName).Get(ctx, rc.name, metav1.GetOptions{})
   467  		if err != nil {
   468  			return 0, err
   469  		}
   470  		deploymentReplicas := int64(deployment.Status.ReadyReplicas)
   471  
   472  		scale, err := rc.scaleClient.Scales(rc.nsName).Get(ctx, schema.GroupResource{Group: crdGroup, Resource: crdNamePlural}, rc.name, metav1.GetOptions{})
   473  		if err != nil {
   474  			return 0, err
   475  		}
   476  		crdInstance, err := rc.resourceClient.Get(ctx, rc.name, metav1.GetOptions{})
   477  		if err != nil {
   478  			return 0, err
   479  		}
   480  		// Update custom resource's status.replicas with child Deployment's current number of ready replicas.
   481  		err = unstructured.SetNestedField(crdInstance.Object, deploymentReplicas, "status", "replicas")
   482  		if err != nil {
   483  			return 0, err
   484  		}
   485  		_, err = rc.resourceClient.Update(ctx, crdInstance, metav1.UpdateOptions{})
   486  		if err != nil {
   487  			return 0, err
   488  		}
   489  		return int(scale.Spec.Replicas), nil
   490  	default:
   491  		return 0, fmt.Errorf(invalidKind)
   492  	}
   493  }
   494  
   495  // GetHpa get the corresponding horizontalPodAutoscaler object
   496  func (rc *ResourceConsumer) GetHpa(ctx context.Context, name string) (*autoscalingv1.HorizontalPodAutoscaler, error) {
   497  	return rc.clientSet.AutoscalingV1().HorizontalPodAutoscalers(rc.nsName).Get(ctx, name, metav1.GetOptions{})
   498  }
   499  
   500  // WaitForReplicas wait for the desired replicas
   501  func (rc *ResourceConsumer) WaitForReplicas(ctx context.Context, desiredReplicas int, duration time.Duration) {
   502  	interval := 20 * time.Second
   503  	err := framework.Gomega().Eventually(ctx, framework.HandleRetry(rc.GetReplicas)).
   504  		WithTimeout(duration).
   505  		WithPolling(interval).
   506  		Should(gomega.Equal(desiredReplicas))
   507  
   508  	framework.ExpectNoErrorWithOffset(1, err, "timeout waiting %v for %d replicas", duration, desiredReplicas)
   509  }
   510  
   511  // EnsureDesiredReplicasInRange ensure the replicas is in a desired range
   512  func (rc *ResourceConsumer) EnsureDesiredReplicasInRange(ctx context.Context, minDesiredReplicas, maxDesiredReplicas int, duration time.Duration, hpaName string) {
   513  	interval := 10 * time.Second
   514  	desiredReplicasErr := framework.Gomega().Consistently(ctx, framework.HandleRetry(rc.GetReplicas)).
   515  		WithTimeout(duration).
   516  		WithPolling(interval).
   517  		Should(gomega.And(gomega.BeNumerically(">=", minDesiredReplicas), gomega.BeNumerically("<=", maxDesiredReplicas)))
   518  
   519  	// dump HPA for debugging
   520  	as, err := rc.GetHpa(ctx, hpaName)
   521  	if err != nil {
   522  		framework.Logf("Error getting HPA: %s", err)
   523  	} else {
   524  		framework.Logf("HPA status: %+v", as.Status)
   525  	}
   526  	framework.ExpectNoError(desiredReplicasErr)
   527  }
   528  
   529  // Pause stops background goroutines responsible for consuming resources.
   530  func (rc *ResourceConsumer) Pause() {
   531  	ginkgo.By(fmt.Sprintf("HPA pausing RC %s", rc.name))
   532  	rc.stopCPU <- 0
   533  	rc.stopMem <- 0
   534  	rc.stopCustomMetric <- 0
   535  	rc.stopWaitGroup.Wait()
   536  }
   537  
   538  // Resume starts background goroutines responsible for consuming resources.
   539  func (rc *ResourceConsumer) Resume(ctx context.Context) {
   540  	ginkgo.By(fmt.Sprintf("HPA resuming RC %s", rc.name))
   541  	go rc.makeConsumeCPURequests(ctx)
   542  	go rc.makeConsumeMemRequests(ctx)
   543  	go rc.makeConsumeCustomMetric(ctx)
   544  }
   545  
   546  // CleanUp clean up the background goroutines responsible for consuming resources.
   547  func (rc *ResourceConsumer) CleanUp(ctx context.Context) {
   548  	ginkgo.By(fmt.Sprintf("Removing consuming RC %s", rc.name))
   549  	close(rc.stopCPU)
   550  	close(rc.stopMem)
   551  	close(rc.stopCustomMetric)
   552  	rc.stopWaitGroup.Wait()
   553  	// Wait some time to ensure all child goroutines are finished.
   554  	time.Sleep(10 * time.Second)
   555  	kind := rc.kind.GroupKind()
   556  	if kind.Kind == crdKind {
   557  		gvr := schema.GroupVersionResource{Group: crdGroup, Version: crdVersion, Resource: crdNamePlural}
   558  		framework.ExpectNoError(e2eresource.DeleteCustomResourceAndWaitForGC(ctx, rc.clientSet, rc.dynamicClient, rc.scaleClient, gvr, rc.nsName, rc.name))
   559  
   560  	} else {
   561  		framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, rc.clientSet, kind, rc.nsName, rc.name))
   562  	}
   563  
   564  	framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(ctx, rc.name, metav1.DeleteOptions{}))
   565  	framework.ExpectNoError(e2eresource.DeleteResourceAndWaitForGC(ctx, rc.clientSet, schema.GroupKind{Kind: "ReplicationController"}, rc.nsName, rc.controllerName))
   566  	framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(ctx, rc.name+"-ctrl", metav1.DeleteOptions{}))
   567  	// Cleanup sidecar related resources
   568  	if rc.sidecarStatus == Enable && rc.sidecarType == Busy {
   569  		framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(ctx, rc.name+"-sidecar", metav1.DeleteOptions{}))
   570  		framework.ExpectNoError(rc.clientSet.CoreV1().Services(rc.nsName).Delete(ctx, rc.name+"-sidecar-ctrl", metav1.DeleteOptions{}))
   571  	}
   572  }
   573  
   574  func createService(ctx context.Context, c clientset.Interface, name, ns string, annotations, selectors map[string]string, port int32, targetPort int) (*v1.Service, error) {
   575  	return c.CoreV1().Services(ns).Create(ctx, &v1.Service{
   576  		ObjectMeta: metav1.ObjectMeta{
   577  			Name:        name,
   578  			Annotations: annotations,
   579  		},
   580  		Spec: v1.ServiceSpec{
   581  			Ports: []v1.ServicePort{{
   582  				Port:       port,
   583  				TargetPort: intstr.FromInt32(int32(targetPort)),
   584  			}},
   585  			Selector: selectors,
   586  		},
   587  	}, metav1.CreateOptions{})
   588  }
   589  
   590  // runServiceAndSidecarForResourceConsumer creates service and runs resource consumer for sidecar container
   591  func runServiceAndSidecarForResourceConsumer(ctx context.Context, c clientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, serviceAnnotations map[string]string) {
   592  	ginkgo.By(fmt.Sprintf("Running consuming RC sidecar %s via %s with %v replicas", name, kind, replicas))
   593  
   594  	sidecarName := name + "-sidecar"
   595  	serviceSelectors := map[string]string{
   596  		"name": name,
   597  	}
   598  	_, err := createService(ctx, c, sidecarName, ns, serviceAnnotations, serviceSelectors, port, sidecarTargetPort)
   599  	framework.ExpectNoError(err)
   600  
   601  	ginkgo.By("Running controller for sidecar")
   602  	controllerName := sidecarName + "-ctrl"
   603  	_, err = createService(ctx, c, controllerName, ns, map[string]string{}, map[string]string{"name": controllerName}, port, targetPort)
   604  	framework.ExpectNoError(err)
   605  
   606  	dnsClusterFirst := v1.DNSClusterFirst
   607  	controllerRcConfig := testutils.RCConfig{
   608  		Client:    c,
   609  		Image:     imageutils.GetE2EImage(imageutils.Agnhost),
   610  		Name:      controllerName,
   611  		Namespace: ns,
   612  		Timeout:   timeoutRC,
   613  		Replicas:  1,
   614  		Command:   []string{"/agnhost", "resource-consumer-controller", "--consumer-service-name=" + sidecarName, "--consumer-service-namespace=" + ns, "--consumer-port=80"},
   615  		DNSPolicy: &dnsClusterFirst,
   616  	}
   617  
   618  	framework.ExpectNoError(e2erc.RunRC(ctx, controllerRcConfig))
   619  	// Wait for endpoints to propagate for the controller service.
   620  	framework.ExpectNoError(framework.WaitForServiceEndpointsNum(
   621  		ctx, c, ns, controllerName, 1, startServiceInterval, startServiceTimeout))
   622  }
   623  
   624  func runServiceAndWorkloadForResourceConsumer(ctx context.Context, c clientset.Interface, resourceClient dynamic.ResourceInterface, apiExtensionClient crdclientset.Interface, ns, name string, kind schema.GroupVersionKind, replicas int, cpuLimitMillis, memLimitMb int64, podAnnotations, serviceAnnotations map[string]string, additionalContainers []v1.Container) {
   625  	ginkgo.By(fmt.Sprintf("Running consuming RC %s via %s with %v replicas", name, kind, replicas))
   626  	_, err := createService(ctx, c, name, ns, serviceAnnotations, map[string]string{"name": name}, port, targetPort)
   627  	framework.ExpectNoError(err)
   628  
   629  	rcConfig := testutils.RCConfig{
   630  		Client:               c,
   631  		Image:                resourceConsumerImage,
   632  		Name:                 name,
   633  		Namespace:            ns,
   634  		Timeout:              timeoutRC,
   635  		Replicas:             replicas,
   636  		CpuRequest:           cpuLimitMillis,
   637  		CpuLimit:             cpuLimitMillis,
   638  		MemRequest:           memLimitMb * 1024 * 1024, // MemLimit is in bytes
   639  		MemLimit:             memLimitMb * 1024 * 1024,
   640  		Annotations:          podAnnotations,
   641  		AdditionalContainers: additionalContainers,
   642  	}
   643  
   644  	dpConfig := testutils.DeploymentConfig{
   645  		RCConfig: rcConfig,
   646  	}
   647  	dpConfig.NodeDumpFunc = e2edebug.DumpNodeDebugInfo
   648  	dpConfig.ContainerDumpFunc = e2ekubectl.LogFailedContainers
   649  
   650  	switch kind {
   651  	case KindRC:
   652  		framework.ExpectNoError(e2erc.RunRC(ctx, rcConfig))
   653  	case KindDeployment:
   654  		ginkgo.By(fmt.Sprintf("Creating deployment %s in namespace %s", dpConfig.Name, dpConfig.Namespace))
   655  		framework.ExpectNoError(testutils.RunDeployment(ctx, dpConfig))
   656  	case KindReplicaSet:
   657  		rsConfig := testutils.ReplicaSetConfig{
   658  			RCConfig: rcConfig,
   659  		}
   660  		ginkgo.By(fmt.Sprintf("Creating replicaset %s in namespace %s", rsConfig.Name, rsConfig.Namespace))
   661  		framework.ExpectNoError(runReplicaSet(ctx, rsConfig))
   662  	case KindCRD:
   663  		crd := CreateCustomResourceDefinition(ctx, apiExtensionClient)
   664  		crdInstance, err := CreateCustomSubresourceInstance(ctx, ns, name, resourceClient, crd)
   665  		framework.ExpectNoError(err)
   666  
   667  		ginkgo.By(fmt.Sprintf("Creating deployment %s backing CRD in namespace %s", dpConfig.Name, dpConfig.Namespace))
   668  		framework.ExpectNoError(testutils.RunDeployment(ctx, dpConfig))
   669  
   670  		deployment, err := c.AppsV1().Deployments(dpConfig.Namespace).Get(ctx, dpConfig.Name, metav1.GetOptions{})
   671  		framework.ExpectNoError(err)
   672  		deployment.SetOwnerReferences([]metav1.OwnerReference{{
   673  			APIVersion: kind.GroupVersion().String(),
   674  			Kind:       crdKind,
   675  			Name:       name,
   676  			UID:        crdInstance.GetUID(),
   677  		}})
   678  		_, err = c.AppsV1().Deployments(dpConfig.Namespace).Update(ctx, deployment, metav1.UpdateOptions{})
   679  		framework.ExpectNoError(err)
   680  	default:
   681  		framework.Failf(invalidKind)
   682  	}
   683  
   684  	ginkgo.By(fmt.Sprintf("Running controller"))
   685  	controllerName := name + "-ctrl"
   686  	_, err = createService(ctx, c, controllerName, ns, map[string]string{}, map[string]string{"name": controllerName}, port, targetPort)
   687  	framework.ExpectNoError(err)
   688  
   689  	dnsClusterFirst := v1.DNSClusterFirst
   690  	controllerRcConfig := testutils.RCConfig{
   691  		Client:    c,
   692  		Image:     imageutils.GetE2EImage(imageutils.Agnhost),
   693  		Name:      controllerName,
   694  		Namespace: ns,
   695  		Timeout:   timeoutRC,
   696  		Replicas:  1,
   697  		Command:   []string{"/agnhost", "resource-consumer-controller", "--consumer-service-name=" + name, "--consumer-service-namespace=" + ns, "--consumer-port=80"},
   698  		DNSPolicy: &dnsClusterFirst,
   699  	}
   700  
   701  	framework.ExpectNoError(e2erc.RunRC(ctx, controllerRcConfig))
   702  	// Wait for endpoints to propagate for the controller service.
   703  	framework.ExpectNoError(framework.WaitForServiceEndpointsNum(
   704  		ctx, c, ns, controllerName, 1, startServiceInterval, startServiceTimeout))
   705  }
   706  
   707  func CreateHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, targetRef autoscalingv2.CrossVersionObjectReference, namespace string, metrics []autoscalingv2.MetricSpec, resourceType v1.ResourceName, metricTargetType autoscalingv2.MetricTargetType, metricTargetValue, minReplicas, maxReplicas int32) *autoscalingv2.HorizontalPodAutoscaler {
   708  	hpa := &autoscalingv2.HorizontalPodAutoscaler{
   709  		ObjectMeta: metav1.ObjectMeta{
   710  			Name:      targetRef.Name,
   711  			Namespace: namespace,
   712  		},
   713  		Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
   714  			ScaleTargetRef: targetRef,
   715  			MinReplicas:    &minReplicas,
   716  			MaxReplicas:    maxReplicas,
   717  			Metrics:        metrics,
   718  		},
   719  	}
   720  	hpa, errHPA := rc.clientSet.AutoscalingV2().HorizontalPodAutoscalers(namespace).Create(ctx, hpa, metav1.CreateOptions{})
   721  	framework.ExpectNoError(errHPA)
   722  	return hpa
   723  }
   724  
   725  func CreateResourceHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, resourceType v1.ResourceName, metricTargetType autoscalingv2.MetricTargetType, metricTargetValue, minReplicas, maxReplicas int32) *autoscalingv2.HorizontalPodAutoscaler {
   726  	targetRef := autoscalingv2.CrossVersionObjectReference{
   727  		APIVersion: rc.kind.GroupVersion().String(),
   728  		Kind:       rc.kind.Kind,
   729  		Name:       rc.name,
   730  	}
   731  	metrics := []autoscalingv2.MetricSpec{
   732  		{
   733  			Type: autoscalingv2.ResourceMetricSourceType,
   734  			Resource: &autoscalingv2.ResourceMetricSource{
   735  				Name:   resourceType,
   736  				Target: CreateMetricTargetWithType(resourceType, metricTargetType, metricTargetValue),
   737  			},
   738  		},
   739  	}
   740  	return CreateHorizontalPodAutoscaler(ctx, rc, targetRef, rc.nsName, metrics, resourceType, metricTargetType, metricTargetValue, minReplicas, maxReplicas)
   741  }
   742  
   743  func CreateCPUResourceHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, cpu, minReplicas, maxReplicas int32) *autoscalingv2.HorizontalPodAutoscaler {
   744  	return CreateResourceHorizontalPodAutoscaler(ctx, rc, v1.ResourceCPU, autoscalingv2.UtilizationMetricType, cpu, minReplicas, maxReplicas)
   745  }
   746  
   747  // DeleteHorizontalPodAutoscaler delete the horizontalPodAutoscaler for consuming resources.
   748  func DeleteHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, autoscalerName string) {
   749  	framework.ExpectNoError(rc.clientSet.AutoscalingV1().HorizontalPodAutoscalers(rc.nsName).Delete(ctx, autoscalerName, metav1.DeleteOptions{}))
   750  }
   751  
   752  // runReplicaSet launches (and verifies correctness) of a replicaset.
   753  func runReplicaSet(ctx context.Context, config testutils.ReplicaSetConfig) error {
   754  	ginkgo.By(fmt.Sprintf("creating replicaset %s in namespace %s", config.Name, config.Namespace))
   755  	config.NodeDumpFunc = e2edebug.DumpNodeDebugInfo
   756  	config.ContainerDumpFunc = e2ekubectl.LogFailedContainers
   757  	return testutils.RunReplicaSet(ctx, config)
   758  }
   759  
   760  func CreateContainerResourceHorizontalPodAutoscaler(ctx context.Context, rc *ResourceConsumer, resourceType v1.ResourceName, metricTargetType autoscalingv2.MetricTargetType, metricTargetValue, minReplicas, maxReplicas int32) *autoscalingv2.HorizontalPodAutoscaler {
   761  	targetRef := autoscalingv2.CrossVersionObjectReference{
   762  		APIVersion: rc.kind.GroupVersion().String(),
   763  		Kind:       rc.kind.Kind,
   764  		Name:       rc.name,
   765  	}
   766  	metrics := []autoscalingv2.MetricSpec{
   767  		{
   768  			Type: autoscalingv2.ContainerResourceMetricSourceType,
   769  			ContainerResource: &autoscalingv2.ContainerResourceMetricSource{
   770  				Name:      resourceType,
   771  				Container: rc.name,
   772  				Target:    CreateMetricTargetWithType(resourceType, metricTargetType, metricTargetValue),
   773  			},
   774  		},
   775  	}
   776  	return CreateHorizontalPodAutoscaler(ctx, rc, targetRef, rc.nsName, metrics, resourceType, metricTargetType, metricTargetValue, minReplicas, maxReplicas)
   777  }
   778  
   779  // DeleteContainerResourceHPA delete the horizontalPodAutoscaler for consuming resources.
   780  func DeleteContainerResourceHPA(ctx context.Context, rc *ResourceConsumer, autoscalerName string) {
   781  	framework.ExpectNoError(rc.clientSet.AutoscalingV2().HorizontalPodAutoscalers(rc.nsName).Delete(ctx, autoscalerName, metav1.DeleteOptions{}))
   782  }
   783  
   784  func CreateMetricTargetWithType(resourceType v1.ResourceName, targetType autoscalingv2.MetricTargetType, targetValue int32) autoscalingv2.MetricTarget {
   785  	var metricTarget autoscalingv2.MetricTarget
   786  	if targetType == autoscalingv2.UtilizationMetricType {
   787  		metricTarget = autoscalingv2.MetricTarget{
   788  			Type:               targetType,
   789  			AverageUtilization: &targetValue,
   790  		}
   791  	} else if targetType == autoscalingv2.AverageValueMetricType {
   792  		var averageValue *resource.Quantity
   793  		if resourceType == v1.ResourceCPU {
   794  			averageValue = resource.NewMilliQuantity(int64(targetValue), resource.DecimalSI)
   795  		} else {
   796  			averageValue = resource.NewQuantity(int64(targetValue*megabytes), resource.DecimalSI)
   797  		}
   798  		metricTarget = autoscalingv2.MetricTarget{
   799  			Type:         targetType,
   800  			AverageValue: averageValue,
   801  		}
   802  	}
   803  	return metricTarget
   804  }
   805  
   806  func CreateCPUHorizontalPodAutoscalerWithBehavior(ctx context.Context, rc *ResourceConsumer, cpu int32, minReplicas int32, maxRepl int32, behavior *autoscalingv2.HorizontalPodAutoscalerBehavior) *autoscalingv2.HorizontalPodAutoscaler {
   807  	hpa := &autoscalingv2.HorizontalPodAutoscaler{
   808  		ObjectMeta: metav1.ObjectMeta{
   809  			Name:      rc.name,
   810  			Namespace: rc.nsName,
   811  		},
   812  		Spec: autoscalingv2.HorizontalPodAutoscalerSpec{
   813  			ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{
   814  				APIVersion: rc.kind.GroupVersion().String(),
   815  				Kind:       rc.kind.Kind,
   816  				Name:       rc.name,
   817  			},
   818  			MinReplicas: &minReplicas,
   819  			MaxReplicas: maxRepl,
   820  			Metrics: []autoscalingv2.MetricSpec{
   821  				{
   822  					Type: autoscalingv2.ResourceMetricSourceType,
   823  					Resource: &autoscalingv2.ResourceMetricSource{
   824  						Name: v1.ResourceCPU,
   825  						Target: autoscalingv2.MetricTarget{
   826  							Type:               autoscalingv2.UtilizationMetricType,
   827  							AverageUtilization: &cpu,
   828  						},
   829  					},
   830  				},
   831  			},
   832  			Behavior: behavior,
   833  		},
   834  	}
   835  	hpa, errHPA := rc.clientSet.AutoscalingV2().HorizontalPodAutoscalers(rc.nsName).Create(ctx, hpa, metav1.CreateOptions{})
   836  	framework.ExpectNoError(errHPA)
   837  	return hpa
   838  }
   839  
   840  func HPABehaviorWithScaleUpAndDownRules(scaleUpRule, scaleDownRule *autoscalingv2.HPAScalingRules) *autoscalingv2.HorizontalPodAutoscalerBehavior {
   841  	return &autoscalingv2.HorizontalPodAutoscalerBehavior{
   842  		ScaleUp:   scaleUpRule,
   843  		ScaleDown: scaleDownRule,
   844  	}
   845  }
   846  
   847  func HPABehaviorWithScalingRuleInDirection(scalingDirection ScalingDirection, rule *autoscalingv2.HPAScalingRules) *autoscalingv2.HorizontalPodAutoscalerBehavior {
   848  	var scaleUpRule, scaleDownRule *autoscalingv2.HPAScalingRules
   849  	if scalingDirection == ScaleUpDirection {
   850  		scaleUpRule = rule
   851  	}
   852  	if scalingDirection == ScaleDownDirection {
   853  		scaleDownRule = rule
   854  	}
   855  	return HPABehaviorWithScaleUpAndDownRules(scaleUpRule, scaleDownRule)
   856  }
   857  
   858  func HPAScalingRuleWithStabilizationWindow(stabilizationDuration int32) *autoscalingv2.HPAScalingRules {
   859  	return &autoscalingv2.HPAScalingRules{
   860  		StabilizationWindowSeconds: &stabilizationDuration,
   861  	}
   862  }
   863  
   864  func HPAScalingRuleWithPolicyDisabled() *autoscalingv2.HPAScalingRules {
   865  	disabledPolicy := autoscalingv2.DisabledPolicySelect
   866  	return &autoscalingv2.HPAScalingRules{
   867  		SelectPolicy: &disabledPolicy,
   868  	}
   869  }
   870  
   871  func HPAScalingRuleWithScalingPolicy(policyType autoscalingv2.HPAScalingPolicyType, value, periodSeconds int32) *autoscalingv2.HPAScalingRules {
   872  	stabilizationWindowDisabledDuration := int32(0)
   873  	selectPolicy := autoscalingv2.MaxChangePolicySelect
   874  	return &autoscalingv2.HPAScalingRules{
   875  		Policies: []autoscalingv2.HPAScalingPolicy{
   876  			{
   877  				Type:          policyType,
   878  				Value:         value,
   879  				PeriodSeconds: periodSeconds,
   880  			},
   881  		},
   882  		SelectPolicy:               &selectPolicy,
   883  		StabilizationWindowSeconds: &stabilizationWindowDisabledDuration,
   884  	}
   885  }
   886  
   887  func HPABehaviorWithStabilizationWindows(upscaleStabilization, downscaleStabilization time.Duration) *autoscalingv2.HorizontalPodAutoscalerBehavior {
   888  	scaleUpRule := HPAScalingRuleWithStabilizationWindow(int32(upscaleStabilization.Seconds()))
   889  	scaleDownRule := HPAScalingRuleWithStabilizationWindow(int32(downscaleStabilization.Seconds()))
   890  	return HPABehaviorWithScaleUpAndDownRules(scaleUpRule, scaleDownRule)
   891  }
   892  
   893  func HPABehaviorWithScaleDisabled(scalingDirection ScalingDirection) *autoscalingv2.HorizontalPodAutoscalerBehavior {
   894  	scalingRule := HPAScalingRuleWithPolicyDisabled()
   895  	return HPABehaviorWithScalingRuleInDirection(scalingDirection, scalingRule)
   896  }
   897  
   898  func HPABehaviorWithScaleLimitedByNumberOfPods(scalingDirection ScalingDirection, numberOfPods, periodSeconds int32) *autoscalingv2.HorizontalPodAutoscalerBehavior {
   899  	scalingRule := HPAScalingRuleWithScalingPolicy(autoscalingv2.PodsScalingPolicy, numberOfPods, periodSeconds)
   900  	return HPABehaviorWithScalingRuleInDirection(scalingDirection, scalingRule)
   901  }
   902  
   903  func HPABehaviorWithScaleLimitedByPercentage(scalingDirection ScalingDirection, percentage, periodSeconds int32) *autoscalingv2.HorizontalPodAutoscalerBehavior {
   904  	scalingRule := HPAScalingRuleWithScalingPolicy(autoscalingv2.PercentScalingPolicy, percentage, periodSeconds)
   905  	return HPABehaviorWithScalingRuleInDirection(scalingDirection, scalingRule)
   906  }
   907  
   908  func DeleteHPAWithBehavior(ctx context.Context, rc *ResourceConsumer, autoscalerName string) {
   909  	framework.ExpectNoError(rc.clientSet.AutoscalingV2().HorizontalPodAutoscalers(rc.nsName).Delete(ctx, autoscalerName, metav1.DeleteOptions{}))
   910  }
   911  
   912  // SidecarStatusType type for sidecar status
   913  type SidecarStatusType bool
   914  
   915  const (
   916  	Enable  SidecarStatusType = true
   917  	Disable SidecarStatusType = false
   918  )
   919  
   920  // SidecarWorkloadType type of the sidecar
   921  type SidecarWorkloadType string
   922  
   923  const (
   924  	Busy SidecarWorkloadType = "Busy"
   925  	Idle SidecarWorkloadType = "Idle"
   926  )
   927  
   928  func CreateCustomResourceDefinition(ctx context.Context, c crdclientset.Interface) *apiextensionsv1.CustomResourceDefinition {
   929  	crdSchema := &apiextensionsv1.CustomResourceDefinition{
   930  		ObjectMeta: metav1.ObjectMeta{Name: crdNamePlural + "." + crdGroup},
   931  		Spec: apiextensionsv1.CustomResourceDefinitionSpec{
   932  			Group: crdGroup,
   933  			Scope: apiextensionsv1.ResourceScope("Namespaced"),
   934  			Names: apiextensionsv1.CustomResourceDefinitionNames{
   935  				Plural:   crdNamePlural,
   936  				Singular: crdName,
   937  				Kind:     crdKind,
   938  				ListKind: "TestCRDList",
   939  			},
   940  			Versions: []apiextensionsv1.CustomResourceDefinitionVersion{{
   941  				Name:    crdVersion,
   942  				Served:  true,
   943  				Storage: true,
   944  				Schema:  fixtures.AllowAllSchema(),
   945  				Subresources: &apiextensionsv1.CustomResourceSubresources{
   946  					Scale: &apiextensionsv1.CustomResourceSubresourceScale{
   947  						SpecReplicasPath:   ".spec.replicas",
   948  						StatusReplicasPath: ".status.replicas",
   949  						LabelSelectorPath:  utilpointer.String(".status.selector"),
   950  					},
   951  				},
   952  			}},
   953  		},
   954  		Status: apiextensionsv1.CustomResourceDefinitionStatus{},
   955  	}
   956  	// Create Custom Resource Definition if it's not present.
   957  	crd, err := c.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, crdSchema.Name, metav1.GetOptions{})
   958  	if err != nil {
   959  		crd, err = c.ApiextensionsV1().CustomResourceDefinitions().Create(ctx, crdSchema, metav1.CreateOptions{})
   960  		framework.ExpectNoError(err)
   961  		// Wait until just created CRD appears in discovery.
   962  		err = framework.Gomega().Eventually(ctx, framework.RetryNotFound(framework.HandleRetry(func(ctx context.Context) (*metav1.APIResourceList, error) {
   963  			return c.Discovery().ServerResourcesForGroupVersion(crd.Spec.Group + "/" + "v1")
   964  		}))).Should(framework.MakeMatcher(func(actual *metav1.APIResourceList) (func() string, error) {
   965  			for _, g := range actual.APIResources {
   966  				if g.Name == crd.Spec.Names.Plural {
   967  					return nil, nil
   968  				}
   969  			}
   970  			return func() string {
   971  				return fmt.Sprintf("CRD %s not found in discovery", crd.Spec.Names.Plural)
   972  			}, nil
   973  		}))
   974  		framework.ExpectNoError(err)
   975  		ginkgo.By(fmt.Sprintf("Successfully created Custom Resource Definition: %v", crd))
   976  	}
   977  	return crd
   978  }
   979  
   980  func CreateCustomSubresourceInstance(ctx context.Context, namespace, name string, client dynamic.ResourceInterface, definition *apiextensionsv1.CustomResourceDefinition) (*unstructured.Unstructured, error) {
   981  	instance := &unstructured.Unstructured{
   982  		Object: map[string]interface{}{
   983  			"apiVersion": crdGroup + "/" + crdVersion,
   984  			"kind":       crdKind,
   985  			"metadata": map[string]interface{}{
   986  				"namespace": namespace,
   987  				"name":      name,
   988  			},
   989  			"spec": map[string]interface{}{
   990  				"num":      int64(1),
   991  				"replicas": int64(1),
   992  			},
   993  			"status": map[string]interface{}{
   994  				"replicas": int64(1),
   995  				"selector": "name=" + name,
   996  			},
   997  		},
   998  	}
   999  	instance, err := client.Create(ctx, instance, metav1.CreateOptions{})
  1000  	if err != nil {
  1001  		framework.Logf("%#v", instance)
  1002  		return nil, err
  1003  	}
  1004  	createdObjectMeta, err := meta.Accessor(instance)
  1005  	if err != nil {
  1006  		return nil, fmt.Errorf("Error while creating object meta: %w", err)
  1007  	}
  1008  	if len(createdObjectMeta.GetUID()) == 0 {
  1009  		return nil, fmt.Errorf("Missing UUID: %v", instance)
  1010  	}
  1011  	ginkgo.By(fmt.Sprintf("Successfully created instance of CRD of kind %v: %v", definition.Kind, instance))
  1012  	return instance, nil
  1013  }
  1014  

View as plain text