1
16
17 package storage
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "math/rand"
24 "path/filepath"
25 "strconv"
26 "strings"
27 "sync"
28 "time"
29
30 "github.com/onsi/ginkgo/v2"
31 "github.com/onsi/gomega"
32
33 appsv1 "k8s.io/api/apps/v1"
34 v1 "k8s.io/api/core/v1"
35 storagev1 "k8s.io/api/storage/v1"
36 apierrors "k8s.io/apimachinery/pkg/api/errors"
37 "k8s.io/apimachinery/pkg/api/resource"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 utilerrors "k8s.io/apimachinery/pkg/util/errors"
40 "k8s.io/apimachinery/pkg/util/sets"
41 "k8s.io/apimachinery/pkg/util/wait"
42 "k8s.io/apimachinery/pkg/watch"
43 clientset "k8s.io/client-go/kubernetes"
44 "k8s.io/kubernetes/test/e2e/framework"
45 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
46 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
47 e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
48 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
49 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
50 e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset"
51 e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
52 "k8s.io/kubernetes/test/e2e/storage/utils"
53 imageutils "k8s.io/kubernetes/test/utils/image"
54 admissionapi "k8s.io/pod-security-admission/api"
55 )
56
57 type localTestConfig struct {
58 ns string
59 nodes []v1.Node
60 randomNode *v1.Node
61 client clientset.Interface
62 timeouts *framework.TimeoutContext
63 scName string
64 discoveryDir string
65 hostExec utils.HostExec
66 ltrMgr utils.LocalTestResourceManager
67 }
68
69 type localVolumeType string
70
71 const (
72
73 DirectoryLocalVolumeType localVolumeType = "dir"
74
75
76 DirectoryLinkLocalVolumeType localVolumeType = "dir-link"
77
78
79 DirectoryBindMountedLocalVolumeType localVolumeType = "dir-bindmounted"
80
81
82
83
84 DirectoryLinkBindMountedLocalVolumeType localVolumeType = "dir-link-bindmounted"
85
86 TmpfsLocalVolumeType localVolumeType = "tmpfs"
87
88 GCELocalSSDVolumeType localVolumeType = "gce-localssd-scsi-fs"
89
90 BlockLocalVolumeType localVolumeType = "block"
91
92
93 BlockFsWithFormatLocalVolumeType localVolumeType = "blockfswithformat"
94
95
96 BlockFsWithoutFormatLocalVolumeType localVolumeType = "blockfswithoutformat"
97 )
98
99
100 var setupLocalVolumeMap = map[localVolumeType]utils.LocalVolumeType{
101 GCELocalSSDVolumeType: utils.LocalVolumeGCELocalSSD,
102 TmpfsLocalVolumeType: utils.LocalVolumeTmpfs,
103 DirectoryLocalVolumeType: utils.LocalVolumeDirectory,
104 DirectoryLinkLocalVolumeType: utils.LocalVolumeDirectoryLink,
105 DirectoryBindMountedLocalVolumeType: utils.LocalVolumeDirectoryBindMounted,
106 DirectoryLinkBindMountedLocalVolumeType: utils.LocalVolumeDirectoryLinkBindMounted,
107 BlockLocalVolumeType: utils.LocalVolumeBlock,
108 BlockFsWithFormatLocalVolumeType: utils.LocalVolumeBlockFS,
109 BlockFsWithoutFormatLocalVolumeType: utils.LocalVolumeBlock,
110 }
111
112 type localTestVolume struct {
113
114 ltr *utils.LocalTestResource
115
116 pvc *v1.PersistentVolumeClaim
117
118 pv *v1.PersistentVolume
119
120 localVolumeType localVolumeType
121 }
122
123 const (
124
125 hostBase = "/tmp"
126
127
128
129 volumeDir = "/mnt/volume1"
130
131 testFile = "test-file"
132
133 testFileContent = "test-file-content"
134 testSCPrefix = "local-volume-test-storageclass"
135
136
137 testRequestSize = "10Mi"
138
139
140 maxNodes = 5
141 )
142
143 var (
144
145 waitMode = storagev1.VolumeBindingWaitForFirstConsumer
146 immediateMode = storagev1.VolumeBindingImmediate
147
148
149 selinuxLabel = &v1.SELinuxOptions{
150 Level: "s0:c0,c1"}
151 )
152
153 var _ = utils.SIGDescribe("PersistentVolumes-local", func() {
154 f := framework.NewDefaultFramework("persistent-local-volumes-test")
155 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
156
157 var (
158 config *localTestConfig
159 scName string
160 )
161
162 ginkgo.BeforeEach(func(ctx context.Context) {
163 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
164 framework.ExpectNoError(err)
165
166 scName = fmt.Sprintf("%v-%v", testSCPrefix, f.Namespace.Name)
167
168 randomNode := &nodes.Items[rand.Intn(len(nodes.Items))]
169
170 hostExec := utils.NewHostExec(f)
171 ltrMgr := utils.NewLocalResourceManager("local-volume-test", hostExec, hostBase)
172 config = &localTestConfig{
173 ns: f.Namespace.Name,
174 client: f.ClientSet,
175 timeouts: f.Timeouts,
176 nodes: nodes.Items,
177 randomNode: randomNode,
178 scName: scName,
179 discoveryDir: filepath.Join(hostBase, f.Namespace.Name),
180 hostExec: hostExec,
181 ltrMgr: ltrMgr,
182 }
183 })
184
185 for tempTestVolType := range setupLocalVolumeMap {
186
187
188 testVolType := tempTestVolType
189 args := []interface{}{fmt.Sprintf("[Volume type: %s]", testVolType)}
190 if testVolType == GCELocalSSDVolumeType {
191 args = append(args, framework.WithSerial())
192 }
193 testMode := immediateMode
194
195 args = append(args, func() {
196 var testVol *localTestVolume
197
198 ginkgo.BeforeEach(func(ctx context.Context) {
199 if testVolType == GCELocalSSDVolumeType {
200 SkipUnlessLocalSSDExists(ctx, config, "scsi", "fs", config.randomNode)
201 }
202 setupStorageClass(ctx, config, &testMode)
203 testVols := setupLocalVolumesPVCsPVs(ctx, config, testVolType, config.randomNode, 1, testMode)
204 if len(testVols) > 0 {
205 testVol = testVols[0]
206 } else {
207 framework.Failf("Failed to get a test volume")
208 }
209 })
210
211 ginkgo.AfterEach(func(ctx context.Context) {
212 if testVol != nil {
213 cleanupLocalVolumes(ctx, config, []*localTestVolume{testVol})
214 cleanupStorageClass(ctx, config)
215 } else {
216 framework.Failf("no test volume to cleanup")
217 }
218 })
219
220 ginkgo.Context("One pod requesting one prebound PVC", func() {
221 var (
222 pod1 *v1.Pod
223 pod1Err error
224 )
225
226 ginkgo.BeforeEach(func(ctx context.Context) {
227 ginkgo.By("Creating pod1")
228 pod1, pod1Err = createLocalPod(ctx, config, testVol, nil)
229 framework.ExpectNoError(pod1Err)
230 verifyLocalPod(ctx, config, testVol, pod1, config.randomNode.Name)
231
232 writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
233
234 ginkgo.By("Writing in pod1")
235 podRWCmdExec(f, pod1, writeCmd)
236 })
237
238 ginkgo.AfterEach(func(ctx context.Context) {
239 ginkgo.By("Deleting pod1")
240 e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod1.Name)
241 })
242
243 ginkgo.It("should be able to mount volume and read from pod1", func(ctx context.Context) {
244 ginkgo.By("Reading in pod1")
245
246 testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType)
247 })
248
249 ginkgo.It("should be able to mount volume and write from pod1", func(ctx context.Context) {
250
251 testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType)
252
253 ginkgo.By("Writing in pod1")
254 writeCmd := createWriteCmd(volumeDir, testFile, testVol.ltr.Path , testVolType)
255 podRWCmdExec(f, pod1, writeCmd)
256 })
257 })
258
259 ginkgo.Context("Two pods mounting a local volume at the same time", func() {
260 ginkgo.It("should be able to write from pod1 and read from pod2", func(ctx context.Context) {
261 twoPodsReadWriteTest(ctx, f, config, testVol)
262 })
263 })
264
265 ginkgo.Context("Two pods mounting a local volume one after the other", func() {
266 ginkgo.It("should be able to write from pod1 and read from pod2", func(ctx context.Context) {
267 twoPodsReadWriteSerialTest(ctx, f, config, testVol)
268 })
269 })
270
271 ginkgo.Context("Set fsGroup for local volume", func() {
272 ginkgo.BeforeEach(func() {
273 if testVolType == BlockLocalVolumeType {
274 e2eskipper.Skipf("We don't set fsGroup on block device, skipped.")
275 }
276 })
277
278 f.It("should set fsGroup for one pod", f.WithSlow(), func(ctx context.Context) {
279 ginkgo.By("Checking fsGroup is set")
280 pod := createPodWithFsGroupTest(ctx, config, testVol, 1234, 1234)
281 ginkgo.By("Deleting pod")
282 e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod.Name)
283 })
284
285 f.It("should set same fsGroup for two pods simultaneously", f.WithSlow(), func(ctx context.Context) {
286 fsGroup := int64(1234)
287 ginkgo.By("Create first pod and check fsGroup is set")
288 pod1 := createPodWithFsGroupTest(ctx, config, testVol, fsGroup, fsGroup)
289 ginkgo.By("Create second pod with same fsGroup and check fsGroup is correct")
290 pod2 := createPodWithFsGroupTest(ctx, config, testVol, fsGroup, fsGroup)
291 ginkgo.By("Deleting first pod")
292 e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod1.Name)
293 ginkgo.By("Deleting second pod")
294 e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod2.Name)
295 })
296
297 f.It("should set different fsGroup for second pod if first pod is deleted", f.WithFlaky(), func(ctx context.Context) {
298
299 fsGroup1, fsGroup2 := int64(1234), int64(4321)
300 ginkgo.By("Create first pod and check fsGroup is set")
301 pod1 := createPodWithFsGroupTest(ctx, config, testVol, fsGroup1, fsGroup1)
302 ginkgo.By("Deleting first pod")
303 err := e2epod.DeletePodWithWait(ctx, config.client, pod1)
304 framework.ExpectNoError(err, "while deleting first pod")
305 ginkgo.By("Create second pod and check fsGroup is the new one")
306 pod2 := createPodWithFsGroupTest(ctx, config, testVol, fsGroup2, fsGroup2)
307 ginkgo.By("Deleting second pod")
308 e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod2.Name)
309 })
310 })
311 })
312 f.Context(args...)
313 }
314
315 f.Context("Local volume that cannot be mounted", f.WithSlow(), func() {
316
317
318 ginkgo.It("should fail due to non-existent path", func(ctx context.Context) {
319 testVol := &localTestVolume{
320 ltr: &utils.LocalTestResource{
321 Node: config.randomNode,
322 Path: "/non-existent/location/nowhere",
323 },
324 localVolumeType: DirectoryLocalVolumeType,
325 }
326 ginkgo.By("Creating local PVC and PV")
327 createLocalPVCsPVs(ctx, config, []*localTestVolume{testVol}, immediateMode)
328 pod, err := createLocalPod(ctx, config, testVol, nil)
329 framework.ExpectError(err)
330 err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, config.client, pod.Name, pod.Namespace, f.Timeouts.PodStart)
331 framework.ExpectError(err)
332 cleanupLocalPVCsPVs(ctx, config, []*localTestVolume{testVol})
333 })
334
335 ginkgo.It("should fail due to wrong node", func(ctx context.Context) {
336 if len(config.nodes) < 2 {
337 e2eskipper.Skipf("Runs only when number of nodes >= 2")
338 }
339
340 testVols := setupLocalVolumesPVCsPVs(ctx, config, DirectoryLocalVolumeType, config.randomNode, 1, immediateMode)
341 testVol := testVols[0]
342
343 conflictNodeName := config.nodes[0].Name
344 if conflictNodeName == config.randomNode.Name {
345 conflictNodeName = config.nodes[1].Name
346 }
347 pod := makeLocalPodWithNodeName(config, testVol, conflictNodeName)
348 pod, err := config.client.CoreV1().Pods(config.ns).Create(ctx, pod, metav1.CreateOptions{})
349 framework.ExpectNoError(err)
350
351 err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, config.client, pod.Name, pod.Namespace, f.Timeouts.PodStart)
352 framework.ExpectError(err)
353
354 cleanupLocalVolumes(ctx, config, []*localTestVolume{testVol})
355 })
356 })
357
358 ginkgo.Context("Pod with node different from PV's NodeAffinity", func() {
359 var (
360 testVol *localTestVolume
361 volumeType localVolumeType
362 conflictNodeName string
363 )
364
365 ginkgo.BeforeEach(func(ctx context.Context) {
366 if len(config.nodes) < 2 {
367 e2eskipper.Skipf("Runs only when number of nodes >= 2")
368 }
369
370 volumeType = DirectoryLocalVolumeType
371 setupStorageClass(ctx, config, &immediateMode)
372 testVols := setupLocalVolumesPVCsPVs(ctx, config, volumeType, config.randomNode, 1, immediateMode)
373 conflictNodeName = config.nodes[0].Name
374 if conflictNodeName == config.randomNode.Name {
375 conflictNodeName = config.nodes[1].Name
376 }
377
378 testVol = testVols[0]
379 })
380
381 ginkgo.AfterEach(func(ctx context.Context) {
382 cleanupLocalVolumes(ctx, config, []*localTestVolume{testVol})
383 cleanupStorageClass(ctx, config)
384 })
385
386 ginkgo.It("should fail scheduling due to different NodeAffinity", func(ctx context.Context) {
387 testPodWithNodeConflict(ctx, config, testVol, conflictNodeName, makeLocalPodWithNodeAffinity)
388 })
389
390 ginkgo.It("should fail scheduling due to different NodeSelector", func(ctx context.Context) {
391 testPodWithNodeConflict(ctx, config, testVol, conflictNodeName, makeLocalPodWithNodeSelector)
392 })
393 })
394
395 f.Context("StatefulSet with pod affinity", f.WithSlow(), func() {
396 var testVols map[string][]*localTestVolume
397 const (
398 ssReplicas = 3
399 volsPerNode = 6
400 )
401
402 ginkgo.BeforeEach(func(ctx context.Context) {
403 setupStorageClass(ctx, config, &waitMode)
404
405 testVols = map[string][]*localTestVolume{}
406 for i, node := range config.nodes {
407
408 ginkgo.By(fmt.Sprintf("Setting up local volumes on node %q", node.Name))
409 vols := setupLocalVolumesPVCsPVs(ctx, config, DirectoryLocalVolumeType, &config.nodes[i], volsPerNode, waitMode)
410 testVols[node.Name] = vols
411 }
412 })
413
414 ginkgo.AfterEach(func(ctx context.Context) {
415 for _, vols := range testVols {
416 cleanupLocalVolumes(ctx, config, vols)
417 }
418 cleanupStorageClass(ctx, config)
419 })
420
421 ginkgo.It("should use volumes spread across nodes when pod has anti-affinity", func(ctx context.Context) {
422 if len(config.nodes) < ssReplicas {
423 e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas)
424 }
425 ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
426 ss := createStatefulSet(ctx, config, ssReplicas, volsPerNode, true, false)
427 validateStatefulSet(ctx, config, ss, true)
428 })
429
430 ginkgo.It("should use volumes on one node when pod has affinity", func(ctx context.Context) {
431 ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
432 ss := createStatefulSet(ctx, config, ssReplicas, volsPerNode/ssReplicas, false, false)
433 validateStatefulSet(ctx, config, ss, false)
434 })
435
436 ginkgo.It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func(ctx context.Context) {
437 if len(config.nodes) < ssReplicas {
438 e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas)
439 }
440 ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
441 ss := createStatefulSet(ctx, config, ssReplicas, 1, true, true)
442 validateStatefulSet(ctx, config, ss, true)
443 })
444
445 ginkgo.It("should use volumes on one node when pod management is parallel and pod has affinity", func(ctx context.Context) {
446 ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
447 ss := createStatefulSet(ctx, config, ssReplicas, 1, false, true)
448 validateStatefulSet(ctx, config, ss, false)
449 })
450 })
451
452 f.Context("Stress with local volumes", f.WithSerial(), func() {
453 var (
454 allLocalVolumes = make(map[string][]*localTestVolume)
455 volType = TmpfsLocalVolumeType
456 )
457
458 const (
459 volsPerNode = 10
460 volsPerPod = 3
461 podsFactor = 4
462 )
463
464 ginkgo.BeforeEach(func(ctx context.Context) {
465 setupStorageClass(ctx, config, &waitMode)
466 ginkgo.DeferCleanup(cleanupStorageClass, config)
467
468 for i, node := range config.nodes {
469 ginkgo.By(fmt.Sprintf("Setting up %d local volumes on node %q", volsPerNode, node.Name))
470 allLocalVolumes[node.Name] = setupLocalVolumes(ctx, config, volType, &config.nodes[i], volsPerNode)
471 }
472 ginkgo.By(fmt.Sprintf("Create %d PVs", volsPerNode*len(config.nodes)))
473 var err error
474 for _, localVolumes := range allLocalVolumes {
475 for _, localVolume := range localVolumes {
476 pvConfig := makeLocalPVConfig(config, localVolume)
477 localVolume.pv, err = e2epv.CreatePV(ctx, config.client, f.Timeouts, e2epv.MakePersistentVolume(pvConfig))
478 framework.ExpectNoError(err)
479 }
480 }
481 ginkgo.DeferCleanup(func(ctx context.Context) {
482 ginkgo.By("Clean all PVs")
483 for nodeName, localVolumes := range allLocalVolumes {
484 ginkgo.By(fmt.Sprintf("Cleaning up %d local volumes on node %q", len(localVolumes), nodeName))
485 cleanupLocalVolumes(ctx, config, localVolumes)
486 }
487 })
488 ginkgo.By("Start a goroutine to recycle unbound PVs")
489 backgroundCtx, cancel := context.WithCancel(context.Background())
490 var wg sync.WaitGroup
491 wg.Add(1)
492 ginkgo.DeferCleanup(func() {
493 ginkgo.By("Stop and wait for recycle goroutine to finish")
494 cancel()
495 wg.Wait()
496 })
497 go func() {
498 defer ginkgo.GinkgoRecover()
499 defer wg.Done()
500 w, err := config.client.CoreV1().PersistentVolumes().Watch(backgroundCtx, metav1.ListOptions{})
501 framework.ExpectNoError(err)
502 if w == nil {
503 return
504 }
505 defer w.Stop()
506 for {
507 select {
508 case event := <-w.ResultChan():
509 if event.Type != watch.Modified {
510 continue
511 }
512 pv, ok := event.Object.(*v1.PersistentVolume)
513 if !ok {
514 continue
515 }
516 if pv.Status.Phase == v1.VolumeBound || pv.Status.Phase == v1.VolumeAvailable {
517 continue
518 }
519 pv, err = config.client.CoreV1().PersistentVolumes().Get(backgroundCtx, pv.Name, metav1.GetOptions{})
520 if apierrors.IsNotFound(err) || errors.Is(err, context.Canceled) {
521 continue
522 }
523
524 ginkgo.By(fmt.Sprintf("Delete %q and create a new PV for same local volume storage", pv.Name))
525 for _, localVolumes := range allLocalVolumes {
526 for _, localVolume := range localVolumes {
527 if localVolume.pv.Name != pv.Name {
528 continue
529 }
530 err = config.client.CoreV1().PersistentVolumes().Delete(backgroundCtx, pv.Name, metav1.DeleteOptions{})
531 if apierrors.IsNotFound(err) || errors.Is(err, context.Canceled) {
532 continue
533 }
534 framework.ExpectNoError(err)
535 pvConfig := makeLocalPVConfig(config, localVolume)
536 localVolume.pv, err = e2epv.CreatePV(backgroundCtx, config.client, f.Timeouts, e2epv.MakePersistentVolume(pvConfig))
537 if errors.Is(err, context.Canceled) {
538 continue
539 }
540 framework.ExpectNoError(err)
541 }
542 }
543 case <-backgroundCtx.Done():
544 return
545 }
546 }
547 }()
548 })
549
550 ginkgo.It("should be able to process many pods and reuse local volumes", func(ctx context.Context) {
551 var (
552 podsLock sync.Mutex
553
554 numConcurrentPods = volsPerNode/volsPerPod*len(config.nodes) + 1
555 totalPods = numConcurrentPods * podsFactor
556 numCreated = 0
557 numFinished = 0
558 pods = map[string]*v1.Pod{}
559 )
560
561
562
563 ginkgo.By(fmt.Sprintf("Creating %v pods periodically", numConcurrentPods))
564 stop := make(chan struct{})
565 go wait.Until(func() {
566 defer ginkgo.GinkgoRecover()
567 podsLock.Lock()
568 defer podsLock.Unlock()
569
570 if numCreated >= totalPods {
571
572 return
573 }
574
575 if len(pods) > numConcurrentPods/2 {
576
577 return
578 }
579
580 for i := 0; i < numConcurrentPods; i++ {
581 pvcs := []*v1.PersistentVolumeClaim{}
582 for j := 0; j < volsPerPod; j++ {
583 pvc := e2epv.MakePersistentVolumeClaim(makeLocalPVCConfig(config, volType), config.ns)
584 pvc, err := e2epv.CreatePVC(ctx, config.client, config.ns, pvc)
585 framework.ExpectNoError(err)
586 pvcs = append(pvcs, pvc)
587 }
588 podConfig := e2epod.Config{
589 NS: config.ns,
590 PVCs: pvcs,
591 Command: "sleep 1",
592 SeLinuxLabel: selinuxLabel,
593 }
594 pod, err := e2epod.MakeSecPod(&podConfig)
595 framework.ExpectNoError(err)
596 pod, err = config.client.CoreV1().Pods(config.ns).Create(ctx, pod, metav1.CreateOptions{})
597 framework.ExpectNoError(err)
598 pods[pod.Name] = pod
599 numCreated++
600 }
601 }, 2*time.Second, stop)
602
603 defer func() {
604 close(stop)
605 podsLock.Lock()
606 defer podsLock.Unlock()
607
608 for _, pod := range pods {
609 if err := deletePodAndPVCs(ctx, config, pod); err != nil {
610 framework.Logf("Deleting pod %v failed: %v", pod.Name, err)
611 }
612 }
613 }()
614
615 ginkgo.By("Waiting for all pods to complete successfully")
616 const completeTimeout = 5 * time.Minute
617 waitErr := wait.PollUntilContextTimeout(ctx, time.Second, completeTimeout, true, func(ctx context.Context) (done bool, err error) {
618 podsList, err := config.client.CoreV1().Pods(config.ns).List(ctx, metav1.ListOptions{})
619 if err != nil {
620 return false, err
621 }
622
623 podsLock.Lock()
624 defer podsLock.Unlock()
625
626 for _, pod := range podsList.Items {
627 if pod.Status.Phase == v1.PodSucceeded {
628
629 if err := deletePodAndPVCs(ctx, config, &pod); err != nil {
630 return false, err
631 }
632 delete(pods, pod.Name)
633 numFinished++
634 framework.Logf("%v/%v pods finished", numFinished, totalPods)
635 }
636 }
637
638 return numFinished == totalPods, nil
639 })
640 framework.ExpectNoError(waitErr, "some pods failed to complete within %v", completeTimeout)
641 })
642 })
643 })
644
645 func deletePodAndPVCs(ctx context.Context, config *localTestConfig, pod *v1.Pod) error {
646 framework.Logf("Deleting pod %v", pod.Name)
647 if err := config.client.CoreV1().Pods(config.ns).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
648 return err
649 }
650
651
652 for _, vol := range pod.Spec.Volumes {
653 pvcSource := vol.VolumeSource.PersistentVolumeClaim
654 if pvcSource != nil {
655 if err := e2epv.DeletePersistentVolumeClaim(ctx, config.client, pvcSource.ClaimName, config.ns); err != nil {
656 return err
657 }
658 }
659 }
660 return nil
661 }
662
663 type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod
664
665 func testPodWithNodeConflict(ctx context.Context, config *localTestConfig, testVol *localTestVolume, nodeName string, makeLocalPodFunc makeLocalPodWith) {
666 ginkgo.By(fmt.Sprintf("local-volume-type: %s", testVol.localVolumeType))
667
668 pod := makeLocalPodFunc(config, testVol, nodeName)
669 pod, err := config.client.CoreV1().Pods(config.ns).Create(ctx, pod, metav1.CreateOptions{})
670 framework.ExpectNoError(err)
671
672 err = e2epod.WaitForPodNameUnschedulableInNamespace(ctx, config.client, pod.Name, pod.Namespace)
673 framework.ExpectNoError(err)
674 }
675
676
677
678
679 func twoPodsReadWriteTest(ctx context.Context, f *framework.Framework, config *localTestConfig, testVol *localTestVolume) {
680 ginkgo.By("Creating pod1 to write to the PV")
681 pod1, pod1Err := createLocalPod(ctx, config, testVol, nil)
682 framework.ExpectNoError(pod1Err)
683 verifyLocalPod(ctx, config, testVol, pod1, config.randomNode.Name)
684
685 writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
686
687 ginkgo.By("Writing in pod1")
688 podRWCmdExec(f, pod1, writeCmd)
689
690
691 testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
692
693 ginkgo.By("Creating pod2 to read from the PV")
694 pod2, pod2Err := createLocalPod(ctx, config, testVol, nil)
695 framework.ExpectNoError(pod2Err)
696 verifyLocalPod(ctx, config, testVol, pod2, config.randomNode.Name)
697
698
699 testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
700
701 writeCmd = createWriteCmd(volumeDir, testFile, testVol.ltr.Path , testVol.localVolumeType)
702
703 ginkgo.By("Writing in pod2")
704 podRWCmdExec(f, pod2, writeCmd)
705
706 ginkgo.By("Reading in pod1")
707 testReadFileContent(f, volumeDir, testFile, testVol.ltr.Path, pod1, testVol.localVolumeType)
708
709 ginkgo.By("Deleting pod1")
710 e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod1.Name)
711 ginkgo.By("Deleting pod2")
712 e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod2.Name)
713 }
714
715
716 func twoPodsReadWriteSerialTest(ctx context.Context, f *framework.Framework, config *localTestConfig, testVol *localTestVolume) {
717 ginkgo.By("Creating pod1")
718 pod1, pod1Err := createLocalPod(ctx, config, testVol, nil)
719 framework.ExpectNoError(pod1Err)
720 verifyLocalPod(ctx, config, testVol, pod1, config.randomNode.Name)
721
722 writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
723
724 ginkgo.By("Writing in pod1")
725 podRWCmdExec(f, pod1, writeCmd)
726
727
728 testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
729
730 ginkgo.By("Deleting pod1")
731 e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod1.Name)
732
733 ginkgo.By("Creating pod2")
734 pod2, pod2Err := createLocalPod(ctx, config, testVol, nil)
735 framework.ExpectNoError(pod2Err)
736 verifyLocalPod(ctx, config, testVol, pod2, config.randomNode.Name)
737
738 ginkgo.By("Reading in pod2")
739 testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
740
741 ginkgo.By("Deleting pod2")
742 e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod2.Name)
743 }
744
745
746 func createPodWithFsGroupTest(ctx context.Context, config *localTestConfig, testVol *localTestVolume, fsGroup int64, expectedFsGroup int64) *v1.Pod {
747 pod, err := createLocalPod(ctx, config, testVol, &fsGroup)
748 framework.ExpectNoError(err)
749 _, err = e2eoutput.LookForStringInPodExec(config.ns, pod.Name, []string{"stat", "-c", "%g", volumeDir}, strconv.FormatInt(expectedFsGroup, 10), time.Second*3)
750 framework.ExpectNoError(err, "failed to get expected fsGroup %d on directory %s in pod %s", fsGroup, volumeDir, pod.Name)
751 return pod
752 }
753
754 func setupStorageClass(ctx context.Context, config *localTestConfig, mode *storagev1.VolumeBindingMode) {
755 sc := &storagev1.StorageClass{
756 ObjectMeta: metav1.ObjectMeta{
757 Name: config.scName,
758 },
759 Provisioner: "kubernetes.io/no-provisioner",
760 VolumeBindingMode: mode,
761 }
762
763 _, err := config.client.StorageV1().StorageClasses().Create(ctx, sc, metav1.CreateOptions{})
764 framework.ExpectNoError(err)
765 }
766
767 func cleanupStorageClass(ctx context.Context, config *localTestConfig) {
768 framework.ExpectNoError(config.client.StorageV1().StorageClasses().Delete(ctx, config.scName, metav1.DeleteOptions{}))
769 }
770
771
772 func podNodeName(ctx context.Context, config *localTestConfig, pod *v1.Pod) (string, error) {
773 runtimePod, runtimePodErr := config.client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
774 return runtimePod.Spec.NodeName, runtimePodErr
775 }
776
777
778 func setupLocalVolumes(ctx context.Context, config *localTestConfig, localVolumeType localVolumeType, node *v1.Node, count int) []*localTestVolume {
779 vols := []*localTestVolume{}
780 for i := 0; i < count; i++ {
781 ltrType, ok := setupLocalVolumeMap[localVolumeType]
782 if !ok {
783 framework.Failf("Invalid localVolumeType: %v", localVolumeType)
784 }
785 ltr := config.ltrMgr.Create(ctx, node, ltrType, nil)
786 vols = append(vols, &localTestVolume{
787 ltr: ltr,
788 localVolumeType: localVolumeType,
789 })
790 }
791 return vols
792 }
793
794 func cleanupLocalPVCsPVs(ctx context.Context, config *localTestConfig, volumes []*localTestVolume) {
795 for _, volume := range volumes {
796 ginkgo.By("Cleaning up PVC and PV")
797 errs := e2epv.PVPVCCleanup(ctx, config.client, config.ns, volume.pv, volume.pvc)
798 if len(errs) > 0 {
799 framework.Failf("Failed to delete PV and/or PVC: %v", utilerrors.NewAggregate(errs))
800 }
801 }
802 }
803
804
805 func cleanupLocalVolumes(ctx context.Context, config *localTestConfig, volumes []*localTestVolume) {
806 cleanupLocalPVCsPVs(ctx, config, volumes)
807
808 for _, volume := range volumes {
809 config.ltrMgr.Remove(ctx, volume.ltr)
810 }
811 }
812
813 func verifyLocalVolume(ctx context.Context, config *localTestConfig, volume *localTestVolume) {
814 framework.ExpectNoError(e2epv.WaitOnPVandPVC(ctx, config.client, config.timeouts, config.ns, volume.pv, volume.pvc))
815 }
816
817 func verifyLocalPod(ctx context.Context, config *localTestConfig, volume *localTestVolume, pod *v1.Pod, expectedNodeName string) {
818 podNodeName, err := podNodeName(ctx, config, pod)
819 framework.ExpectNoError(err)
820 framework.Logf("pod %q created on Node %q", pod.Name, podNodeName)
821 gomega.Expect(podNodeName).To(gomega.Equal(expectedNodeName))
822 }
823
824 func makeLocalPVCConfig(config *localTestConfig, volumeType localVolumeType) e2epv.PersistentVolumeClaimConfig {
825 pvcConfig := e2epv.PersistentVolumeClaimConfig{
826 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
827 StorageClassName: &config.scName,
828 }
829 if volumeType == BlockLocalVolumeType {
830 pvcVolumeMode := v1.PersistentVolumeBlock
831 pvcConfig.VolumeMode = &pvcVolumeMode
832 }
833 return pvcConfig
834 }
835
836 func makeLocalPVConfig(config *localTestConfig, volume *localTestVolume) e2epv.PersistentVolumeConfig {
837
838 nodeKey := "kubernetes.io/hostname"
839 if volume.ltr.Node.Labels == nil {
840 framework.Failf("Node does not have labels")
841 }
842 nodeValue, found := volume.ltr.Node.Labels[nodeKey]
843 if !found {
844 framework.Failf("Node does not have required label %q", nodeKey)
845 }
846
847 pvConfig := e2epv.PersistentVolumeConfig{
848 PVSource: v1.PersistentVolumeSource{
849 Local: &v1.LocalVolumeSource{
850 Path: volume.ltr.Path,
851 },
852 },
853 NamePrefix: "local-pv",
854 StorageClassName: config.scName,
855 NodeAffinity: &v1.VolumeNodeAffinity{
856 Required: &v1.NodeSelector{
857 NodeSelectorTerms: []v1.NodeSelectorTerm{
858 {
859 MatchExpressions: []v1.NodeSelectorRequirement{
860 {
861 Key: nodeKey,
862 Operator: v1.NodeSelectorOpIn,
863 Values: []string{nodeValue},
864 },
865 },
866 },
867 },
868 },
869 },
870 }
871
872 if volume.localVolumeType == BlockLocalVolumeType {
873 pvVolumeMode := v1.PersistentVolumeBlock
874 pvConfig.VolumeMode = &pvVolumeMode
875 }
876 return pvConfig
877 }
878
879
880 func createLocalPVCsPVs(ctx context.Context, config *localTestConfig, volumes []*localTestVolume, mode storagev1.VolumeBindingMode) {
881 var err error
882
883 for _, volume := range volumes {
884 pvcConfig := makeLocalPVCConfig(config, volume.localVolumeType)
885 pvConfig := makeLocalPVConfig(config, volume)
886
887 volume.pv, volume.pvc, err = e2epv.CreatePVPVC(ctx, config.client, config.timeouts, pvConfig, pvcConfig, config.ns, false)
888 framework.ExpectNoError(err)
889 }
890
891 if mode == storagev1.VolumeBindingImmediate {
892 for _, volume := range volumes {
893 verifyLocalVolume(ctx, config, volume)
894 }
895 } else {
896
897
898 const bindTimeout = 10 * time.Second
899 waitErr := wait.PollImmediate(time.Second, bindTimeout, func() (done bool, err error) {
900 for _, volume := range volumes {
901 pvc, err := config.client.CoreV1().PersistentVolumeClaims(volume.pvc.Namespace).Get(ctx, volume.pvc.Name, metav1.GetOptions{})
902 if err != nil {
903 return false, fmt.Errorf("failed to get PVC %s/%s: %w", volume.pvc.Namespace, volume.pvc.Name, err)
904 }
905 if pvc.Status.Phase != v1.ClaimPending {
906 return true, nil
907 }
908 }
909 return false, nil
910 })
911 if wait.Interrupted(waitErr) {
912 framework.Logf("PVCs were not bound within %v (that's good)", bindTimeout)
913 waitErr = nil
914 }
915 framework.ExpectNoError(waitErr, "Error making sure PVCs are not bound")
916 }
917 }
918
919 func makeLocalPodWithNodeAffinity(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
920 affinity := &v1.Affinity{
921 NodeAffinity: &v1.NodeAffinity{
922 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
923 NodeSelectorTerms: []v1.NodeSelectorTerm{
924 {
925 MatchExpressions: []v1.NodeSelectorRequirement{
926 {
927 Key: "kubernetes.io/hostname",
928 Operator: v1.NodeSelectorOpIn,
929 Values: []string{nodeName},
930 },
931 },
932 },
933 },
934 },
935 },
936 }
937 podConfig := e2epod.Config{
938 NS: config.ns,
939 PVCs: []*v1.PersistentVolumeClaim{volume.pvc},
940 SeLinuxLabel: selinuxLabel,
941 NodeSelection: e2epod.NodeSelection{Affinity: affinity},
942 }
943 pod, err := e2epod.MakeSecPod(&podConfig)
944 if pod == nil || err != nil {
945 return
946 }
947 pod.Spec.Affinity = affinity
948 return
949 }
950
951 func makeLocalPodWithNodeSelector(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
952 ns := map[string]string{
953 "kubernetes.io/hostname": nodeName,
954 }
955 podConfig := e2epod.Config{
956 NS: config.ns,
957 PVCs: []*v1.PersistentVolumeClaim{volume.pvc},
958 SeLinuxLabel: selinuxLabel,
959 NodeSelection: e2epod.NodeSelection{Selector: ns},
960 }
961 pod, err := e2epod.MakeSecPod(&podConfig)
962 if pod == nil || err != nil {
963 return
964 }
965 return
966 }
967
968 func makeLocalPodWithNodeName(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
969 podConfig := e2epod.Config{
970 NS: config.ns,
971 PVCs: []*v1.PersistentVolumeClaim{volume.pvc},
972 SeLinuxLabel: selinuxLabel,
973 }
974 pod, err := e2epod.MakeSecPod(&podConfig)
975 if pod == nil || err != nil {
976 return
977 }
978
979 e2epod.SetNodeAffinity(&pod.Spec, nodeName)
980 return
981 }
982
983 func createLocalPod(ctx context.Context, config *localTestConfig, volume *localTestVolume, fsGroup *int64) (*v1.Pod, error) {
984 ginkgo.By("Creating a pod")
985 podConfig := e2epod.Config{
986 NS: config.ns,
987 PVCs: []*v1.PersistentVolumeClaim{volume.pvc},
988 SeLinuxLabel: selinuxLabel,
989 FsGroup: fsGroup,
990 }
991 return e2epod.CreateSecPod(ctx, config.client, &podConfig, config.timeouts.PodStart)
992 }
993
994 func createWriteCmd(testDir string, testFile string, writeTestFileContent string, volumeType localVolumeType) string {
995 if volumeType == BlockLocalVolumeType {
996
997 testFileDir := filepath.Join("/tmp", testDir)
998 testFilePath := filepath.Join(testFileDir, testFile)
999
1000 writeTestFileCmd := fmt.Sprintf("mkdir -p %s; echo %s > %s", testFileDir, writeTestFileContent, testFilePath)
1001
1002
1003 sudoCmd := fmt.Sprintf("SUDO_CMD=$(which sudo); echo ${SUDO_CMD}")
1004
1005 writeBlockCmd := fmt.Sprintf("${SUDO_CMD} dd if=%s of=%s bs=512 count=100", testFilePath, testDir)
1006
1007 deleteTestFileCmd := fmt.Sprintf("rm %s", testFilePath)
1008 return fmt.Sprintf("%s && %s && %s && %s", writeTestFileCmd, sudoCmd, writeBlockCmd, deleteTestFileCmd)
1009 }
1010 testFilePath := filepath.Join(testDir, testFile)
1011 return fmt.Sprintf("mkdir -p %s; echo %s > %s", testDir, writeTestFileContent, testFilePath)
1012 }
1013
1014 func createReadCmd(testFileDir string, testFile string, volumeType localVolumeType) string {
1015 if volumeType == BlockLocalVolumeType {
1016
1017 return fmt.Sprintf("hexdump -n 100 -e '100 \"%%_p\"' %s | head -1", testFileDir)
1018 }
1019
1020 testFilePath := filepath.Join(testFileDir, testFile)
1021 return fmt.Sprintf("cat %s", testFilePath)
1022 }
1023
1024
1025 func testReadFileContent(f *framework.Framework, testFileDir string, testFile string, testFileContent string, pod *v1.Pod, volumeType localVolumeType) {
1026 readCmd := createReadCmd(testFileDir, testFile, volumeType)
1027 readOut := podRWCmdExec(f, pod, readCmd)
1028 gomega.Expect(readOut).To(gomega.ContainSubstring(testFileContent))
1029 }
1030
1031
1032
1033 func podRWCmdExec(f *framework.Framework, pod *v1.Pod, cmd string) string {
1034 stdout, stderr, err := e2evolume.PodExec(f, pod, cmd)
1035 framework.Logf("podRWCmdExec cmd: %q, out: %q, stderr: %q, err: %v", cmd, stdout, stderr, err)
1036 framework.ExpectNoError(err)
1037 return stdout
1038 }
1039
1040
1041
1042 func setupLocalVolumesPVCsPVs(
1043 ctx context.Context,
1044 config *localTestConfig,
1045 localVolumeType localVolumeType,
1046 node *v1.Node,
1047 count int,
1048 mode storagev1.VolumeBindingMode) []*localTestVolume {
1049
1050 ginkgo.By("Initializing test volumes")
1051 testVols := setupLocalVolumes(ctx, config, localVolumeType, node, count)
1052
1053 ginkgo.By("Creating local PVCs and PVs")
1054 createLocalPVCsPVs(ctx, config, testVols, mode)
1055
1056 return testVols
1057 }
1058
1059
1060 func newLocalClaimWithName(config *localTestConfig, name string) *v1.PersistentVolumeClaim {
1061 claim := v1.PersistentVolumeClaim{
1062 ObjectMeta: metav1.ObjectMeta{
1063 Name: name,
1064 Namespace: config.ns,
1065 },
1066 Spec: v1.PersistentVolumeClaimSpec{
1067 StorageClassName: &config.scName,
1068 AccessModes: []v1.PersistentVolumeAccessMode{
1069 v1.ReadWriteOnce,
1070 },
1071 Resources: v1.VolumeResourceRequirements{
1072 Requests: v1.ResourceList{
1073 v1.ResourceName(v1.ResourceStorage): resource.MustParse(testRequestSize),
1074 },
1075 },
1076 },
1077 }
1078
1079 return &claim
1080 }
1081
1082 func createStatefulSet(ctx context.Context, config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet {
1083 mounts := []v1.VolumeMount{}
1084 claims := []v1.PersistentVolumeClaim{}
1085 for i := 0; i < volumeCount; i++ {
1086 name := fmt.Sprintf("vol%v", i+1)
1087 pvc := newLocalClaimWithName(config, name)
1088 mounts = append(mounts, v1.VolumeMount{Name: name, MountPath: "/" + name})
1089 claims = append(claims, *pvc)
1090 }
1091
1092 podAffinityTerms := []v1.PodAffinityTerm{
1093 {
1094 LabelSelector: &metav1.LabelSelector{
1095 MatchExpressions: []metav1.LabelSelectorRequirement{
1096 {
1097 Key: "app",
1098 Operator: metav1.LabelSelectorOpIn,
1099 Values: []string{"local-volume-test"},
1100 },
1101 },
1102 },
1103 TopologyKey: "kubernetes.io/hostname",
1104 },
1105 }
1106
1107 affinity := v1.Affinity{}
1108 if anti {
1109 affinity.PodAntiAffinity = &v1.PodAntiAffinity{
1110 RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
1111 }
1112 } else {
1113 affinity.PodAffinity = &v1.PodAffinity{
1114 RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
1115 }
1116 }
1117
1118 labels := map[string]string{"app": "local-volume-test"}
1119 spec := &appsv1.StatefulSet{
1120 ObjectMeta: metav1.ObjectMeta{
1121 Name: "local-volume-statefulset",
1122 Namespace: config.ns,
1123 },
1124 Spec: appsv1.StatefulSetSpec{
1125 Selector: &metav1.LabelSelector{
1126 MatchLabels: map[string]string{"app": "local-volume-test"},
1127 },
1128 Replicas: &ssReplicas,
1129 Template: v1.PodTemplateSpec{
1130 ObjectMeta: metav1.ObjectMeta{
1131 Labels: labels,
1132 },
1133 Spec: v1.PodSpec{
1134 Containers: []v1.Container{
1135 {
1136 Name: "nginx",
1137 Image: imageutils.GetE2EImage(imageutils.Nginx),
1138 VolumeMounts: mounts,
1139 },
1140 },
1141 Affinity: &affinity,
1142 },
1143 },
1144 VolumeClaimTemplates: claims,
1145 ServiceName: "test-service",
1146 },
1147 }
1148
1149 if parallel {
1150 spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
1151 }
1152
1153 ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(ctx, spec, metav1.CreateOptions{})
1154 framework.ExpectNoError(err)
1155
1156 e2estatefulset.WaitForRunningAndReady(ctx, config.client, ssReplicas, ss)
1157 return ss
1158 }
1159
1160 func validateStatefulSet(ctx context.Context, config *localTestConfig, ss *appsv1.StatefulSet, anti bool) {
1161 pods := e2estatefulset.GetPodList(ctx, config.client, ss)
1162
1163 nodes := sets.NewString()
1164 for _, pod := range pods.Items {
1165 nodes.Insert(pod.Spec.NodeName)
1166 }
1167
1168 if anti {
1169
1170 gomega.Expect(pods.Items).To(gomega.HaveLen(nodes.Len()))
1171 } else {
1172
1173 gomega.Expect(nodes.Len()).To(gomega.Equal(1))
1174 }
1175
1176
1177 for _, pod := range pods.Items {
1178 for _, volume := range pod.Spec.Volumes {
1179 pvcSource := volume.VolumeSource.PersistentVolumeClaim
1180 if pvcSource != nil {
1181 err := e2epv.WaitForPersistentVolumeClaimPhase(ctx,
1182 v1.ClaimBound, config.client, config.ns, pvcSource.ClaimName, framework.Poll, time.Second)
1183 framework.ExpectNoError(err)
1184 }
1185 }
1186 }
1187 }
1188
1189
1190
1191 func SkipUnlessLocalSSDExists(ctx context.Context, config *localTestConfig, ssdInterface, filesystemType string, node *v1.Node) {
1192 ssdCmd := fmt.Sprintf("ls -1 /mnt/disks/by-uuid/google-local-ssds-%s-%s/ | wc -l", ssdInterface, filesystemType)
1193 res, err := config.hostExec.Execute(ctx, ssdCmd, node)
1194 utils.LogResult(res)
1195 framework.ExpectNoError(err)
1196 num, err := strconv.Atoi(strings.TrimSpace(res.Stdout))
1197 framework.ExpectNoError(err)
1198 if num < 1 {
1199 e2eskipper.Skipf("Requires at least 1 %s %s localSSD ", ssdInterface, filesystemType)
1200 }
1201 }
1202
View as plain text