...

Source file src/k8s.io/kubernetes/test/e2e/storage/testsuites/volume_stress.go

Documentation: k8s.io/kubernetes/test/e2e/storage/testsuites

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // This suite tests volumes under stress conditions
    18  
    19  package testsuites
    20  
    21  import (
    22  	"context"
    23  	"sync"
    24  
    25  	"github.com/onsi/ginkgo/v2"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	errors "k8s.io/apimachinery/pkg/util/errors"
    30  	clientset "k8s.io/client-go/kubernetes"
    31  	"k8s.io/kubernetes/test/e2e/framework"
    32  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    33  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    34  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    35  	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
    36  	admissionapi "k8s.io/pod-security-admission/api"
    37  )
    38  
    39  type volumeStressTestSuite struct {
    40  	tsInfo storageframework.TestSuiteInfo
    41  }
    42  
    43  type volumeStressTest struct {
    44  	config *storageframework.PerTestConfig
    45  
    46  	migrationCheck *migrationOpCheck
    47  
    48  	volumes []*storageframework.VolumeResource
    49  	pods    []*v1.Pod
    50  	// stop and wait for any async routines
    51  	wg sync.WaitGroup
    52  
    53  	testOptions storageframework.StressTestOptions
    54  }
    55  
    56  var _ storageframework.TestSuite = &volumeStressTestSuite{}
    57  
    58  // InitCustomVolumeStressTestSuite returns volumeStressTestSuite that implements TestSuite interface
    59  // using custom test patterns
    60  func InitCustomVolumeStressTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
    61  	return &volumeStressTestSuite{
    62  		tsInfo: storageframework.TestSuiteInfo{
    63  			Name:         "volume-stress",
    64  			TestPatterns: patterns,
    65  		},
    66  	}
    67  }
    68  
    69  // InitVolumeStressTestSuite returns volumeStressTestSuite that implements TestSuite interface
    70  // using testsuite default patterns
    71  func InitVolumeStressTestSuite() storageframework.TestSuite {
    72  	patterns := []storageframework.TestPattern{
    73  		storageframework.DefaultFsDynamicPV,
    74  		storageframework.BlockVolModeDynamicPV,
    75  	}
    76  	return InitCustomVolumeStressTestSuite(patterns)
    77  }
    78  
    79  func (t *volumeStressTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
    80  	return t.tsInfo
    81  }
    82  
    83  func (t *volumeStressTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
    84  	dInfo := driver.GetDriverInfo()
    85  	if dInfo.StressTestOptions == nil {
    86  		e2eskipper.Skipf("Driver %s doesn't specify stress test options -- skipping", dInfo.Name)
    87  	}
    88  	if dInfo.StressTestOptions.NumPods <= 0 {
    89  		framework.Failf("NumPods in stress test options must be a positive integer, received: %d", dInfo.StressTestOptions.NumPods)
    90  	}
    91  	if dInfo.StressTestOptions.NumRestarts <= 0 {
    92  		framework.Failf("NumRestarts in stress test options must be a positive integer, received: %d", dInfo.StressTestOptions.NumRestarts)
    93  	}
    94  
    95  	if _, ok := driver.(storageframework.DynamicPVTestDriver); !ok {
    96  		e2eskipper.Skipf("Driver %s doesn't implement DynamicPVTestDriver -- skipping", dInfo.Name)
    97  	}
    98  	if !driver.GetDriverInfo().Capabilities[storageframework.CapBlock] && pattern.VolMode == v1.PersistentVolumeBlock {
    99  		e2eskipper.Skipf("Driver %q does not support block volume mode - skipping", dInfo.Name)
   100  	}
   101  }
   102  
   103  func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
   104  	var (
   105  		dInfo = driver.GetDriverInfo()
   106  		cs    clientset.Interface
   107  		l     *volumeStressTest
   108  	)
   109  
   110  	// Beware that it also registers an AfterEach which renders f unusable. Any code using
   111  	// f must run inside an It or Context callback.
   112  	f := framework.NewFrameworkWithCustomTimeouts("stress", storageframework.GetDriverTimeouts(driver))
   113  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
   114  
   115  	init := func(ctx context.Context) {
   116  		cs = f.ClientSet
   117  		l = &volumeStressTest{}
   118  
   119  		// Now do the more expensive test initialization.
   120  		l.config = driver.PrepareTest(ctx, f)
   121  		l.migrationCheck = newMigrationOpCheck(ctx, f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
   122  		l.volumes = []*storageframework.VolumeResource{}
   123  		l.pods = []*v1.Pod{}
   124  		l.testOptions = *dInfo.StressTestOptions
   125  	}
   126  
   127  	createPodsAndVolumes := func(ctx context.Context) {
   128  		for i := 0; i < l.testOptions.NumPods; i++ {
   129  			framework.Logf("Creating resources for pod %v/%v", i, l.testOptions.NumPods-1)
   130  			r := storageframework.CreateVolumeResource(ctx, driver, l.config, pattern, t.GetTestSuiteInfo().SupportedSizeRange)
   131  			l.volumes = append(l.volumes, r)
   132  			podConfig := e2epod.Config{
   133  				NS:           f.Namespace.Name,
   134  				PVCs:         []*v1.PersistentVolumeClaim{r.Pvc},
   135  				SeLinuxLabel: e2epv.SELinuxLabel,
   136  			}
   137  			pod, err := e2epod.MakeSecPod(&podConfig)
   138  			framework.ExpectNoError(err)
   139  
   140  			l.pods = append(l.pods, pod)
   141  		}
   142  	}
   143  
   144  	cleanup := func(ctx context.Context) {
   145  		framework.Logf("Stopping and waiting for all test routines to finish")
   146  		l.wg.Wait()
   147  
   148  		var (
   149  			errs []error
   150  			mu   sync.Mutex
   151  			wg   sync.WaitGroup
   152  		)
   153  
   154  		wg.Add(len(l.pods))
   155  		for _, pod := range l.pods {
   156  			go func(pod *v1.Pod) {
   157  				defer ginkgo.GinkgoRecover()
   158  				defer wg.Done()
   159  
   160  				framework.Logf("Deleting pod %v", pod.Name)
   161  				err := e2epod.DeletePodWithWait(ctx, cs, pod)
   162  				mu.Lock()
   163  				defer mu.Unlock()
   164  				errs = append(errs, err)
   165  			}(pod)
   166  		}
   167  		wg.Wait()
   168  
   169  		wg.Add(len(l.volumes))
   170  		for _, volume := range l.volumes {
   171  			go func(volume *storageframework.VolumeResource) {
   172  				defer ginkgo.GinkgoRecover()
   173  				defer wg.Done()
   174  
   175  				framework.Logf("Deleting volume %s", volume.Pvc.GetName())
   176  				err := volume.CleanupResource(ctx)
   177  				mu.Lock()
   178  				defer mu.Unlock()
   179  				errs = append(errs, err)
   180  			}(volume)
   181  		}
   182  		wg.Wait()
   183  
   184  		framework.ExpectNoError(errors.NewAggregate(errs), "while cleaning up resource")
   185  		l.migrationCheck.validateMigrationVolumeOpCounts(ctx)
   186  	}
   187  
   188  	f.It("multiple pods should access different volumes repeatedly", f.WithSlow(), f.WithSerial(), func(ctx context.Context) {
   189  		init(ctx)
   190  		ginkgo.DeferCleanup(cleanup)
   191  		createPodsAndVolumes(ctx)
   192  		// Restart pod repeatedly
   193  		for i := 0; i < l.testOptions.NumPods; i++ {
   194  			podIndex := i
   195  			l.wg.Add(1)
   196  			go func() {
   197  				defer ginkgo.GinkgoRecover()
   198  				defer l.wg.Done()
   199  				for j := 0; j < l.testOptions.NumRestarts; j++ {
   200  					select {
   201  					case <-ctx.Done():
   202  						// This looks like a in the
   203  						// original test
   204  						// (https://github.com/kubernetes/kubernetes/blob/21049c2a1234ae3eea57357ed4329ed567a2dab3/test/e2e/storage/testsuites/volume_stress.go#L212):
   205  						// This early return will never
   206  						// get reached even if some
   207  						// other goroutine fails
   208  						// because the context doesn't
   209  						// get cancelled.
   210  						return
   211  					default:
   212  						pod := l.pods[podIndex]
   213  						framework.Logf("Pod-%v [%v], Iteration %v/%v", podIndex, pod.Name, j, l.testOptions.NumRestarts-1)
   214  						_, err := cs.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{})
   215  						if err != nil {
   216  							framework.Failf("Failed to create pod-%v [%+v]. Error: %v", podIndex, pod, err)
   217  						}
   218  
   219  						err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, cs, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   220  						if err != nil {
   221  							framework.Failf("Failed to wait for pod-%v [%+v] turn into running status. Error: %v", podIndex, pod, err)
   222  						}
   223  
   224  						// TODO: write data per pod and validate it every time
   225  
   226  						err = e2epod.DeletePodWithWait(ctx, f.ClientSet, pod)
   227  						if err != nil {
   228  							framework.Failf("Failed to delete pod-%v [%+v]. Error: %v", podIndex, pod, err)
   229  						}
   230  					}
   231  				}
   232  			}()
   233  		}
   234  
   235  		l.wg.Wait()
   236  	})
   237  }
   238  

View as plain text