...

Source file src/k8s.io/kubernetes/test/e2e/storage/testsuites/volumeperf.go

Documentation: k8s.io/kubernetes/test/e2e/storage/testsuites

     1  /*
     2  Copyright 2021 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package testsuites
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sync"
    23  	"time"
    24  
    25  	"github.com/onsi/ginkgo/v2"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	storagev1 "k8s.io/api/storage/v1"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/runtime"
    31  	"k8s.io/apimachinery/pkg/util/dump"
    32  	"k8s.io/apimachinery/pkg/util/errors"
    33  	"k8s.io/apimachinery/pkg/watch"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/client-go/tools/cache"
    36  	"k8s.io/kubernetes/test/e2e/framework"
    37  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    38  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    39  	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
    40  	admissionapi "k8s.io/pod-security-admission/api"
    41  )
    42  
    43  type volumePerformanceTestSuite struct {
    44  	tsInfo storageframework.TestSuiteInfo
    45  }
    46  
    47  var _ storageframework.TestSuite = &volumePerformanceTestSuite{}
    48  
    49  const testTimeout = 15 * time.Minute
    50  
    51  // Time intervals when a resource was created, it enters
    52  // the desired state and the elapsed time between these
    53  // two states
    54  type interval struct {
    55  	create            time.Time
    56  	enterDesiredState time.Time
    57  	elapsed           time.Duration
    58  }
    59  
    60  // Consolidates performance stats for any operation
    61  type performanceStats struct {
    62  	mutex             *sync.Mutex
    63  	perObjectInterval map[string]*interval
    64  	operationMetrics  *storageframework.Metrics
    65  }
    66  
    67  // waitForProvisionCh receives a signal from controller
    68  // when all PVCs are Bound
    69  // The signal received on this channel is the list of
    70  // PVC objects that are created in the test
    71  // The test blocks until waitForProvisionCh receives a signal
    72  // or the test times out
    73  var waitForProvisionCh chan []*v1.PersistentVolumeClaim
    74  
    75  // InitVolumePerformanceTestSuite returns volumePerformanceTestSuite that implements TestSuite interface
    76  func InitVolumePerformanceTestSuite() storageframework.TestSuite {
    77  	return &volumePerformanceTestSuite{
    78  		tsInfo: storageframework.TestSuiteInfo{
    79  			Name: "volume-lifecycle-performance",
    80  			TestPatterns: []storageframework.TestPattern{
    81  				storageframework.FsVolModeDynamicPV,
    82  			},
    83  		},
    84  	}
    85  }
    86  
    87  func (t *volumePerformanceTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
    88  	return t.tsInfo
    89  }
    90  
    91  func (t *volumePerformanceTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
    92  }
    93  
    94  func (t *volumePerformanceTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
    95  	type local struct {
    96  		config  *storageframework.PerTestConfig
    97  		cs      clientset.Interface
    98  		ns      *v1.Namespace
    99  		scName  string
   100  		pvcs    []*v1.PersistentVolumeClaim
   101  		options *storageframework.PerformanceTestOptions
   102  		stopCh  chan struct{}
   103  	}
   104  	var (
   105  		dInfo *storageframework.DriverInfo
   106  		l     *local
   107  	)
   108  	ginkgo.BeforeEach(func() {
   109  		// Check preconditions
   110  		dDriver := driver.(storageframework.DynamicPVTestDriver)
   111  		if dDriver == nil {
   112  			e2eskipper.Skipf("Test driver does not support dynamically created volumes")
   113  
   114  		}
   115  		dInfo = dDriver.GetDriverInfo()
   116  		if dInfo == nil {
   117  			e2eskipper.Skipf("Failed to get Driver info -- skipping")
   118  		}
   119  		if dInfo.PerformanceTestOptions == nil || dInfo.PerformanceTestOptions.ProvisioningOptions == nil {
   120  			e2eskipper.Skipf("Driver %s doesn't specify performance test options -- skipping", dInfo.Name)
   121  		}
   122  	})
   123  
   124  	frameworkOptions := framework.Options{
   125  		ClientQPS:   200,
   126  		ClientBurst: 400,
   127  	}
   128  	f := framework.NewFramework("volume-lifecycle-performance", frameworkOptions, nil)
   129  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
   130  
   131  	ginkgo.AfterEach(func(ctx context.Context) {
   132  		if l != nil {
   133  			if l.stopCh != nil {
   134  				ginkgo.By("Closing informer channel")
   135  				close(l.stopCh)
   136  			}
   137  
   138  			ginkgo.By("Deleting all PVCs")
   139  			for _, pvc := range l.pvcs {
   140  				err := e2epv.DeletePersistentVolumeClaim(ctx, l.cs, pvc.Name, pvc.Namespace)
   141  				framework.ExpectNoError(err)
   142  				err = e2epv.WaitForPersistentVolumeDeleted(ctx, l.cs, pvc.Spec.VolumeName, 1*time.Second, 5*time.Minute)
   143  				framework.ExpectNoError(err)
   144  			}
   145  			ginkgo.By(fmt.Sprintf("Deleting Storage Class %s", l.scName))
   146  			err := l.cs.StorageV1().StorageClasses().Delete(ctx, l.scName, metav1.DeleteOptions{})
   147  			framework.ExpectNoError(err)
   148  		} else {
   149  			ginkgo.By("Local l setup is nil")
   150  		}
   151  	})
   152  
   153  	f.It("should provision volumes at scale within performance constraints", f.WithSlow(), f.WithSerial(), func(ctx context.Context) {
   154  		l = &local{
   155  			cs:      f.ClientSet,
   156  			ns:      f.Namespace,
   157  			options: dInfo.PerformanceTestOptions,
   158  		}
   159  		l.config = driver.PrepareTest(ctx, f)
   160  
   161  		// Stats for volume provisioning operation
   162  		// TODO: Add stats for attach, resize and snapshot
   163  		provisioningStats := &performanceStats{
   164  			mutex:             &sync.Mutex{},
   165  			perObjectInterval: make(map[string]*interval),
   166  			operationMetrics:  &storageframework.Metrics{},
   167  		}
   168  		sc := driver.(storageframework.DynamicPVTestDriver).GetDynamicProvisionStorageClass(ctx, l.config, pattern.FsType)
   169  		ginkgo.By(fmt.Sprintf("Creating Storage Class %v", sc))
   170  		// TODO: Add support for WaitForFirstConsumer volume binding mode
   171  		if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
   172  			e2eskipper.Skipf("WaitForFirstConsumer binding mode currently not supported for performance tests")
   173  		}
   174  		ginkgo.By(fmt.Sprintf("Creating Storage Class %s", sc.Name))
   175  		sc, err := l.cs.StorageV1().StorageClasses().Create(ctx, sc, metav1.CreateOptions{})
   176  		framework.ExpectNoError(err)
   177  		l.scName = sc.Name
   178  
   179  		// Create a controller to watch on PVCs
   180  		// When all PVCs provisioned by this test are in the Bound state, the controller
   181  		// sends a signal to the channel
   182  		controller := newPVCWatch(ctx, f, l.options.ProvisioningOptions.Count, provisioningStats)
   183  		l.stopCh = make(chan struct{})
   184  		go controller.Run(l.stopCh)
   185  		waitForProvisionCh = make(chan []*v1.PersistentVolumeClaim)
   186  
   187  		ginkgo.By(fmt.Sprintf("Creating %d PVCs of size %s", l.options.ProvisioningOptions.Count, l.options.ProvisioningOptions.VolumeSize))
   188  		for i := 0; i < l.options.ProvisioningOptions.Count; i++ {
   189  			pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   190  				ClaimSize:        l.options.ProvisioningOptions.VolumeSize,
   191  				StorageClassName: &sc.Name,
   192  			}, l.ns.Name)
   193  			pvc, err = l.cs.CoreV1().PersistentVolumeClaims(l.ns.Name).Create(ctx, pvc, metav1.CreateOptions{})
   194  			framework.ExpectNoError(err)
   195  			// Store create time for each PVC
   196  			provisioningStats.mutex.Lock()
   197  			provisioningStats.perObjectInterval[pvc.Name] = &interval{
   198  				create: pvc.CreationTimestamp.Time,
   199  			}
   200  			provisioningStats.mutex.Unlock()
   201  		}
   202  
   203  		ginkgo.By("Waiting for all PVCs to be Bound...")
   204  
   205  		select {
   206  		case l.pvcs = <-waitForProvisionCh:
   207  			framework.Logf("All PVCs in Bound state")
   208  		case <-time.After(testTimeout):
   209  			ginkgo.Fail(fmt.Sprintf("expected all PVCs to be in Bound state within %v minutes", testTimeout))
   210  		}
   211  
   212  		ginkgo.By("Calculating performance metrics for provisioning operations")
   213  		createPerformanceStats(provisioningStats, l.options.ProvisioningOptions.Count, l.pvcs)
   214  
   215  		ginkgo.By(fmt.Sprintf("Validating performance metrics for provisioning operations against baseline %v", dump.Pretty(l.options.ProvisioningOptions.ExpectedMetrics)))
   216  		errList := validatePerformanceStats(provisioningStats.operationMetrics, l.options.ProvisioningOptions.ExpectedMetrics)
   217  		framework.ExpectNoError(errors.NewAggregate(errList), "while validating performance metrics")
   218  	})
   219  
   220  }
   221  
   222  // createPerformanceStats calculates individual metrics for an operation
   223  // given the intervals collected during that operation
   224  func createPerformanceStats(stats *performanceStats, provisionCount int, pvcs []*v1.PersistentVolumeClaim) {
   225  	var min, max, sum time.Duration
   226  	for _, pvc := range pvcs {
   227  		pvcMetric, ok := stats.perObjectInterval[pvc.Name]
   228  		if !ok {
   229  			framework.Failf("PVC %s not found in perObjectInterval", pvc.Name)
   230  		}
   231  
   232  		elapsedTime := pvcMetric.elapsed
   233  		sum += elapsedTime
   234  		if elapsedTime < min || min == 0 {
   235  			min = elapsedTime
   236  		}
   237  		if elapsedTime > max {
   238  			max = elapsedTime
   239  		}
   240  	}
   241  	stats.operationMetrics = &storageframework.Metrics{
   242  		AvgLatency: time.Duration(int64(sum) / int64(provisionCount)),
   243  		Throughput: float64(provisionCount) / max.Seconds(),
   244  	}
   245  }
   246  
   247  // validatePerformanceStats validates if test performance metrics meet the baseline target
   248  func validatePerformanceStats(operationMetrics *storageframework.Metrics, baselineMetrics *storageframework.Metrics) []error {
   249  	var errList []error
   250  	framework.Logf("Metrics to evaluate: %+v", dump.Pretty(operationMetrics))
   251  
   252  	if operationMetrics.AvgLatency > baselineMetrics.AvgLatency {
   253  		err := fmt.Errorf("expected latency to be less than %v but calculated latency %v", baselineMetrics.AvgLatency, operationMetrics.AvgLatency)
   254  		errList = append(errList, err)
   255  	}
   256  	if operationMetrics.Throughput < baselineMetrics.Throughput {
   257  		err := fmt.Errorf("expected throughput to be greater than %f but calculated throughput %f", baselineMetrics.Throughput, operationMetrics.Throughput)
   258  		errList = append(errList, err)
   259  	}
   260  	return errList
   261  }
   262  
   263  // newPVCWatch creates an informer to check whether all PVCs are Bound
   264  // When all PVCs are bound, the controller sends a signal to
   265  // waitForProvisionCh to unblock the test
   266  func newPVCWatch(ctx context.Context, f *framework.Framework, provisionCount int, pvcMetrics *performanceStats) cache.Controller {
   267  	defer ginkgo.GinkgoRecover()
   268  	count := 0
   269  	countLock := &sync.Mutex{}
   270  	ns := f.Namespace.Name
   271  	var pvcs []*v1.PersistentVolumeClaim
   272  	checkPVCBound := func(oldPVC *v1.PersistentVolumeClaim, newPVC *v1.PersistentVolumeClaim) {
   273  		now := time.Now()
   274  		pvcMetrics.mutex.Lock()
   275  		defer pvcMetrics.mutex.Unlock()
   276  		countLock.Lock()
   277  		defer countLock.Unlock()
   278  
   279  		// Check if PVC entered the bound state
   280  		if oldPVC.Status.Phase != v1.ClaimBound && newPVC.Status.Phase == v1.ClaimBound {
   281  			newPVCInterval, ok := pvcMetrics.perObjectInterval[newPVC.Name]
   282  			if !ok {
   283  				framework.Failf("PVC %s should exist in interval map already", newPVC.Name)
   284  			}
   285  			count++
   286  			newPVCInterval.enterDesiredState = now
   287  			newPVCInterval.elapsed = now.Sub(newPVCInterval.create)
   288  			pvcs = append(pvcs, newPVC)
   289  		}
   290  		if count == provisionCount {
   291  			// Number of Bound PVCs equals the number of PVCs
   292  			// provisioned by this test
   293  			// Send those PVCs to the channel to unblock test
   294  			waitForProvisionCh <- pvcs
   295  		}
   296  	}
   297  	_, controller := cache.NewInformer(
   298  		&cache.ListWatch{
   299  			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   300  				obj, err := f.ClientSet.CoreV1().PersistentVolumeClaims(ns).List(ctx, metav1.ListOptions{})
   301  				return runtime.Object(obj), err
   302  			},
   303  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   304  				return f.ClientSet.CoreV1().PersistentVolumeClaims(ns).Watch(ctx, metav1.ListOptions{})
   305  			},
   306  		},
   307  		&v1.PersistentVolumeClaim{},
   308  		0,
   309  		cache.ResourceEventHandlerFuncs{
   310  			UpdateFunc: func(oldObj, newObj interface{}) {
   311  				oldPVC, ok := oldObj.(*v1.PersistentVolumeClaim)
   312  				if !ok {
   313  					framework.Failf("Expected a PVC, got instead an old object of type %T", oldObj)
   314  				}
   315  				newPVC, ok := newObj.(*v1.PersistentVolumeClaim)
   316  				if !ok {
   317  					framework.Failf("Expected a PVC, got instead a new object of type %T", newObj)
   318  				}
   319  
   320  				checkPVCBound(oldPVC, newPVC)
   321  			},
   322  		},
   323  	)
   324  	return controller
   325  }
   326  

View as plain text