1
16
17
35
36 package drivers
37
38 import (
39 "context"
40 "encoding/json"
41 "errors"
42 "fmt"
43 "strconv"
44 "strings"
45 "sync"
46 "time"
47
48 "github.com/onsi/ginkgo/v2"
49 spb "google.golang.org/genproto/googleapis/rpc/status"
50 "google.golang.org/grpc/codes"
51 grpcstatus "google.golang.org/grpc/status"
52
53 appsv1 "k8s.io/api/apps/v1"
54 v1 "k8s.io/api/core/v1"
55 rbacv1 "k8s.io/api/rbac/v1"
56 storagev1 "k8s.io/api/storage/v1"
57 apierrors "k8s.io/apimachinery/pkg/api/errors"
58 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
59 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
60 "k8s.io/apimachinery/pkg/util/sets"
61 "k8s.io/apimachinery/pkg/util/wait"
62 clientset "k8s.io/client-go/kubernetes"
63 "k8s.io/klog/v2"
64 "k8s.io/kubernetes/test/e2e/feature"
65 "k8s.io/kubernetes/test/e2e/framework"
66 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
67 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
68 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
69 e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
70 mockdriver "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/driver"
71 mockservice "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service"
72 "k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
73 storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
74 "k8s.io/kubernetes/test/e2e/storage/utils"
75
76 "google.golang.org/grpc"
77 )
78
79 const (
80
81 GCEPDCSIDriverName = "pd.csi.storage.gke.io"
82
83 GCEPDCSIZoneTopologyKey = "topology.gke.io/zone"
84
85
86 grpcCallPrefix = "gRPCCall:"
87 )
88
89
90 type hostpathCSIDriver struct {
91 driverInfo storageframework.DriverInfo
92 manifests []string
93 volumeAttributes []map[string]string
94 }
95
96 func initHostPathCSIDriver(name string, capabilities map[storageframework.Capability]bool, volumeAttributes []map[string]string, manifests ...string) storageframework.TestDriver {
97 return &hostpathCSIDriver{
98 driverInfo: storageframework.DriverInfo{
99 Name: name,
100 MaxFileSize: storageframework.FileSizeMedium,
101 SupportedFsType: sets.NewString(
102 "",
103 ),
104 SupportedSizeRange: e2evolume.SizeRange{
105 Min: "1Mi",
106 },
107 Capabilities: capabilities,
108 StressTestOptions: &storageframework.StressTestOptions{
109 NumPods: 10,
110 NumRestarts: 10,
111 },
112 VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{
113 NumPods: 10,
114 NumSnapshots: 10,
115 },
116 PerformanceTestOptions: &storageframework.PerformanceTestOptions{
117 ProvisioningOptions: &storageframework.PerformanceTestProvisioningOptions{
118 VolumeSize: "1Mi",
119 Count: 300,
120
121
122 ExpectedMetrics: &storageframework.Metrics{
123 AvgLatency: 2 * time.Minute,
124 Throughput: 0.5,
125 },
126 },
127 },
128 },
129 manifests: manifests,
130 volumeAttributes: volumeAttributes,
131 }
132 }
133
134 var _ storageframework.TestDriver = &hostpathCSIDriver{}
135 var _ storageframework.DynamicPVTestDriver = &hostpathCSIDriver{}
136 var _ storageframework.SnapshottableTestDriver = &hostpathCSIDriver{}
137 var _ storageframework.EphemeralTestDriver = &hostpathCSIDriver{}
138
139
140 func InitHostPathCSIDriver() storageframework.TestDriver {
141 capabilities := map[storageframework.Capability]bool{
142 storageframework.CapPersistence: true,
143 storageframework.CapSnapshotDataSource: true,
144 storageframework.CapMultiPODs: true,
145 storageframework.CapBlock: true,
146 storageframework.CapPVCDataSource: true,
147 storageframework.CapControllerExpansion: true,
148 storageframework.CapOfflineExpansion: true,
149 storageframework.CapOnlineExpansion: true,
150 storageframework.CapSingleNodeVolume: true,
151 storageframework.CapReadWriteOncePod: true,
152 storageframework.CapMultiplePVsSameID: true,
153 storageframework.CapFSResizeFromSourceNotSupported: true,
154
155
156
157
158
159 storageframework.CapVolumeLimits: true,
160 }
161 return initHostPathCSIDriver("csi-hostpath",
162 capabilities,
163
164 []map[string]string{
165 {"foo": "bar"},
166 },
167 "test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
168 "test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
169 "test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml",
170 "test/e2e/testing-manifests/storage-csi/external-health-monitor/external-health-monitor-controller/rbac.yaml",
171 "test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml",
172 "test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-driverinfo.yaml",
173 "test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-plugin.yaml",
174 "test/e2e/testing-manifests/storage-csi/hostpath/hostpath/e2e-test-rbac.yaml",
175 )
176 }
177
178 func (h *hostpathCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
179 return &h.driverInfo
180 }
181
182 func (h *hostpathCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
183 if pattern.VolType == storageframework.CSIInlineVolume && len(h.volumeAttributes) == 0 {
184 e2eskipper.Skipf("%s has no volume attributes defined, doesn't support ephemeral inline volumes", h.driverInfo.Name)
185 }
186 }
187
188 func (h *hostpathCSIDriver) GetDynamicProvisionStorageClass(ctx context.Context, config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
189 provisioner := config.GetUniqueDriverName()
190 parameters := map[string]string{}
191 ns := config.Framework.Namespace.Name
192
193 return storageframework.GetStorageClass(provisioner, parameters, nil, ns)
194 }
195
196 func (h *hostpathCSIDriver) GetVolume(config *storageframework.PerTestConfig, volumeNumber int) (map[string]string, bool, bool) {
197 return h.volumeAttributes[volumeNumber%len(h.volumeAttributes)], false , false
198 }
199
200 func (h *hostpathCSIDriver) GetCSIDriverName(config *storageframework.PerTestConfig) string {
201 return config.GetUniqueDriverName()
202 }
203
204 func (h *hostpathCSIDriver) GetSnapshotClass(ctx context.Context, config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
205 snapshotter := config.GetUniqueDriverName()
206 ns := config.Framework.Namespace.Name
207
208 return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
209 }
210
211 func (h *hostpathCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework) *storageframework.PerTestConfig {
212
213 driverNamespace := utils.CreateDriverNamespace(ctx, f)
214 driverns := driverNamespace.Name
215 testns := f.Namespace.Name
216
217 ginkgo.By(fmt.Sprintf("deploying %s driver", h.driverInfo.Name))
218 cancelLogging := utils.StartPodLogs(ctx, f, driverNamespace)
219 cs := f.ClientSet
220
221
222 node, err := e2enode.GetRandomReadySchedulableNode(ctx, cs)
223 framework.ExpectNoError(err)
224 config := &storageframework.PerTestConfig{
225 Driver: h,
226 Prefix: "hostpath",
227 Framework: f,
228 ClientNodeSelection: e2epod.NodeSelection{Name: node.Name},
229 DriverNamespace: driverNamespace,
230 }
231
232 o := utils.PatchCSIOptions{
233 OldDriverName: h.driverInfo.Name,
234 NewDriverName: config.GetUniqueDriverName(),
235 DriverContainerName: "hostpath",
236 DriverContainerArguments: []string{"--drivername=" + config.GetUniqueDriverName(),
237
238
239
240 "--maxvolumespernode=10",
241
242
243 "--check-volume-lifecycle=true",
244 },
245 ProvisionerContainerName: "csi-provisioner",
246 SnapshotterContainerName: "csi-snapshotter",
247 NodeName: node.Name,
248 }
249
250 err = utils.CreateFromManifests(ctx, config.Framework, driverNamespace, func(item interface{}) error {
251 if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil {
252 return err
253 }
254
255
256
257
258
259
260
261
262 switch item := item.(type) {
263 case *appsv1.StatefulSet:
264 var containers []v1.Container
265 for _, container := range item.Spec.Template.Spec.Containers {
266 switch container.Name {
267 case "csi-external-health-monitor-agent", "csi-external-health-monitor-controller":
268
269 default:
270
271 containers = append(containers, container)
272 }
273 }
274 item.Spec.Template.Spec.Containers = containers
275 }
276 return nil
277 }, h.manifests...)
278
279 if err != nil {
280 framework.Failf("deploying %s driver: %v", h.driverInfo.Name, err)
281 }
282
283 cleanupFunc := generateDriverCleanupFunc(
284 f,
285 h.driverInfo.Name,
286 testns,
287 driverns,
288 cancelLogging)
289 ginkgo.DeferCleanup(cleanupFunc)
290
291 return config
292 }
293
294
295 type mockCSIDriver struct {
296 driverInfo storageframework.DriverInfo
297 manifests []string
298 podInfo *bool
299 storageCapacity *bool
300 attachable bool
301 attachLimit int
302 enableTopology bool
303 enableNodeExpansion bool
304 hooks Hooks
305 tokenRequests []storagev1.TokenRequest
306 requiresRepublish *bool
307 fsGroupPolicy *storagev1.FSGroupPolicy
308 enableVolumeMountGroup bool
309 embedded bool
310 calls MockCSICalls
311 embeddedCSIDriver *mockdriver.CSIDriver
312 enableSELinuxMount *bool
313 enableRecoverExpansionFailure bool
314
315
316 clientSet clientset.Interface
317 driverNamespace *v1.Namespace
318 }
319
320
321
322
323
324
325
326 type Hooks struct {
327
328
329 Pre func(ctx context.Context, method string, request interface{}) (reply interface{}, err error)
330
331
332
333 Post func(ctx context.Context, method string, request, reply interface{}, err error) (finalReply interface{}, finalErr error)
334 }
335
336
337 type MockCSITestDriver interface {
338 storageframework.DynamicPVTestDriver
339
340
341
342 GetCalls(ctx context.Context) ([]MockCSICall, error)
343 }
344
345
346 type CSIMockDriverOpts struct {
347 RegisterDriver bool
348 DisableAttach bool
349 PodInfo *bool
350 StorageCapacity *bool
351 AttachLimit int
352 EnableTopology bool
353 EnableResizing bool
354 EnableNodeExpansion bool
355 EnableSnapshot bool
356 EnableVolumeMountGroup bool
357 TokenRequests []storagev1.TokenRequest
358 RequiresRepublish *bool
359 FSGroupPolicy *storagev1.FSGroupPolicy
360 EnableSELinuxMount *bool
361 EnableRecoverExpansionFailure bool
362
363
364
365
366
367 Embedded bool
368
369
370
371
372 Hooks Hooks
373 }
374
375
376 type MockCSICall struct {
377 json string
378
379 Method string
380 Request struct {
381 VolumeContext map[string]string `json:"volume_context"`
382 Secrets map[string]string `json:"secrets"`
383 }
384 FullError struct {
385 Code codes.Code `json:"code"`
386 Message string `json:"message"`
387 }
388 Error string
389 }
390
391
392 type MockCSICalls struct {
393 calls []MockCSICall
394 mutex sync.Mutex
395 }
396
397
398 func (c *MockCSICalls) Get() []MockCSICall {
399 c.mutex.Lock()
400 defer c.mutex.Unlock()
401
402 return c.calls[:]
403 }
404
405
406 func (c *MockCSICalls) Add(call MockCSICall) {
407 c.mutex.Lock()
408 defer c.mutex.Unlock()
409
410 c.calls = append(c.calls, call)
411 }
412
413
414 func (c *MockCSICalls) LogGRPC(method string, request, reply interface{}, err error) {
415
416
417 logMessage := struct {
418 Method string
419 Request interface{}
420 Response interface{}
421
422
423 Error string
424
425 FullError *spb.Status
426 }{
427 Method: method,
428 Request: request,
429 Response: reply,
430 }
431
432 if err != nil {
433 logMessage.Error = err.Error()
434 logMessage.FullError = grpcstatus.Convert(err).Proto()
435 }
436
437 msg, _ := json.Marshal(logMessage)
438 call := MockCSICall{
439 json: string(msg),
440 }
441 json.Unmarshal(msg, &call)
442
443 klog.Infof("%s %s", grpcCallPrefix, string(msg))
444
445
446 methodParts := strings.Split(call.Method, "/")
447 call.Method = methodParts[len(methodParts)-1]
448
449 c.Add(call)
450 }
451
452 var _ storageframework.TestDriver = &mockCSIDriver{}
453 var _ storageframework.DynamicPVTestDriver = &mockCSIDriver{}
454 var _ storageframework.SnapshottableTestDriver = &mockCSIDriver{}
455
456
457 func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver {
458 driverManifests := []string{
459 "test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
460 "test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
461 "test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml",
462 "test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml",
463 "test/e2e/testing-manifests/storage-csi/mock/csi-mock-rbac.yaml",
464 "test/e2e/testing-manifests/storage-csi/mock/csi-storageclass.yaml",
465 }
466 if driverOpts.Embedded {
467 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml")
468 } else {
469 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml")
470 }
471
472 if driverOpts.RegisterDriver {
473 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driverinfo.yaml")
474 }
475
476 if !driverOpts.DisableAttach {
477 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-attacher.yaml")
478 }
479
480 if driverOpts.EnableResizing {
481 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-resizer.yaml")
482 }
483
484 if driverOpts.EnableSnapshot {
485 driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-snapshotter.yaml")
486 }
487
488 return &mockCSIDriver{
489 driverInfo: storageframework.DriverInfo{
490 Name: "csi-mock",
491 MaxFileSize: storageframework.FileSizeMedium,
492 SupportedFsType: sets.NewString(
493 "",
494 ),
495 Capabilities: map[storageframework.Capability]bool{
496 storageframework.CapPersistence: false,
497 storageframework.CapFsGroup: false,
498 storageframework.CapExec: false,
499 storageframework.CapVolumeLimits: true,
500 storageframework.CapMultiplePVsSameID: true,
501 },
502 },
503 manifests: driverManifests,
504 podInfo: driverOpts.PodInfo,
505 storageCapacity: driverOpts.StorageCapacity,
506 enableTopology: driverOpts.EnableTopology,
507 attachable: !driverOpts.DisableAttach,
508 attachLimit: driverOpts.AttachLimit,
509 enableNodeExpansion: driverOpts.EnableNodeExpansion,
510 tokenRequests: driverOpts.TokenRequests,
511 requiresRepublish: driverOpts.RequiresRepublish,
512 fsGroupPolicy: driverOpts.FSGroupPolicy,
513 enableVolumeMountGroup: driverOpts.EnableVolumeMountGroup,
514 enableSELinuxMount: driverOpts.EnableSELinuxMount,
515 enableRecoverExpansionFailure: driverOpts.EnableRecoverExpansionFailure,
516 embedded: driverOpts.Embedded,
517 hooks: driverOpts.Hooks,
518 }
519 }
520
521 func (m *mockCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
522 return &m.driverInfo
523 }
524
525 func (m *mockCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
526 }
527
528 func (m *mockCSIDriver) GetDynamicProvisionStorageClass(ctx context.Context, config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
529 provisioner := config.GetUniqueDriverName()
530 parameters := map[string]string{}
531 ns := config.Framework.Namespace.Name
532
533 return storageframework.GetStorageClass(provisioner, parameters, nil, ns)
534 }
535
536 func (m *mockCSIDriver) GetSnapshotClass(ctx context.Context, config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
537 snapshotter := m.driverInfo.Name + "-" + config.Framework.UniqueName
538 ns := config.Framework.Namespace.Name
539
540 return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
541 }
542
543 func (m *mockCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework) *storageframework.PerTestConfig {
544 m.clientSet = f.ClientSet
545
546
547 m.driverNamespace = utils.CreateDriverNamespace(ctx, f)
548 driverns := m.driverNamespace.Name
549 testns := f.Namespace.Name
550
551 if m.embedded {
552 ginkgo.By("deploying csi mock proxy")
553 } else {
554 ginkgo.By("deploying csi mock driver")
555 }
556 cancelLogging := utils.StartPodLogs(ctx, f, m.driverNamespace)
557 cs := f.ClientSet
558
559
560 node, err := e2enode.GetRandomReadySchedulableNode(ctx, cs)
561 framework.ExpectNoError(err)
562
563 embeddedCleanup := func() {}
564 containerArgs := []string{}
565 if m.embedded {
566
567
568
569
570
571
572
573
574
575
576 podname := "csi-mockplugin-0"
577 containername := "mock"
578
579
580
581 ctx, cancel := context.WithCancel(context.Background())
582 serviceConfig := mockservice.Config{
583 DisableAttach: !m.attachable,
584 DriverName: "csi-mock-" + f.UniqueName,
585 AttachLimit: int64(m.attachLimit),
586 NodeExpansionRequired: m.enableNodeExpansion,
587 VolumeMountGroupRequired: m.enableVolumeMountGroup,
588 EnableTopology: m.enableTopology,
589 IO: proxy.PodDirIO{
590 F: f,
591 Namespace: m.driverNamespace.Name,
592 PodName: podname,
593 ContainerName: "busybox",
594 },
595 }
596 s := mockservice.New(serviceConfig)
597 servers := &mockdriver.CSIDriverServers{
598 Controller: s,
599 Identity: s,
600 Node: s,
601 }
602 m.embeddedCSIDriver = mockdriver.NewCSIDriver(servers)
603 l, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(),
604 proxy.Addr{
605 Namespace: m.driverNamespace.Name,
606 PodName: podname,
607 ContainerName: containername,
608 Port: 9000,
609 },
610 )
611
612 framework.ExpectNoError(err, "start connecting to proxy pod")
613 err = m.embeddedCSIDriver.Start(l, m.interceptGRPC)
614 framework.ExpectNoError(err, "start mock driver")
615
616 embeddedCleanup = func() {
617
618 m.embeddedCSIDriver.Stop()
619 l.Close()
620 cancel()
621 }
622 } else {
623
624
625 containerArgs = append(containerArgs, "--drivername=csi-mock-"+f.UniqueName)
626
627 if m.attachable {
628 containerArgs = append(containerArgs, "--enable-attach")
629 }
630
631 if m.enableTopology {
632 containerArgs = append(containerArgs, "--enable-topology")
633 }
634
635 if m.attachLimit > 0 {
636 containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit))
637 }
638
639 if m.enableNodeExpansion {
640 containerArgs = append(containerArgs, "--node-expand-required=true")
641 }
642 }
643
644 config := &storageframework.PerTestConfig{
645 Driver: m,
646 Prefix: "mock",
647 Framework: f,
648 ClientNodeSelection: e2epod.NodeSelection{Name: node.Name},
649 DriverNamespace: m.driverNamespace,
650 }
651
652 o := utils.PatchCSIOptions{
653 OldDriverName: "csi-mock",
654 NewDriverName: "csi-mock-" + f.UniqueName,
655 DriverContainerName: "mock",
656 DriverContainerArguments: containerArgs,
657 ProvisionerContainerName: "csi-provisioner",
658 NodeName: node.Name,
659 PodInfo: m.podInfo,
660 StorageCapacity: m.storageCapacity,
661 CanAttach: &m.attachable,
662 VolumeLifecycleModes: &[]storagev1.VolumeLifecycleMode{
663 storagev1.VolumeLifecyclePersistent,
664 storagev1.VolumeLifecycleEphemeral,
665 },
666 TokenRequests: m.tokenRequests,
667 RequiresRepublish: m.requiresRepublish,
668 FSGroupPolicy: m.fsGroupPolicy,
669 SELinuxMount: m.enableSELinuxMount,
670 Features: map[string][]string{},
671 }
672
673 if m.enableRecoverExpansionFailure {
674 o.Features["csi-resizer"] = []string{"RecoverVolumeExpansionFailure=true"}
675 }
676 err = utils.CreateFromManifests(ctx, f, m.driverNamespace, func(item interface{}) error {
677 if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil {
678 return err
679 }
680
681 switch item := item.(type) {
682 case *rbacv1.ClusterRole:
683 if strings.HasPrefix(item.Name, "external-snapshotter-runner") {
684
685
686
687 item.Rules = append(item.Rules, rbacv1.PolicyRule{
688 APIGroups: []string{""},
689 Resources: []string{"secrets"},
690 Verbs: []string{"get", "list"},
691 })
692 }
693 }
694
695 return nil
696 }, m.manifests...)
697
698 if err != nil {
699 framework.Failf("deploying csi mock driver: %v", err)
700 }
701
702 driverCleanupFunc := generateDriverCleanupFunc(
703 f,
704 "mock",
705 testns,
706 driverns,
707 cancelLogging)
708
709 ginkgo.DeferCleanup(func(ctx context.Context) {
710 embeddedCleanup()
711 driverCleanupFunc(ctx)
712 })
713
714 return config
715 }
716
717 func (m *mockCSIDriver) interceptGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
718 defer func() {
719
720
721
722 m.calls.LogGRPC(info.FullMethod, req, resp, err)
723 }()
724
725 if m.hooks.Pre != nil {
726 resp, err = m.hooks.Pre(ctx, info.FullMethod, req)
727 if resp != nil || err != nil {
728 return
729 }
730 }
731 resp, err = handler(ctx, req)
732 if m.hooks.Post != nil {
733 resp, err = m.hooks.Post(ctx, info.FullMethod, req, resp, err)
734 }
735 return
736 }
737
738 func (m *mockCSIDriver) GetCalls(ctx context.Context) ([]MockCSICall, error) {
739 if m.embedded {
740 return m.calls.Get(), nil
741 }
742
743 if m.driverNamespace == nil {
744 return nil, errors.New("PrepareTest not called yet")
745 }
746
747
748 driverPodName := "csi-mockplugin-0"
749
750 driverContainerName := "mock"
751
752
753 log, err := e2epod.GetPodLogs(ctx, m.clientSet, m.driverNamespace.Name, driverPodName, driverContainerName)
754 if err != nil {
755 return nil, fmt.Errorf("could not load CSI driver logs: %w", err)
756 }
757
758 logLines := strings.Split(log, "\n")
759 var calls []MockCSICall
760 for _, line := range logLines {
761 index := strings.Index(line, grpcCallPrefix)
762 if index == -1 {
763 continue
764 }
765 line = line[index+len(grpcCallPrefix):]
766 call := MockCSICall{
767 json: string(line),
768 }
769 err := json.Unmarshal([]byte(line), &call)
770 if err != nil {
771 framework.Logf("Could not parse CSI driver log line %q: %s", line, err)
772 continue
773 }
774
775
776 methodParts := strings.Split(call.Method, "/")
777 call.Method = methodParts[len(methodParts)-1]
778
779 calls = append(calls, call)
780 }
781 return calls, nil
782 }
783
784
785 type gcePDCSIDriver struct {
786 driverInfo storageframework.DriverInfo
787 }
788
789 var _ storageframework.TestDriver = &gcePDCSIDriver{}
790 var _ storageframework.DynamicPVTestDriver = &gcePDCSIDriver{}
791 var _ storageframework.SnapshottableTestDriver = &gcePDCSIDriver{}
792
793
794 func InitGcePDCSIDriver() storageframework.TestDriver {
795 return &gcePDCSIDriver{
796 driverInfo: storageframework.DriverInfo{
797 Name: GCEPDCSIDriverName,
798 TestTags: []interface{}{framework.WithSerial()},
799 MaxFileSize: storageframework.FileSizeMedium,
800 SupportedSizeRange: e2evolume.SizeRange{
801 Min: "5Gi",
802 },
803 SupportedFsType: sets.NewString(
804 "",
805 "ext2",
806 "ext3",
807 "ext4",
808 "xfs",
809 ),
810 SupportedMountOption: sets.NewString("debug", "nouid32"),
811 Capabilities: map[storageframework.Capability]bool{
812 storageframework.CapPersistence: true,
813 storageframework.CapBlock: true,
814 storageframework.CapFsGroup: true,
815 storageframework.CapExec: true,
816 storageframework.CapMultiPODs: true,
817
818
819 storageframework.CapVolumeLimits: false,
820 storageframework.CapTopology: true,
821 storageframework.CapControllerExpansion: true,
822 storageframework.CapOfflineExpansion: true,
823 storageframework.CapOnlineExpansion: true,
824 storageframework.CapNodeExpansion: true,
825 storageframework.CapSnapshotDataSource: true,
826 storageframework.CapReadWriteOncePod: true,
827 storageframework.CapMultiplePVsSameID: true,
828 storageframework.CapFSResizeFromSourceNotSupported: true,
829 },
830 RequiredAccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
831 TopologyKeys: []string{GCEPDCSIZoneTopologyKey},
832 StressTestOptions: &storageframework.StressTestOptions{
833 NumPods: 10,
834 NumRestarts: 10,
835 },
836 VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{
837
838
839
840
841 NumPods: 20,
842 NumSnapshots: 2,
843 },
844 },
845 }
846 }
847
848 func (g *gcePDCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
849 return &g.driverInfo
850 }
851
852 func (g *gcePDCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
853 e2eskipper.SkipUnlessProviderIs("gce", "gke")
854 if pattern.FsType == "xfs" {
855 e2eskipper.SkipUnlessNodeOSDistroIs("ubuntu", "custom")
856 }
857 for _, tag := range pattern.TestTags {
858 if framework.TagsEqual(tag, feature.Windows) {
859 e2eskipper.Skipf("Skipping tests for windows since CSI does not support it yet")
860 }
861 }
862 }
863
864 func (g *gcePDCSIDriver) GetDynamicProvisionStorageClass(ctx context.Context, config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
865 ns := config.Framework.Namespace.Name
866 provisioner := g.driverInfo.Name
867
868 parameters := map[string]string{"type": "pd-standard"}
869 if fsType != "" {
870 parameters["csi.storage.k8s.io/fstype"] = fsType
871 }
872 delayedBinding := storagev1.VolumeBindingWaitForFirstConsumer
873
874 return storageframework.GetStorageClass(provisioner, parameters, &delayedBinding, ns)
875 }
876
877 func (g *gcePDCSIDriver) GetSnapshotClass(ctx context.Context, config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
878 snapshotter := g.driverInfo.Name
879 ns := config.Framework.Namespace.Name
880
881 return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
882 }
883
884 func (g *gcePDCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework) *storageframework.PerTestConfig {
885 testns := f.Namespace.Name
886 cfg := &storageframework.PerTestConfig{
887 Driver: g,
888 Prefix: "gcepd",
889 Framework: f,
890 }
891
892 if framework.ProviderIs("gke") {
893 framework.Logf("The csi gce-pd driver is automatically installed in GKE. Skipping driver installation.")
894 return cfg
895 }
896
897
898 deploy, err := f.ClientSet.AppsV1().Deployments("gce-pd-csi-driver").Get(ctx, "csi-gce-pd-controller", metav1.GetOptions{})
899 if err == nil && deploy != nil {
900 framework.Logf("The csi gce-pd driver is already installed.")
901 return cfg
902 }
903 ginkgo.By("deploying csi gce-pd driver")
904
905 driverNamespace := utils.CreateDriverNamespace(ctx, f)
906 driverns := driverNamespace.Name
907
908 cancelLogging := utils.StartPodLogs(ctx, f, driverNamespace)
909
910
911
912
913
914
915
916
917
918
919
920
921 createGCESecrets(f.ClientSet, driverns)
922
923 manifests := []string{
924 "test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
925 "test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
926 "test/e2e/testing-manifests/storage-csi/gce-pd/csi-controller-rbac.yaml",
927 "test/e2e/testing-manifests/storage-csi/gce-pd/node_ds.yaml",
928 "test/e2e/testing-manifests/storage-csi/gce-pd/controller_ss.yaml",
929 }
930
931 err = utils.CreateFromManifests(ctx, f, driverNamespace, nil, manifests...)
932 if err != nil {
933 framework.Failf("deploying csi gce-pd driver: %v", err)
934 }
935
936 if err = WaitForCSIDriverRegistrationOnAllNodes(ctx, GCEPDCSIDriverName, f.ClientSet); err != nil {
937 framework.Failf("waiting for csi driver node registration on: %v", err)
938 }
939
940 cleanupFunc := generateDriverCleanupFunc(
941 f,
942 "gce-pd",
943 testns,
944 driverns,
945 cancelLogging)
946 ginkgo.DeferCleanup(cleanupFunc)
947
948 return &storageframework.PerTestConfig{
949 Driver: g,
950 Prefix: "gcepd",
951 Framework: f,
952 DriverNamespace: driverNamespace,
953 }
954 }
955
956
957
958 func WaitForCSIDriverRegistrationOnAllNodes(ctx context.Context, driverName string, cs clientset.Interface) error {
959 nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
960 if err != nil {
961 return err
962 }
963 for _, node := range nodes.Items {
964 if err := WaitForCSIDriverRegistrationOnNode(ctx, node.Name, driverName, cs); err != nil {
965 return err
966 }
967 }
968 return nil
969 }
970
971
972 func WaitForCSIDriverRegistrationOnNode(ctx context.Context, nodeName string, driverName string, cs clientset.Interface) error {
973 framework.Logf("waiting for CSIDriver %v to register on node %v", driverName, nodeName)
974
975
976 backoff := wait.Backoff{
977 Duration: 2 * time.Second,
978 Factor: 1.5,
979 Steps: 12,
980 }
981
982 waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
983 csiNode, err := cs.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
984 if err != nil && !apierrors.IsNotFound(err) {
985 return false, err
986 }
987 for _, driver := range csiNode.Spec.Drivers {
988 if driver.Name == driverName {
989 return true, nil
990 }
991 }
992 return false, nil
993 })
994 if waitErr != nil {
995 return fmt.Errorf("error waiting for CSI driver %s registration on node %s: %v", driverName, nodeName, waitErr)
996 }
997 return nil
998 }
999
1000 func tryFunc(f func()) error {
1001 var err error
1002 if f == nil {
1003 return nil
1004 }
1005 defer func() {
1006 if recoverError := recover(); recoverError != nil {
1007 err = fmt.Errorf("%v", recoverError)
1008 }
1009 }()
1010 f()
1011 return err
1012 }
1013
1014 func generateDriverCleanupFunc(
1015 f *framework.Framework,
1016 driverName, testns, driverns string,
1017 cancelLogging func()) func(ctx context.Context) {
1018
1019
1020
1021 cleanupFunc := func(ctx context.Context) {
1022 ginkgo.By(fmt.Sprintf("deleting the test namespace: %s", testns))
1023
1024
1025 _ = tryFunc(func() { f.DeleteNamespace(ctx, testns) })
1026
1027 ginkgo.By(fmt.Sprintf("uninstalling csi %s driver", driverName))
1028 _ = tryFunc(cancelLogging)
1029
1030 ginkgo.By(fmt.Sprintf("deleting the driver namespace: %s", driverns))
1031 _ = tryFunc(func() { f.DeleteNamespace(ctx, driverns) })
1032 }
1033
1034 return cleanupFunc
1035 }
1036
View as plain text