...

Source file src/k8s.io/kubernetes/test/e2e/autoscaling/cluster_autoscaler_scalability.go

Documentation: k8s.io/kubernetes/test/e2e/autoscaling

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package autoscaling
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"math"
    24  	"strings"
    25  	"time"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/fields"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/klog/v2"
    34  	"k8s.io/kubernetes/test/e2e/feature"
    35  	"k8s.io/kubernetes/test/e2e/framework"
    36  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    37  	e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
    38  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    39  	testutils "k8s.io/kubernetes/test/utils"
    40  	imageutils "k8s.io/kubernetes/test/utils/image"
    41  	admissionapi "k8s.io/pod-security-admission/api"
    42  
    43  	"github.com/onsi/ginkgo/v2"
    44  	"github.com/onsi/gomega"
    45  )
    46  
    47  const (
    48  	memoryReservationTimeout = 5 * time.Minute
    49  	largeResizeTimeout       = 8 * time.Minute
    50  	largeScaleUpTimeout      = 10 * time.Minute
    51  	maxNodes                 = 1000
    52  )
    53  
    54  type clusterPredicates struct {
    55  	nodes int
    56  }
    57  
    58  type scaleUpTestConfig struct {
    59  	initialNodes   int
    60  	initialPods    int
    61  	extraPods      *testutils.RCConfig
    62  	expectedResult *clusterPredicates
    63  }
    64  
    65  var _ = SIGDescribe("Cluster size autoscaler scalability", framework.WithSlow(), func() {
    66  	f := framework.NewDefaultFramework("autoscaling")
    67  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    68  	var c clientset.Interface
    69  	var nodeCount int
    70  	var coresPerNode int
    71  	var memCapacityMb int
    72  	var originalSizes map[string]int
    73  	var sum int
    74  
    75  	ginkgo.BeforeEach(func(ctx context.Context) {
    76  		e2eskipper.SkipUnlessProviderIs("gce", "gke", "kubemark")
    77  
    78  		// Check if Cloud Autoscaler is enabled by trying to get its ConfigMap.
    79  		_, err := f.ClientSet.CoreV1().ConfigMaps("kube-system").Get(ctx, "cluster-autoscaler-status", metav1.GetOptions{})
    80  		if err != nil {
    81  			e2eskipper.Skipf("test expects Cluster Autoscaler to be enabled")
    82  		}
    83  
    84  		c = f.ClientSet
    85  		if originalSizes == nil {
    86  			originalSizes = make(map[string]int)
    87  			sum = 0
    88  			for _, mig := range strings.Split(framework.TestContext.CloudConfig.NodeInstanceGroup, ",") {
    89  				size, err := framework.GroupSize(mig)
    90  				framework.ExpectNoError(err)
    91  				ginkgo.By(fmt.Sprintf("Initial size of %s: %d", mig, size))
    92  				originalSizes[mig] = size
    93  				sum += size
    94  			}
    95  		}
    96  
    97  		framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, sum, scaleUpTimeout))
    98  
    99  		nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
   100  		framework.ExpectNoError(err)
   101  		nodeCount = len(nodes.Items)
   102  		cpu := nodes.Items[0].Status.Capacity[v1.ResourceCPU]
   103  		mem := nodes.Items[0].Status.Capacity[v1.ResourceMemory]
   104  		coresPerNode = int((&cpu).MilliValue() / 1000)
   105  		memCapacityMb = int((&mem).Value() / 1024 / 1024)
   106  
   107  		gomega.Expect(nodeCount).To(gomega.Equal(sum))
   108  
   109  		if framework.ProviderIs("gke") {
   110  			val, err := isAutoscalerEnabled(3)
   111  			framework.ExpectNoError(err)
   112  			if !val {
   113  				err = enableAutoscaler("default-pool", 3, 5)
   114  				framework.ExpectNoError(err)
   115  			}
   116  		}
   117  	})
   118  
   119  	ginkgo.AfterEach(func(ctx context.Context) {
   120  		ginkgo.By(fmt.Sprintf("Restoring initial size of the cluster"))
   121  		setMigSizes(originalSizes)
   122  		framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, c, nodeCount, scaleDownTimeout))
   123  		nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
   124  		framework.ExpectNoError(err)
   125  		s := time.Now()
   126  	makeSchedulableLoop:
   127  		for start := time.Now(); time.Since(start) < makeSchedulableTimeout; time.Sleep(makeSchedulableDelay) {
   128  			for _, n := range nodes.Items {
   129  				err = makeNodeSchedulable(ctx, c, &n, true)
   130  				switch err.(type) {
   131  				case CriticalAddonsOnlyError:
   132  					continue makeSchedulableLoop
   133  				default:
   134  					framework.ExpectNoError(err)
   135  				}
   136  			}
   137  			break
   138  		}
   139  		klog.Infof("Made nodes schedulable again in %v", time.Since(s).String())
   140  	})
   141  
   142  	f.It("should scale up at all", feature.ClusterAutoscalerScalability1, func(ctx context.Context) {
   143  		perNodeReservation := int(float64(memCapacityMb) * 0.95)
   144  		replicasPerNode := 10
   145  
   146  		additionalNodes := maxNodes - nodeCount
   147  		replicas := additionalNodes * replicasPerNode
   148  		additionalReservation := additionalNodes * perNodeReservation
   149  
   150  		// saturate cluster
   151  		reservationCleanup := ReserveMemory(ctx, f, "some-pod", nodeCount*2, nodeCount*perNodeReservation, true, memoryReservationTimeout)
   152  		defer reservationCleanup()
   153  		framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
   154  
   155  		// configure pending pods & expected scale up
   156  		rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas, additionalReservation, largeScaleUpTimeout)
   157  		expectedResult := createClusterPredicates(nodeCount + additionalNodes)
   158  		config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
   159  
   160  		// run test
   161  		testCleanup := simpleScaleUpTest(ctx, f, config)
   162  		defer testCleanup()
   163  	})
   164  
   165  	f.It("should scale up twice", feature.ClusterAutoscalerScalability2, func(ctx context.Context) {
   166  		perNodeReservation := int(float64(memCapacityMb) * 0.95)
   167  		replicasPerNode := 10
   168  		additionalNodes1 := int(math.Ceil(0.7 * maxNodes))
   169  		additionalNodes2 := int(math.Ceil(0.25 * maxNodes))
   170  		if additionalNodes1+additionalNodes2 > maxNodes {
   171  			additionalNodes2 = maxNodes - additionalNodes1
   172  		}
   173  
   174  		replicas1 := additionalNodes1 * replicasPerNode
   175  		replicas2 := additionalNodes2 * replicasPerNode
   176  
   177  		klog.Infof("cores per node: %v", coresPerNode)
   178  
   179  		// saturate cluster
   180  		initialReplicas := nodeCount
   181  		reservationCleanup := ReserveMemory(ctx, f, "some-pod", initialReplicas, nodeCount*perNodeReservation, true, memoryReservationTimeout)
   182  		defer reservationCleanup()
   183  		framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
   184  
   185  		klog.Infof("Reserved successfully")
   186  
   187  		// configure pending pods & expected scale up #1
   188  		rcConfig := reserveMemoryRCConfig(f, "extra-pod-1", replicas1, additionalNodes1*perNodeReservation, largeScaleUpTimeout)
   189  		expectedResult := createClusterPredicates(nodeCount + additionalNodes1)
   190  		config := createScaleUpTestConfig(nodeCount, nodeCount, rcConfig, expectedResult)
   191  
   192  		// run test #1
   193  		tolerateUnreadyNodes := additionalNodes1 / 20
   194  		tolerateUnreadyPods := (initialReplicas + replicas1) / 20
   195  		testCleanup1 := simpleScaleUpTestWithTolerance(ctx, f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
   196  		defer testCleanup1()
   197  
   198  		klog.Infof("Scaled up once")
   199  
   200  		// configure pending pods & expected scale up #2
   201  		rcConfig2 := reserveMemoryRCConfig(f, "extra-pod-2", replicas2, additionalNodes2*perNodeReservation, largeScaleUpTimeout)
   202  		expectedResult2 := createClusterPredicates(nodeCount + additionalNodes1 + additionalNodes2)
   203  		config2 := createScaleUpTestConfig(nodeCount+additionalNodes1, nodeCount+additionalNodes2, rcConfig2, expectedResult2)
   204  
   205  		// run test #2
   206  		tolerateUnreadyNodes = maxNodes / 20
   207  		tolerateUnreadyPods = (initialReplicas + replicas1 + replicas2) / 20
   208  		testCleanup2 := simpleScaleUpTestWithTolerance(ctx, f, config2, tolerateUnreadyNodes, tolerateUnreadyPods)
   209  		defer testCleanup2()
   210  
   211  		klog.Infof("Scaled up twice")
   212  	})
   213  
   214  	f.It("should scale down empty nodes", feature.ClusterAutoscalerScalability3, func(ctx context.Context) {
   215  		perNodeReservation := int(float64(memCapacityMb) * 0.7)
   216  		replicas := int(math.Ceil(maxNodes * 0.7))
   217  		totalNodes := maxNodes
   218  
   219  		// resize cluster to totalNodes
   220  		newSizes := map[string]int{
   221  			anyKey(originalSizes): totalNodes,
   222  		}
   223  		setMigSizes(newSizes)
   224  		framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, f.ClientSet, totalNodes, largeResizeTimeout))
   225  
   226  		// run replicas
   227  		rcConfig := reserveMemoryRCConfig(f, "some-pod", replicas, replicas*perNodeReservation, largeScaleUpTimeout)
   228  		expectedResult := createClusterPredicates(totalNodes)
   229  		config := createScaleUpTestConfig(totalNodes, totalNodes, rcConfig, expectedResult)
   230  		tolerateUnreadyNodes := totalNodes / 10
   231  		tolerateUnreadyPods := replicas / 10
   232  		testCleanup := simpleScaleUpTestWithTolerance(ctx, f, config, tolerateUnreadyNodes, tolerateUnreadyPods)
   233  		defer testCleanup()
   234  
   235  		// check if empty nodes are scaled down
   236  		framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
   237  			func(size int) bool {
   238  				return size <= replicas+3 // leaving space for non-evictable kube-system pods
   239  			}, scaleDownTimeout))
   240  	})
   241  
   242  	f.It("should scale down underutilized nodes", feature.ClusterAutoscalerScalability4, func(ctx context.Context) {
   243  		perPodReservation := int(float64(memCapacityMb) * 0.01)
   244  		// underutilizedNodes are 10% full
   245  		underutilizedPerNodeReplicas := 10
   246  		// fullNodes are 70% full
   247  		fullPerNodeReplicas := 70
   248  		totalNodes := maxNodes
   249  		underutilizedRatio := 0.3
   250  		maxDelta := 30
   251  
   252  		// resize cluster to totalNodes
   253  		newSizes := map[string]int{
   254  			anyKey(originalSizes): totalNodes,
   255  		}
   256  		setMigSizes(newSizes)
   257  
   258  		framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, f.ClientSet, totalNodes, largeResizeTimeout))
   259  
   260  		// annotate all nodes with no-scale-down
   261  		ScaleDownDisabledKey := "cluster-autoscaler.kubernetes.io/scale-down-disabled"
   262  
   263  		nodes, err := f.ClientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{
   264  			FieldSelector: fields.Set{
   265  				"spec.unschedulable": "false",
   266  			}.AsSelector().String(),
   267  		})
   268  
   269  		framework.ExpectNoError(err)
   270  		framework.ExpectNoError(addAnnotation(ctx, f, nodes.Items, ScaleDownDisabledKey, "true"))
   271  
   272  		// distribute pods using replication controllers taking up space that should
   273  		// be empty after pods are distributed
   274  		underutilizedNodesNum := int(float64(maxNodes) * underutilizedRatio)
   275  		fullNodesNum := totalNodes - underutilizedNodesNum
   276  
   277  		podDistribution := []podBatch{
   278  			{numNodes: fullNodesNum, podsPerNode: fullPerNodeReplicas},
   279  			{numNodes: underutilizedNodesNum, podsPerNode: underutilizedPerNodeReplicas}}
   280  
   281  		distributeLoad(ctx, f, f.Namespace.Name, "10-70", podDistribution, perPodReservation,
   282  			int(0.95*float64(memCapacityMb)), map[string]string{}, largeScaleUpTimeout)
   283  
   284  		// enable scale down again
   285  		framework.ExpectNoError(addAnnotation(ctx, f, nodes.Items, ScaleDownDisabledKey, "false"))
   286  
   287  		// wait for scale down to start. Node deletion takes a long time, so we just
   288  		// wait for maximum of 30 nodes deleted
   289  		nodesToScaleDownCount := int(float64(totalNodes) * 0.1)
   290  		if nodesToScaleDownCount > maxDelta {
   291  			nodesToScaleDownCount = maxDelta
   292  		}
   293  		expectedSize := totalNodes - nodesToScaleDownCount
   294  		timeout := time.Duration(nodesToScaleDownCount)*time.Minute + scaleDownTimeout
   295  		framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet, func(size int) bool {
   296  			return size <= expectedSize
   297  		}, timeout))
   298  	})
   299  
   300  	f.It("shouldn't scale down with underutilized nodes due to host port conflicts", feature.ClusterAutoscalerScalability5, func(ctx context.Context) {
   301  		fullReservation := int(float64(memCapacityMb) * 0.9)
   302  		hostPortPodReservation := int(float64(memCapacityMb) * 0.3)
   303  		totalNodes := maxNodes
   304  		reservedPort := 4321
   305  
   306  		// resize cluster to totalNodes
   307  		newSizes := map[string]int{
   308  			anyKey(originalSizes): totalNodes,
   309  		}
   310  		setMigSizes(newSizes)
   311  		framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, f.ClientSet, totalNodes, largeResizeTimeout))
   312  		divider := int(float64(totalNodes) * 0.7)
   313  		fullNodesCount := divider
   314  		underutilizedNodesCount := totalNodes - fullNodesCount
   315  
   316  		ginkgo.By("Reserving full nodes")
   317  		// run RC1 w/o host port
   318  		cleanup := ReserveMemory(ctx, f, "filling-pod", fullNodesCount, fullNodesCount*fullReservation, true, largeScaleUpTimeout*2)
   319  		defer cleanup()
   320  
   321  		ginkgo.By("Reserving host ports on remaining nodes")
   322  		// run RC2 w/ host port
   323  		ginkgo.DeferCleanup(createHostPortPodsWithMemory, f, "underutilizing-host-port-pod", underutilizedNodesCount, reservedPort, underutilizedNodesCount*hostPortPodReservation, largeScaleUpTimeout)
   324  
   325  		framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
   326  		// wait and check scale down doesn't occur
   327  		ginkgo.By(fmt.Sprintf("Sleeping %v minutes...", scaleDownTimeout.Minutes()))
   328  		time.Sleep(scaleDownTimeout)
   329  
   330  		ginkgo.By("Checking if the number of nodes is as expected")
   331  		nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
   332  		framework.ExpectNoError(err)
   333  		klog.Infof("Nodes: %v, expected: %v", len(nodes.Items), totalNodes)
   334  		gomega.Expect(nodes.Items).To(gomega.HaveLen(totalNodes))
   335  	})
   336  
   337  	f.It("CA ignores unschedulable pods while scheduling schedulable pods", feature.ClusterAutoscalerScalability6, func(ctx context.Context) {
   338  		// Start a number of pods saturating existing nodes.
   339  		perNodeReservation := int(float64(memCapacityMb) * 0.80)
   340  		replicasPerNode := 10
   341  		initialPodReplicas := nodeCount * replicasPerNode
   342  		initialPodsTotalMemory := nodeCount * perNodeReservation
   343  		reservationCleanup := ReserveMemory(ctx, f, "initial-pod", initialPodReplicas, initialPodsTotalMemory, true /* wait for pods to run */, memoryReservationTimeout)
   344  		ginkgo.DeferCleanup(reservationCleanup)
   345  		framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, c))
   346  
   347  		// Configure a number of unschedulable pods.
   348  		unschedulableMemReservation := memCapacityMb * 2
   349  		unschedulablePodReplicas := 1000
   350  		totalMemReservation := unschedulableMemReservation * unschedulablePodReplicas
   351  		timeToWait := 5 * time.Minute
   352  		podsConfig := reserveMemoryRCConfig(f, "unschedulable-pod", unschedulablePodReplicas, totalMemReservation, timeToWait)
   353  		_ = e2erc.RunRC(ctx, *podsConfig) // Ignore error (it will occur because pods are unschedulable)
   354  		ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, podsConfig.Name)
   355  
   356  		// Ensure that no new nodes have been added so far.
   357  		readyNodeCount, _ := e2enode.TotalReady(ctx, f.ClientSet)
   358  		gomega.Expect(readyNodeCount).To(gomega.Equal(nodeCount))
   359  
   360  		// Start a number of schedulable pods to ensure CA reacts.
   361  		additionalNodes := maxNodes - nodeCount
   362  		replicas := additionalNodes * replicasPerNode
   363  		totalMemory := additionalNodes * perNodeReservation
   364  		rcConfig := reserveMemoryRCConfig(f, "extra-pod", replicas, totalMemory, largeScaleUpTimeout)
   365  		expectedResult := createClusterPredicates(nodeCount + additionalNodes)
   366  		config := createScaleUpTestConfig(nodeCount, initialPodReplicas, rcConfig, expectedResult)
   367  
   368  		// Test that scale up happens, allowing 1000 unschedulable pods not to be scheduled.
   369  		testCleanup := simpleScaleUpTestWithTolerance(ctx, f, config, 0, unschedulablePodReplicas)
   370  		ginkgo.DeferCleanup(testCleanup)
   371  	})
   372  
   373  })
   374  
   375  func anyKey(input map[string]int) string {
   376  	for k := range input {
   377  		return k
   378  	}
   379  	return ""
   380  }
   381  
   382  func simpleScaleUpTestWithTolerance(ctx context.Context, f *framework.Framework, config *scaleUpTestConfig, tolerateMissingNodeCount int, tolerateMissingPodCount int) func() error {
   383  	// resize cluster to start size
   384  	// run rc based on config
   385  	ginkgo.By(fmt.Sprintf("Running RC %v from config", config.extraPods.Name))
   386  	start := time.Now()
   387  	framework.ExpectNoError(e2erc.RunRC(ctx, *config.extraPods))
   388  	// check results
   389  	if tolerateMissingNodeCount > 0 {
   390  		// Tolerate some number of nodes not to be created.
   391  		minExpectedNodeCount := config.expectedResult.nodes - tolerateMissingNodeCount
   392  		framework.ExpectNoError(WaitForClusterSizeFunc(ctx, f.ClientSet,
   393  			func(size int) bool { return size >= minExpectedNodeCount }, scaleUpTimeout))
   394  	} else {
   395  		framework.ExpectNoError(e2enode.WaitForReadyNodes(ctx, f.ClientSet, config.expectedResult.nodes, scaleUpTimeout))
   396  	}
   397  	klog.Infof("cluster is increased")
   398  	if tolerateMissingPodCount > 0 {
   399  		framework.ExpectNoError(waitForCaPodsReadyInNamespace(ctx, f, f.ClientSet, tolerateMissingPodCount))
   400  	} else {
   401  		framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, f.ClientSet))
   402  	}
   403  	timeTrack(start, fmt.Sprintf("Scale up to %v", config.expectedResult.nodes))
   404  	return func() error {
   405  		return e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, config.extraPods.Name)
   406  	}
   407  }
   408  
   409  func simpleScaleUpTest(ctx context.Context, f *framework.Framework, config *scaleUpTestConfig) func() error {
   410  	return simpleScaleUpTestWithTolerance(ctx, f, config, 0, 0)
   411  }
   412  
   413  func reserveMemoryRCConfig(f *framework.Framework, id string, replicas, megabytes int, timeout time.Duration) *testutils.RCConfig {
   414  	return &testutils.RCConfig{
   415  		Client:     f.ClientSet,
   416  		Name:       id,
   417  		Namespace:  f.Namespace.Name,
   418  		Timeout:    timeout,
   419  		Image:      imageutils.GetPauseImageName(),
   420  		Replicas:   replicas,
   421  		MemRequest: int64(1024 * 1024 * megabytes / replicas),
   422  	}
   423  }
   424  
   425  func createScaleUpTestConfig(nodes, pods int, extraPods *testutils.RCConfig, expectedResult *clusterPredicates) *scaleUpTestConfig {
   426  	return &scaleUpTestConfig{
   427  		initialNodes:   nodes,
   428  		initialPods:    pods,
   429  		extraPods:      extraPods,
   430  		expectedResult: expectedResult,
   431  	}
   432  }
   433  
   434  func createClusterPredicates(nodes int) *clusterPredicates {
   435  	return &clusterPredicates{
   436  		nodes: nodes,
   437  	}
   438  }
   439  
   440  func addAnnotation(ctx context.Context, f *framework.Framework, nodes []v1.Node, key, value string) error {
   441  	for _, node := range nodes {
   442  		oldData, err := json.Marshal(node)
   443  		if err != nil {
   444  			return err
   445  		}
   446  
   447  		if node.Annotations == nil {
   448  			node.Annotations = make(map[string]string)
   449  		}
   450  		node.Annotations[key] = value
   451  
   452  		newData, err := json.Marshal(node)
   453  		if err != nil {
   454  			return err
   455  		}
   456  
   457  		patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
   458  		if err != nil {
   459  			return err
   460  		}
   461  
   462  		_, err = f.ClientSet.CoreV1().Nodes().Patch(ctx, string(node.Name), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
   463  		if err != nil {
   464  			return err
   465  		}
   466  	}
   467  	return nil
   468  }
   469  
   470  func createHostPortPodsWithMemory(ctx context.Context, f *framework.Framework, id string, replicas, port, megabytes int, timeout time.Duration) func() error {
   471  	ginkgo.By(fmt.Sprintf("Running RC which reserves host port and memory"))
   472  	request := int64(1024 * 1024 * megabytes / replicas)
   473  	config := &testutils.RCConfig{
   474  		Client:     f.ClientSet,
   475  		Name:       id,
   476  		Namespace:  f.Namespace.Name,
   477  		Timeout:    timeout,
   478  		Image:      imageutils.GetPauseImageName(),
   479  		Replicas:   replicas,
   480  		HostPorts:  map[string]int{"port1": port},
   481  		MemRequest: request,
   482  	}
   483  	err := e2erc.RunRC(ctx, *config)
   484  	framework.ExpectNoError(err)
   485  	return func() error {
   486  		return e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, id)
   487  	}
   488  }
   489  
   490  type podBatch struct {
   491  	numNodes    int
   492  	podsPerNode int
   493  }
   494  
   495  // distributeLoad distributes the pods in the way described by podDostribution,
   496  // assuming all pods will have the same memory reservation and all nodes the same
   497  // memory capacity. This allows us generate the load on the cluster in the exact
   498  // way that we want.
   499  //
   500  // To achieve this we do the following:
   501  // 1. Create replication controllers that eat up all the space that should be
   502  // empty after setup, making sure they end up on different nodes by specifying
   503  // conflicting host port
   504  // 2. Create target RC that will generate the load on the cluster
   505  // 3. Remove the rcs created in 1.
   506  func distributeLoad(ctx context.Context, f *framework.Framework, namespace string, id string, podDistribution []podBatch,
   507  	podMemRequestMegabytes int, nodeMemCapacity int, labels map[string]string, timeout time.Duration) {
   508  	port := 8013
   509  	// Create load-distribution RCs with one pod per node, reserving all remaining
   510  	// memory to force the distribution of pods for the target RCs.
   511  	// The load-distribution RCs will be deleted on function return.
   512  	totalPods := 0
   513  	for i, podBatch := range podDistribution {
   514  		totalPods += podBatch.numNodes * podBatch.podsPerNode
   515  		remainingMem := nodeMemCapacity - podBatch.podsPerNode*podMemRequestMegabytes
   516  		replicas := podBatch.numNodes
   517  		cleanup := createHostPortPodsWithMemory(ctx, f, fmt.Sprintf("load-distribution%d", i), replicas, port, remainingMem*replicas, timeout)
   518  		defer cleanup()
   519  	}
   520  	framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, f.ClientSet))
   521  	// Create the target RC
   522  	rcConfig := reserveMemoryRCConfig(f, id, totalPods, totalPods*podMemRequestMegabytes, timeout)
   523  	framework.ExpectNoError(e2erc.RunRC(ctx, *rcConfig))
   524  	framework.ExpectNoError(waitForAllCaPodsReadyInNamespace(ctx, f, f.ClientSet))
   525  	ginkgo.DeferCleanup(e2erc.DeleteRCAndWaitForGC, f.ClientSet, f.Namespace.Name, id)
   526  }
   527  
   528  func timeTrack(start time.Time, name string) {
   529  	elapsed := time.Since(start)
   530  	klog.Infof("%s took %s", name, elapsed)
   531  }
   532  

View as plain text