1
16
17 package csi_mock
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "reflect"
24 "strconv"
25 "strings"
26 "sync/atomic"
27 "time"
28
29 csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
30 "github.com/onsi/ginkgo/v2"
31 "google.golang.org/grpc/codes"
32 v1 "k8s.io/api/core/v1"
33 storagev1 "k8s.io/api/storage/v1"
34 apierrors "k8s.io/apimachinery/pkg/api/errors"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
37 "k8s.io/apimachinery/pkg/fields"
38 utilerrors "k8s.io/apimachinery/pkg/util/errors"
39 "k8s.io/apimachinery/pkg/util/sets"
40 "k8s.io/apimachinery/pkg/util/wait"
41 clientset "k8s.io/client-go/kubernetes"
42 "k8s.io/kubernetes/test/e2e/framework"
43 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
44 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
45 "k8s.io/kubernetes/test/e2e/storage/drivers"
46 storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
47 "k8s.io/kubernetes/test/e2e/storage/testsuites"
48 "k8s.io/kubernetes/test/e2e/storage/utils"
49 "k8s.io/kubernetes/test/utils/format"
50 imageutils "k8s.io/kubernetes/test/utils/image"
51 )
52
53 const (
54 csiNodeLimitUpdateTimeout = 5 * time.Minute
55 csiPodUnschedulableTimeout = 5 * time.Minute
56 csiResizeWaitPeriod = 5 * time.Minute
57 csiVolumeAttachmentTimeout = 7 * time.Minute
58
59 csiResizingConditionWait = 2 * time.Minute
60
61
62 csiPodRunningTimeout = 5 * time.Minute
63
64
65 csiUnstageWaitTimeout = 1 * time.Minute
66 )
67
68
69
70
71
72
73
74 type csiCall struct {
75 expectedMethod string
76 expectedError codes.Code
77 expectedSecret map[string]string
78
79
80 deletePod bool
81 }
82
83 type testParameters struct {
84 disableAttach bool
85 attachLimit int
86 registerDriver bool
87 lateBinding bool
88 enableTopology bool
89 podInfo *bool
90 storageCapacity *bool
91 scName string
92 enableResizing bool
93 enableNodeExpansion bool
94
95 disableResizingOnDriver bool
96 enableSnapshot bool
97 enableVolumeMountGroup bool
98 hooks *drivers.Hooks
99 tokenRequests []storagev1.TokenRequest
100 requiresRepublish *bool
101 fsGroupPolicy *storagev1.FSGroupPolicy
102 enableSELinuxMount *bool
103 enableRecoverExpansionFailure bool
104 enableCSINodeExpandSecret bool
105 }
106
107 type mockDriverSetup struct {
108 cs clientset.Interface
109 config *storageframework.PerTestConfig
110 pods []*v1.Pod
111 pvcs []*v1.PersistentVolumeClaim
112 sc map[string]*storagev1.StorageClass
113 vsc map[string]*unstructured.Unstructured
114 driver drivers.MockCSITestDriver
115 provisioner string
116 tp testParameters
117 f *framework.Framework
118 }
119
120 type volumeType string
121
122 var (
123 csiEphemeral = volumeType("CSI")
124 genericEphemeral = volumeType("Ephemeral")
125 pvcReference = volumeType("PVC")
126 )
127
128 const (
129 poll = 2 * time.Second
130 pvcAsSourceProtectionFinalizer = "snapshot.storage.kubernetes.io/pvc-as-source-protection"
131 volumeSnapshotContentFinalizer = "snapshot.storage.kubernetes.io/volumesnapshotcontent-bound-protection"
132 volumeSnapshotBoundFinalizer = "snapshot.storage.kubernetes.io/volumesnapshot-bound-protection"
133 errReasonNotEnoughSpace = "node(s) did not have enough free storage"
134
135 csiNodeExpandSecretKey = "csi.storage.k8s.io/node-expand-secret-name"
136 csiNodeExpandSecretNamespaceKey = "csi.storage.k8s.io/node-expand-secret-namespace"
137 )
138
139 var (
140 errPodCompleted = fmt.Errorf("pod ran to completion")
141 errNotEnoughSpace = errors.New(errReasonNotEnoughSpace)
142 )
143
144 func newMockDriverSetup(f *framework.Framework) *mockDriverSetup {
145 return &mockDriverSetup{
146 cs: f.ClientSet,
147 sc: make(map[string]*storagev1.StorageClass),
148 vsc: make(map[string]*unstructured.Unstructured),
149 f: f,
150 }
151 }
152
153 func (m *mockDriverSetup) init(ctx context.Context, tp testParameters) {
154 m.cs = m.f.ClientSet
155 m.tp = tp
156
157 var err error
158 driverOpts := drivers.CSIMockDriverOpts{
159 RegisterDriver: tp.registerDriver,
160 PodInfo: tp.podInfo,
161 StorageCapacity: tp.storageCapacity,
162 EnableTopology: tp.enableTopology,
163 AttachLimit: tp.attachLimit,
164 DisableAttach: tp.disableAttach,
165 EnableResizing: tp.enableResizing,
166 EnableNodeExpansion: tp.enableNodeExpansion,
167 EnableSnapshot: tp.enableSnapshot,
168 EnableVolumeMountGroup: tp.enableVolumeMountGroup,
169 TokenRequests: tp.tokenRequests,
170 RequiresRepublish: tp.requiresRepublish,
171 FSGroupPolicy: tp.fsGroupPolicy,
172 EnableSELinuxMount: tp.enableSELinuxMount,
173 EnableRecoverExpansionFailure: tp.enableRecoverExpansionFailure,
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 if tp.hooks != nil {
190 driverOpts.Embedded = true
191 driverOpts.Hooks = *tp.hooks
192 }
193
194
195 if tp.disableResizingOnDriver {
196 driverOpts.EnableResizing = false
197 }
198
199 m.driver = drivers.InitMockCSIDriver(driverOpts)
200 config := m.driver.PrepareTest(ctx, m.f)
201 m.config = config
202 m.provisioner = config.GetUniqueDriverName()
203
204 if tp.registerDriver {
205 err = waitForCSIDriver(m.cs, m.config.GetUniqueDriverName())
206 framework.ExpectNoError(err, "Failed to get CSIDriver %v", m.config.GetUniqueDriverName())
207 ginkgo.DeferCleanup(destroyCSIDriver, m.cs, m.config.GetUniqueDriverName())
208 }
209
210
211
212 err = drivers.WaitForCSIDriverRegistrationOnNode(ctx, m.config.ClientNodeSelection.Name, m.config.GetUniqueDriverName(), m.cs)
213 framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName())
214 }
215
216 func (m *mockDriverSetup) cleanup(ctx context.Context) {
217 cs := m.f.ClientSet
218 var errs []error
219
220 for _, pod := range m.pods {
221 ginkgo.By(fmt.Sprintf("Deleting pod %s", pod.Name))
222 errs = append(errs, e2epod.DeletePodWithWait(ctx, cs, pod))
223 }
224
225 for _, claim := range m.pvcs {
226 ginkgo.By(fmt.Sprintf("Deleting claim %s", claim.Name))
227 claim, err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(context.TODO(), claim.Name, metav1.GetOptions{})
228 if err == nil {
229 if err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(context.TODO(), claim.Name, metav1.DeleteOptions{}); err != nil {
230 errs = append(errs, err)
231 }
232 if claim.Spec.VolumeName != "" {
233 errs = append(errs, e2epv.WaitForPersistentVolumeDeleted(ctx, cs, claim.Spec.VolumeName, framework.Poll, 2*time.Minute))
234 }
235 }
236 }
237
238 for _, sc := range m.sc {
239 ginkgo.By(fmt.Sprintf("Deleting storageclass %s", sc.Name))
240 cs.StorageV1().StorageClasses().Delete(context.TODO(), sc.Name, metav1.DeleteOptions{})
241 }
242
243 for _, vsc := range m.vsc {
244 ginkgo.By(fmt.Sprintf("Deleting volumesnapshotclass %s", vsc.GetName()))
245 m.config.Framework.DynamicClient.Resource(utils.SnapshotClassGVR).Delete(context.TODO(), vsc.GetName(), metav1.DeleteOptions{})
246 }
247
248 err := utilerrors.NewAggregate(errs)
249 framework.ExpectNoError(err, "while cleaning up after test")
250 }
251
252 func (m *mockDriverSetup) update(o utils.PatchCSIOptions) {
253 item, err := m.cs.StorageV1().CSIDrivers().Get(context.TODO(), m.config.GetUniqueDriverName(), metav1.GetOptions{})
254 framework.ExpectNoError(err, "Failed to get CSIDriver %v", m.config.GetUniqueDriverName())
255
256 err = utils.PatchCSIDeployment(nil, o, item)
257 framework.ExpectNoError(err, "Failed to apply %v to CSIDriver object %v", o, m.config.GetUniqueDriverName())
258
259 _, err = m.cs.StorageV1().CSIDrivers().Update(context.TODO(), item, metav1.UpdateOptions{})
260 framework.ExpectNoError(err, "Failed to update CSIDriver %v", m.config.GetUniqueDriverName())
261 }
262
263 func (m *mockDriverSetup) createPod(ctx context.Context, withVolume volumeType) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) {
264 ginkgo.By("Creating pod")
265 f := m.f
266
267 sc := m.driver.GetDynamicProvisionStorageClass(ctx, m.config, "")
268 if m.tp.enableCSINodeExpandSecret {
269 if sc.Parameters == nil {
270 parameters := map[string]string{
271 csiNodeExpandSecretKey: "test-secret",
272 csiNodeExpandSecretNamespaceKey: f.Namespace.Name,
273 }
274 sc.Parameters = parameters
275 } else {
276 sc.Parameters[csiNodeExpandSecretKey] = "test-secret"
277 sc.Parameters[csiNodeExpandSecretNamespaceKey] = f.Namespace.Name
278 }
279 }
280 scTest := testsuites.StorageClassTest{
281 Name: m.driver.GetDriverInfo().Name,
282 Timeouts: f.Timeouts,
283 Provisioner: sc.Provisioner,
284 Parameters: sc.Parameters,
285 ClaimSize: "1Gi",
286 ExpectedSize: "1Gi",
287 DelayBinding: m.tp.lateBinding,
288 AllowVolumeExpansion: m.tp.enableResizing,
289 }
290
291
292 nodeSelection := m.config.ClientNodeSelection
293 switch withVolume {
294 case csiEphemeral:
295 pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name)
296 case genericEphemeral:
297 class, pod = startPausePodGenericEphemeral(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
298 if class != nil {
299 m.sc[class.Name] = class
300 }
301 claim = &v1.PersistentVolumeClaim{
302 ObjectMeta: metav1.ObjectMeta{
303 Name: pod.Name + "-" + pod.Spec.Volumes[0].Name,
304 Namespace: f.Namespace.Name,
305 },
306 }
307 case pvcReference:
308 class, claim, pod = startPausePod(ctx, f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
309 if class != nil {
310 m.sc[class.Name] = class
311 }
312 if claim != nil {
313 m.pvcs = append(m.pvcs, claim)
314 }
315 }
316 if pod != nil {
317 m.pods = append(m.pods, pod)
318 }
319 return
320 }
321
322 func (m *mockDriverSetup) createPodWithPVC(pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) {
323 f := m.f
324
325 nodeSelection := m.config.ClientNodeSelection
326 pod, err := startPausePodWithClaim(m.cs, pvc, nodeSelection, f.Namespace.Name)
327 if pod != nil {
328 m.pods = append(m.pods, pod)
329 }
330 return pod, err
331 }
332
333 func (m *mockDriverSetup) createPodWithFSGroup(ctx context.Context, fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
334 f := m.f
335
336 ginkgo.By("Creating pod with fsGroup")
337 nodeSelection := m.config.ClientNodeSelection
338 sc := m.driver.GetDynamicProvisionStorageClass(ctx, m.config, "")
339 scTest := testsuites.StorageClassTest{
340 Name: m.driver.GetDriverInfo().Name,
341 Provisioner: sc.Provisioner,
342 Parameters: sc.Parameters,
343 ClaimSize: "1Gi",
344 ExpectedSize: "1Gi",
345 DelayBinding: m.tp.lateBinding,
346 AllowVolumeExpansion: m.tp.enableResizing,
347 }
348 class, claim, pod := startBusyBoxPod(ctx, f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name, fsGroup)
349
350 if class != nil {
351 m.sc[class.Name] = class
352 }
353 if claim != nil {
354 m.pvcs = append(m.pvcs, claim)
355 }
356
357 if pod != nil {
358 m.pods = append(m.pods, pod)
359 }
360
361 return class, claim, pod
362 }
363
364 func (m *mockDriverSetup) createPodWithSELinux(ctx context.Context, accessModes []v1.PersistentVolumeAccessMode, mountOptions []string, seLinuxOpts *v1.SELinuxOptions) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
365 ginkgo.By("Creating pod with SELinux context")
366 f := m.f
367 nodeSelection := m.config.ClientNodeSelection
368 sc := m.driver.GetDynamicProvisionStorageClass(ctx, m.config, "")
369 scTest := testsuites.StorageClassTest{
370 Name: m.driver.GetDriverInfo().Name,
371 Provisioner: sc.Provisioner,
372 Parameters: sc.Parameters,
373 ClaimSize: "1Gi",
374 ExpectedSize: "1Gi",
375 DelayBinding: m.tp.lateBinding,
376 AllowVolumeExpansion: m.tp.enableResizing,
377 MountOptions: mountOptions,
378 }
379 class, claim := createClaim(ctx, f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name, accessModes)
380 pod, err := startPausePodWithSELinuxOptions(f.ClientSet, claim, nodeSelection, f.Namespace.Name, seLinuxOpts)
381 framework.ExpectNoError(err, "Failed to create pause pod with SELinux context %s: %v", seLinuxOpts, err)
382
383 if class != nil {
384 m.sc[class.Name] = class
385 }
386 if claim != nil {
387 m.pvcs = append(m.pvcs, claim)
388 }
389
390 if pod != nil {
391 m.pods = append(m.pods, pod)
392 }
393
394 return class, claim, pod
395 }
396
397 func waitForCSIDriver(cs clientset.Interface, driverName string) error {
398 timeout := 4 * time.Minute
399
400 framework.Logf("waiting up to %v for CSIDriver %q", timeout, driverName)
401 for start := time.Now(); time.Since(start) < timeout; time.Sleep(framework.Poll) {
402 _, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{})
403 if !apierrors.IsNotFound(err) {
404 return err
405 }
406 }
407 return fmt.Errorf("gave up after waiting %v for CSIDriver %q", timeout, driverName)
408 }
409
410 func destroyCSIDriver(cs clientset.Interface, driverName string) {
411 driverGet, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{})
412 if err == nil {
413 framework.Logf("deleting %s.%s: %s", driverGet.TypeMeta.APIVersion, driverGet.TypeMeta.Kind, driverGet.ObjectMeta.Name)
414
415
416 cs.StorageV1().CSIDrivers().Delete(context.TODO(), driverName, metav1.DeleteOptions{})
417 }
418 }
419
420 func newStorageClass(t testsuites.StorageClassTest, ns string, prefix string) *storagev1.StorageClass {
421 pluginName := t.Provisioner
422 if pluginName == "" {
423 pluginName = getDefaultPluginName()
424 }
425 if prefix == "" {
426 prefix = "sc"
427 }
428 bindingMode := storagev1.VolumeBindingImmediate
429 if t.DelayBinding {
430 bindingMode = storagev1.VolumeBindingWaitForFirstConsumer
431 }
432 if t.Parameters == nil {
433 t.Parameters = make(map[string]string)
434 }
435
436 if framework.NodeOSDistroIs("windows") {
437
438 if _, exists := t.Parameters["fstype"]; !exists {
439 t.Parameters["fstype"] = e2epv.GetDefaultFSType()
440 framework.Logf("settings a default fsType=%s in the storage class", t.Parameters["fstype"])
441 }
442 }
443
444 sc := getStorageClass(pluginName, t.Parameters, &bindingMode, t.MountOptions, ns, prefix)
445 if t.AllowVolumeExpansion {
446 sc.AllowVolumeExpansion = &t.AllowVolumeExpansion
447 }
448 return sc
449 }
450
451 func getStorageClass(
452 provisioner string,
453 parameters map[string]string,
454 bindingMode *storagev1.VolumeBindingMode,
455 mountOptions []string,
456 ns string,
457 prefix string,
458 ) *storagev1.StorageClass {
459 if bindingMode == nil {
460 defaultBindingMode := storagev1.VolumeBindingImmediate
461 bindingMode = &defaultBindingMode
462 }
463 return &storagev1.StorageClass{
464 TypeMeta: metav1.TypeMeta{
465 Kind: "StorageClass",
466 },
467 ObjectMeta: metav1.ObjectMeta{
468
469 GenerateName: ns + "-" + prefix,
470 },
471 Provisioner: provisioner,
472 Parameters: parameters,
473 VolumeBindingMode: bindingMode,
474 MountOptions: mountOptions,
475 }
476 }
477
478 func getDefaultPluginName() string {
479 switch {
480 case framework.ProviderIs("gke"), framework.ProviderIs("gce"):
481 return "kubernetes.io/gce-pd"
482 case framework.ProviderIs("aws"):
483 return "kubernetes.io/aws-ebs"
484 case framework.ProviderIs("vsphere"):
485 return "kubernetes.io/vsphere-volume"
486 case framework.ProviderIs("azure"):
487 return "kubernetes.io/azure-disk"
488 }
489 return ""
490 }
491
492 func createSC(cs clientset.Interface, t testsuites.StorageClassTest, scName, ns string) *storagev1.StorageClass {
493 class := newStorageClass(t, ns, "")
494 if scName != "" {
495 class.Name = scName
496 }
497 var err error
498 _, err = cs.StorageV1().StorageClasses().Get(context.TODO(), class.Name, metav1.GetOptions{})
499 if err != nil {
500 class, err = cs.StorageV1().StorageClasses().Create(context.TODO(), class, metav1.CreateOptions{})
501 framework.ExpectNoError(err, "Failed to create class: %v", err)
502 }
503
504 return class
505 }
506
507 func createClaim(ctx context.Context, cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string, accessModes []v1.PersistentVolumeAccessMode) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) {
508 class := createSC(cs, t, scName, ns)
509 claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
510 ClaimSize: t.ClaimSize,
511 StorageClassName: &(class.Name),
512 VolumeMode: &t.VolumeMode,
513 AccessModes: accessModes,
514 }, ns)
515 claim, err := cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{})
516 framework.ExpectNoError(err, "Failed to create claim: %v", err)
517
518 if !t.DelayBinding {
519 pvcClaims := []*v1.PersistentVolumeClaim{claim}
520 _, err = e2epv.WaitForPVClaimBoundPhase(ctx, cs, pvcClaims, framework.ClaimProvisionTimeout)
521 framework.ExpectNoError(err, "Failed waiting for PVC to be bound: %v", err)
522 }
523 return class, claim
524 }
525
526 func startPausePod(ctx context.Context, cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
527 class, claim := createClaim(ctx, cs, t, node, scName, ns, nil)
528
529 pod, err := startPausePodWithClaim(cs, claim, node, ns)
530 framework.ExpectNoError(err, "Failed to create pause pod: %v", err)
531 return class, claim, pod
532 }
533
534 func startBusyBoxPod(ctx context.Context, cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string, fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
535 class, claim := createClaim(ctx, cs, t, node, scName, ns, nil)
536 pod, err := startBusyBoxPodWithClaim(cs, claim, node, ns, fsGroup)
537 framework.ExpectNoError(err, "Failed to create busybox pod: %v", err)
538 return class, claim, pod
539 }
540
541 func startPausePodInline(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, ns string) *v1.Pod {
542 pod, err := startPausePodWithInlineVolume(cs,
543 &v1.CSIVolumeSource{
544 Driver: t.Provisioner,
545 },
546 node, ns)
547 framework.ExpectNoError(err, "Failed to create pod: %v", err)
548 return pod
549 }
550
551 func startPausePodGenericEphemeral(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.Pod) {
552 class := createSC(cs, t, scName, ns)
553 claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
554 ClaimSize: t.ClaimSize,
555 StorageClassName: &(class.Name),
556 VolumeMode: &t.VolumeMode,
557 }, ns)
558 pod, err := startPausePodWithVolumeSource(cs, v1.VolumeSource{
559 Ephemeral: &v1.EphemeralVolumeSource{
560 VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{Spec: claim.Spec}},
561 }, node, ns)
562 framework.ExpectNoError(err, "Failed to create pod: %v", err)
563 return class, pod
564 }
565
566 func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
567 return startPausePodWithVolumeSource(cs,
568 v1.VolumeSource{
569 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
570 ClaimName: pvc.Name,
571 ReadOnly: false,
572 },
573 },
574 node, ns)
575 }
576
577 func startBusyBoxPodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) {
578 return startBusyBoxPodWithVolumeSource(cs,
579 v1.VolumeSource{
580 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
581 ClaimName: pvc.Name,
582 ReadOnly: false,
583 },
584 },
585 node, ns, fsGroup)
586 }
587
588 func startPausePodWithInlineVolume(cs clientset.Interface, inlineVolume *v1.CSIVolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
589 return startPausePodWithVolumeSource(cs,
590 v1.VolumeSource{
591 CSI: inlineVolume,
592 },
593 node, ns)
594 }
595
596 func startPausePodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
597 pod := &v1.Pod{
598 ObjectMeta: metav1.ObjectMeta{
599 GenerateName: "pvc-volume-tester-",
600 },
601 Spec: v1.PodSpec{
602 Containers: []v1.Container{
603 {
604 Name: "volume-tester",
605 Image: imageutils.GetE2EImage(imageutils.Pause),
606 VolumeMounts: []v1.VolumeMount{
607 {
608 Name: "my-volume",
609 MountPath: "/mnt/test",
610 },
611 },
612 },
613 },
614 RestartPolicy: v1.RestartPolicyNever,
615 Volumes: []v1.Volume{
616 {
617 Name: "my-volume",
618 VolumeSource: volumeSource,
619 },
620 },
621 },
622 }
623 e2epod.SetNodeSelection(&pod.Spec, node)
624 return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
625 }
626
627 func startBusyBoxPodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) {
628 pod := &v1.Pod{
629 ObjectMeta: metav1.ObjectMeta{
630 GenerateName: "pvc-volume-tester-",
631 },
632 Spec: v1.PodSpec{
633 Containers: []v1.Container{
634 {
635 Name: "volume-tester",
636 Image: framework.BusyBoxImage,
637 VolumeMounts: []v1.VolumeMount{
638 {
639 Name: "my-volume",
640 MountPath: "/mnt/test",
641 },
642 },
643 Command: e2epod.GenerateScriptCmd("while true ; do sleep 2; done"),
644 },
645 },
646 SecurityContext: &v1.PodSecurityContext{
647 FSGroup: fsGroup,
648 },
649 RestartPolicy: v1.RestartPolicyNever,
650 Volumes: []v1.Volume{
651 {
652 Name: "my-volume",
653 VolumeSource: volumeSource,
654 },
655 },
656 },
657 }
658 e2epod.SetNodeSelection(&pod.Spec, node)
659 return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
660 }
661
662 func startPausePodWithSELinuxOptions(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string, seLinuxOpts *v1.SELinuxOptions) (*v1.Pod, error) {
663 pod := &v1.Pod{
664 ObjectMeta: metav1.ObjectMeta{
665 GenerateName: "pvc-volume-tester-",
666 },
667 Spec: v1.PodSpec{
668 SecurityContext: &v1.PodSecurityContext{
669 SELinuxOptions: seLinuxOpts,
670 },
671 Containers: []v1.Container{
672 {
673 Name: "volume-tester",
674 Image: imageutils.GetE2EImage(imageutils.Pause),
675 VolumeMounts: []v1.VolumeMount{
676 {
677 Name: "my-volume",
678 MountPath: "/mnt/test",
679 },
680 },
681 },
682 },
683 RestartPolicy: v1.RestartPolicyNever,
684 Volumes: []v1.Volume{
685 {
686 Name: "my-volume",
687 VolumeSource: v1.VolumeSource{
688 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
689 ClaimName: pvc.Name,
690 ReadOnly: false,
691 },
692 },
693 },
694 },
695 },
696 }
697 if node.Name != "" {
698
699 framework.Logf("Forcing node name %s", node.Name)
700 pod.Spec.NodeName = node.Name
701 }
702 e2epod.SetNodeSelection(&pod.Spec, node)
703 return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
704 }
705
706
707
708 func checkNodePublishVolume(ctx context.Context, getCalls func(ctx context.Context) ([]drivers.MockCSICall, error), pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled, csiServiceAccountTokenEnabled bool) error {
709 expectedAttributes := map[string]string{}
710 unexpectedAttributeKeys := sets.New[string]()
711 if expectPodInfo {
712 expectedAttributes["csi.storage.k8s.io/pod.name"] = pod.Name
713 expectedAttributes["csi.storage.k8s.io/pod.namespace"] = pod.Namespace
714 expectedAttributes["csi.storage.k8s.io/pod.uid"] = string(pod.UID)
715 expectedAttributes["csi.storage.k8s.io/serviceAccount.name"] = "default"
716 } else {
717 unexpectedAttributeKeys.Insert("csi.storage.k8s.io/pod.name")
718 unexpectedAttributeKeys.Insert("csi.storage.k8s.io/pod.namespace")
719 unexpectedAttributeKeys.Insert("csi.storage.k8s.io/pod.uid")
720 unexpectedAttributeKeys.Insert("csi.storage.k8s.io/serviceAccount.name")
721 }
722 if csiInlineVolumesEnabled {
723
724 expectedAttributes["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(ephemeralVolume)
725 } else {
726 unexpectedAttributeKeys.Insert("csi.storage.k8s.io/ephemeral")
727 }
728
729 if csiServiceAccountTokenEnabled {
730 expectedAttributes["csi.storage.k8s.io/serviceAccount.tokens"] = "<nonempty>"
731 } else {
732 unexpectedAttributeKeys.Insert("csi.storage.k8s.io/serviceAccount.tokens")
733 }
734
735 calls, err := getCalls(ctx)
736 if err != nil {
737 return err
738 }
739
740 var volumeContexts []map[string]string
741 for _, call := range calls {
742 if call.Method != "NodePublishVolume" {
743 continue
744 }
745
746 volumeCtx := call.Request.VolumeContext
747
748
749 foundAttributes := sets.NewString()
750 for k, v := range expectedAttributes {
751 vv, found := volumeCtx[k]
752 if found && (v == vv || (v == "<nonempty>" && len(vv) != 0)) {
753 foundAttributes.Insert(k)
754 }
755 }
756 if foundAttributes.Len() != len(expectedAttributes) {
757 framework.Logf("Skipping the NodePublishVolume call: expected attribute %+v, got %+v", format.Object(expectedAttributes, 1), format.Object(volumeCtx, 1))
758 continue
759 }
760
761
762 unexpectedAttributes := make(map[string]string)
763 for k := range volumeCtx {
764 if unexpectedAttributeKeys.Has(k) {
765 unexpectedAttributes[k] = volumeCtx[k]
766 }
767 }
768 if len(unexpectedAttributes) != 0 {
769 framework.Logf("Skipping the NodePublishVolume call because it contains unexpected attributes %+v", format.Object(unexpectedAttributes, 1))
770 continue
771 }
772
773 return nil
774 }
775
776 if len(volumeContexts) == 0 {
777 return fmt.Errorf("NodePublishVolume was never called")
778 }
779
780 return fmt.Errorf("NodePublishVolume was called %d times, but no call had expected attributes %s or calls have unwanted attributes key %+v", len(volumeContexts), format.Object(expectedAttributes, 1), unexpectedAttributeKeys.UnsortedList())
781 }
782
783
784
785 func createFSGroupRequestPreHook(nodeStageFsGroup, nodePublishFsGroup *string) *drivers.Hooks {
786 return &drivers.Hooks{
787 Pre: func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
788 nodeStageRequest, ok := request.(*csipbv1.NodeStageVolumeRequest)
789 if ok {
790 mountVolume := nodeStageRequest.GetVolumeCapability().GetMount()
791 if mountVolume != nil {
792 *nodeStageFsGroup = mountVolume.VolumeMountGroup
793 }
794 }
795 nodePublishRequest, ok := request.(*csipbv1.NodePublishVolumeRequest)
796 if ok {
797 mountVolume := nodePublishRequest.GetVolumeCapability().GetMount()
798 if mountVolume != nil {
799 *nodePublishFsGroup = mountVolume.VolumeMountGroup
800 }
801 }
802 return nil, nil
803 },
804 }
805 }
806
807
808 func createPreHook(method string, callback func(counter int64) error) *drivers.Hooks {
809 var counter int64
810
811 return &drivers.Hooks{
812 Pre: func() func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
813 return func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
814 if strings.Contains(fullMethod, method) {
815 counter := atomic.AddInt64(&counter, 1)
816 return nil, callback(counter)
817 }
818 return nil, nil
819 }
820 }(),
821 }
822 }
823
824
825
826
827
828
829
830
831
832 func compareCSICalls(ctx context.Context, trackedCalls []string, expectedCallSequence []csiCall, getCalls func(ctx context.Context) ([]drivers.MockCSICall, error)) ([]drivers.MockCSICall, int, error) {
833 allCalls, err := getCalls(ctx)
834 if err != nil {
835 framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err)
836 return nil, 0, nil
837 }
838
839
840 tracked := sets.NewString(trackedCalls...)
841 var calls []drivers.MockCSICall
842 var last drivers.MockCSICall
843 for _, c := range allCalls {
844 if !tracked.Has(c.Method) {
845 continue
846 }
847 if c.Method != last.Method || c.FullError.Code != last.FullError.Code {
848 last = c
849 calls = append(calls, c)
850 }
851
852 }
853
854 for i, c := range calls {
855 if i >= len(expectedCallSequence) {
856
857 framework.Logf("Unexpected CSI driver call: %s (%v)", c.Method, c.FullError)
858 continue
859 }
860
861
862 expectedCall := expectedCallSequence[i]
863 if c.Method != expectedCall.expectedMethod || c.FullError.Code != expectedCall.expectedError {
864 return allCalls, i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code)
865 }
866
867
868 if expectedCall.expectedSecret != nil {
869 if !reflect.DeepEqual(expectedCall.expectedSecret, c.Request.Secrets) {
870 return allCalls, i, fmt.Errorf("Unexpected secret: expected %v, got %v", expectedCall.expectedSecret, c.Request.Secrets)
871 }
872 }
873
874 }
875 if len(calls) > len(expectedCallSequence) {
876 return allCalls, len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence))
877 }
878
879 return allCalls, len(calls), nil
880
881 }
882
883
884
885 func createSELinuxMountPreHook(nodeStageMountOpts, nodePublishMountOpts *[]string, stageCalls, unstageCalls, publishCalls, unpublishCalls *atomic.Int32) *drivers.Hooks {
886 return &drivers.Hooks{
887 Pre: func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
888 switch req := request.(type) {
889 case *csipbv1.NodeStageVolumeRequest:
890 stageCalls.Add(1)
891 mountVolume := req.GetVolumeCapability().GetMount()
892 if mountVolume != nil {
893 *nodeStageMountOpts = mountVolume.MountFlags
894 }
895 case *csipbv1.NodePublishVolumeRequest:
896 publishCalls.Add(1)
897 mountVolume := req.GetVolumeCapability().GetMount()
898 if mountVolume != nil {
899 *nodePublishMountOpts = mountVolume.MountFlags
900 }
901 case *csipbv1.NodeUnstageVolumeRequest:
902 unstageCalls.Add(1)
903 case *csipbv1.NodeUnpublishVolumeRequest:
904 unpublishCalls.Add(1)
905 }
906 return nil, nil
907 },
908 }
909 }
910
911
912
913 func podRunning(ctx context.Context, c clientset.Interface, podName, namespace string) wait.ConditionFunc {
914 return func() (bool, error) {
915 pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
916 if err != nil {
917 return false, err
918 }
919 switch pod.Status.Phase {
920 case v1.PodRunning:
921 return true, nil
922 case v1.PodFailed, v1.PodSucceeded:
923 return false, errPodCompleted
924 }
925 return false, nil
926 }
927 }
928
929 func podHasStorage(ctx context.Context, c clientset.Interface, podName, namespace string, when time.Time) wait.ConditionFunc {
930
931 expectedEvent := fields.Set{
932 "involvedObject.kind": "Pod",
933 "involvedObject.name": podName,
934 "involvedObject.namespace": namespace,
935 "reason": "FailedScheduling",
936 }.AsSelector().String()
937 options := metav1.ListOptions{
938 FieldSelector: expectedEvent,
939 }
940
941 return func() (bool, error) {
942
943
944 events, err := c.CoreV1().Events(namespace).List(ctx, options)
945 if err != nil {
946 return false, fmt.Errorf("got error while getting events: %w", err)
947 }
948 for _, event := range events.Items {
949 if strings.Contains(event.Message, errReasonNotEnoughSpace) {
951 return false, errNotEnoughSpace
952 }
953 }
954 return false, nil
955 }
956 }
957
958 func anyOf(conditions ...wait.ConditionFunc) wait.ConditionFunc {
959 return func() (bool, error) {
960 for _, condition := range conditions {
961 done, err := condition()
962 if err != nil {
963 return false, err
964 }
965 if done {
966 return true, nil
967 }
968 }
969 return false, nil
970 }
971 }
972
973 func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
974 waitErr := wait.PollImmediate(10*time.Second, csiPodUnschedulableTimeout, func() (bool, error) {
975 pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
976 if err != nil {
977 return false, err
978 }
979 for _, c := range pod.Status.Conditions {
980
981
982 if c.Type == v1.PodScheduled && c.Status == v1.ConditionFalse && c.Reason != "" && c.Message != "" {
983 return true, nil
984 }
985 }
986 return false, nil
987 })
988 if waitErr != nil {
989 return fmt.Errorf("error waiting for pod %s/%s to have max volume condition: %v", pod.Namespace, pod.Name, waitErr)
990 }
991 return nil
992 }
993
View as plain text