1
16
17 package testsuites
18
19 import (
20 "context"
21 "fmt"
22 "reflect"
23 "time"
24
25 "github.com/onsi/ginkgo/v2"
26 v1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/util/errors"
29 clientset "k8s.io/client-go/kubernetes"
30 "k8s.io/kubernetes/test/e2e/feature"
31 "k8s.io/kubernetes/test/e2e/framework"
32 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
33 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
34 e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
35 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
36 e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
37 storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
38 storageutils "k8s.io/kubernetes/test/e2e/storage/utils"
39 imageutils "k8s.io/kubernetes/test/utils/image"
40 admissionapi "k8s.io/pod-security-admission/api"
41 )
42
43 type multiVolumeTestSuite struct {
44 tsInfo storageframework.TestSuiteInfo
45 }
46
47 var _ storageframework.TestSuite = &multiVolumeTestSuite{}
48
49
50
51 func InitCustomMultiVolumeTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
52 return &multiVolumeTestSuite{
53 tsInfo: storageframework.TestSuiteInfo{
54 Name: "multiVolume",
55 TestTags: []interface{}{framework.WithSlow()},
56 TestPatterns: patterns,
57 SupportedSizeRange: e2evolume.SizeRange{
58 Min: "1Mi",
59 },
60 },
61 }
62 }
63
64
65
66 func InitMultiVolumeTestSuite() storageframework.TestSuite {
67 patterns := []storageframework.TestPattern{
68 storageframework.FsVolModePreprovisionedPV,
69 storageframework.FsVolModeDynamicPV,
70 storageframework.BlockVolModePreprovisionedPV,
71 storageframework.BlockVolModeDynamicPV,
72 storageframework.Ext4DynamicPV,
73 storageframework.XfsDynamicPV,
74 storageframework.NtfsDynamicPV,
75 }
76 return InitCustomMultiVolumeTestSuite(patterns)
77 }
78
79 func (t *multiVolumeTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
80 return t.tsInfo
81 }
82
83 func (t *multiVolumeTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
84 dInfo := driver.GetDriverInfo()
85 skipVolTypePatterns(pattern, driver, storageframework.NewVolTypeMap(storageframework.PreprovisionedPV))
86 if pattern.VolMode == v1.PersistentVolumeBlock && !dInfo.Capabilities[storageframework.CapBlock] {
87 e2eskipper.Skipf("Driver %s doesn't support %v -- skipping", dInfo.Name, pattern.VolMode)
88 }
89 }
90
91 func (t *multiVolumeTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
92 type local struct {
93 config *storageframework.PerTestConfig
94
95 cs clientset.Interface
96 ns *v1.Namespace
97 driver storageframework.TestDriver
98 resources []*storageframework.VolumeResource
99
100 migrationCheck *migrationOpCheck
101 }
102 var (
103 dInfo = driver.GetDriverInfo()
104 l local
105 )
106
107
108
109 f := framework.NewFrameworkWithCustomTimeouts("multivolume", storageframework.GetDriverTimeouts(driver))
110 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
111
112 init := func(ctx context.Context) {
113 l = local{}
114 l.ns = f.Namespace
115 l.cs = f.ClientSet
116 l.driver = driver
117
118
119 l.config = driver.PrepareTest(ctx, f)
120 l.migrationCheck = newMigrationOpCheck(ctx, f.ClientSet, f.ClientConfig(), dInfo.InTreePluginName)
121 }
122
123 cleanup := func(ctx context.Context) {
124 var errs []error
125 for _, resource := range l.resources {
126 errs = append(errs, resource.CleanupResource(ctx))
127 }
128
129 framework.ExpectNoError(errors.NewAggregate(errs), "while cleanup resource")
130 l.migrationCheck.validateMigrationVolumeOpCounts(ctx)
131 }
132
133
134
135
136
137
138 ginkgo.It("should access to two volumes with the same volume mode and retain data across pod recreation on the same node", func(ctx context.Context) {
139
140
141
142 if pattern.VolType == storageframework.PreprovisionedPV {
143 e2eskipper.Skipf("This test doesn't work with pre-provisioned volume -- skipping")
144 }
145
146 init(ctx)
147 ginkgo.DeferCleanup(cleanup)
148
149 var pvcs []*v1.PersistentVolumeClaim
150 numVols := 2
151
152 for i := 0; i < numVols; i++ {
153 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
154 resource := storageframework.CreateVolumeResource(ctx, driver, l.config, pattern, testVolumeSizeRange)
155 l.resources = append(l.resources, resource)
156 pvcs = append(pvcs, resource.Pvc)
157 }
158 TestAccessMultipleVolumesAcrossPodRecreation(ctx, l.config.Framework, l.cs, l.ns.Name,
159 l.config.ClientNodeSelection, pvcs, true )
160 })
161
162
163
164
165
166
167 ginkgo.It("should access to two volumes with the same volume mode and retain data across pod recreation on different node", func(ctx context.Context) {
168
169
170
171 if pattern.VolType == storageframework.PreprovisionedPV {
172 e2eskipper.Skipf("This test doesn't work with pre-provisioned volume -- skipping")
173 }
174
175 init(ctx)
176 ginkgo.DeferCleanup(cleanup)
177
178
179 if l.driver.GetDriverInfo().Capabilities[storageframework.CapSingleNodeVolume] {
180 e2eskipper.Skipf("Driver %s only supports %v -- skipping", l.driver.GetDriverInfo().Name, storageframework.CapSingleNodeVolume)
181 }
182 if l.config.ClientNodeSelection.Name != "" {
183 e2eskipper.Skipf("Driver %q requires to deploy on a specific node - skipping", l.driver.GetDriverInfo().Name)
184 }
185 if err := ensureTopologyRequirements(ctx, &l.config.ClientNodeSelection, l.cs, dInfo, 2); err != nil {
186 framework.Failf("Error setting topology requirements: %v", err)
187 }
188
189 var pvcs []*v1.PersistentVolumeClaim
190 numVols := 2
191
192 for i := 0; i < numVols; i++ {
193 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
194 resource := storageframework.CreateVolumeResource(ctx, driver, l.config, pattern, testVolumeSizeRange)
195 l.resources = append(l.resources, resource)
196 pvcs = append(pvcs, resource.Pvc)
197 }
198
199 TestAccessMultipleVolumesAcrossPodRecreation(ctx, l.config.Framework, l.cs, l.ns.Name,
200 l.config.ClientNodeSelection, pvcs, false )
201 })
202
203
204
205
206
207
208 ginkgo.It("should access to two volumes with different volume mode and retain data across pod recreation on the same node", func(ctx context.Context) {
209 if pattern.VolMode == v1.PersistentVolumeFilesystem {
210 e2eskipper.Skipf("Filesystem volume case should be covered by block volume case -- skipping")
211 }
212
213
214
215
216 if pattern.VolType == storageframework.PreprovisionedPV {
217 e2eskipper.Skipf("This test doesn't work with pre-provisioned volume -- skipping")
218 }
219
220 init(ctx)
221 ginkgo.DeferCleanup(cleanup)
222
223 var pvcs []*v1.PersistentVolumeClaim
224 numVols := 2
225
226 for i := 0; i < numVols; i++ {
227 curPattern := pattern
228 if i != 0 {
229
230 curPattern.VolMode = v1.PersistentVolumeFilesystem
231 }
232 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
233 resource := storageframework.CreateVolumeResource(ctx, driver, l.config, curPattern, testVolumeSizeRange)
234 l.resources = append(l.resources, resource)
235 pvcs = append(pvcs, resource.Pvc)
236 }
237
238 TestAccessMultipleVolumesAcrossPodRecreation(ctx, l.config.Framework, l.cs, l.ns.Name,
239 l.config.ClientNodeSelection, pvcs, true )
240 })
241
242
243
244
245
246
247 ginkgo.It("should access to two volumes with different volume mode and retain data across pod recreation on different node", func(ctx context.Context) {
248 if pattern.VolMode == v1.PersistentVolumeFilesystem {
249 e2eskipper.Skipf("Filesystem volume case should be covered by block volume case -- skipping")
250 }
251
252
253
254
255 if pattern.VolType == storageframework.PreprovisionedPV {
256 e2eskipper.Skipf("This test doesn't work with pre-provisioned volume -- skipping")
257 }
258
259 init(ctx)
260 ginkgo.DeferCleanup(cleanup)
261
262
263 if l.driver.GetDriverInfo().Capabilities[storageframework.CapSingleNodeVolume] {
264 e2eskipper.Skipf("Driver %s only supports %v -- skipping", l.driver.GetDriverInfo().Name, storageframework.CapSingleNodeVolume)
265 }
266 if l.config.ClientNodeSelection.Name != "" {
267 e2eskipper.Skipf("Driver %q requires to deploy on a specific node - skipping", l.driver.GetDriverInfo().Name)
268 }
269 if err := ensureTopologyRequirements(ctx, &l.config.ClientNodeSelection, l.cs, dInfo, 2); err != nil {
270 framework.Failf("Error setting topology requirements: %v", err)
271 }
272
273 var pvcs []*v1.PersistentVolumeClaim
274 numVols := 2
275
276 for i := 0; i < numVols; i++ {
277 curPattern := pattern
278 if i != 0 {
279
280 curPattern.VolMode = v1.PersistentVolumeFilesystem
281 }
282 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
283 resource := storageframework.CreateVolumeResource(ctx, driver, l.config, curPattern, testVolumeSizeRange)
284 l.resources = append(l.resources, resource)
285 pvcs = append(pvcs, resource.Pvc)
286 }
287
288 TestAccessMultipleVolumesAcrossPodRecreation(ctx, l.config.Framework, l.cs, l.ns.Name,
289 l.config.ClientNodeSelection, pvcs, false )
290 })
291
292
293
294
295
296
297 ginkgo.It("should concurrently access the single volume from pods on the same node", func(ctx context.Context) {
298 init(ctx)
299 ginkgo.DeferCleanup(cleanup)
300
301 numPods := 2
302
303 if !l.driver.GetDriverInfo().Capabilities[storageframework.CapMultiPODs] {
304 e2eskipper.Skipf("Driver %q does not support multiple concurrent pods - skipping", dInfo.Name)
305 }
306
307
308 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
309 resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
310 l.resources = append(l.resources, resource)
311
312
313 TestConcurrentAccessToSingleVolume(ctx, l.config.Framework, l.cs, l.ns.Name,
314 l.config.ClientNodeSelection, resource.Pvc, numPods, true , false )
315 })
316
317
318
319
320
321
322 f.It("should concurrently access the volume and restored snapshot from pods on the same node [LinuxOnly]", feature.VolumeSnapshotDataSource, feature.VolumeSourceXFS, func(ctx context.Context) {
323 init(ctx)
324 ginkgo.DeferCleanup(cleanup)
325
326 if !l.driver.GetDriverInfo().Capabilities[storageframework.CapSnapshotDataSource] {
327 e2eskipper.Skipf("Driver %q does not support volume snapshots - skipping", dInfo.Name)
328 }
329 if pattern.SnapshotType == "" {
330 e2eskipper.Skipf("Driver %q does not support snapshots - skipping", dInfo.Name)
331 }
332
333
334 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
335 resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
336 l.resources = append(l.resources, resource)
337 pvcs := []*v1.PersistentVolumeClaim{resource.Pvc}
338
339
340 expectedContent := fmt.Sprintf("volume content %d", time.Now().UTC().UnixNano())
341 sDriver, ok := driver.(storageframework.SnapshottableTestDriver)
342 if !ok {
343 framework.Failf("Driver %q has CapSnapshotDataSource but does not implement SnapshottableTestDriver", dInfo.Name)
344 }
345 testConfig := storageframework.ConvertTestConfig(l.config)
346 dc := l.config.Framework.DynamicClient
347 dataSourceRef := prepareSnapshotDataSourceForProvisioning(ctx, f, testConfig, l.config, pattern, l.cs, dc, resource.Pvc, resource.Sc, sDriver, pattern.VolMode, expectedContent)
348
349
350 pvc2 := &v1.PersistentVolumeClaim{
351 ObjectMeta: metav1.ObjectMeta{
352 Name: resource.Pvc.Name + "-restored",
353 Namespace: resource.Pvc.Namespace,
354 },
355 }
356 resource.Pvc.Spec.DeepCopyInto(&pvc2.Spec)
357 pvc2.Spec.VolumeName = ""
358 pvc2.Spec.DataSourceRef = dataSourceRef
359
360 pvc2, err := l.cs.CoreV1().PersistentVolumeClaims(pvc2.Namespace).Create(ctx, pvc2, metav1.CreateOptions{})
361 framework.ExpectNoError(err)
362 pvcs = append(pvcs, pvc2)
363 ginkgo.DeferCleanup(framework.IgnoreNotFound(l.cs.CoreV1().PersistentVolumeClaims(pvc2.Namespace).Delete), pvc2.Name, metav1.DeleteOptions{})
364
365
366 TestConcurrentAccessToRelatedVolumes(ctx, l.config.Framework, l.cs, l.ns.Name, l.config.ClientNodeSelection, pvcs, expectedContent)
367 })
368
369
370
371
372
373
374 f.It("should concurrently access the volume and its clone from pods on the same node [LinuxOnly]", feature.VolumeSourceXFS, func(ctx context.Context) {
375 init(ctx)
376 ginkgo.DeferCleanup(cleanup)
377
378 if !l.driver.GetDriverInfo().Capabilities[storageframework.CapPVCDataSource] {
379 e2eskipper.Skipf("Driver %q does not support volume clone - skipping", dInfo.Name)
380 }
381
382
383 expectedContent := fmt.Sprintf("volume content %d", time.Now().UTC().UnixNano())
384 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
385 resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
386 l.resources = append(l.resources, resource)
387 pvcs := []*v1.PersistentVolumeClaim{resource.Pvc}
388 testConfig := storageframework.ConvertTestConfig(l.config)
389 dataSourceRef := preparePVCDataSourceForProvisioning(ctx, f, testConfig, l.cs, resource.Pvc, resource.Sc, pattern.VolMode, expectedContent)
390
391
392 pvc2 := &v1.PersistentVolumeClaim{
393 ObjectMeta: metav1.ObjectMeta{
394 Name: resource.Pvc.Name + "-cloned",
395 Namespace: resource.Pvc.Namespace,
396 },
397 }
398 resource.Pvc.Spec.DeepCopyInto(&pvc2.Spec)
399 pvc2.Spec.VolumeName = ""
400 pvc2.Spec.DataSourceRef = dataSourceRef
401
402 pvc2, err := l.cs.CoreV1().PersistentVolumeClaims(pvc2.Namespace).Create(ctx, pvc2, metav1.CreateOptions{})
403 framework.ExpectNoError(err)
404 pvcs = append(pvcs, pvc2)
405 ginkgo.DeferCleanup(framework.IgnoreNotFound(l.cs.CoreV1().PersistentVolumeClaims(pvc2.Namespace).Delete), pvc2.Name, metav1.DeleteOptions{})
406
407
408 TestConcurrentAccessToRelatedVolumes(ctx, l.config.Framework, l.cs, l.ns.Name, l.config.ClientNodeSelection, pvcs, expectedContent)
409 })
410
411
412
413
414
415
416 ginkgo.It("should concurrently access the single read-only volume from pods on the same node", func(ctx context.Context) {
417 init(ctx)
418 ginkgo.DeferCleanup(cleanup)
419
420 numPods := 2
421
422 if !l.driver.GetDriverInfo().Capabilities[storageframework.CapMultiPODs] {
423 e2eskipper.Skipf("Driver %q does not support multiple concurrent pods - skipping", dInfo.Name)
424 }
425
426 if l.driver.GetDriverInfo().Name == "vsphere" && reflect.DeepEqual(pattern, storageframework.BlockVolModeDynamicPV) {
427 e2eskipper.Skipf("Driver %q does not support read only raw block volumes - skipping", dInfo.Name)
428 }
429
430
431 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
432 resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
433 l.resources = append(l.resources, resource)
434
435
436 initializeVolume(ctx, l.cs, f.Timeouts, l.ns.Name, resource.Pvc, l.config.ClientNodeSelection)
437
438
439 TestConcurrentAccessToSingleVolume(ctx, l.config.Framework, l.cs, l.ns.Name,
440 l.config.ClientNodeSelection, resource.Pvc, numPods, true , true )
441 })
442
443
444
445
446
447
448 ginkgo.It("should concurrently access the single volume from pods on different node", func(ctx context.Context) {
449 init(ctx)
450 ginkgo.DeferCleanup(cleanup)
451
452 numPods := 2
453
454 if !l.driver.GetDriverInfo().Capabilities[storageframework.CapRWX] {
455 e2eskipper.Skipf("Driver %s doesn't support %v -- skipping", l.driver.GetDriverInfo().Name, storageframework.CapRWX)
456 }
457
458
459 if l.config.ClientNodeSelection.Name != "" {
460 e2eskipper.Skipf("Driver %q requires to deploy on a specific node - skipping", l.driver.GetDriverInfo().Name)
461 }
462
463 if err := ensureTopologyRequirements(ctx, &l.config.ClientNodeSelection, l.cs, dInfo, 2); err != nil {
464 framework.Failf("Error setting topology requirements: %v", err)
465 }
466
467
468 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
469 resource := storageframework.CreateVolumeResource(ctx, l.driver, l.config, pattern, testVolumeSizeRange)
470 l.resources = append(l.resources, resource)
471
472
473 TestConcurrentAccessToSingleVolume(ctx, l.config.Framework, l.cs, l.ns.Name,
474 l.config.ClientNodeSelection, resource.Pvc, numPods, false , false )
475 })
476 }
477
478
479
480 func testAccessMultipleVolumes(ctx context.Context, f *framework.Framework, cs clientset.Interface, ns string,
481 node e2epod.NodeSelection, pvcs []*v1.PersistentVolumeClaim, readSeedBase int64, writeSeedBase int64) string {
482 ginkgo.By(fmt.Sprintf("Creating pod on %+v with multiple volumes", node))
483 podConfig := e2epod.Config{
484 NS: ns,
485 PVCs: pvcs,
486 SeLinuxLabel: e2epod.GetLinuxLabel(),
487 NodeSelection: node,
488 ImageID: e2epod.GetDefaultTestImageID(),
489 }
490 pod, err := e2epod.CreateSecPodWithNodeSelection(ctx, cs, &podConfig, f.Timeouts.PodStart)
491 defer func() {
492 framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, pod))
493 }()
494 framework.ExpectNoError(err)
495
496 byteLen := 64
497 for i, pvc := range pvcs {
498
499 index := i + 1
500 path := fmt.Sprintf("/mnt/volume%d", index)
501 ginkgo.By(fmt.Sprintf("Checking if the volume%d exists as expected volume mode (%s)", index, *pvc.Spec.VolumeMode))
502 e2evolume.CheckVolumeModeOfPath(f, pod, *pvc.Spec.VolumeMode, path)
503
504 if readSeedBase > 0 {
505 ginkgo.By(fmt.Sprintf("Checking if read from the volume%d works properly", index))
506 storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, false, path, byteLen, readSeedBase+int64(i))
507 }
508
509 ginkgo.By(fmt.Sprintf("Checking if write to the volume%d works properly", index))
510 storageutils.CheckWriteToPath(f, pod, *pvc.Spec.VolumeMode, false, path, byteLen, writeSeedBase+int64(i))
511
512 ginkgo.By(fmt.Sprintf("Checking if read from the volume%d works properly", index))
513 storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, false, path, byteLen, writeSeedBase+int64(i))
514 }
515
516 pod, err = cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
517 framework.ExpectNoError(err, "get pod")
518 return pod.Spec.NodeName
519 }
520
521
522
523
524 func TestAccessMultipleVolumesAcrossPodRecreation(ctx context.Context, f *framework.Framework, cs clientset.Interface, ns string,
525 node e2epod.NodeSelection, pvcs []*v1.PersistentVolumeClaim, requiresSameNode bool) {
526
527
528 readSeedBase := int64(-1)
529 writeSeedBase := time.Now().UTC().UnixNano()
530
531 nodeName := testAccessMultipleVolumes(ctx, f, cs, ns, node, pvcs, readSeedBase, writeSeedBase)
532
533
534 if requiresSameNode {
535 e2epod.SetAffinity(&node, nodeName)
536 } else {
537 e2epod.SetAntiAffinity(&node, nodeName)
538 }
539
540
541
542 readSeedBase = writeSeedBase
543
544 writeSeedBase = time.Now().UTC().UnixNano()
545 _ = testAccessMultipleVolumes(ctx, f, cs, ns, node, pvcs, readSeedBase, writeSeedBase)
546 }
547
548
549
550
551
552 func TestConcurrentAccessToSingleVolume(ctx context.Context, f *framework.Framework, cs clientset.Interface, ns string,
553 node e2epod.NodeSelection, pvc *v1.PersistentVolumeClaim, numPods int, requiresSameNode bool,
554 readOnly bool) {
555
556 var pods []*v1.Pod
557
558
559 for i := 0; i < numPods; i++ {
560 index := i + 1
561 ginkgo.By(fmt.Sprintf("Creating pod%d with a volume on %+v", index, node))
562 podConfig := e2epod.Config{
563 NS: ns,
564 PVCs: []*v1.PersistentVolumeClaim{pvc},
565 SeLinuxLabel: e2epod.GetLinuxLabel(),
566 NodeSelection: node,
567 PVCsReadOnly: readOnly,
568 ImageID: e2epod.GetTestImageID(imageutils.JessieDnsutils),
569 }
570 pod, err := e2epod.CreateSecPodWithNodeSelection(ctx, cs, &podConfig, f.Timeouts.PodStart)
571 framework.ExpectNoError(err)
572
573
574
575 defer func() {
576 framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, pod))
577 }()
578 pod, err = cs.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
579 pods = append(pods, pod)
580 framework.ExpectNoError(err, fmt.Sprintf("get pod%d", index))
581 actualNodeName := pod.Spec.NodeName
582
583
584 if requiresSameNode {
585 e2epod.SetAffinity(&node, actualNodeName)
586 } else {
587 e2epod.SetAntiAffinity(&node, actualNodeName)
588 }
589 }
590
591 path := "/mnt/volume1"
592
593 var seed int64
594 byteLen := 64
595 directIO := false
596
597 if *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
598 if len(pods) < 1 {
599 framework.Failf("Number of pods shouldn't be less than 1, but got %d", len(pods))
600 }
601
602 byteLen = storageutils.GetSectorSize(f, pods[0], path)
603 directIO = true
604 }
605
606
607 for i, pod := range pods {
608 index := i + 1
609 ginkgo.By(fmt.Sprintf("Checking if the volume in pod%d exists as expected volume mode (%s)", index, *pvc.Spec.VolumeMode))
610 e2evolume.CheckVolumeModeOfPath(f, pod, *pvc.Spec.VolumeMode, path)
611
612 if readOnly {
613 ginkgo.By("Skipping volume content checks, volume is read-only")
614 continue
615 }
616
617 if i != 0 {
618 ginkgo.By(fmt.Sprintf("From pod%d, checking if reading the data that pod%d write works properly", index, index-1))
619
620 storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
621 }
622
623
624 seed = time.Now().UTC().UnixNano()
625
626 ginkgo.By(fmt.Sprintf("Checking if write to the volume in pod%d works properly", index))
627 storageutils.CheckWriteToPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
628
629 ginkgo.By(fmt.Sprintf("Checking if read from the volume in pod%d works properly", index))
630 storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
631 }
632
633 if len(pods) < 2 {
634 framework.Failf("Number of pods shouldn't be less than 2, but got %d", len(pods))
635 }
636
637 lastPod := pods[len(pods)-1]
638 framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, lastPod))
639 pods = pods[:len(pods)-1]
640
641
642 for i, pod := range pods {
643 index := i + 1
644
645 ginkgo.By(fmt.Sprintf("Rechecking if the volume in pod%d exists as expected volume mode (%s)", index, *pvc.Spec.VolumeMode))
646 e2evolume.CheckVolumeModeOfPath(f, pod, *pvc.Spec.VolumeMode, "/mnt/volume1")
647
648 if readOnly {
649 ginkgo.By("Skipping volume content checks, volume is read-only")
650 continue
651 }
652
653 if i == 0 {
654
655 ginkgo.By(fmt.Sprintf("From pod%d, rechecking if reading the data that last pod write works properly", index))
656 } else {
657 ginkgo.By(fmt.Sprintf("From pod%d, rechecking if reading the data that pod%d write works properly", index, index-1))
658 }
659 storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
660
661
662 seed = time.Now().UTC().UnixNano()
663
664 ginkgo.By(fmt.Sprintf("Rechecking if write to the volume in pod%d works properly", index))
665 storageutils.CheckWriteToPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
666
667 ginkgo.By(fmt.Sprintf("Rechecking if read from the volume in pod%d works properly", index))
668 storageutils.CheckReadFromPath(f, pod, *pvc.Spec.VolumeMode, directIO, path, byteLen, seed)
669 }
670 }
671
672
673
674
675 func TestConcurrentAccessToRelatedVolumes(ctx context.Context, f *framework.Framework, cs clientset.Interface, ns string,
676 node e2epod.NodeSelection, pvcs []*v1.PersistentVolumeClaim, expectedContent string) {
677
678 var pods []*v1.Pod
679
680
681 for i := range pvcs {
682 index := i + 1
683 ginkgo.By(fmt.Sprintf("Creating pod%d with a volume on %+v", index, node))
684 podConfig := e2epod.Config{
685 NS: ns,
686 PVCs: []*v1.PersistentVolumeClaim{pvcs[i]},
687 SeLinuxLabel: e2epod.GetLinuxLabel(),
688 NodeSelection: node,
689 PVCsReadOnly: false,
690 ImageID: e2epod.GetTestImageID(imageutils.JessieDnsutils),
691 }
692 pod, err := e2epod.CreateSecPodWithNodeSelection(ctx, cs, &podConfig, f.Timeouts.PodStart)
693 defer func() {
694 framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, pod))
695 }()
696 framework.ExpectNoError(err)
697 pods = append(pods, pod)
698 actualNodeName := pod.Spec.NodeName
699
700
701 e2epod.SetAffinity(&node, actualNodeName)
702 }
703
704 for i, pvc := range pvcs {
705 var commands []string
706
707 if *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
708 fileName := "/mnt/volume1"
709 commands = e2evolume.GenerateReadBlockCmd(fileName, len(expectedContent))
710
711 index := i + 1
712 ginkgo.By(fmt.Sprintf("Checking if the volume in pod%d has expected initial content", index))
713 _, err := e2eoutput.LookForStringInPodExec(pods[i].Namespace, pods[i].Name, commands, expectedContent, time.Minute)
714 framework.ExpectNoError(err, "failed: finding the contents of the block volume %s.", fileName)
715 } else {
716 fileName := "/mnt/volume1/index.html"
717 commands = e2evolume.GenerateReadFileCmd(fileName)
718
719 index := i + 1
720 ginkgo.By(fmt.Sprintf("Checking if the volume in pod%d has expected initial content", index))
721 _, err := e2eoutput.LookForStringInPodExec(pods[i].Namespace, pods[i].Name, commands, expectedContent, time.Minute)
722 framework.ExpectNoError(err, "failed: finding the contents of the mounted file %s.", fileName)
723 }
724 }
725 }
726
727
728 func getCurrentTopologiesNumber(cs clientset.Interface, nodes *v1.NodeList, keys []string) ([]topology, []int, error) {
729 topos := []topology{}
730 topoCount := []int{}
731
732
733 for _, n := range nodes.Items {
734 topo := map[string]string{}
735 for _, k := range keys {
736 v, ok := n.Labels[k]
737 if ok {
738 topo[k] = v
739 }
740 }
741
742 found := false
743 for i, existingTopo := range topos {
744 if topologyEqual(existingTopo, topo) {
745 found = true
746 topoCount[i]++
747 break
748 }
749 }
750 if !found && len(topo) > 0 {
751 framework.Logf("found topology %v", topo)
752 topos = append(topos, topo)
753 topoCount = append(topoCount, 1)
754 }
755 }
756 return topos, topoCount, nil
757 }
758
759
760
761 func ensureTopologyRequirements(ctx context.Context, nodeSelection *e2epod.NodeSelection, cs clientset.Interface, driverInfo *storageframework.DriverInfo, minCount int) error {
762 nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
763 framework.ExpectNoError(err)
764 if len(nodes.Items) < minCount {
765 e2eskipper.Skipf(fmt.Sprintf("Number of available nodes is less than %d - skipping", minCount))
766 }
767
768 topologyKeys := driverInfo.TopologyKeys
769 if len(topologyKeys) == 0 {
770
771 return nil
772 }
773
774 topologyList, topologyCount, err := getCurrentTopologiesNumber(cs, nodes, topologyKeys)
775 if err != nil {
776 return err
777 }
778 suitableTopologies := []topology{}
779 for i, topo := range topologyList {
780 if topologyCount[i] >= minCount {
781 suitableTopologies = append(suitableTopologies, topo)
782 }
783 }
784 if len(suitableTopologies) == 0 {
785 e2eskipper.Skipf("No topology with at least %d nodes found - skipping", minCount)
786 }
787
788 e2epod.SetNodeAffinityTopologyRequirement(nodeSelection, suitableTopologies[0])
789 return nil
790 }
791
792
793 func initializeVolume(ctx context.Context, cs clientset.Interface, t *framework.TimeoutContext, ns string, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection) {
794 if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock {
795
796 return
797 }
798
799 ginkgo.By(fmt.Sprintf("Initializing a filesystem on PVC %s", pvc.Name))
800
801
802 podConfig := e2epod.Config{
803 NS: ns,
804 PVCs: []*v1.PersistentVolumeClaim{pvc},
805 SeLinuxLabel: e2epod.GetLinuxLabel(),
806 NodeSelection: node,
807 ImageID: e2epod.GetDefaultTestImageID(),
808 }
809 pod, err := e2epod.CreateSecPod(ctx, cs, &podConfig, t.PodStart)
810 defer func() {
811 framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, cs, pod))
812 }()
813 framework.ExpectNoError(err)
814 }
815
View as plain text