...

Source file src/k8s.io/kubernetes/test/e2e/storage/testsuites/snapshottable_stress.go

Documentation: k8s.io/kubernetes/test/e2e/storage/testsuites

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // This suite tests volume snapshots under stress conditions.
    18  
    19  package testsuites
    20  
    21  import (
    22  	"context"
    23  	"sync"
    24  
    25  	"github.com/onsi/ginkgo/v2"
    26  
    27  	v1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	errors "k8s.io/apimachinery/pkg/util/errors"
    30  	clientset "k8s.io/client-go/kubernetes"
    31  	"k8s.io/kubernetes/test/e2e/feature"
    32  	"k8s.io/kubernetes/test/e2e/framework"
    33  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    34  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    35  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    36  	e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
    37  	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
    38  	storageutils "k8s.io/kubernetes/test/e2e/storage/utils"
    39  	admissionapi "k8s.io/pod-security-admission/api"
    40  )
    41  
    42  type snapshottableStressTestSuite struct {
    43  	tsInfo storageframework.TestSuiteInfo
    44  }
    45  
    46  type snapshottableStressTest struct {
    47  	config        *storageframework.PerTestConfig
    48  	testOptions   storageframework.VolumeSnapshotStressTestOptions
    49  	driverCleanup func()
    50  
    51  	pods      []*v1.Pod
    52  	volumes   []*storageframework.VolumeResource
    53  	snapshots []*storageframework.SnapshotResource
    54  	// Because we are appending snapshot resources in parallel goroutines.
    55  	snapshotsMutex sync.Mutex
    56  
    57  	// Stop and wait for any async routines.
    58  	wg sync.WaitGroup
    59  }
    60  
    61  // InitCustomSnapshottableStressTestSuite returns snapshottableStressTestSuite that implements TestSuite interface
    62  // using custom test patterns
    63  func InitCustomSnapshottableStressTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
    64  	return &snapshottableStressTestSuite{
    65  		tsInfo: storageframework.TestSuiteInfo{
    66  			Name:         "snapshottable-stress",
    67  			TestPatterns: patterns,
    68  			SupportedSizeRange: e2evolume.SizeRange{
    69  				Min: "1Mi",
    70  			},
    71  			TestTags: []interface{}{feature.VolumeSnapshotDataSource},
    72  		},
    73  	}
    74  }
    75  
    76  // InitSnapshottableStressTestSuite returns snapshottableStressTestSuite that implements TestSuite interface
    77  // using testsuite default patterns
    78  func InitSnapshottableStressTestSuite() storageframework.TestSuite {
    79  	patterns := []storageframework.TestPattern{
    80  		storageframework.DynamicSnapshotDelete,
    81  		storageframework.DynamicSnapshotRetain,
    82  	}
    83  	return InitCustomSnapshottableStressTestSuite(patterns)
    84  }
    85  
    86  func (t *snapshottableStressTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
    87  	return t.tsInfo
    88  }
    89  
    90  func (t *snapshottableStressTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
    91  	driverInfo := driver.GetDriverInfo()
    92  	var ok bool
    93  	if driverInfo.VolumeSnapshotStressTestOptions == nil {
    94  		e2eskipper.Skipf("Driver %s doesn't specify snapshot stress test options -- skipping", driverInfo.Name)
    95  	}
    96  	if driverInfo.VolumeSnapshotStressTestOptions.NumPods <= 0 {
    97  		framework.Failf("NumPods in snapshot stress test options must be a positive integer, received: %d", driverInfo.VolumeSnapshotStressTestOptions.NumPods)
    98  	}
    99  	if driverInfo.VolumeSnapshotStressTestOptions.NumSnapshots <= 0 {
   100  		framework.Failf("NumSnapshots in snapshot stress test options must be a positive integer, received: %d", driverInfo.VolumeSnapshotStressTestOptions.NumSnapshots)
   101  	}
   102  	_, ok = driver.(storageframework.SnapshottableTestDriver)
   103  	if !driverInfo.Capabilities[storageframework.CapSnapshotDataSource] || !ok {
   104  		e2eskipper.Skipf("Driver %q doesn't implement SnapshottableTestDriver - skipping", driverInfo.Name)
   105  	}
   106  
   107  	_, ok = driver.(storageframework.DynamicPVTestDriver)
   108  	if !ok {
   109  		e2eskipper.Skipf("Driver %s doesn't implement DynamicPVTestDriver -- skipping", driverInfo.Name)
   110  	}
   111  }
   112  
   113  func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
   114  	var (
   115  		driverInfo          *storageframework.DriverInfo
   116  		snapshottableDriver storageframework.SnapshottableTestDriver
   117  		cs                  clientset.Interface
   118  		stressTest          *snapshottableStressTest
   119  	)
   120  
   121  	// Beware that it also registers an AfterEach which renders f unusable. Any code using
   122  	// f must run inside an It or Context callback.
   123  	f := framework.NewDefaultFramework("snapshottable-stress")
   124  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
   125  
   126  	init := func(ctx context.Context) {
   127  		driverInfo = driver.GetDriverInfo()
   128  		snapshottableDriver, _ = driver.(storageframework.SnapshottableTestDriver)
   129  		cs = f.ClientSet
   130  		config := driver.PrepareTest(ctx, f)
   131  
   132  		stressTest = &snapshottableStressTest{
   133  			config:      config,
   134  			volumes:     []*storageframework.VolumeResource{},
   135  			snapshots:   []*storageframework.SnapshotResource{},
   136  			pods:        []*v1.Pod{},
   137  			testOptions: *driverInfo.VolumeSnapshotStressTestOptions,
   138  		}
   139  	}
   140  
   141  	createPodsAndVolumes := func(ctx context.Context) {
   142  		for i := 0; i < stressTest.testOptions.NumPods; i++ {
   143  			framework.Logf("Creating resources for pod %d/%d", i, stressTest.testOptions.NumPods-1)
   144  
   145  			volume := storageframework.CreateVolumeResource(ctx, driver, stressTest.config, pattern, t.GetTestSuiteInfo().SupportedSizeRange)
   146  			stressTest.volumes = append(stressTest.volumes, volume)
   147  
   148  			podConfig := e2epod.Config{
   149  				NS:           f.Namespace.Name,
   150  				PVCs:         []*v1.PersistentVolumeClaim{volume.Pvc},
   151  				SeLinuxLabel: e2epv.SELinuxLabel,
   152  			}
   153  			pod, err := e2epod.MakeSecPod(&podConfig)
   154  			framework.ExpectNoError(err)
   155  			stressTest.pods = append(stressTest.pods, pod)
   156  
   157  		}
   158  
   159  		var wg sync.WaitGroup
   160  		for i, pod := range stressTest.pods {
   161  			wg.Add(1)
   162  
   163  			go func(i int, pod *v1.Pod) {
   164  				defer ginkgo.GinkgoRecover()
   165  				defer wg.Done()
   166  
   167  				if _, err := cs.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
   168  					framework.Failf("Failed to create pod-%d [%+v]. Error: %v", i, pod, err)
   169  				}
   170  			}(i, pod)
   171  		}
   172  		wg.Wait()
   173  
   174  		for i, pod := range stressTest.pods {
   175  			if err := e2epod.WaitForPodRunningInNamespace(ctx, cs, pod); err != nil {
   176  				framework.Failf("Failed to wait for pod-%d [%+v] turn into running status. Error: %v", i, pod, err)
   177  			}
   178  		}
   179  	}
   180  
   181  	cleanup := func(ctx context.Context) {
   182  		framework.Logf("Stopping and waiting for all test routines to finish")
   183  		stressTest.wg.Wait()
   184  
   185  		var (
   186  			errs []error
   187  			mu   sync.Mutex
   188  			wg   sync.WaitGroup
   189  		)
   190  
   191  		wg.Add(len(stressTest.snapshots))
   192  		for _, snapshot := range stressTest.snapshots {
   193  			go func(snapshot *storageframework.SnapshotResource) {
   194  				defer ginkgo.GinkgoRecover()
   195  				defer wg.Done()
   196  
   197  				framework.Logf("Deleting snapshot %s/%s", snapshot.Vs.GetNamespace(), snapshot.Vs.GetName())
   198  				err := snapshot.CleanupResource(ctx, f.Timeouts)
   199  				mu.Lock()
   200  				defer mu.Unlock()
   201  				errs = append(errs, err)
   202  			}(snapshot)
   203  		}
   204  		wg.Wait()
   205  
   206  		wg.Add(len(stressTest.pods))
   207  		for _, pod := range stressTest.pods {
   208  			go func(pod *v1.Pod) {
   209  				defer ginkgo.GinkgoRecover()
   210  				defer wg.Done()
   211  
   212  				framework.Logf("Deleting pod %s", pod.Name)
   213  				err := e2epod.DeletePodWithWait(ctx, cs, pod)
   214  				mu.Lock()
   215  				defer mu.Unlock()
   216  				errs = append(errs, err)
   217  			}(pod)
   218  		}
   219  		wg.Wait()
   220  
   221  		wg.Add(len(stressTest.volumes))
   222  		for _, volume := range stressTest.volumes {
   223  			go func(volume *storageframework.VolumeResource) {
   224  				defer ginkgo.GinkgoRecover()
   225  				defer wg.Done()
   226  
   227  				framework.Logf("Deleting volume %s", volume.Pvc.GetName())
   228  				err := volume.CleanupResource(ctx)
   229  				mu.Lock()
   230  				defer mu.Unlock()
   231  				errs = append(errs, err)
   232  			}(volume)
   233  		}
   234  		wg.Wait()
   235  
   236  		errs = append(errs, storageutils.TryFunc(stressTest.driverCleanup))
   237  
   238  		framework.ExpectNoError(errors.NewAggregate(errs), "while cleaning up resources")
   239  	}
   240  
   241  	f.It("should support snapshotting of many volumes repeatedly", f.WithSlow(), f.WithSerial(), func(ctx context.Context) {
   242  		init(ctx)
   243  		ginkgo.DeferCleanup(cleanup)
   244  		createPodsAndVolumes(ctx)
   245  		// Repeatedly create and delete snapshots of each volume.
   246  		for i := 0; i < stressTest.testOptions.NumPods; i++ {
   247  			for j := 0; j < stressTest.testOptions.NumSnapshots; j++ {
   248  				stressTest.wg.Add(1)
   249  
   250  				go func(podIndex, snapshotIndex int) {
   251  					defer ginkgo.GinkgoRecover()
   252  					defer stressTest.wg.Done()
   253  
   254  					pod := stressTest.pods[podIndex]
   255  					volume := stressTest.volumes[podIndex]
   256  
   257  					select {
   258  					case <-ctx.Done():
   259  						// This looks like a in the
   260  						// original test
   261  						// (https://github.com/kubernetes/kubernetes/blob/21049c2a1234ae3eea57357ed4329ed567a2dab3/test/e2e/storage/testsuites/snapshottable_stress.go#L269):
   262  						// This early return will never
   263  						// get reached even if some
   264  						// other goroutine fails
   265  						// because the context doesn't
   266  						// get cancelled.
   267  						return
   268  					default:
   269  						framework.Logf("Pod-%d [%s], Iteration %d/%d", podIndex, pod.Name, snapshotIndex, stressTest.testOptions.NumSnapshots-1)
   270  						parameters := map[string]string{}
   271  						snapshot := storageframework.CreateSnapshotResource(ctx, snapshottableDriver, stressTest.config, pattern, volume.Pvc.GetName(), volume.Pvc.GetNamespace(), f.Timeouts, parameters)
   272  						stressTest.snapshotsMutex.Lock()
   273  						defer stressTest.snapshotsMutex.Unlock()
   274  						stressTest.snapshots = append(stressTest.snapshots, snapshot)
   275  					}
   276  				}(i, j)
   277  			}
   278  		}
   279  
   280  		stressTest.wg.Wait()
   281  	})
   282  }
   283  

View as plain text