...

Source file src/k8s.io/kubernetes/test/e2e/network/scale/ingress.go

Documentation: k8s.io/kubernetes/test/e2e/network/scale

     1  //go:build !providerless
     2  // +build !providerless
     3  
     4  /*
     5  Copyright 2018 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package scale
    21  
    22  import (
    23  	"context"
    24  	"fmt"
    25  	"os"
    26  	"sync"
    27  	"time"
    28  
    29  	appsv1 "k8s.io/api/apps/v1"
    30  	v1 "k8s.io/api/core/v1"
    31  	networkingv1 "k8s.io/api/networking/v1"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/util/intstr"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  
    36  	"k8s.io/kubernetes/test/e2e/framework"
    37  	e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
    38  	e2eingress "k8s.io/kubernetes/test/e2e/framework/ingress"
    39  	"k8s.io/kubernetes/test/e2e/framework/providers/gce"
    40  	imageutils "k8s.io/kubernetes/test/utils/image"
    41  )
    42  
    43  const (
    44  	numIngressesSmall      = 5
    45  	numIngressesMedium     = 20
    46  	numIngressesLarge      = 50
    47  	numIngressesExtraLarge = 99
    48  
    49  	scaleTestIngressNamePrefix = "ing-scale"
    50  	scaleTestBackendName       = "echoheaders-scale"
    51  	scaleTestSecretName        = "tls-secret-scale"
    52  	scaleTestHostname          = "scale.ingress.com"
    53  	scaleTestNumBackends       = 10
    54  	scaleTestPollInterval      = 15 * time.Second
    55  
    56  	// We don't expect waitForIngress to take longer
    57  	// than waitForIngressMaxTimeout.
    58  	waitForIngressMaxTimeout = 80 * time.Minute
    59  	ingressesCleanupTimeout  = 80 * time.Minute
    60  )
    61  
    62  var (
    63  	scaleTestLabels = map[string]string{
    64  		"app": scaleTestBackendName,
    65  	}
    66  )
    67  
    68  // IngressScaleFramework defines the framework for ingress scale testing.
    69  type IngressScaleFramework struct {
    70  	Clientset     clientset.Interface
    71  	Jig           *e2eingress.TestJig
    72  	GCEController *gce.IngressController
    73  	CloudConfig   framework.CloudConfig
    74  	Logger        e2eingress.TestLogger
    75  
    76  	Namespace        string
    77  	EnableTLS        bool
    78  	NumIngressesTest []int
    79  	OutputFile       string
    80  
    81  	ScaleTestDeploy *appsv1.Deployment
    82  	ScaleTestSvcs   []*v1.Service
    83  	ScaleTestIngs   []*networkingv1.Ingress
    84  
    85  	// BatchCreateLatencies stores all ingress creation latencies, in different
    86  	// batches.
    87  	BatchCreateLatencies [][]time.Duration
    88  	// BatchDurations stores the total duration for each ingress batch creation.
    89  	BatchDurations []time.Duration
    90  	// StepCreateLatencies stores the single ingress creation latency, which happens
    91  	// after each ingress batch creation is complete.
    92  	StepCreateLatencies []time.Duration
    93  	// StepCreateLatencies stores the single ingress update latency, which happens
    94  	// after each ingress batch creation is complete.
    95  	StepUpdateLatencies []time.Duration
    96  }
    97  
    98  // NewIngressScaleFramework returns a new framework for ingress scale testing.
    99  func NewIngressScaleFramework(cs clientset.Interface, ns string, cloudConfig framework.CloudConfig) *IngressScaleFramework {
   100  	return &IngressScaleFramework{
   101  		Namespace:   ns,
   102  		Clientset:   cs,
   103  		CloudConfig: cloudConfig,
   104  		Logger:      &e2eingress.E2ELogger{},
   105  		EnableTLS:   true,
   106  		NumIngressesTest: []int{
   107  			numIngressesSmall,
   108  			numIngressesMedium,
   109  			numIngressesLarge,
   110  			numIngressesExtraLarge,
   111  		},
   112  	}
   113  }
   114  
   115  // PrepareScaleTest prepares framework for ingress scale testing.
   116  func (f *IngressScaleFramework) PrepareScaleTest(ctx context.Context) error {
   117  	f.Logger.Infof("Initializing ingress test suite and gce controller...")
   118  	f.Jig = e2eingress.NewIngressTestJig(f.Clientset)
   119  	f.Jig.Logger = f.Logger
   120  	f.Jig.PollInterval = scaleTestPollInterval
   121  	f.GCEController = &gce.IngressController{
   122  		Client: f.Clientset,
   123  		Cloud:  f.CloudConfig,
   124  	}
   125  	if err := f.GCEController.Init(ctx); err != nil {
   126  		return fmt.Errorf("failed to initialize GCE controller: %w", err)
   127  	}
   128  
   129  	f.ScaleTestSvcs = []*v1.Service{}
   130  	f.ScaleTestIngs = []*networkingv1.Ingress{}
   131  
   132  	return nil
   133  }
   134  
   135  // CleanupScaleTest cleans up framework for ingress scale testing.
   136  func (f *IngressScaleFramework) CleanupScaleTest(ctx context.Context) []error {
   137  	var errs []error
   138  
   139  	f.Logger.Infof("Cleaning up ingresses...")
   140  	for _, ing := range f.ScaleTestIngs {
   141  		if ing != nil {
   142  			if err := f.Clientset.NetworkingV1().Ingresses(ing.Namespace).Delete(ctx, ing.Name, metav1.DeleteOptions{}); err != nil {
   143  				errs = append(errs, fmt.Errorf("error while deleting ingress %s/%s: %w", ing.Namespace, ing.Name, err))
   144  			}
   145  		}
   146  	}
   147  	f.Logger.Infof("Cleaning up services...")
   148  	for _, svc := range f.ScaleTestSvcs {
   149  		if svc != nil {
   150  			if err := f.Clientset.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}); err != nil {
   151  				errs = append(errs, fmt.Errorf("error while deleting service %s/%s: %w", svc.Namespace, svc.Name, err))
   152  			}
   153  		}
   154  	}
   155  	if f.ScaleTestDeploy != nil {
   156  		f.Logger.Infof("Cleaning up deployment %s...", f.ScaleTestDeploy.Name)
   157  		if err := f.Clientset.AppsV1().Deployments(f.ScaleTestDeploy.Namespace).Delete(ctx, f.ScaleTestDeploy.Name, metav1.DeleteOptions{}); err != nil {
   158  			errs = append(errs, fmt.Errorf("error while deleting deployment %s/%s: %w", f.ScaleTestDeploy.Namespace, f.ScaleTestDeploy.Name, err))
   159  		}
   160  	}
   161  
   162  	f.Logger.Infof("Cleaning up cloud resources...")
   163  	if err := f.GCEController.CleanupIngressControllerWithTimeout(ctx, ingressesCleanupTimeout); err != nil {
   164  		errs = append(errs, err)
   165  	}
   166  
   167  	return errs
   168  }
   169  
   170  // RunScaleTest runs ingress scale testing.
   171  func (f *IngressScaleFramework) RunScaleTest(ctx context.Context) []error {
   172  	var errs []error
   173  
   174  	testDeploy := generateScaleTestBackendDeploymentSpec(scaleTestNumBackends)
   175  	f.Logger.Infof("Creating deployment %s...", testDeploy.Name)
   176  	testDeploy, err := f.Jig.Client.AppsV1().Deployments(f.Namespace).Create(ctx, testDeploy, metav1.CreateOptions{})
   177  	if err != nil {
   178  		errs = append(errs, fmt.Errorf("failed to create deployment %s: %w", testDeploy.Name, err))
   179  		return errs
   180  	}
   181  	f.ScaleTestDeploy = testDeploy
   182  
   183  	if f.EnableTLS {
   184  		f.Logger.Infof("Ensuring TLS secret %s...", scaleTestSecretName)
   185  		if err := f.Jig.PrepareTLSSecret(ctx, f.Namespace, scaleTestSecretName, scaleTestHostname); err != nil {
   186  			errs = append(errs, fmt.Errorf("failed to prepare TLS secret %s: %w", scaleTestSecretName, err))
   187  			return errs
   188  		}
   189  	}
   190  
   191  	// numIngsCreated keeps track of how many ingresses have been created.
   192  	numIngsCreated := 0
   193  
   194  	prepareIngsFunc := func(ctx context.Context, numIngsNeeded int) {
   195  		var ingWg sync.WaitGroup
   196  		numIngsToCreate := numIngsNeeded - numIngsCreated
   197  		ingWg.Add(numIngsToCreate)
   198  		svcQueue := make(chan *v1.Service, numIngsToCreate)
   199  		ingQueue := make(chan *networkingv1.Ingress, numIngsToCreate)
   200  		errQueue := make(chan error, numIngsToCreate)
   201  		latencyQueue := make(chan time.Duration, numIngsToCreate)
   202  		start := time.Now()
   203  		for ; numIngsCreated < numIngsNeeded; numIngsCreated++ {
   204  			suffix := fmt.Sprintf("%d", numIngsCreated)
   205  			go func() {
   206  				defer ingWg.Done()
   207  
   208  				start := time.Now()
   209  				svcCreated, ingCreated, err := f.createScaleTestServiceIngress(ctx, suffix, f.EnableTLS)
   210  				svcQueue <- svcCreated
   211  				ingQueue <- ingCreated
   212  				if err != nil {
   213  					errQueue <- err
   214  					return
   215  				}
   216  				f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
   217  				if err := f.Jig.WaitForGivenIngressWithTimeout(ctx, ingCreated, false, waitForIngressMaxTimeout); err != nil {
   218  					errQueue <- err
   219  					return
   220  				}
   221  				elapsed := time.Since(start)
   222  				f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
   223  				latencyQueue <- elapsed
   224  			}()
   225  		}
   226  
   227  		// Wait until all ingress creations are complete.
   228  		f.Logger.Infof("Waiting for %d ingresses to come up...", numIngsToCreate)
   229  		ingWg.Wait()
   230  		close(svcQueue)
   231  		close(ingQueue)
   232  		close(errQueue)
   233  		close(latencyQueue)
   234  		elapsed := time.Since(start)
   235  		for svc := range svcQueue {
   236  			f.ScaleTestSvcs = append(f.ScaleTestSvcs, svc)
   237  		}
   238  		for ing := range ingQueue {
   239  			f.ScaleTestIngs = append(f.ScaleTestIngs, ing)
   240  		}
   241  		var createLatencies []time.Duration
   242  		for latency := range latencyQueue {
   243  			createLatencies = append(createLatencies, latency)
   244  		}
   245  		f.BatchCreateLatencies = append(f.BatchCreateLatencies, createLatencies)
   246  		if len(errQueue) != 0 {
   247  			f.Logger.Errorf("Failed while creating services and ingresses, spent %v", elapsed)
   248  			for err := range errQueue {
   249  				errs = append(errs, err)
   250  			}
   251  			return
   252  		}
   253  		f.Logger.Infof("Spent %s for %d ingresses to come up", elapsed, numIngsToCreate)
   254  		f.BatchDurations = append(f.BatchDurations, elapsed)
   255  	}
   256  
   257  	measureCreateUpdateFunc := func(ctx context.Context) {
   258  		f.Logger.Infof("Create one more ingress and wait for it to come up")
   259  		start := time.Now()
   260  		svcCreated, ingCreated, err := f.createScaleTestServiceIngress(ctx, fmt.Sprintf("%d", numIngsCreated), f.EnableTLS)
   261  		numIngsCreated = numIngsCreated + 1
   262  		f.ScaleTestSvcs = append(f.ScaleTestSvcs, svcCreated)
   263  		f.ScaleTestIngs = append(f.ScaleTestIngs, ingCreated)
   264  		if err != nil {
   265  			errs = append(errs, err)
   266  			return
   267  		}
   268  
   269  		f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
   270  		if err := f.Jig.WaitForGivenIngressWithTimeout(ctx, ingCreated, false, waitForIngressMaxTimeout); err != nil {
   271  			errs = append(errs, err)
   272  			return
   273  		}
   274  		elapsed := time.Since(start)
   275  		f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
   276  		f.StepCreateLatencies = append(f.StepCreateLatencies, elapsed)
   277  
   278  		f.Logger.Infof("Updating ingress and wait for change to take effect")
   279  		ingToUpdate, err := f.Clientset.NetworkingV1().Ingresses(f.Namespace).Get(ctx, ingCreated.Name, metav1.GetOptions{})
   280  		if err != nil {
   281  			errs = append(errs, err)
   282  			return
   283  		}
   284  		addTestPathToIngress(ingToUpdate)
   285  		start = time.Now()
   286  		ingToUpdate, err = f.Clientset.NetworkingV1().Ingresses(f.Namespace).Update(ctx, ingToUpdate, metav1.UpdateOptions{})
   287  		if err != nil {
   288  			errs = append(errs, err)
   289  			return
   290  		}
   291  
   292  		if err := f.Jig.WaitForGivenIngressWithTimeout(ctx, ingToUpdate, false, waitForIngressMaxTimeout); err != nil {
   293  			errs = append(errs, err)
   294  			return
   295  		}
   296  		elapsed = time.Since(start)
   297  		f.Logger.Infof("Spent %s for updating ingress %s", elapsed, ingToUpdate.Name)
   298  		f.StepUpdateLatencies = append(f.StepUpdateLatencies, elapsed)
   299  	}
   300  
   301  	defer f.dumpLatencies()
   302  
   303  	for _, num := range f.NumIngressesTest {
   304  		f.Logger.Infof("Create more ingresses until we reach %d ingresses", num)
   305  		prepareIngsFunc(ctx, num)
   306  		f.Logger.Infof("Measure create and update latency with %d ingresses", num)
   307  		measureCreateUpdateFunc(ctx)
   308  
   309  		if len(errs) != 0 {
   310  			return errs
   311  		}
   312  	}
   313  
   314  	return errs
   315  }
   316  
   317  func (f *IngressScaleFramework) dumpLatencies() {
   318  	f.Logger.Infof("Dumping scale test latencies...")
   319  	formattedData := f.GetFormattedLatencies()
   320  	if f.OutputFile != "" {
   321  		f.Logger.Infof("Dumping scale test latencies to file %s...", f.OutputFile)
   322  		os.WriteFile(f.OutputFile, []byte(formattedData), 0644)
   323  		return
   324  	}
   325  	f.Logger.Infof("\n%v", formattedData)
   326  }
   327  
   328  // GetFormattedLatencies returns the formatted latencies output.
   329  // TODO: Need a better way/format for data output.
   330  func (f *IngressScaleFramework) GetFormattedLatencies() string {
   331  	if len(f.NumIngressesTest) == 0 ||
   332  		len(f.NumIngressesTest) != len(f.BatchCreateLatencies) ||
   333  		len(f.NumIngressesTest) != len(f.BatchDurations) ||
   334  		len(f.NumIngressesTest) != len(f.StepCreateLatencies) ||
   335  		len(f.NumIngressesTest) != len(f.StepUpdateLatencies) {
   336  		return "Failed to construct latencies output."
   337  	}
   338  
   339  	res := "--- Procedure logs ---\n"
   340  	for i, latencies := range f.BatchCreateLatencies {
   341  		res += fmt.Sprintf("Create %d ingresses parallelly, each of them takes below amount of time before starts serving traffic:\n", len(latencies))
   342  		for _, latency := range latencies {
   343  			res = res + fmt.Sprintf("- %v\n", latency)
   344  		}
   345  		res += fmt.Sprintf("Total duration for completing %d ingress creations: %v\n", len(latencies), f.BatchDurations[i])
   346  		res += fmt.Sprintf("Duration to create one more ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
   347  		res += fmt.Sprintf("Duration to update one ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
   348  	}
   349  	res = res + "--- Summary ---\n"
   350  	var batchTotalStr, batchAvgStr, singleCreateStr, singleUpdateStr string
   351  	for i, latencies := range f.BatchCreateLatencies {
   352  		batchTotalStr += fmt.Sprintf("Batch creation total latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), f.BatchDurations[i])
   353  		var avgLatency time.Duration
   354  		for _, latency := range latencies {
   355  			avgLatency = avgLatency + latency
   356  		}
   357  		avgLatency /= time.Duration(len(latencies))
   358  		batchAvgStr += fmt.Sprintf("Batch creation average latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), avgLatency)
   359  		singleCreateStr += fmt.Sprintf("Single ingress creation latency with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
   360  		singleUpdateStr += fmt.Sprintf("Single ingress update latency with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
   361  	}
   362  	res += batchTotalStr + batchAvgStr + singleCreateStr + singleUpdateStr
   363  	return res
   364  }
   365  
   366  func addTestPathToIngress(ing *networkingv1.Ingress) {
   367  	prefixPathType := networkingv1.PathTypeImplementationSpecific
   368  	ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths = append(
   369  		ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths,
   370  		networkingv1.HTTPIngressPath{
   371  			Path:     "/test",
   372  			PathType: &prefixPathType,
   373  			Backend:  ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths[0].Backend,
   374  		})
   375  }
   376  
   377  func (f *IngressScaleFramework) createScaleTestServiceIngress(ctx context.Context, suffix string, enableTLS bool) (*v1.Service, *networkingv1.Ingress, error) {
   378  	svcCreated, err := f.Clientset.CoreV1().Services(f.Namespace).Create(ctx, generateScaleTestServiceSpec(suffix), metav1.CreateOptions{})
   379  	if err != nil {
   380  		return nil, nil, err
   381  	}
   382  	ingCreated, err := f.Clientset.NetworkingV1().Ingresses(f.Namespace).Create(ctx, generateScaleTestIngressSpec(suffix, enableTLS), metav1.CreateOptions{})
   383  	if err != nil {
   384  		return nil, nil, err
   385  	}
   386  	return svcCreated, ingCreated, nil
   387  }
   388  
   389  func generateScaleTestIngressSpec(suffix string, enableTLS bool) *networkingv1.Ingress {
   390  	prefixPathType := networkingv1.PathTypeImplementationSpecific
   391  	ing := &networkingv1.Ingress{
   392  		ObjectMeta: metav1.ObjectMeta{
   393  			Name: fmt.Sprintf("%s-%s", scaleTestIngressNamePrefix, suffix),
   394  		},
   395  		Spec: networkingv1.IngressSpec{
   396  			TLS: []networkingv1.IngressTLS{
   397  				{SecretName: scaleTestSecretName},
   398  			},
   399  			Rules: []networkingv1.IngressRule{
   400  				{
   401  					Host: scaleTestHostname,
   402  					IngressRuleValue: networkingv1.IngressRuleValue{
   403  						HTTP: &networkingv1.HTTPIngressRuleValue{
   404  							Paths: []networkingv1.HTTPIngressPath{
   405  								{
   406  									Path:     "/scale",
   407  									PathType: &prefixPathType,
   408  									Backend: networkingv1.IngressBackend{
   409  										Service: &networkingv1.IngressServiceBackend{
   410  											Name: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
   411  											Port: networkingv1.ServiceBackendPort{
   412  												Number: 80,
   413  											},
   414  										},
   415  									},
   416  								},
   417  							},
   418  						},
   419  					},
   420  				},
   421  			},
   422  		},
   423  	}
   424  	if enableTLS {
   425  		ing.Spec.TLS = []networkingv1.IngressTLS{
   426  			{SecretName: scaleTestSecretName},
   427  		}
   428  	}
   429  	return ing
   430  }
   431  
   432  func generateScaleTestServiceSpec(suffix string) *v1.Service {
   433  	return &v1.Service{
   434  		ObjectMeta: metav1.ObjectMeta{
   435  			Name:   fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
   436  			Labels: scaleTestLabels,
   437  		},
   438  		Spec: v1.ServiceSpec{
   439  			Ports: []v1.ServicePort{{
   440  				Name:       "http",
   441  				Protocol:   v1.ProtocolTCP,
   442  				Port:       80,
   443  				TargetPort: intstr.FromInt32(8080),
   444  			}},
   445  			Selector: scaleTestLabels,
   446  			Type:     v1.ServiceTypeNodePort,
   447  		},
   448  	}
   449  }
   450  
   451  func generateScaleTestBackendDeploymentSpec(numReplicas int32) *appsv1.Deployment {
   452  	d := e2edeployment.NewDeployment(
   453  		scaleTestBackendName, numReplicas, scaleTestLabels, scaleTestBackendName,
   454  		imageutils.GetE2EImage(imageutils.Agnhost), appsv1.RollingUpdateDeploymentStrategyType)
   455  	d.Spec.Template.Spec.Containers[0].Command = []string{
   456  		"/agnhost",
   457  		"netexec",
   458  		"--http-port=8080",
   459  	}
   460  	d.Spec.Template.Spec.Containers[0].Ports = []v1.ContainerPort{{ContainerPort: 8080}}
   461  	d.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{
   462  		ProbeHandler: v1.ProbeHandler{
   463  			HTTPGet: &v1.HTTPGetAction{
   464  				Port: intstr.FromInt32(8080),
   465  				Path: "/healthz",
   466  			},
   467  		},
   468  		FailureThreshold: 10,
   469  		PeriodSeconds:    1,
   470  		SuccessThreshold: 1,
   471  		TimeoutSeconds:   1,
   472  	}
   473  	return d
   474  }
   475  

View as plain text