
Source file src/k8s.io/kubernetes/test/e2e/windows/density.go

Documentation: k8s.io/kubernetes/test/e2e/windows

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package windows
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sort"
    23  	"sync"
    24  	"time"
    26  	v1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/labels"
    29  	"k8s.io/apimachinery/pkg/runtime"
    30  	"k8s.io/apimachinery/pkg/util/uuid"
    31  	"k8s.io/apimachinery/pkg/watch"
    32  	"k8s.io/client-go/tools/cache"
    33  	"k8s.io/kubernetes/test/e2e/feature"
    34  	"k8s.io/kubernetes/test/e2e/framework"
    35  	e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
    36  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    37  	imageutils "k8s.io/kubernetes/test/utils/image"
    38  	admissionapi "k8s.io/pod-security-admission/api"
    40  	"github.com/onsi/ginkgo/v2"
    41  	"github.com/onsi/gomega"
    42  )
    44  var _ = sigDescribe(feature.Windows, "Density", framework.WithSerial(), framework.WithSlow(), skipUnlessWindows(func() {
    45  	f := framework.NewDefaultFramework("density-test-windows")
    46  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    48  	ginkgo.Context("create a batch of pods", func() {
    49  		// TODO(coufon): the values are generous, set more precise limits with benchmark data
    50  		// and add more tests
    51  		dTests := []densityTest{
    52  			{
    53  				podsNr:   10,
    54  				interval: 0 * time.Millisecond,
    55  				// percentile limit of single pod startup latency
    56  				podStartupLimits: e2emetrics.LatencyMetric{
    57  					Perc50: 30 * time.Second,
    58  					Perc90: 54 * time.Second,
    59  					Perc99: 59 * time.Second,
    60  				},
    61  				// upbound of startup latency of a batch of pods
    62  				podBatchStartupLimit: 10 * time.Minute,
    63  			},
    64  		}
    66  		for _, testArg := range dTests {
    67  			itArg := testArg
    68  			desc := fmt.Sprintf("latency/resource should be within limit when create %d pods with %v interval", itArg.podsNr, itArg.interval)
    69  			ginkgo.It(desc, func(ctx context.Context) {
    70  				itArg.createMethod = "batch"
    71  				runDensityBatchTest(ctx, f, itArg)
    72  			})
    73  		}
    74  	})
    76  }))
    78  type densityTest struct {
    79  	// number of pods
    80  	podsNr int
    81  	// interval between creating pod (rate control)
    82  	interval time.Duration
    83  	// create pods in 'batch' or 'sequence'
    84  	createMethod string
    85  	// API QPS limit
    86  	APIQPSLimit int
    87  	// performance limits
    88  	podStartupLimits     e2emetrics.LatencyMetric
    89  	podBatchStartupLimit time.Duration
    90  }
    92  // runDensityBatchTest runs the density batch pod creation test
    93  func runDensityBatchTest(ctx context.Context, f *framework.Framework, testArg densityTest) (time.Duration, []e2emetrics.PodLatencyData) {
    94  	const (
    95  		podType = "density_test_pod"
    96  	)
    97  	var (
    98  		mutex      = &sync.Mutex{}
    99  		watchTimes = make(map[string]metav1.Time)
   100  		stopCh     = make(chan struct{})
   101  	)
   103  	// create test pod data structure
   104  	pods := newDensityTestPods(testArg.podsNr, false, imageutils.GetPauseImageName(), podType)
   106  	// the controller watches the change of pod status
   107  	controller := newInformerWatchPod(ctx, f, mutex, watchTimes, podType)
   108  	go controller.Run(stopCh)
   109  	defer close(stopCh)
   111  	ginkgo.By("Creating a batch of pods")
   112  	// It returns a map['pod name']'creation time' containing the creation timestamps
   113  	createTimes := createBatchPodWithRateControl(ctx, f, pods, testArg.interval)
   115  	ginkgo.By("Waiting for all Pods to be observed by the watch...")
   117  	gomega.Eventually(ctx, func() bool {
   118  		return len(watchTimes) == testArg.podsNr
   119  	}, 10*time.Minute, 10*time.Second).Should(gomega.BeTrue())
   121  	if len(watchTimes) < testArg.podsNr {
   122  		framework.Failf("Timeout reached waiting for all Pods to be observed by the watch.")
   123  	}
   125  	// Analyze results
   126  	var (
   127  		firstCreate metav1.Time
   128  		lastRunning metav1.Time
   129  		init        = true
   130  		e2eLags     = make([]e2emetrics.PodLatencyData, 0)
   131  	)
   133  	for name, create := range createTimes {
   134  		watch, ok := watchTimes[name]
   135  		if !ok {
   136  			framework.Failf("pod %s failed to be observed by the watch", name)
   137  		}
   139  		e2eLags = append(e2eLags,
   140  			e2emetrics.PodLatencyData{Name: name, Latency: watch.Time.Sub(create.Time)})
   142  		if !init {
   143  			if firstCreate.Time.After(create.Time) {
   144  				firstCreate = create
   145  			}
   146  			if lastRunning.Time.Before(watch.Time) {
   147  				lastRunning = watch
   148  			}
   149  		} else {
   150  			init = false
   151  			firstCreate, lastRunning = create, watch
   152  		}
   153  	}
   155  	sort.Sort(e2emetrics.LatencySlice(e2eLags))
   156  	batchLag := lastRunning.Time.Sub(firstCreate.Time)
   158  	deletePodsSync(ctx, f, pods)
   160  	return batchLag, e2eLags
   161  }
   163  // createBatchPodWithRateControl creates a batch of pods concurrently, uses one goroutine for each creation.
   164  // between creations there is an interval for throughput control
   165  func createBatchPodWithRateControl(ctx context.Context, f *framework.Framework, pods []*v1.Pod, interval time.Duration) map[string]metav1.Time {
   166  	createTimes := make(map[string]metav1.Time)
   167  	for _, pod := range pods {
   168  		createTimes[pod.ObjectMeta.Name] = metav1.Now()
   169  		go e2epod.NewPodClient(f).Create(ctx, pod)
   170  		time.Sleep(interval)
   171  	}
   172  	return createTimes
   173  }
   175  // newInformerWatchPod creates an informer to check whether all pods are running.
   176  func newInformerWatchPod(ctx context.Context, f *framework.Framework, mutex *sync.Mutex, watchTimes map[string]metav1.Time, podType string) cache.Controller {
   177  	ns := f.Namespace.Name
   178  	checkPodRunning := func(p *v1.Pod) {
   179  		mutex.Lock()
   180  		defer mutex.Unlock()
   181  		defer ginkgo.GinkgoRecover()
   183  		if p.Status.Phase == v1.PodRunning {
   184  			if _, found := watchTimes[p.Name]; !found {
   185  				watchTimes[p.Name] = metav1.Now()
   186  			}
   187  		}
   188  	}
   190  	_, controller := cache.NewInformer(
   191  		&cache.ListWatch{
   192  			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   193  				options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}).String()
   194  				obj, err := f.ClientSet.CoreV1().Pods(ns).List(ctx, options)
   195  				return runtime.Object(obj), err
   196  			},
   197  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   198  				options.LabelSelector = labels.SelectorFromSet(labels.Set{"type": podType}).String()
   199  				return f.ClientSet.CoreV1().Pods(ns).Watch(ctx, options)
   200  			},
   201  		},
   202  		&v1.Pod{},
   203  		0,
   204  		cache.ResourceEventHandlerFuncs{
   205  			AddFunc: func(obj interface{}) {
   206  				p, ok := obj.(*v1.Pod)
   207  				if !ok {
   208  					framework.Failf("expected Pod, got %T", obj)
   209  				}
   210  				go checkPodRunning(p)
   211  			},
   212  			UpdateFunc: func(oldObj, newObj interface{}) {
   213  				p, ok := newObj.(*v1.Pod)
   214  				if !ok {
   215  					framework.Failf("expected Pod, got %T", newObj)
   216  				}
   217  				go checkPodRunning(p)
   218  			},
   219  		},
   220  	)
   221  	return controller
   222  }
   224  // newDensityTestPods creates a list of pods (specification) for test.
   225  func newDensityTestPods(numPods int, volume bool, imageName, podType string) []*v1.Pod {
   226  	var pods []*v1.Pod
   228  	for i := 0; i < numPods; i++ {
   230  		podName := "test-" + string(uuid.NewUUID())
   231  		pod := v1.Pod{
   232  			ObjectMeta: metav1.ObjectMeta{
   233  				Name: podName,
   234  				Labels: map[string]string{
   235  					"type": podType,
   236  					"name": podName,
   237  				},
   238  			},
   239  			Spec: v1.PodSpec{
   240  				// Restart policy is always (default).
   241  				Containers: []v1.Container{
   242  					{
   243  						Image: imageName,
   244  						Name:  podName,
   245  					},
   246  				},
   247  				NodeSelector: map[string]string{
   248  					"kubernetes.io/os": "windows",
   249  				},
   250  			},
   251  		}
   253  		if volume {
   254  			pod.Spec.Containers[0].VolumeMounts = []v1.VolumeMount{
   255  				{MountPath: "/test-volume-mnt", Name: podName + "-volume"},
   256  			}
   257  			pod.Spec.Volumes = []v1.Volume{
   258  				{Name: podName + "-volume", VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}}},
   259  			}
   260  		}
   262  		pods = append(pods, &pod)
   263  	}
   265  	return pods
   266  }
   268  // deletePodsSync deletes a list of pods and block until pods disappear.
   269  func deletePodsSync(ctx context.Context, f *framework.Framework, pods []*v1.Pod) {
   270  	var wg sync.WaitGroup
   271  	for _, pod := range pods {
   272  		wg.Add(1)
   273  		go func(pod *v1.Pod) {
   274  			defer ginkgo.GinkgoRecover()
   275  			defer wg.Done()
   277  			err := e2epod.NewPodClient(f).Delete(ctx, pod.ObjectMeta.Name, *metav1.NewDeleteOptions(30))
   278  			framework.ExpectNoError(err)
   280  			err = e2epod.WaitForPodNotFoundInNamespace(ctx, f.ClientSet, pod.ObjectMeta.Name, f.Namespace.Name, 10*time.Minute)
   281  			framework.ExpectNoError(err)
   282  		}(pod)
   283  	}
   284  	wg.Wait()
   285  }

View as plain text