
Source file src/k8s.io/kubernetes/test/e2e_node/density_test.go

Documentation: k8s.io/kubernetes/test/e2e_node

     1  //go:build linux
     2  // +build linux
     4  /*
     5  Copyright 2015 The Kubernetes Authors.
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    11      http://www.apache.org/licenses/LICENSE-2.0
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    20  package e2enode
    22  import (
    23  	"context"
    24  	"fmt"
    25  	"math"
    26  	"sort"
    27  	"strconv"
    28  	"sync"
    29  	"time"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    33  	"github.com/onsi/ginkgo/v2"
    34  	"github.com/onsi/gomega"
    35  	v1 "k8s.io/api/core/v1"
    36  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    37  	"k8s.io/apimachinery/pkg/labels"
    38  	"k8s.io/apimachinery/pkg/runtime"
    39  	"k8s.io/apimachinery/pkg/watch"
    40  	"k8s.io/client-go/tools/cache"
    41  	kubeletstatsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
    42  	kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
    43  	kubemetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
    44  	"k8s.io/kubernetes/test/e2e/framework"
    45  	e2ekubelet "k8s.io/kubernetes/test/e2e/framework/kubelet"
    46  	e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
    47  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    48  	imageutils "k8s.io/kubernetes/test/utils/image"
    49  	admissionapi "k8s.io/pod-security-admission/api"
    50  )
    52  const (
    53  	kubeletAddr = "localhost:10255"
    54  )
    56  var _ = SIGDescribe("Density", framework.WithSerial(), framework.WithSlow(), func() {
    57  	const (
    58  		// The data collection time of resource collector and the standalone cadvisor
    59  		// is not synchronized, so resource collector may miss data or
    60  		// collect duplicated data
    61  		containerStatsPollingPeriod = 500 * time.Millisecond
    62  	)
    64  	var (
    65  		rc *ResourceCollector
    66  	)
    68  	f := framework.NewDefaultFramework("density-test")
    69  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    71  	ginkgo.BeforeEach(func(ctx context.Context) {
    72  		// Start a standalone cadvisor pod using 'createSync', the pod is running when it returns
    73  		e2epod.NewPodClient(f).CreateSync(ctx, getCadvisorPod())
    74  		// Resource collector monitors fine-grain CPU/memory usage by a standalone Cadvisor with
    75  		// 1s housingkeeping interval
    76  		rc = NewResourceCollector(containerStatsPollingPeriod)
    77  	})
    79  	f.Context("create a batch of pods", framework.WithFlaky(), func() {
    80  		// TODO(coufon): the values are generous, set more precise limits with benchmark data
    81  		// and add more tests
    82  		dTests := []densityTest{
    83  			{
    84  				podsNr:   10,
    85  				interval: 0 * time.Millisecond,
    86  				cpuLimits: e2ekubelet.ContainersCPUSummary{
    87  					kubeletstatsv1alpha1.SystemContainerKubelet: {0.50: 0.30, 0.95: 0.50},
    88  					kubeletstatsv1alpha1.SystemContainerRuntime: {0.50: 0.40, 0.95: 0.60},
    89  				},
    90  				memLimits: e2ekubelet.ResourceUsagePerContainer{
    91  					kubeletstatsv1alpha1.SystemContainerKubelet: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
    92  					kubeletstatsv1alpha1.SystemContainerRuntime: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 500 * 1024 * 1024},
    93  				},
    94  				// percentile limit of single pod startup latency
    95  				podStartupLimits: e2emetrics.LatencyMetric{
    96  					Perc50: 16 * time.Second,
    97  					Perc90: 18 * time.Second,
    98  					Perc99: 20 * time.Second,
    99  				},
   100  				// upbound of startup latency of a batch of pods
   101  				podBatchStartupLimit: 25 * time.Second,
   102  			},
   103  		}
   105  		for _, testArg := range dTests {
   106  			itArg := testArg
   107  			desc := fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval", itArg.podsNr, itArg.interval)
   108  			ginkgo.It(desc, func(ctx context.Context) {
   109  				itArg.createMethod = "batch"
   110  				testInfo := getTestNodeInfo(f, itArg.getTestName(), desc)
   112  				batchLag, e2eLags := runDensityBatchTest(ctx, f, rc, itArg, testInfo, false)
   114  				ginkgo.By("Verifying latency")
   115  				logAndVerifyLatency(ctx, batchLag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testInfo, true)
   117  				ginkgo.By("Verifying resource")
   118  				logAndVerifyResource(ctx, f, rc, itArg.cpuLimits, itArg.memLimits, testInfo, true)
   119  			})
   120  		}
   121  	})
   123  	ginkgo.Context("create a batch of pods", func() {
   124  		dTests := []densityTest{
   125  			{
   126  				podsNr:   10,
   127  				interval: 0 * time.Millisecond,
   128  			},
   129  			{
   130  				podsNr:   35,
   131  				interval: 0 * time.Millisecond,
   132  			},
   133  			{
   134  				podsNr:   90,
   135  				interval: 0 * time.Millisecond,
   136  			},
   137  			{
   138  				podsNr:   10,
   139  				interval: 100 * time.Millisecond,
   140  			},
   141  			{
   142  				podsNr:   35,
   143  				interval: 100 * time.Millisecond,
   144  			},
   145  			{
   146  				podsNr:   90,
   147  				interval: 100 * time.Millisecond,
   148  			},
   149  			{
   150  				podsNr:   10,
   151  				interval: 300 * time.Millisecond,
   152  			},
   153  			{
   154  				podsNr:   35,
   155  				interval: 300 * time.Millisecond,
   156  			},
   157  			{
   158  				podsNr:   90,
   159  				interval: 300 * time.Millisecond,
   160  			},
   161  		}
   163  		for _, testArg := range dTests {
   164  			itArg := testArg
   165  			desc := fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval [Benchmark][NodeSpecialFeature:Benchmark]", itArg.podsNr, itArg.interval)
   166  			ginkgo.It(desc, func(ctx context.Context) {
   167  				itArg.createMethod = "batch"
   168  				testInfo := getTestNodeInfo(f, itArg.getTestName(), desc)
   170  				batchLag, e2eLags := runDensityBatchTest(ctx, f, rc, itArg, testInfo, true)
   172  				ginkgo.By("Verifying latency")
   173  				logAndVerifyLatency(ctx, batchLag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testInfo, false)
   175  				ginkgo.By("Verifying resource")
   176  				logAndVerifyResource(ctx, f, rc, itArg.cpuLimits, itArg.memLimits, testInfo, false)
   177  			})
   178  		}
   179  	})
   181  	ginkgo.Context("create a batch of pods with higher API QPS", func() {
   182  		dTests := []densityTest{
   183  			{
   184  				podsNr:      90,
   185  				interval:    0 * time.Millisecond,
   186  				APIQPSLimit: 60,
   187  			},
   188  			{
   189  				podsNr:      90,
   190  				interval:    100 * time.Millisecond,
   191  				APIQPSLimit: 60,
   192  			},
   193  			{
   194  				podsNr:      90,
   195  				interval:    300 * time.Millisecond,
   196  				APIQPSLimit: 60,
   197  			},
   198  		}
   200  		for _, testArg := range dTests {
   201  			itArg := testArg
   202  			ginkgo.Context("", func() {
   203  				desc := fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval (QPS %d) [Benchmark][NodeSpecialFeature:Benchmark]", itArg.podsNr, itArg.interval, itArg.APIQPSLimit)
   204  				// The latency caused by API QPS limit takes a large portion (up to ~33%) of e2e latency.
   205  				// It makes the pod startup latency of Kubelet (creation throughput as well) under-estimated.
   206  				// Here we set API QPS limit from default 5 to 60 in order to test real Kubelet performance.
   207  				// Note that it will cause higher resource usage.
   208  				tempSetCurrentKubeletConfig(f, func(ctx context.Context, cfg *kubeletconfig.KubeletConfiguration) {
   209  					framework.Logf("Old QPS limit is: %d", cfg.KubeAPIQPS)
   210  					// Set new API QPS limit
   211  					cfg.KubeAPIQPS = int32(itArg.APIQPSLimit)
   212  				})
   213  				ginkgo.It(desc, func(ctx context.Context) {
   214  					itArg.createMethod = "batch"
   215  					testInfo := getTestNodeInfo(f, itArg.getTestName(), desc)
   216  					batchLag, e2eLags := runDensityBatchTest(ctx, f, rc, itArg, testInfo, true)
   218  					ginkgo.By("Verifying latency")
   219  					logAndVerifyLatency(ctx, batchLag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testInfo, false)
   221  					ginkgo.By("Verifying resource")
   222  					logAndVerifyResource(ctx, f, rc, itArg.cpuLimits, itArg.memLimits, testInfo, false)
   223  				})
   224  			})
   225  		}
   226  	})
   228  	f.Context("create a sequence of pods", framework.WithFlaky(), func() {
   229  		dTests := []densityTest{
   230  			{
   231  				podsNr:   10,
   232  				bgPodsNr: 50,
   233  				cpuLimits: e2ekubelet.ContainersCPUSummary{
   234  					kubeletstatsv1alpha1.SystemContainerKubelet: {0.50: 0.30, 0.95: 0.50},
   235  					kubeletstatsv1alpha1.SystemContainerRuntime: {0.50: 0.40, 0.95: 0.60},
   236  				},
   237  				memLimits: e2ekubelet.ResourceUsagePerContainer{
   238  					kubeletstatsv1alpha1.SystemContainerKubelet: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 100 * 1024 * 1024},
   239  					kubeletstatsv1alpha1.SystemContainerRuntime: &e2ekubelet.ContainerResourceUsage{MemoryRSSInBytes: 500 * 1024 * 1024},
   240  				},
   241  				podStartupLimits: e2emetrics.LatencyMetric{
   242  					Perc50: 5000 * time.Millisecond,
   243  					Perc90: 9000 * time.Millisecond,
   244  					Perc99: 10000 * time.Millisecond,
   245  				},
   246  			},
   247  		}
   249  		for _, testArg := range dTests {
   250  			itArg := testArg
   251  			desc := fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods", itArg.podsNr, itArg.bgPodsNr)
   252  			ginkgo.It(desc, func(ctx context.Context) {
   253  				itArg.createMethod = "sequence"
   254  				testInfo := getTestNodeInfo(f, itArg.getTestName(), desc)
   255  				batchlag, e2eLags := runDensitySeqTest(ctx, f, rc, itArg, testInfo)
   257  				ginkgo.By("Verifying latency")
   258  				logAndVerifyLatency(ctx, batchlag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testInfo, true)
   260  				ginkgo.By("Verifying resource")
   261  				logAndVerifyResource(ctx, f, rc, itArg.cpuLimits, itArg.memLimits, testInfo, true)
   262  			})
   263  		}
   264  	})
   266  	ginkgo.Context("create a sequence of pods", func() {
   267  		dTests := []densityTest{
   268  			{
   269  				podsNr:   10,
   270  				bgPodsNr: 50,
   271  			},
   272  			{
   273  				podsNr:   30,
   274  				bgPodsNr: 50,
   275  			},
   276  			{
   277  				podsNr:   50,
   278  				bgPodsNr: 50,
   279  			},
   280  		}
   282  		for _, testArg := range dTests {
   283  			itArg := testArg
   284  			desc := fmt.Sprintf("latency/resource should be within limit when create %d pods with %d background pods [Benchmark][NodeSpeicalFeature:Benchmark]", itArg.podsNr, itArg.bgPodsNr)
   285  			ginkgo.It(desc, func(ctx context.Context) {
   286  				itArg.createMethod = "sequence"
   287  				testInfo := getTestNodeInfo(f, itArg.getTestName(), desc)
   288  				batchlag, e2eLags := runDensitySeqTest(ctx, f, rc, itArg, testInfo)
   290  				ginkgo.By("Verifying latency")
   291  				logAndVerifyLatency(ctx, batchlag, e2eLags, itArg.podStartupLimits, itArg.podBatchStartupLimit, testInfo, false)
   293  				ginkgo.By("Verifying resource")
   294  				logAndVerifyResource(ctx, f, rc, itArg.cpuLimits, itArg.memLimits, testInfo, false)
   295  			})
   296  		}
   297  	})
   298  })
   300  type densityTest struct {
   301  	// number of pods
   302  	podsNr int
   303  	// number of background pods
   304  	bgPodsNr int
   305  	// interval between creating pod (rate control)
   306  	interval time.Duration
   307  	// create pods in 'batch' or 'sequence'
   308  	createMethod string
   309  	// API QPS limit
   310  	APIQPSLimit int
   311  	// performance limits
   312  	cpuLimits            e2ekubelet.ContainersCPUSummary
   313  	memLimits            e2ekubelet.ResourceUsagePerContainer
   314  	podStartupLimits     e2emetrics.LatencyMetric
   315  	podBatchStartupLimit time.Duration
   316  }
   318  func (dt *densityTest) getTestName() string {
   319  	// The current default API QPS limit is 5
   320  	// TODO(coufon): is there any way to not hard code this?
   321  	APIQPSLimit := 5
   322  	if dt.APIQPSLimit > 0 {
   323  		APIQPSLimit = dt.APIQPSLimit
   324  	}
   325  	return fmt.Sprintf("density_create_%s_%d_%d_%d_%d", dt.createMethod, dt.podsNr, dt.bgPodsNr,
   326  		dt.interval.Nanoseconds()/1000000, APIQPSLimit)
   327  }
   329  // runDensityBatchTest runs the density batch pod creation test
   330  func runDensityBatchTest(ctx context.Context, f *framework.Framework, rc *ResourceCollector, testArg densityTest, testInfo map[string]string,
   331  	isLogTimeSeries bool) (time.Duration, []e2emetrics.PodLatencyData) {
   332  	const (
   333  		podType               = "density_test_pod"
   334  		sleepBeforeCreatePods = 30 * time.Second
   335  	)
   336  	var (
   337  		mutex      = &sync.Mutex{}
   338  		watchTimes = make(map[string]metav1.Time, 0)
   339  		stopCh     = make(chan struct{})
   340  	)
   342  	// create test pod data structure
   343  	pods := newTestPods(testArg.podsNr, true, imageutils.GetPauseImageName(), podType)
   345  	// the controller watches the change of pod status
   346  	controller := newInformerWatchPod(ctx, f, mutex, watchTimes, podType)
   347  	go controller.Run(stopCh)
   348  	defer close(stopCh)
   350  	// TODO(coufon): in the test we found kubelet starts while it is busy on something, as a result 'syncLoop'
   351  	// does not response to pod creation immediately. Creating the first pod has a delay around 5s.
   352  	// The node status has already been 'ready' so `wait and check node being ready does not help here.
   353  	// Now wait here for a grace period to let 'syncLoop' be ready
   354  	time.Sleep(sleepBeforeCreatePods)
   356  	rc.Start()
   358  	ginkgo.By("Creating a batch of pods")
   359  	// It returns a map['pod name']'creation time' containing the creation timestamps
   360  	createTimes := createBatchPodWithRateControl(ctx, f, pods, testArg.interval)
   362  	ginkgo.By("Waiting for all Pods to be observed by the watch...")
   364  	gomega.Eventually(ctx, func() bool {
   365  		return len(watchTimes) == testArg.podsNr
   366  	}, 10*time.Minute, 10*time.Second).Should(gomega.BeTrue())
   368  	if len(watchTimes) < testArg.podsNr {
   369  		framework.Failf("Timeout reached waiting for all Pods to be observed by the watch.")
   370  	}
   372  	// Analyze results
   373  	var (
   374  		firstCreate metav1.Time
   375  		lastRunning metav1.Time
   376  		init        = true
   377  		e2eLags     = make([]e2emetrics.PodLatencyData, 0)
   378  	)
   380  	for name, create := range createTimes {
   381  		watch := watchTimes[name]
   382  		gomega.Expect(watchTimes).To(gomega.HaveKey(name))
   384  		e2eLags = append(e2eLags,
   385  			e2emetrics.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
   387  		if !init {
   388  			if firstCreate.Time.After(create.Time) {
   389  				firstCreate = create
   390  			}
   391  			if lastRunning.Time.Before(watch.Time) {
   392  				lastRunning = watch
   393  			}
   394  		} else {
   395  			init = false
   396  			firstCreate, lastRunning = create, watch
   397  		}
   398  	}
   400  	sort.Sort(e2emetrics.LatencySlice(e2eLags))
   401  	batchLag := lastRunning.Time.Sub(firstCreate.Time)
   403  	rc.Stop()
   404  	deletePodsSync(ctx, f, pods)
   406  	// Log time series data.
   407  	if isLogTimeSeries {
   408  		logDensityTimeSeries(rc, createTimes, watchTimes, testInfo)
   409  	}
   410  	// Log throughput data.
   411  	logPodCreateThroughput(batchLag, e2eLags, testArg.podsNr, testInfo)
   413  	deletePodsSync(ctx, f, []*v1.Pod{getCadvisorPod()})
   415  	return batchLag, e2eLags
   416  }
   418  // runDensitySeqTest runs the density sequential pod creation test
   419  func runDensitySeqTest(ctx context.Context, f *framework.Framework, rc *ResourceCollector, testArg densityTest, testInfo map[string]string) (time.Duration, []e2emetrics.PodLatencyData) {
   420  	const (
   421  		podType               = "density_test_pod"
   422  		sleepBeforeCreatePods = 30 * time.Second
   423  	)
   424  	bgPods := newTestPods(testArg.bgPodsNr, true, imageutils.GetPauseImageName(), "background_pod")
   425  	testPods := newTestPods(testArg.podsNr, true, imageutils.GetPauseImageName(), podType)
   427  	ginkgo.By("Creating a batch of background pods")
   429  	// CreatBatch is synchronized, all pods are running when it returns
   430  	e2epod.NewPodClient(f).CreateBatch(ctx, bgPods)
   432  	time.Sleep(sleepBeforeCreatePods)
   434  	rc.Start()
   436  	// Create pods sequentially (back-to-back). e2eLags have been sorted.
   437  	batchlag, e2eLags := createBatchPodSequential(ctx, f, testPods, podType)
   439  	rc.Stop()
   440  	deletePodsSync(ctx, f, append(bgPods, testPods...))
   442  	// Log throughput data.
   443  	logPodCreateThroughput(batchlag, e2eLags, testArg.podsNr, testInfo)
   445  	deletePodsSync(ctx, f, []*v1.Pod{getCadvisorPod()})
   447  	return batchlag, e2eLags
   448  }
   450  // createBatchPodWithRateControl creates a batch of pods concurrently, uses one goroutine for each creation.
   451  // between creations there is an interval for throughput control
   452  func createBatchPodWithRateControl(ctx context.Context, f *framework.Framework, pods []*v1.Pod, interval time.Duration) map[string]metav1.Time {
   453  	createTimes := make(map[string]metav1.Time)
   454  	for i := range pods {
   455  		pod := pods[i]
   456  		createTimes[pod.ObjectMeta.Name] = metav1.Now()
   457  		go e2epod.NewPodClient(f).Create(ctx, pod)
   458  		time.Sleep(interval)
   459  	}
   460  	return createTimes
   461  }
   463  // getPodStartLatency gets prometheus metric 'pod start latency' from kubelet
   464  func getPodStartLatency(ctx context.Context, node string) (e2emetrics.KubeletLatencyMetrics, error) {
   465  	latencyMetrics := e2emetrics.KubeletLatencyMetrics{}
   466  	ms, err := e2emetrics.GrabKubeletMetricsWithoutProxy(ctx, node, "/metrics")
   467  	framework.ExpectNoError(err, "Failed to get kubelet metrics without proxy in node %s", node)
   469  	for _, samples := range ms {
   470  		for _, sample := range samples {
   471  			if sample.Metric["__name__"] == kubemetrics.KubeletSubsystem+"_"+kubemetrics.PodStartDurationKey {
   472  				quantile, _ := strconv.ParseFloat(string(sample.Metric["quantile"]), 64)
   473  				latencyMetrics = append(latencyMetrics,
   474  					e2emetrics.KubeletLatencyMetric{
   475  						Quantile: quantile,
   476  						Method:   kubemetrics.PodStartDurationKey,
   477  						Latency:  time.Duration(int(sample.Value)) * time.Microsecond})
   478  			}
   479  		}
   480  	}
   481  	return latencyMetrics, nil
   482  }
   484  // newInformerWatchPod creates an informer to check whether all pods are running.
   485  func newInformerWatchPod(ctx context.Context, f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]metav1.Time, podType string) cache.Controller {
   486  	ns := f.Namespace.Name
   487  	checkPodRunning := func(p *v1.Pod) {
   488  		mutex.Lock()
   489  		defer mutex.Unlock()
   490  		defer ginkgo.GinkgoRecover()
   492  		if p.Status.Phase == v1.PodRunning {
   493  			if _, found := watchTimes[p.Name]; !found {
   494  				watchTimes[p.Name] = metav1.Now()
   495  			}
   496  		}
   497  	}
   499  	_, controller := cache.NewInformer(
   500  		&cache.ListWatch{
   501  			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   502  				options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}).String()
   503  				obj, err := f.ClientSet.CoreV1().Pods(ns).List(ctx, options)
   504  				return runtime.Object(obj), err
   505  			},
   506  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   507  				options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}).String()
   508  				return f.ClientSet.CoreV1().Pods(ns).Watch(ctx, options)
   509  			},
   510  		},
   511  		&v1.Pod{},
   512  		0,
   513  		cache.ResourceEventHandlerFuncs{
   514  			AddFunc: func(obj interface{}) {
   515  				p, ok := obj.(*v1.Pod)
   516  				if !ok {
   517  					framework.Failf("Failed to cast object %T to Pod", obj)
   518  				}
   519  				go checkPodRunning(p)
   520  			},
   521  			UpdateFunc: func(oldObj, newObj interface{}) {
   522  				p, ok := newObj.(*v1.Pod)
   523  				if !ok {
   524  					framework.Failf("Failed to cast object %T to Pod", newObj)
   525  				}
   526  				go checkPodRunning(p)
   527  			},
   528  		},
   529  	)
   530  	return controller
   531  }
   533  // createBatchPodSequential creates pods back-to-back in sequence.
   534  func createBatchPodSequential(ctx context.Context, f *framework.Framework, pods []*v1.Pod, podType string) (time.Duration, []e2emetrics.PodLatencyData) {
   535  	var (
   536  		mutex       = &sync.Mutex{}
   537  		watchTimes  = make(map[string]metav1.Time, 0)
   538  		stopCh      = make(chan struct{})
   539  		firstCreate metav1.Time
   540  		lastRunning metav1.Time
   541  		init        = true
   542  	)
   543  	// the controller watches the change of pod status
   544  	controller := newInformerWatchPod(ctx, f, mutex, watchTimes, podType)
   545  	go controller.Run(stopCh)
   546  	defer close(stopCh)
   548  	batchStartTime := metav1.Now()
   549  	e2eLags := make([]e2emetrics.PodLatencyData, 0)
   550  	createTimes := make(map[string]metav1.Time)
   551  	for _, pod := range pods {
   552  		create := metav1.Now()
   553  		createTimes[pod.Name] = create
   554  		p := e2epod.NewPodClient(f).Create(ctx, pod)
   555  		framework.ExpectNoError(wait.PollUntilContextTimeout(ctx, 2*time.Second, framework.PodStartTimeout, true, podWatchedRunning(watchTimes, p.Name)))
   556  		e2eLags = append(e2eLags,
   557  			e2emetrics.PodLatencyData{Name: pod.Name, Latency: watchTimes[pod.Name].Time.Sub(create.Time)})
   558  	}
   560  	for name, create := range createTimes {
   561  		watch := watchTimes[name]
   562  		gomega.Expect(watchTimes).To(gomega.HaveKey(name))
   563  		if !init {
   564  			if firstCreate.Time.After(create.Time) {
   565  				firstCreate = create
   566  			}
   567  			if lastRunning.Time.Before(watch.Time) {
   568  				lastRunning = watch
   569  			}
   570  		} else {
   571  			init = false
   572  			firstCreate, lastRunning = create, watch
   573  		}
   574  	}
   575  	batchLag := lastRunning.Time.Sub(batchStartTime.Time)
   576  	sort.Sort(e2emetrics.LatencySlice(e2eLags))
   577  	return batchLag, e2eLags
   578  }
   580  // podWatchedRunning verifies whether the pod becomes Running, as the watchTime was set by informer
   581  func podWatchedRunning(watchTimes map[string]metav1.Time, podName string) wait.ConditionWithContextFunc {
   582  	return func(ctx context.Context) (done bool, err error) {
   583  		if _, found := watchTimes[podName]; found {
   584  			return true, nil
   585  		}
   586  		return false, nil
   587  	}
   588  }
   590  // verifyLatencyWithinThreshold verifies whether 50, 90 and 99th percentiles of a latency metric are
   591  // within the expected threshold.
   592  func verifyLatencyWithinThreshold(threshold, actual e2emetrics.LatencyMetric, metricName string) error {
   593  	if actual.Perc50 > threshold.Perc50 {
   594  		return fmt.Errorf("too high %v latency 50th percentile: %v", metricName, actual.Perc50)
   595  	}
   596  	if actual.Perc90 > threshold.Perc90 {
   597  		return fmt.Errorf("too high %v latency 90th percentile: %v", metricName, actual.Perc90)
   598  	}
   599  	if actual.Perc99 > threshold.Perc99 {
   600  		return fmt.Errorf("too high %v latency 99th percentile: %v", metricName, actual.Perc99)
   601  	}
   602  	return nil
   603  }
   605  // extractLatencyMetrics returns latency metrics for each percentile(50th, 90th and 99th).
   606  func extractLatencyMetrics(latencies []e2emetrics.PodLatencyData) e2emetrics.LatencyMetric {
   607  	length := len(latencies)
   608  	perc50 := latencies[int(math.Ceil(float64(length*50)/100))-1].Latency
   609  	perc90 := latencies[int(math.Ceil(float64(length*90)/100))-1].Latency
   610  	perc99 := latencies[int(math.Ceil(float64(length*99)/100))-1].Latency
   611  	perc100 := latencies[length-1].Latency
   612  	return e2emetrics.LatencyMetric{Perc50: perc50, Perc90: perc90, Perc99: perc99, Perc100: perc100}
   613  }
   615  // printLatencies outputs latencies to log with readable format.
   616  func printLatencies(latencies []e2emetrics.PodLatencyData, header string) {
   617  	metrics := extractLatencyMetrics(latencies)
   618  	framework.Logf("10%% %s: %v", header, latencies[(len(latencies)*9)/10:])
   619  	framework.Logf("perc50: %v, perc90: %v, perc99: %v", metrics.Perc50, metrics.Perc90, metrics.Perc99)
   620  }
   622  // logAndVerifyLatency verifies that whether pod creation latency satisfies the limit.
   623  func logAndVerifyLatency(ctx context.Context, batchLag time.Duration, e2eLags []e2emetrics.PodLatencyData, podStartupLimits e2emetrics.LatencyMetric,
   624  	podBatchStartupLimit time.Duration, testInfo map[string]string, isVerify bool) {
   625  	printLatencies(e2eLags, "worst client e2e total latencies")
   627  	// TODO(coufon): do not trust 'kubelet' metrics since they are not reset!
   628  	latencyMetrics, _ := getPodStartLatency(ctx, kubeletAddr)
   629  	framework.Logf("Kubelet Prometheus metrics (not reset):\n%s", framework.PrettyPrintJSON(latencyMetrics))
   631  	podStartupLatency := extractLatencyMetrics(e2eLags)
   633  	// log latency perf data
   634  	logPerfData(getLatencyPerfData(podStartupLatency, testInfo), "latency")
   636  	if isVerify {
   637  		// check whether e2e pod startup time is acceptable.
   638  		framework.ExpectNoError(verifyLatencyWithinThreshold(podStartupLimits, podStartupLatency, "pod startup"))
   640  		// check bactch pod creation latency
   641  		if podBatchStartupLimit > 0 {
   642  			if batchLag > podBatchStartupLimit {
   643  				framework.Failf("Batch creation startup time %v exceed limit %v", batchLag, podBatchStartupLimit)
   644  			}
   645  		}
   646  	}
   647  }
   649  // logThroughput calculates and logs pod creation throughput.
   650  func logPodCreateThroughput(batchLag time.Duration, e2eLags []e2emetrics.PodLatencyData, podsNr int, testInfo map[string]string) {
   651  	logPerfData(getThroughputPerfData(batchLag, e2eLags, podsNr, testInfo), "throughput")
   652  }

View as plain text