1
16
17
18
19 package testsuites
20
21 import (
22 "context"
23 "sync"
24
25 "github.com/onsi/ginkgo/v2"
26
27 v1 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 errors "k8s.io/apimachinery/pkg/util/errors"
30 clientset "k8s.io/client-go/kubernetes"
31 "k8s.io/kubernetes/test/e2e/feature"
32 "k8s.io/kubernetes/test/e2e/framework"
33 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
34 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
35 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
36 e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
37 storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
38 storageutils "k8s.io/kubernetes/test/e2e/storage/utils"
39 admissionapi "k8s.io/pod-security-admission/api"
40 )
41
42 type snapshottableStressTestSuite struct {
43 tsInfo storageframework.TestSuiteInfo
44 }
45
46 type snapshottableStressTest struct {
47 config *storageframework.PerTestConfig
48 testOptions storageframework.VolumeSnapshotStressTestOptions
49 driverCleanup func()
50
51 pods []*v1.Pod
52 volumes []*storageframework.VolumeResource
53 snapshots []*storageframework.SnapshotResource
54
55 snapshotsMutex sync.Mutex
56
57
58 wg sync.WaitGroup
59 }
60
61
62
63 func InitCustomSnapshottableStressTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
64 return &snapshottableStressTestSuite{
65 tsInfo: storageframework.TestSuiteInfo{
66 Name: "snapshottable-stress",
67 TestPatterns: patterns,
68 SupportedSizeRange: e2evolume.SizeRange{
69 Min: "1Mi",
70 },
71 TestTags: []interface{}{feature.VolumeSnapshotDataSource},
72 },
73 }
74 }
75
76
77
78 func InitSnapshottableStressTestSuite() storageframework.TestSuite {
79 patterns := []storageframework.TestPattern{
80 storageframework.DynamicSnapshotDelete,
81 storageframework.DynamicSnapshotRetain,
82 }
83 return InitCustomSnapshottableStressTestSuite(patterns)
84 }
85
86 func (t *snapshottableStressTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
87 return t.tsInfo
88 }
89
90 func (t *snapshottableStressTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
91 driverInfo := driver.GetDriverInfo()
92 var ok bool
93 if driverInfo.VolumeSnapshotStressTestOptions == nil {
94 e2eskipper.Skipf("Driver %s doesn't specify snapshot stress test options -- skipping", driverInfo.Name)
95 }
96 if driverInfo.VolumeSnapshotStressTestOptions.NumPods <= 0 {
97 framework.Failf("NumPods in snapshot stress test options must be a positive integer, received: %d", driverInfo.VolumeSnapshotStressTestOptions.NumPods)
98 }
99 if driverInfo.VolumeSnapshotStressTestOptions.NumSnapshots <= 0 {
100 framework.Failf("NumSnapshots in snapshot stress test options must be a positive integer, received: %d", driverInfo.VolumeSnapshotStressTestOptions.NumSnapshots)
101 }
102 _, ok = driver.(storageframework.SnapshottableTestDriver)
103 if !driverInfo.Capabilities[storageframework.CapSnapshotDataSource] || !ok {
104 e2eskipper.Skipf("Driver %q doesn't implement SnapshottableTestDriver - skipping", driverInfo.Name)
105 }
106
107 _, ok = driver.(storageframework.DynamicPVTestDriver)
108 if !ok {
109 e2eskipper.Skipf("Driver %s doesn't implement DynamicPVTestDriver -- skipping", driverInfo.Name)
110 }
111 }
112
113 func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
114 var (
115 driverInfo *storageframework.DriverInfo
116 snapshottableDriver storageframework.SnapshottableTestDriver
117 cs clientset.Interface
118 stressTest *snapshottableStressTest
119 )
120
121
122
123 f := framework.NewDefaultFramework("snapshottable-stress")
124 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
125
126 init := func(ctx context.Context) {
127 driverInfo = driver.GetDriverInfo()
128 snapshottableDriver, _ = driver.(storageframework.SnapshottableTestDriver)
129 cs = f.ClientSet
130 config := driver.PrepareTest(ctx, f)
131
132 stressTest = &snapshottableStressTest{
133 config: config,
134 volumes: []*storageframework.VolumeResource{},
135 snapshots: []*storageframework.SnapshotResource{},
136 pods: []*v1.Pod{},
137 testOptions: *driverInfo.VolumeSnapshotStressTestOptions,
138 }
139 }
140
141 createPodsAndVolumes := func(ctx context.Context) {
142 for i := 0; i < stressTest.testOptions.NumPods; i++ {
143 framework.Logf("Creating resources for pod %d/%d", i, stressTest.testOptions.NumPods-1)
144
145 volume := storageframework.CreateVolumeResource(ctx, driver, stressTest.config, pattern, t.GetTestSuiteInfo().SupportedSizeRange)
146 stressTest.volumes = append(stressTest.volumes, volume)
147
148 podConfig := e2epod.Config{
149 NS: f.Namespace.Name,
150 PVCs: []*v1.PersistentVolumeClaim{volume.Pvc},
151 SeLinuxLabel: e2epv.SELinuxLabel,
152 }
153 pod, err := e2epod.MakeSecPod(&podConfig)
154 framework.ExpectNoError(err)
155 stressTest.pods = append(stressTest.pods, pod)
156
157 }
158
159 var wg sync.WaitGroup
160 for i, pod := range stressTest.pods {
161 wg.Add(1)
162
163 go func(i int, pod *v1.Pod) {
164 defer ginkgo.GinkgoRecover()
165 defer wg.Done()
166
167 if _, err := cs.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
168 framework.Failf("Failed to create pod-%d [%+v]. Error: %v", i, pod, err)
169 }
170 }(i, pod)
171 }
172 wg.Wait()
173
174 for i, pod := range stressTest.pods {
175 if err := e2epod.WaitForPodRunningInNamespace(ctx, cs, pod); err != nil {
176 framework.Failf("Failed to wait for pod-%d [%+v] turn into running status. Error: %v", i, pod, err)
177 }
178 }
179 }
180
181 cleanup := func(ctx context.Context) {
182 framework.Logf("Stopping and waiting for all test routines to finish")
183 stressTest.wg.Wait()
184
185 var (
186 errs []error
187 mu sync.Mutex
188 wg sync.WaitGroup
189 )
190
191 wg.Add(len(stressTest.snapshots))
192 for _, snapshot := range stressTest.snapshots {
193 go func(snapshot *storageframework.SnapshotResource) {
194 defer ginkgo.GinkgoRecover()
195 defer wg.Done()
196
197 framework.Logf("Deleting snapshot %s/%s", snapshot.Vs.GetNamespace(), snapshot.Vs.GetName())
198 err := snapshot.CleanupResource(ctx, f.Timeouts)
199 mu.Lock()
200 defer mu.Unlock()
201 errs = append(errs, err)
202 }(snapshot)
203 }
204 wg.Wait()
205
206 wg.Add(len(stressTest.pods))
207 for _, pod := range stressTest.pods {
208 go func(pod *v1.Pod) {
209 defer ginkgo.GinkgoRecover()
210 defer wg.Done()
211
212 framework.Logf("Deleting pod %s", pod.Name)
213 err := e2epod.DeletePodWithWait(ctx, cs, pod)
214 mu.Lock()
215 defer mu.Unlock()
216 errs = append(errs, err)
217 }(pod)
218 }
219 wg.Wait()
220
221 wg.Add(len(stressTest.volumes))
222 for _, volume := range stressTest.volumes {
223 go func(volume *storageframework.VolumeResource) {
224 defer ginkgo.GinkgoRecover()
225 defer wg.Done()
226
227 framework.Logf("Deleting volume %s", volume.Pvc.GetName())
228 err := volume.CleanupResource(ctx)
229 mu.Lock()
230 defer mu.Unlock()
231 errs = append(errs, err)
232 }(volume)
233 }
234 wg.Wait()
235
236 errs = append(errs, storageutils.TryFunc(stressTest.driverCleanup))
237
238 framework.ExpectNoError(errors.NewAggregate(errs), "while cleaning up resources")
239 }
240
241 f.It("should support snapshotting of many volumes repeatedly", f.WithSlow(), f.WithSerial(), func(ctx context.Context) {
242 init(ctx)
243 ginkgo.DeferCleanup(cleanup)
244 createPodsAndVolumes(ctx)
245
246 for i := 0; i < stressTest.testOptions.NumPods; i++ {
247 for j := 0; j < stressTest.testOptions.NumSnapshots; j++ {
248 stressTest.wg.Add(1)
249
250 go func(podIndex, snapshotIndex int) {
251 defer ginkgo.GinkgoRecover()
252 defer stressTest.wg.Done()
253
254 pod := stressTest.pods[podIndex]
255 volume := stressTest.volumes[podIndex]
256
257 select {
258 case <-ctx.Done():
259
260
261
262
263
264
265
266
267 return
268 default:
269 framework.Logf("Pod-%d [%s], Iteration %d/%d", podIndex, pod.Name, snapshotIndex, stressTest.testOptions.NumSnapshots-1)
270 parameters := map[string]string{}
271 snapshot := storageframework.CreateSnapshotResource(ctx, snapshottableDriver, stressTest.config, pattern, volume.Pvc.GetName(), volume.Pvc.GetNamespace(), f.Timeouts, parameters)
272 stressTest.snapshotsMutex.Lock()
273 defer stressTest.snapshotsMutex.Unlock()
274 stressTest.snapshots = append(stressTest.snapshots, snapshot)
275 }
276 }(i, j)
277 }
278 }
279
280 stressTest.wg.Wait()
281 })
282 }
283
View as plain text