1
16
17 package csi
18
19 import (
20 "context"
21 "errors"
22 "io"
23 "os"
24 "path/filepath"
25 "reflect"
26 "strconv"
27 "testing"
28
29 csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
30 "github.com/stretchr/testify/assert"
31
32 api "k8s.io/api/core/v1"
33 "k8s.io/apimachinery/pkg/api/resource"
34 utilfeature "k8s.io/apiserver/pkg/util/feature"
35 utiltesting "k8s.io/client-go/util/testing"
36 featuregatetesting "k8s.io/component-base/featuregate/testing"
37 "k8s.io/kubernetes/pkg/features"
38 "k8s.io/kubernetes/pkg/volume"
39 "k8s.io/kubernetes/pkg/volume/csi/fake"
40 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
41 )
42
43 type fakeCsiDriverClient struct {
44 t *testing.T
45 nodeClient *fake.NodeClient
46 }
47
48 func newFakeCsiDriverClient(t *testing.T, stagingCapable bool) *fakeCsiDriverClient {
49 return &fakeCsiDriverClient{
50 t: t,
51 nodeClient: fake.NewNodeClient(stagingCapable),
52 }
53 }
54
55 func newFakeCsiDriverClientWithExpansion(t *testing.T, stagingCapable bool, expansionSet bool) *fakeCsiDriverClient {
56 return &fakeCsiDriverClient{
57 t: t,
58 nodeClient: fake.NewNodeClientWithExpansion(stagingCapable, expansionSet),
59 }
60 }
61
62 func newFakeCsiDriverClientWithVolumeStats(t *testing.T, volumeStatsSet bool) *fakeCsiDriverClient {
63 return &fakeCsiDriverClient{
64 t: t,
65 nodeClient: fake.NewNodeClientWithVolumeStats(volumeStatsSet),
66 }
67 }
68
69 func newFakeCsiDriverClientWithVolumeStatsAndCondition(t *testing.T, volumeStatsSet, volumeConditionSet bool) *fakeCsiDriverClient {
70 return &fakeCsiDriverClient{
71 t: t,
72 nodeClient: fake.NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet),
73 }
74 }
75
76 func newFakeCsiDriverClientWithVolumeMountGroup(t *testing.T, stagingCapable, volumeMountGroupSet bool) *fakeCsiDriverClient {
77 return &fakeCsiDriverClient{
78 t: t,
79 nodeClient: fake.NewNodeClientWithVolumeMountGroup(stagingCapable, volumeMountGroupSet),
80 }
81 }
82
83 func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) (
84 nodeID string,
85 maxVolumePerNode int64,
86 accessibleTopology map[string]string,
87 err error) {
88 resp, err := c.nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
89 topology := resp.GetAccessibleTopology()
90 if topology != nil {
91 accessibleTopology = topology.Segments
92 }
93 return resp.GetNodeId(), resp.GetMaxVolumesPerNode(), accessibleTopology, err
94 }
95
96 func (c *fakeCsiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (
97 usageCountMap *volume.Metrics, err error) {
98 c.t.Log("calling fake.NodeGetVolumeStats...")
99 req := &csipbv1.NodeGetVolumeStatsRequest{
100 VolumeId: volID,
101 VolumePath: targetPath,
102 }
103
104 c.nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo())
105 resp, err := c.nodeClient.NodeGetVolumeStats(ctx, req)
106 if err != nil {
107 return nil, err
108 }
109
110 usages := resp.GetUsage()
111 if usages == nil {
112 return nil, nil
113 }
114
115 metrics := &volume.Metrics{}
116
117 isSupportNodeVolumeCondition, err := c.nodeSupportsVolumeCondition(ctx)
118 if err != nil {
119 return nil, err
120 }
121
122 if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) && isSupportNodeVolumeCondition {
123 abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage()
124 metrics.Abnormal, metrics.Message = &abnormal, &message
125 }
126
127 for _, usage := range usages {
128 if usage == nil {
129 continue
130 }
131 unit := usage.GetUnit()
132 switch unit {
133 case csipbv1.VolumeUsage_BYTES:
134 metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
135 metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
136 metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
137 case csipbv1.VolumeUsage_INODES:
138 metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
139 metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
140 metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
141 }
142 }
143 return metrics, nil
144 }
145
146 func (c *fakeCsiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
147 c.t.Log("calling fake.NodeSupportsVolumeStats...")
148 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS)
149 }
150
151 func (c *fakeCsiDriverClient) NodePublishVolume(
152 ctx context.Context,
153 volID string,
154 readOnly bool,
155 stagingTargetPath string,
156 targetPath string,
157 accessMode api.PersistentVolumeAccessMode,
158 publishContext map[string]string,
159 volumeContext map[string]string,
160 secrets map[string]string,
161 fsType string,
162 mountOptions []string,
163 fsGroup *int64,
164 ) error {
165 c.t.Log("calling fake.NodePublishVolume...")
166 req := &csipbv1.NodePublishVolumeRequest{
167 VolumeId: volID,
168 TargetPath: targetPath,
169 StagingTargetPath: stagingTargetPath,
170 Readonly: readOnly,
171 PublishContext: publishContext,
172 VolumeContext: volumeContext,
173 Secrets: secrets,
174 VolumeCapability: &csipbv1.VolumeCapability{
175 AccessMode: &csipbv1.VolumeCapability_AccessMode{
176 Mode: asCSIAccessModeV1(accessMode),
177 },
178 },
179 }
180
181 if fsType == fsTypeBlockName {
182 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
183 Block: &csipbv1.VolumeCapability_BlockVolume{},
184 }
185 } else {
186 mountVolume := &csipbv1.VolumeCapability_MountVolume{
187 FsType: fsType,
188 MountFlags: mountOptions,
189 }
190 if fsGroup != nil {
191 mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 )
192 }
193 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
194 Mount: mountVolume,
195 }
196 }
197
198 _, err := c.nodeClient.NodePublishVolume(ctx, req)
199 if err != nil && !isFinalError(err) {
200 return volumetypes.NewUncertainProgressError(err.Error())
201 }
202 return err
203 }
204
205 func (c *fakeCsiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error {
206 c.t.Log("calling fake.NodeUnpublishVolume...")
207 req := &csipbv1.NodeUnpublishVolumeRequest{
208 VolumeId: volID,
209 TargetPath: targetPath,
210 }
211
212 _, err := c.nodeClient.NodeUnpublishVolume(ctx, req)
213 return err
214 }
215
216 func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context,
217 volID string,
218 publishContext map[string]string,
219 stagingTargetPath string,
220 fsType string,
221 accessMode api.PersistentVolumeAccessMode,
222 secrets map[string]string,
223 volumeContext map[string]string,
224 mountOptions []string,
225 fsGroup *int64,
226 ) error {
227 c.t.Log("calling fake.NodeStageVolume...")
228 req := &csipbv1.NodeStageVolumeRequest{
229 VolumeId: volID,
230 PublishContext: publishContext,
231 StagingTargetPath: stagingTargetPath,
232 VolumeCapability: &csipbv1.VolumeCapability{
233 AccessMode: &csipbv1.VolumeCapability_AccessMode{
234 Mode: asCSIAccessModeV1(accessMode),
235 },
236 },
237 Secrets: secrets,
238 VolumeContext: volumeContext,
239 }
240 if fsType == fsTypeBlockName {
241 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
242 Block: &csipbv1.VolumeCapability_BlockVolume{},
243 }
244 } else {
245 mountVolume := &csipbv1.VolumeCapability_MountVolume{
246 FsType: fsType,
247 MountFlags: mountOptions,
248 }
249 if fsGroup != nil {
250 mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 )
251 }
252 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
253 Mount: mountVolume,
254 }
255 }
256
257 _, err := c.nodeClient.NodeStageVolume(ctx, req)
258 if err != nil && !isFinalError(err) {
259 return volumetypes.NewUncertainProgressError(err.Error())
260 }
261 return err
262 }
263
264 func (c *fakeCsiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error {
265 c.t.Log("calling fake.NodeUnstageVolume...")
266 req := &csipbv1.NodeUnstageVolumeRequest{
267 VolumeId: volID,
268 StagingTargetPath: stagingTargetPath,
269 }
270 _, err := c.nodeClient.NodeUnstageVolume(ctx, req)
271 return err
272 }
273
274 func (c *fakeCsiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
275 c.t.Log("calling fake.NodeSupportsNodeExpand...")
276 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME)
277 }
278
279 func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
280 c.t.Log("calling fake.NodeGetCapabilities for NodeSupportsStageUnstage...")
281 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME)
282 }
283
284 func (c *fakeCsiDriverClient) NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error) {
285 c.t.Log("calling fake.NodeGetCapabilities for NodeSupportsVolumeMountGroup...")
286 req := &csipbv1.NodeGetCapabilitiesRequest{}
287 resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
288 if err != nil {
289 return false, err
290 }
291
292 capabilities := resp.GetCapabilities()
293
294 volumeMountGroupSet := false
295 if capabilities == nil {
296 return false, nil
297 }
298 for _, capability := range capabilities {
299 if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_VOLUME_MOUNT_GROUP {
300 volumeMountGroupSet = true
301 }
302 }
303 return volumeMountGroupSet, nil
304 }
305
306 func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
307 c.t.Log("calling fake.NodeExpandVolume")
308 req := &csipbv1.NodeExpandVolumeRequest{
309 VolumeId: opts.volumeID,
310 VolumePath: opts.volumePath,
311 StagingTargetPath: opts.stagingTargetPath,
312 CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
313 VolumeCapability: &csipbv1.VolumeCapability{
314 AccessMode: &csipbv1.VolumeCapability_AccessMode{
315 Mode: asCSIAccessModeV1(opts.accessMode),
316 },
317 },
318 }
319 if opts.fsType == fsTypeBlockName {
320 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
321 Block: &csipbv1.VolumeCapability_BlockVolume{},
322 }
323 } else {
324 req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
325 Mount: &csipbv1.VolumeCapability_MountVolume{
326 FsType: opts.fsType,
327 MountFlags: opts.mountOptions,
328 },
329 }
330 }
331 resp, err := c.nodeClient.NodeExpandVolume(ctx, req)
332 if err != nil {
333 return opts.newSize, err
334 }
335 updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
336 return *updatedQuantity, nil
337 }
338
339 func (c *fakeCsiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (bool, error) {
340 c.t.Log("calling fake.nodeSupportsVolumeCondition...")
341 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION)
342 }
343
344 func (c *fakeCsiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) {
345 c.t.Log("calling fake.NodeSupportsSingleNodeMultiWriterAccessMode...")
346 return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER)
347 }
348
349 func (c *fakeCsiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) {
350 capabilities, err := c.nodeGetCapabilities(ctx)
351 if err != nil {
352 return false, err
353 }
354
355 for _, capability := range capabilities {
356 if capability.GetRpc().GetType() == capabilityType {
357 return true, nil
358 }
359 }
360 return false, nil
361 }
362
363 func (c *fakeCsiDriverClient) nodeGetCapabilities(ctx context.Context) ([]*csipbv1.NodeServiceCapability, error) {
364 req := &csipbv1.NodeGetCapabilitiesRequest{}
365 resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
366 if err != nil {
367 return []*csipbv1.NodeServiceCapability{}, err
368 }
369 return resp.GetCapabilities(), nil
370 }
371
372 func setupClient(t *testing.T, stageUnstageSet bool) csiClient {
373 return newFakeCsiDriverClient(t, stageUnstageSet)
374 }
375
376 func setupClientWithExpansion(t *testing.T, stageUnstageSet bool, expansionSet bool) csiClient {
377 return newFakeCsiDriverClientWithExpansion(t, stageUnstageSet, expansionSet)
378 }
379
380 func setupClientWithVolumeStatsAndCondition(t *testing.T, volumeStatsSet, volumeConditionSet bool) csiClient {
381 return newFakeCsiDriverClientWithVolumeStatsAndCondition(t, volumeStatsSet, volumeConditionSet)
382 }
383
384 func setupClientWithVolumeStats(t *testing.T, volumeStatsSet bool) csiClient {
385 return newFakeCsiDriverClientWithVolumeStats(t, volumeStatsSet)
386 }
387
388 func setupClientWithVolumeMountGroup(t *testing.T, stageUnstageSet bool, volumeMountGroupSet bool) csiClient {
389 return newFakeCsiDriverClientWithVolumeMountGroup(t, stageUnstageSet, volumeMountGroupSet)
390 }
391
392 func checkErr(t *testing.T, expectedAnError bool, actualError error) {
393 t.Helper()
394
395 errOccurred := actualError != nil
396
397 if expectedAnError && !errOccurred {
398 t.Error("expected an error")
399 }
400
401 if !expectedAnError && errOccurred {
402 t.Errorf("expected no error, got: %v", actualError)
403 }
404 }
405
406 func TestClientNodeGetInfo(t *testing.T) {
407 testCases := []struct {
408 name string
409 expectedNodeID string
410 expectedMaxVolumePerNode int64
411 expectedAccessibleTopology map[string]string
412 mustFail bool
413 err error
414 }{
415 {
416 name: "test ok",
417 expectedNodeID: "node1",
418 expectedMaxVolumePerNode: 16,
419 expectedAccessibleTopology: map[string]string{"com.example.csi-topology/zone": "zone1"},
420 },
421 {
422 name: "grpc error",
423 mustFail: true,
424 err: errors.New("grpc error"),
425 },
426 }
427
428 for _, tc := range testCases {
429 t.Logf("test case: %s", tc.name)
430
431 fakeCloser := fake.NewCloser(t)
432 client := &csiDriverClient{
433 driverName: "Fake Driver Name",
434 nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
435 nodeClient := fake.NewNodeClient(false )
436 nodeClient.SetNextError(tc.err)
437 nodeClient.SetNodeGetInfoResp(&csipbv1.NodeGetInfoResponse{
438 NodeId: tc.expectedNodeID,
439 MaxVolumesPerNode: tc.expectedMaxVolumePerNode,
440 AccessibleTopology: &csipbv1.Topology{
441 Segments: tc.expectedAccessibleTopology,
442 },
443 })
444 return nodeClient, fakeCloser, nil
445 },
446 }
447
448 nodeID, maxVolumePerNode, accessibleTopology, err := client.NodeGetInfo(context.Background())
449 checkErr(t, tc.mustFail, err)
450
451 if nodeID != tc.expectedNodeID {
452 t.Errorf("expected nodeID: %v; got: %v", tc.expectedNodeID, nodeID)
453 }
454
455 if maxVolumePerNode != tc.expectedMaxVolumePerNode {
456 t.Errorf("expected maxVolumePerNode: %v; got: %v", tc.expectedMaxVolumePerNode, maxVolumePerNode)
457 }
458
459 if !reflect.DeepEqual(accessibleTopology, tc.expectedAccessibleTopology) {
460 t.Errorf("expected accessibleTopology: %v; got: %v", tc.expectedAccessibleTopology, accessibleTopology)
461 }
462
463 if !tc.mustFail {
464 fakeCloser.Check()
465 }
466 }
467 }
468
469 func TestClientNodePublishVolume(t *testing.T) {
470 var testFSGroup int64 = 3000
471
472 tmpDir, err := utiltesting.MkTmpdir("csi-test")
473 if err != nil {
474 t.Fatalf("can't create temp dir: %v", err)
475 }
476 defer os.RemoveAll(tmpDir)
477 testPath := filepath.Join(tmpDir, "path")
478
479 testCases := []struct {
480 name string
481 volID string
482 targetPath string
483 fsType string
484 fsGroup *int64
485 expectedVolumeMountGroup string
486 mustFail bool
487 err error
488 }{
489 {name: "test ok", volID: "vol-test", targetPath: testPath},
490 {name: "missing volID", targetPath: testPath, mustFail: true},
491 {name: "missing target path", volID: "vol-test", mustFail: true},
492 {name: "bad fs", volID: "vol-test", targetPath: testPath, fsType: "badfs", mustFail: true},
493 {name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")},
494 {name: "fsgroup", volID: "vol-test", targetPath: testPath, fsGroup: &testFSGroup, expectedVolumeMountGroup: "3000"},
495 }
496
497 for _, tc := range testCases {
498 t.Logf("test case: %s", tc.name)
499
500 nodeClient := fake.NewNodeClient(false )
501 nodeClient.SetNextError(tc.err)
502 fakeCloser := fake.NewCloser(t)
503 client := &csiDriverClient{
504 driverName: "Fake Driver Name",
505 nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
506 return nodeClient, fakeCloser, nil
507 },
508 }
509
510 err := client.NodePublishVolume(
511 context.Background(),
512 tc.volID,
513 false,
514 "",
515 tc.targetPath,
516 api.ReadWriteOnce,
517 map[string]string{"device": "/dev/null"},
518 map[string]string{"attr0": "val0"},
519 map[string]string{},
520 tc.fsType,
521 []string{},
522 tc.fsGroup,
523 )
524 checkErr(t, tc.mustFail, err)
525
526 volumeMountGroup := nodeClient.GetNodePublishedVolumes()[tc.volID].VolumeMountGroup
527 if volumeMountGroup != tc.expectedVolumeMountGroup {
528 t.Errorf("Expected VolumeMountGroup in NodePublishVolumeRequest to be %q, got: %q", tc.expectedVolumeMountGroup, volumeMountGroup)
529 }
530
531 if !tc.mustFail {
532 fakeCloser.Check()
533 }
534 }
535 }
536
537 func TestClientNodeUnpublishVolume(t *testing.T) {
538 tmpDir, err := utiltesting.MkTmpdir("csi-test")
539 if err != nil {
540 t.Fatalf("can't create temp dir: %v", err)
541 }
542 defer os.RemoveAll(tmpDir)
543 testPath := filepath.Join(tmpDir, "path")
544
545 testCases := []struct {
546 name string
547 volID string
548 targetPath string
549 mustFail bool
550 err error
551 }{
552 {name: "test ok", volID: "vol-test", targetPath: testPath},
553 {name: "missing volID", targetPath: testPath, mustFail: true},
554 {name: "missing target path", volID: testPath, mustFail: true},
555 {name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")},
556 }
557
558 for _, tc := range testCases {
559 t.Logf("test case: %s", tc.name)
560 fakeCloser := fake.NewCloser(t)
561 client := &csiDriverClient{
562 driverName: "Fake Driver Name",
563 nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
564 nodeClient := fake.NewNodeClient(false )
565 nodeClient.SetNextError(tc.err)
566 return nodeClient, fakeCloser, nil
567 },
568 }
569
570 err := client.NodeUnpublishVolume(context.Background(), tc.volID, tc.targetPath)
571 checkErr(t, tc.mustFail, err)
572
573 if !tc.mustFail {
574 fakeCloser.Check()
575 }
576 }
577 }
578
579 func TestClientNodeStageVolume(t *testing.T) {
580 var testFSGroup int64 = 3000
581
582 tmpDir, err := utiltesting.MkTmpdir("csi-test")
583 if err != nil {
584 t.Fatalf("can't create temp dir: %v", err)
585 }
586 defer os.RemoveAll(tmpDir)
587 testPath := filepath.Join(tmpDir, "/test/path")
588
589 testCases := []struct {
590 name string
591 volID string
592 stagingTargetPath string
593 fsType string
594 secrets map[string]string
595 mountOptions []string
596 fsGroup *int64
597 expectedVolumeMountGroup string
598 mustFail bool
599 err error
600 }{
601 {name: "test ok", volID: "vol-test", stagingTargetPath: testPath, fsType: "ext4", mountOptions: []string{"unvalidated"}},
602 {name: "missing volID", stagingTargetPath: testPath, mustFail: true},
603 {name: "missing target path", volID: "vol-test", mustFail: true},
604 {name: "bad fs", volID: "vol-test", stagingTargetPath: testPath, fsType: "badfs", mustFail: true},
605 {name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")},
606 {name: "fsgroup", volID: "vol-test", stagingTargetPath: testPath, fsGroup: &testFSGroup, expectedVolumeMountGroup: "3000"},
607 }
608
609 for _, tc := range testCases {
610 t.Logf("Running test case: %s", tc.name)
611
612 nodeClient := fake.NewNodeClientWithVolumeMountGroup(true , true )
613 nodeClient.SetNextError(tc.err)
614 fakeCloser := fake.NewCloser(t)
615 client := &csiDriverClient{
616 driverName: "Fake Driver Name",
617 nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
618 return nodeClient, fakeCloser, nil
619 },
620 }
621
622 err := client.NodeStageVolume(
623 context.Background(),
624 tc.volID,
625 map[string]string{"device": "/dev/null"},
626 tc.stagingTargetPath,
627 tc.fsType,
628 api.ReadWriteOnce,
629 tc.secrets,
630 map[string]string{"attr0": "val0"},
631 tc.mountOptions,
632 tc.fsGroup,
633 )
634 checkErr(t, tc.mustFail, err)
635
636 volumeMountGroup := nodeClient.GetNodeStagedVolumes()[tc.volID].VolumeMountGroup
637 if volumeMountGroup != tc.expectedVolumeMountGroup {
638 t.Errorf("expected VolumeMountGroup parameter in NodePublishVolumeRequest to be %q, got: %q", tc.expectedVolumeMountGroup, volumeMountGroup)
639 }
640
641 if !tc.mustFail {
642 fakeCloser.Check()
643 }
644 }
645 }
646
647 func TestClientNodeUnstageVolume(t *testing.T) {
648 tmpDir, err := utiltesting.MkTmpdir("csi-test")
649 if err != nil {
650 t.Fatalf("can't create temp dir: %v", err)
651 }
652 defer os.RemoveAll(tmpDir)
653 testPath := filepath.Join(tmpDir, "/test/path")
654
655 testCases := []struct {
656 name string
657 volID string
658 stagingTargetPath string
659 mustFail bool
660 err error
661 }{
662 {name: "test ok", volID: "vol-test", stagingTargetPath: testPath},
663 {name: "missing volID", stagingTargetPath: testPath, mustFail: true},
664 {name: "missing target path", volID: "vol-test", mustFail: true},
665 {name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")},
666 }
667
668 for _, tc := range testCases {
669 t.Logf("Running test case: %s", tc.name)
670 fakeCloser := fake.NewCloser(t)
671 client := &csiDriverClient{
672 driverName: "Fake Driver Name",
673 nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
674 nodeClient := fake.NewNodeClient(false )
675 nodeClient.SetNextError(tc.err)
676 return nodeClient, fakeCloser, nil
677 },
678 }
679
680 err := client.NodeUnstageVolume(
681 context.Background(),
682 tc.volID, tc.stagingTargetPath,
683 )
684 checkErr(t, tc.mustFail, err)
685
686 if !tc.mustFail {
687 fakeCloser.Check()
688 }
689 }
690 }
691
692 func TestClientNodeSupportsStageUnstage(t *testing.T) {
693 testClientNodeSupportsCapabilities(t,
694 func(client *csiDriverClient) (bool, error) {
695 return client.NodeSupportsStageUnstage(context.Background())
696 },
697 func(stagingCapable bool) *fake.NodeClient {
698
699 return fake.NewNodeClient(stagingCapable)
700 })
701 }
702
703 func TestClientNodeSupportsNodeExpand(t *testing.T) {
704 testClientNodeSupportsCapabilities(t,
705 func(client *csiDriverClient) (bool, error) {
706 return client.NodeSupportsNodeExpand(context.Background())
707 },
708 func(expansionCapable bool) *fake.NodeClient {
709 return fake.NewNodeClientWithExpansion(false , expansionCapable)
710 })
711 }
712
713 func TestClientNodeSupportsVolumeStats(t *testing.T) {
714 testClientNodeSupportsCapabilities(t,
715 func(client *csiDriverClient) (bool, error) {
716 return client.NodeSupportsVolumeStats(context.Background())
717 },
718 func(volumeStatsCapable bool) *fake.NodeClient {
719 return fake.NewNodeClientWithVolumeStats(volumeStatsCapable)
720 })
721 }
722
723 func TestClientNodeSupportsVolumeMountGroup(t *testing.T) {
724 testClientNodeSupportsCapabilities(t,
725 func(client *csiDriverClient) (bool, error) {
726 return client.NodeSupportsVolumeMountGroup(context.Background())
727 },
728 func(volumeMountGroupCapable bool) *fake.NodeClient {
729 return fake.NewNodeClientWithVolumeMountGroup(false , volumeMountGroupCapable)
730 })
731 }
732
733 func testClientNodeSupportsCapabilities(
734 t *testing.T,
735 capabilityMethodToTest func(*csiDriverClient) (bool, error),
736 nodeClientGenerator func(bool) *fake.NodeClient) {
737
738 testCases := []struct {
739 name string
740 capable bool
741 }{
742 {name: "positive", capable: true},
743 {name: "negative", capable: false},
744 }
745
746 for _, tc := range testCases {
747 t.Logf("Running test case: %s", tc.name)
748 fakeCloser := fake.NewCloser(t)
749 client := &csiDriverClient{
750 driverName: "Fake Driver Name",
751 nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
752 nodeClient := nodeClientGenerator(tc.capable)
753 return nodeClient, fakeCloser, nil
754 },
755 }
756
757 got, _ := capabilityMethodToTest(client)
758
759 if got != tc.capable {
760 t.Errorf("Expected capability support to be %v, got: %v", tc.capable, got)
761 }
762
763 fakeCloser.Check()
764 }
765 }
766
767 func TestNodeExpandVolume(t *testing.T) {
768 testCases := []struct {
769 name string
770 volID string
771 volumePath string
772 newSize resource.Quantity
773 mustFail bool
774 err error
775 }{
776 {
777 name: "with all correct values",
778 volID: "vol-abcde",
779 volumePath: "/foo/bar",
780 newSize: resource.MustParse("10Gi"),
781 mustFail: false,
782 },
783 {
784 name: "with missing volume-id",
785 volumePath: "/foo/bar",
786 newSize: resource.MustParse("10Gi"),
787 mustFail: true,
788 },
789 {
790 name: "with missing volume path",
791 volID: "vol-1234",
792 newSize: resource.MustParse("10Gi"),
793 mustFail: true,
794 },
795 {
796 name: "with invalid quantity",
797 volID: "vol-1234",
798 volumePath: "/foo/bar",
799 newSize: *resource.NewQuantity(-10, resource.DecimalSI),
800 mustFail: true,
801 },
802 }
803
804 for _, tc := range testCases {
805 t.Logf("Running test cases : %s", tc.name)
806 fakeCloser := fake.NewCloser(t)
807 client := &csiDriverClient{
808 driverName: "Fake Driver Name",
809 nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
810 nodeClient := fake.NewNodeClient(false )
811 nodeClient.SetNextError(tc.err)
812 return nodeClient, fakeCloser, nil
813 },
814 }
815 opts := csiResizeOptions{volumeID: tc.volID, volumePath: tc.volumePath, newSize: tc.newSize}
816 _, err := client.NodeExpandVolume(context.Background(), opts)
817 checkErr(t, tc.mustFail, err)
818 if !tc.mustFail {
819 fakeCloser.Check()
820 }
821 }
822 }
823
824 type VolumeStatsOptions struct {
825 VolumeSpec *volume.Spec
826
827
828 VolumeID string
829
830
831
832
833 DeviceMountPath string
834 }
835
836 func TestVolumeHealthEnable(t *testing.T) {
837 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)()
838 spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
839 tests := []struct {
840 name string
841 volumeStatsSet bool
842 volumeConditionSet bool
843 volumeData VolumeStatsOptions
844 success bool
845 }{
846 {
847 name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=on",
848 volumeStatsSet: true,
849 volumeConditionSet: true,
850 volumeData: VolumeStatsOptions{
851 VolumeSpec: spec,
852 VolumeID: "volume1",
853 DeviceMountPath: "/foo/bar",
854 },
855 success: true,
856 },
857 {
858 name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=off",
859 volumeStatsSet: true,
860 volumeConditionSet: false,
861 volumeData: VolumeStatsOptions{
862 VolumeSpec: spec,
863 VolumeID: "volume1",
864 DeviceMountPath: "/foo/bar",
865 },
866 success: true,
867 },
868 }
869
870 for _, tc := range tests {
871 t.Run(tc.name, func(t *testing.T) {
872 ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
873 defer cancel()
874 csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec)
875 csClient := setupClientWithVolumeStatsAndCondition(t, tc.volumeStatsSet, tc.volumeConditionSet)
876 metrics, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath)
877 if tc.success {
878 assert.Nil(t, err)
879 }
880
881 if metrics == nil {
882 t.Errorf("csi.NodeGetVolumeStats returned nil metrics for volume %s", tc.volumeData.VolumeID)
883 } else {
884 if tc.volumeConditionSet {
885 assert.NotNil(t, metrics.Abnormal)
886 assert.NotNil(t, metrics.Message)
887 } else {
888 assert.Nil(t, metrics.Abnormal)
889 assert.Nil(t, metrics.Message)
890 }
891 }
892
893 })
894 }
895 }
896
897 func TestVolumeHealthDisable(t *testing.T) {
898 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, false)()
899 spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
900 tests := []struct {
901 name string
902 volumeStatsSet bool
903 volumeData VolumeStatsOptions
904 success bool
905 }{
906 {
907 name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=off",
908 volumeStatsSet: true,
909 volumeData: VolumeStatsOptions{
910 VolumeSpec: spec,
911 VolumeID: "volume1",
912 DeviceMountPath: "/foo/bar",
913 },
914 success: true,
915 },
916 }
917 for _, tc := range tests {
918 t.Run(tc.name, func(t *testing.T) {
919 ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
920 defer cancel()
921 csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec)
922 csClient := setupClientWithVolumeStatsAndCondition(t, tc.volumeStatsSet, false)
923 metrics, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath)
924 if tc.success {
925 assert.Nil(t, err)
926 }
927
928 if metrics == nil {
929 t.Errorf("csi.NodeGetVolumeStats returned nil metrics for volume %s", tc.volumeData.VolumeID)
930 } else {
931 assert.Nil(t, metrics.Abnormal)
932 assert.Nil(t, metrics.Message)
933 }
934 })
935 }
936 }
937
938 func TestVolumeStats(t *testing.T) {
939 defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)()
940 spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
941 tests := []struct {
942 name string
943 volumeStatsSet bool
944 volumeConditionSet bool
945 volumeData VolumeStatsOptions
946 success bool
947 }{
948 {
949 name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on",
950 volumeStatsSet: true,
951 volumeData: VolumeStatsOptions{
952 VolumeSpec: spec,
953 VolumeID: "volume1",
954 DeviceMountPath: "/foo/bar",
955 },
956 success: true,
957 },
958
959 {
960 name: "when nodeVolumeStats=off, VolumeID=on, DeviceMountPath=on",
961 volumeStatsSet: false,
962 volumeData: VolumeStatsOptions{
963 VolumeSpec: spec,
964 VolumeID: "volume1",
965 DeviceMountPath: "/foo/bar",
966 },
967 success: false,
968 },
969
970 {
971 name: "when nodeVolumeStats=on, VolumeID=off, DeviceMountPath=on",
972 volumeStatsSet: true,
973 volumeData: VolumeStatsOptions{
974 VolumeSpec: spec,
975 VolumeID: "",
976 DeviceMountPath: "/foo/bar",
977 },
978 success: false,
979 },
980
981 {
982 name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=off",
983 volumeStatsSet: true,
984 volumeData: VolumeStatsOptions{
985 VolumeSpec: spec,
986 VolumeID: "volume1",
987 DeviceMountPath: "",
988 },
989 success: false,
990 },
991 {
992 name: "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=off",
993 volumeStatsSet: true,
994 volumeData: VolumeStatsOptions{
995 VolumeSpec: spec,
996 VolumeID: "",
997 DeviceMountPath: "",
998 },
999 success: false,
1000 },
1001 }
1002 for _, tc := range tests {
1003 t.Run(tc.name, func(t *testing.T) {
1004 ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
1005 defer cancel()
1006 csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec)
1007 csClient := setupClientWithVolumeStats(t, tc.volumeStatsSet)
1008 _, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath)
1009 if err != nil && tc.success {
1010 t.Errorf("For %s : expected %v got %v", tc.name, tc.success, err)
1011 }
1012 })
1013 }
1014
1015 }
1016
1017 func TestAccessModeMapping(t *testing.T) {
1018 tests := []struct {
1019 name string
1020 singleNodeMultiWriterSet bool
1021 accessMode api.PersistentVolumeAccessMode
1022 expectedMappedAccessMode csipbv1.VolumeCapability_AccessMode_Mode
1023 }{
1024 {
1025 name: "with ReadWriteOnce and incapable driver",
1026 singleNodeMultiWriterSet: false,
1027 accessMode: api.ReadWriteOnce,
1028 expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
1029 },
1030 {
1031 name: "with ReadOnlyMany and incapable driver",
1032 singleNodeMultiWriterSet: false,
1033 accessMode: api.ReadOnlyMany,
1034 expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
1035 },
1036 {
1037 name: "with ReadWriteMany and incapable driver",
1038 singleNodeMultiWriterSet: false,
1039 accessMode: api.ReadWriteMany,
1040 expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
1041 },
1042 {
1043 name: "with ReadWriteOncePod and incapable driver",
1044 singleNodeMultiWriterSet: false,
1045 accessMode: api.ReadWriteOncePod,
1046 expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
1047 },
1048 {
1049 name: "with ReadWriteOnce and capable driver",
1050 singleNodeMultiWriterSet: true,
1051 accessMode: api.ReadWriteOnce,
1052 expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
1053 },
1054 {
1055 name: "with ReadOnlyMany and capable driver",
1056 singleNodeMultiWriterSet: true,
1057 accessMode: api.ReadOnlyMany,
1058 expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
1059 },
1060 {
1061 name: "with ReadWriteMany and capable driver",
1062 singleNodeMultiWriterSet: true,
1063 accessMode: api.ReadWriteMany,
1064 expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
1065 },
1066 {
1067 name: "with ReadWriteOncePod and capable driver",
1068 singleNodeMultiWriterSet: true,
1069 accessMode: api.ReadWriteOncePod,
1070 expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
1071 },
1072 }
1073 for _, tc := range tests {
1074 t.Run(tc.name, func(t *testing.T) {
1075 fakeCloser := fake.NewCloser(t)
1076 client := &csiDriverClient{
1077 driverName: "Fake Driver Name",
1078 nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
1079 nodeClient := fake.NewNodeClientWithSingleNodeMultiWriter(tc.singleNodeMultiWriterSet)
1080 return nodeClient, fakeCloser, nil
1081 },
1082 }
1083
1084 accessModeMapper, err := client.getNodeV1AccessModeMapper(context.Background())
1085 if err != nil {
1086 t.Error(err)
1087 }
1088
1089 mappedAccessMode := accessModeMapper(tc.accessMode)
1090 if mappedAccessMode != tc.expectedMappedAccessMode {
1091 t.Errorf("expected access mode: %v; got: %v", tc.expectedMappedAccessMode, mappedAccessMode)
1092 }
1093 })
1094 }
1095 }
1096
View as plain text