
Source file src/k8s.io/kubernetes/pkg/volume/csi/csi_client_test.go

Documentation: k8s.io/kubernetes/pkg/volume/csi

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package csi
    19  import (
    20  	"context"
    21  	"errors"
    22  	"io"
    23  	"os"
    24  	"path/filepath"
    25  	"reflect"
    26  	"strconv"
    27  	"testing"
    29  	csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
    30  	"github.com/stretchr/testify/assert"
    32  	api "k8s.io/api/core/v1"
    33  	"k8s.io/apimachinery/pkg/api/resource"
    34  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    35  	utiltesting "k8s.io/client-go/util/testing"
    36  	featuregatetesting "k8s.io/component-base/featuregate/testing"
    37  	"k8s.io/kubernetes/pkg/features"
    38  	"k8s.io/kubernetes/pkg/volume"
    39  	"k8s.io/kubernetes/pkg/volume/csi/fake"
    40  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    41  )
    43  type fakeCsiDriverClient struct {
    44  	t          *testing.T
    45  	nodeClient *fake.NodeClient
    46  }
    48  func newFakeCsiDriverClient(t *testing.T, stagingCapable bool) *fakeCsiDriverClient {
    49  	return &fakeCsiDriverClient{
    50  		t:          t,
    51  		nodeClient: fake.NewNodeClient(stagingCapable),
    52  	}
    53  }
    55  func newFakeCsiDriverClientWithExpansion(t *testing.T, stagingCapable bool, expansionSet bool) *fakeCsiDriverClient {
    56  	return &fakeCsiDriverClient{
    57  		t:          t,
    58  		nodeClient: fake.NewNodeClientWithExpansion(stagingCapable, expansionSet),
    59  	}
    60  }
    62  func newFakeCsiDriverClientWithVolumeStats(t *testing.T, volumeStatsSet bool) *fakeCsiDriverClient {
    63  	return &fakeCsiDriverClient{
    64  		t:          t,
    65  		nodeClient: fake.NewNodeClientWithVolumeStats(volumeStatsSet),
    66  	}
    67  }
    69  func newFakeCsiDriverClientWithVolumeStatsAndCondition(t *testing.T, volumeStatsSet, volumeConditionSet bool) *fakeCsiDriverClient {
    70  	return &fakeCsiDriverClient{
    71  		t:          t,
    72  		nodeClient: fake.NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet),
    73  	}
    74  }
    76  func newFakeCsiDriverClientWithVolumeMountGroup(t *testing.T, stagingCapable, volumeMountGroupSet bool) *fakeCsiDriverClient {
    77  	return &fakeCsiDriverClient{
    78  		t:          t,
    79  		nodeClient: fake.NewNodeClientWithVolumeMountGroup(stagingCapable, volumeMountGroupSet),
    80  	}
    81  }
    83  func (c *fakeCsiDriverClient) NodeGetInfo(ctx context.Context) (
    84  	nodeID string,
    85  	maxVolumePerNode int64,
    86  	accessibleTopology map[string]string,
    87  	err error) {
    88  	resp, err := c.nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
    89  	topology := resp.GetAccessibleTopology()
    90  	if topology != nil {
    91  		accessibleTopology = topology.Segments
    92  	}
    93  	return resp.GetNodeId(), resp.GetMaxVolumesPerNode(), accessibleTopology, err
    94  }
    96  func (c *fakeCsiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (
    97  	usageCountMap *volume.Metrics, err error) {
    98  	c.t.Log("calling fake.NodeGetVolumeStats...")
    99  	req := &csipbv1.NodeGetVolumeStatsRequest{
   100  		VolumeId:   volID,
   101  		VolumePath: targetPath,
   102  	}
   104  	c.nodeClient.SetNodeVolumeStatsResp(getRawVolumeInfo())
   105  	resp, err := c.nodeClient.NodeGetVolumeStats(ctx, req)
   106  	if err != nil {
   107  		return nil, err
   108  	}
   110  	usages := resp.GetUsage()
   111  	if usages == nil {
   112  		return nil, nil
   113  	}
   115  	metrics := &volume.Metrics{}
   117  	isSupportNodeVolumeCondition, err := c.nodeSupportsVolumeCondition(ctx)
   118  	if err != nil {
   119  		return nil, err
   120  	}
   122  	if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) && isSupportNodeVolumeCondition {
   123  		abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage()
   124  		metrics.Abnormal, metrics.Message = &abnormal, &message
   125  	}
   127  	for _, usage := range usages {
   128  		if usage == nil {
   129  			continue
   130  		}
   131  		unit := usage.GetUnit()
   132  		switch unit {
   133  		case csipbv1.VolumeUsage_BYTES:
   134  			metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
   135  			metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
   136  			metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
   137  		case csipbv1.VolumeUsage_INODES:
   138  			metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
   139  			metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
   140  			metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
   141  		}
   142  	}
   143  	return metrics, nil
   144  }
   146  func (c *fakeCsiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
   147  	c.t.Log("calling fake.NodeSupportsVolumeStats...")
   148  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS)
   149  }
   151  func (c *fakeCsiDriverClient) NodePublishVolume(
   152  	ctx context.Context,
   153  	volID string,
   154  	readOnly bool,
   155  	stagingTargetPath string,
   156  	targetPath string,
   157  	accessMode api.PersistentVolumeAccessMode,
   158  	publishContext map[string]string,
   159  	volumeContext map[string]string,
   160  	secrets map[string]string,
   161  	fsType string,
   162  	mountOptions []string,
   163  	fsGroup *int64,
   164  ) error {
   165  	c.t.Log("calling fake.NodePublishVolume...")
   166  	req := &csipbv1.NodePublishVolumeRequest{
   167  		VolumeId:          volID,
   168  		TargetPath:        targetPath,
   169  		StagingTargetPath: stagingTargetPath,
   170  		Readonly:          readOnly,
   171  		PublishContext:    publishContext,
   172  		VolumeContext:     volumeContext,
   173  		Secrets:           secrets,
   174  		VolumeCapability: &csipbv1.VolumeCapability{
   175  			AccessMode: &csipbv1.VolumeCapability_AccessMode{
   176  				Mode: asCSIAccessModeV1(accessMode),
   177  			},
   178  		},
   179  	}
   181  	if fsType == fsTypeBlockName {
   182  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
   183  			Block: &csipbv1.VolumeCapability_BlockVolume{},
   184  		}
   185  	} else {
   186  		mountVolume := &csipbv1.VolumeCapability_MountVolume{
   187  			FsType:     fsType,
   188  			MountFlags: mountOptions,
   189  		}
   190  		if fsGroup != nil {
   191  			mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 /* base */)
   192  		}
   193  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
   194  			Mount: mountVolume,
   195  		}
   196  	}
   198  	_, err := c.nodeClient.NodePublishVolume(ctx, req)
   199  	if err != nil && !isFinalError(err) {
   200  		return volumetypes.NewUncertainProgressError(err.Error())
   201  	}
   202  	return err
   203  }
   205  func (c *fakeCsiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error {
   206  	c.t.Log("calling fake.NodeUnpublishVolume...")
   207  	req := &csipbv1.NodeUnpublishVolumeRequest{
   208  		VolumeId:   volID,
   209  		TargetPath: targetPath,
   210  	}
   212  	_, err := c.nodeClient.NodeUnpublishVolume(ctx, req)
   213  	return err
   214  }
   216  func (c *fakeCsiDriverClient) NodeStageVolume(ctx context.Context,
   217  	volID string,
   218  	publishContext map[string]string,
   219  	stagingTargetPath string,
   220  	fsType string,
   221  	accessMode api.PersistentVolumeAccessMode,
   222  	secrets map[string]string,
   223  	volumeContext map[string]string,
   224  	mountOptions []string,
   225  	fsGroup *int64,
   226  ) error {
   227  	c.t.Log("calling fake.NodeStageVolume...")
   228  	req := &csipbv1.NodeStageVolumeRequest{
   229  		VolumeId:          volID,
   230  		PublishContext:    publishContext,
   231  		StagingTargetPath: stagingTargetPath,
   232  		VolumeCapability: &csipbv1.VolumeCapability{
   233  			AccessMode: &csipbv1.VolumeCapability_AccessMode{
   234  				Mode: asCSIAccessModeV1(accessMode),
   235  			},
   236  		},
   237  		Secrets:       secrets,
   238  		VolumeContext: volumeContext,
   239  	}
   240  	if fsType == fsTypeBlockName {
   241  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
   242  			Block: &csipbv1.VolumeCapability_BlockVolume{},
   243  		}
   244  	} else {
   245  		mountVolume := &csipbv1.VolumeCapability_MountVolume{
   246  			FsType:     fsType,
   247  			MountFlags: mountOptions,
   248  		}
   249  		if fsGroup != nil {
   250  			mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 /* base */)
   251  		}
   252  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
   253  			Mount: mountVolume,
   254  		}
   255  	}
   257  	_, err := c.nodeClient.NodeStageVolume(ctx, req)
   258  	if err != nil && !isFinalError(err) {
   259  		return volumetypes.NewUncertainProgressError(err.Error())
   260  	}
   261  	return err
   262  }
   264  func (c *fakeCsiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error {
   265  	c.t.Log("calling fake.NodeUnstageVolume...")
   266  	req := &csipbv1.NodeUnstageVolumeRequest{
   267  		VolumeId:          volID,
   268  		StagingTargetPath: stagingTargetPath,
   269  	}
   270  	_, err := c.nodeClient.NodeUnstageVolume(ctx, req)
   271  	return err
   272  }
   274  func (c *fakeCsiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
   275  	c.t.Log("calling fake.NodeSupportsNodeExpand...")
   276  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME)
   277  }
   279  func (c *fakeCsiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
   280  	c.t.Log("calling fake.NodeGetCapabilities for NodeSupportsStageUnstage...")
   281  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME)
   282  }
   284  func (c *fakeCsiDriverClient) NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error) {
   285  	c.t.Log("calling fake.NodeGetCapabilities for NodeSupportsVolumeMountGroup...")
   286  	req := &csipbv1.NodeGetCapabilitiesRequest{}
   287  	resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
   288  	if err != nil {
   289  		return false, err
   290  	}
   292  	capabilities := resp.GetCapabilities()
   294  	volumeMountGroupSet := false
   295  	if capabilities == nil {
   296  		return false, nil
   297  	}
   298  	for _, capability := range capabilities {
   299  		if capability.GetRpc().GetType() == csipbv1.NodeServiceCapability_RPC_VOLUME_MOUNT_GROUP {
   300  			volumeMountGroupSet = true
   301  		}
   302  	}
   303  	return volumeMountGroupSet, nil
   304  }
   306  func (c *fakeCsiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
   307  	c.t.Log("calling fake.NodeExpandVolume")
   308  	req := &csipbv1.NodeExpandVolumeRequest{
   309  		VolumeId:          opts.volumeID,
   310  		VolumePath:        opts.volumePath,
   311  		StagingTargetPath: opts.stagingTargetPath,
   312  		CapacityRange:     &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
   313  		VolumeCapability: &csipbv1.VolumeCapability{
   314  			AccessMode: &csipbv1.VolumeCapability_AccessMode{
   315  				Mode: asCSIAccessModeV1(opts.accessMode),
   316  			},
   317  		},
   318  	}
   319  	if opts.fsType == fsTypeBlockName {
   320  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
   321  			Block: &csipbv1.VolumeCapability_BlockVolume{},
   322  		}
   323  	} else {
   324  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
   325  			Mount: &csipbv1.VolumeCapability_MountVolume{
   326  				FsType:     opts.fsType,
   327  				MountFlags: opts.mountOptions,
   328  			},
   329  		}
   330  	}
   331  	resp, err := c.nodeClient.NodeExpandVolume(ctx, req)
   332  	if err != nil {
   333  		return opts.newSize, err
   334  	}
   335  	updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
   336  	return *updatedQuantity, nil
   337  }
   339  func (c *fakeCsiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (bool, error) {
   340  	c.t.Log("calling fake.nodeSupportsVolumeCondition...")
   341  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION)
   342  }
   344  func (c *fakeCsiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) {
   345  	c.t.Log("calling fake.NodeSupportsSingleNodeMultiWriterAccessMode...")
   346  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER)
   347  }
   349  func (c *fakeCsiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) {
   350  	capabilities, err := c.nodeGetCapabilities(ctx)
   351  	if err != nil {
   352  		return false, err
   353  	}
   355  	for _, capability := range capabilities {
   356  		if capability.GetRpc().GetType() == capabilityType {
   357  			return true, nil
   358  		}
   359  	}
   360  	return false, nil
   361  }
   363  func (c *fakeCsiDriverClient) nodeGetCapabilities(ctx context.Context) ([]*csipbv1.NodeServiceCapability, error) {
   364  	req := &csipbv1.NodeGetCapabilitiesRequest{}
   365  	resp, err := c.nodeClient.NodeGetCapabilities(ctx, req)
   366  	if err != nil {
   367  		return []*csipbv1.NodeServiceCapability{}, err
   368  	}
   369  	return resp.GetCapabilities(), nil
   370  }
   372  func setupClient(t *testing.T, stageUnstageSet bool) csiClient {
   373  	return newFakeCsiDriverClient(t, stageUnstageSet)
   374  }
   376  func setupClientWithExpansion(t *testing.T, stageUnstageSet bool, expansionSet bool) csiClient {
   377  	return newFakeCsiDriverClientWithExpansion(t, stageUnstageSet, expansionSet)
   378  }
   380  func setupClientWithVolumeStatsAndCondition(t *testing.T, volumeStatsSet, volumeConditionSet bool) csiClient {
   381  	return newFakeCsiDriverClientWithVolumeStatsAndCondition(t, volumeStatsSet, volumeConditionSet)
   382  }
   384  func setupClientWithVolumeStats(t *testing.T, volumeStatsSet bool) csiClient {
   385  	return newFakeCsiDriverClientWithVolumeStats(t, volumeStatsSet)
   386  }
   388  func setupClientWithVolumeMountGroup(t *testing.T, stageUnstageSet bool, volumeMountGroupSet bool) csiClient {
   389  	return newFakeCsiDriverClientWithVolumeMountGroup(t, stageUnstageSet, volumeMountGroupSet)
   390  }
   392  func checkErr(t *testing.T, expectedAnError bool, actualError error) {
   393  	t.Helper()
   395  	errOccurred := actualError != nil
   397  	if expectedAnError && !errOccurred {
   398  		t.Error("expected an error")
   399  	}
   401  	if !expectedAnError && errOccurred {
   402  		t.Errorf("expected no error, got: %v", actualError)
   403  	}
   404  }
   406  func TestClientNodeGetInfo(t *testing.T) {
   407  	testCases := []struct {
   408  		name                       string
   409  		expectedNodeID             string
   410  		expectedMaxVolumePerNode   int64
   411  		expectedAccessibleTopology map[string]string
   412  		mustFail                   bool
   413  		err                        error
   414  	}{
   415  		{
   416  			name:                       "test ok",
   417  			expectedNodeID:             "node1",
   418  			expectedMaxVolumePerNode:   16,
   419  			expectedAccessibleTopology: map[string]string{"com.example.csi-topology/zone": "zone1"},
   420  		},
   421  		{
   422  			name:     "grpc error",
   423  			mustFail: true,
   424  			err:      errors.New("grpc error"),
   425  		},
   426  	}
   428  	for _, tc := range testCases {
   429  		t.Logf("test case: %s", tc.name)
   431  		fakeCloser := fake.NewCloser(t)
   432  		client := &csiDriverClient{
   433  			driverName: "Fake Driver Name",
   434  			nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
   435  				nodeClient := fake.NewNodeClient(false /* stagingCapable */)
   436  				nodeClient.SetNextError(tc.err)
   437  				nodeClient.SetNodeGetInfoResp(&csipbv1.NodeGetInfoResponse{
   438  					NodeId:            tc.expectedNodeID,
   439  					MaxVolumesPerNode: tc.expectedMaxVolumePerNode,
   440  					AccessibleTopology: &csipbv1.Topology{
   441  						Segments: tc.expectedAccessibleTopology,
   442  					},
   443  				})
   444  				return nodeClient, fakeCloser, nil
   445  			},
   446  		}
   448  		nodeID, maxVolumePerNode, accessibleTopology, err := client.NodeGetInfo(context.Background())
   449  		checkErr(t, tc.mustFail, err)
   451  		if nodeID != tc.expectedNodeID {
   452  			t.Errorf("expected nodeID: %v; got: %v", tc.expectedNodeID, nodeID)
   453  		}
   455  		if maxVolumePerNode != tc.expectedMaxVolumePerNode {
   456  			t.Errorf("expected maxVolumePerNode: %v; got: %v", tc.expectedMaxVolumePerNode, maxVolumePerNode)
   457  		}
   459  		if !reflect.DeepEqual(accessibleTopology, tc.expectedAccessibleTopology) {
   460  			t.Errorf("expected accessibleTopology: %v; got: %v", tc.expectedAccessibleTopology, accessibleTopology)
   461  		}
   463  		if !tc.mustFail {
   464  			fakeCloser.Check()
   465  		}
   466  	}
   467  }
   469  func TestClientNodePublishVolume(t *testing.T) {
   470  	var testFSGroup int64 = 3000
   472  	tmpDir, err := utiltesting.MkTmpdir("csi-test")
   473  	if err != nil {
   474  		t.Fatalf("can't create temp dir: %v", err)
   475  	}
   476  	defer os.RemoveAll(tmpDir)
   477  	testPath := filepath.Join(tmpDir, "path")
   479  	testCases := []struct {
   480  		name                     string
   481  		volID                    string
   482  		targetPath               string
   483  		fsType                   string
   484  		fsGroup                  *int64
   485  		expectedVolumeMountGroup string
   486  		mustFail                 bool
   487  		err                      error
   488  	}{
   489  		{name: "test ok", volID: "vol-test", targetPath: testPath},
   490  		{name: "missing volID", targetPath: testPath, mustFail: true},
   491  		{name: "missing target path", volID: "vol-test", mustFail: true},
   492  		{name: "bad fs", volID: "vol-test", targetPath: testPath, fsType: "badfs", mustFail: true},
   493  		{name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")},
   494  		{name: "fsgroup", volID: "vol-test", targetPath: testPath, fsGroup: &testFSGroup, expectedVolumeMountGroup: "3000"},
   495  	}
   497  	for _, tc := range testCases {
   498  		t.Logf("test case: %s", tc.name)
   500  		nodeClient := fake.NewNodeClient(false /* stagingCapable */)
   501  		nodeClient.SetNextError(tc.err)
   502  		fakeCloser := fake.NewCloser(t)
   503  		client := &csiDriverClient{
   504  			driverName: "Fake Driver Name",
   505  			nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
   506  				return nodeClient, fakeCloser, nil
   507  			},
   508  		}
   510  		err := client.NodePublishVolume(
   511  			context.Background(),
   512  			tc.volID,
   513  			false,
   514  			"",
   515  			tc.targetPath,
   516  			api.ReadWriteOnce,
   517  			map[string]string{"device": "/dev/null"},
   518  			map[string]string{"attr0": "val0"},
   519  			map[string]string{},
   520  			tc.fsType,
   521  			[]string{},
   522  			tc.fsGroup,
   523  		)
   524  		checkErr(t, tc.mustFail, err)
   526  		volumeMountGroup := nodeClient.GetNodePublishedVolumes()[tc.volID].VolumeMountGroup
   527  		if volumeMountGroup != tc.expectedVolumeMountGroup {
   528  			t.Errorf("Expected VolumeMountGroup in NodePublishVolumeRequest to be %q, got: %q", tc.expectedVolumeMountGroup, volumeMountGroup)
   529  		}
   531  		if !tc.mustFail {
   532  			fakeCloser.Check()
   533  		}
   534  	}
   535  }
   537  func TestClientNodeUnpublishVolume(t *testing.T) {
   538  	tmpDir, err := utiltesting.MkTmpdir("csi-test")
   539  	if err != nil {
   540  		t.Fatalf("can't create temp dir: %v", err)
   541  	}
   542  	defer os.RemoveAll(tmpDir)
   543  	testPath := filepath.Join(tmpDir, "path")
   545  	testCases := []struct {
   546  		name       string
   547  		volID      string
   548  		targetPath string
   549  		mustFail   bool
   550  		err        error
   551  	}{
   552  		{name: "test ok", volID: "vol-test", targetPath: testPath},
   553  		{name: "missing volID", targetPath: testPath, mustFail: true},
   554  		{name: "missing target path", volID: testPath, mustFail: true},
   555  		{name: "grpc error", volID: "vol-test", targetPath: testPath, mustFail: true, err: errors.New("grpc error")},
   556  	}
   558  	for _, tc := range testCases {
   559  		t.Logf("test case: %s", tc.name)
   560  		fakeCloser := fake.NewCloser(t)
   561  		client := &csiDriverClient{
   562  			driverName: "Fake Driver Name",
   563  			nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
   564  				nodeClient := fake.NewNodeClient(false /* stagingCapable */)
   565  				nodeClient.SetNextError(tc.err)
   566  				return nodeClient, fakeCloser, nil
   567  			},
   568  		}
   570  		err := client.NodeUnpublishVolume(context.Background(), tc.volID, tc.targetPath)
   571  		checkErr(t, tc.mustFail, err)
   573  		if !tc.mustFail {
   574  			fakeCloser.Check()
   575  		}
   576  	}
   577  }
   579  func TestClientNodeStageVolume(t *testing.T) {
   580  	var testFSGroup int64 = 3000
   582  	tmpDir, err := utiltesting.MkTmpdir("csi-test")
   583  	if err != nil {
   584  		t.Fatalf("can't create temp dir: %v", err)
   585  	}
   586  	defer os.RemoveAll(tmpDir)
   587  	testPath := filepath.Join(tmpDir, "/test/path")
   589  	testCases := []struct {
   590  		name                     string
   591  		volID                    string
   592  		stagingTargetPath        string
   593  		fsType                   string
   594  		secrets                  map[string]string
   595  		mountOptions             []string
   596  		fsGroup                  *int64
   597  		expectedVolumeMountGroup string
   598  		mustFail                 bool
   599  		err                      error
   600  	}{
   601  		{name: "test ok", volID: "vol-test", stagingTargetPath: testPath, fsType: "ext4", mountOptions: []string{"unvalidated"}},
   602  		{name: "missing volID", stagingTargetPath: testPath, mustFail: true},
   603  		{name: "missing target path", volID: "vol-test", mustFail: true},
   604  		{name: "bad fs", volID: "vol-test", stagingTargetPath: testPath, fsType: "badfs", mustFail: true},
   605  		{name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")},
   606  		{name: "fsgroup", volID: "vol-test", stagingTargetPath: testPath, fsGroup: &testFSGroup, expectedVolumeMountGroup: "3000"},
   607  	}
   609  	for _, tc := range testCases {
   610  		t.Logf("Running test case: %s", tc.name)
   612  		nodeClient := fake.NewNodeClientWithVolumeMountGroup(true /* stagingCapable */, true /* volumeMountGroupCapable */)
   613  		nodeClient.SetNextError(tc.err)
   614  		fakeCloser := fake.NewCloser(t)
   615  		client := &csiDriverClient{
   616  			driverName: "Fake Driver Name",
   617  			nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
   618  				return nodeClient, fakeCloser, nil
   619  			},
   620  		}
   622  		err := client.NodeStageVolume(
   623  			context.Background(),
   624  			tc.volID,
   625  			map[string]string{"device": "/dev/null"},
   626  			tc.stagingTargetPath,
   627  			tc.fsType,
   628  			api.ReadWriteOnce,
   629  			tc.secrets,
   630  			map[string]string{"attr0": "val0"},
   631  			tc.mountOptions,
   632  			tc.fsGroup,
   633  		)
   634  		checkErr(t, tc.mustFail, err)
   636  		volumeMountGroup := nodeClient.GetNodeStagedVolumes()[tc.volID].VolumeMountGroup
   637  		if volumeMountGroup != tc.expectedVolumeMountGroup {
   638  			t.Errorf("expected VolumeMountGroup parameter in NodePublishVolumeRequest to be %q, got: %q", tc.expectedVolumeMountGroup, volumeMountGroup)
   639  		}
   641  		if !tc.mustFail {
   642  			fakeCloser.Check()
   643  		}
   644  	}
   645  }
   647  func TestClientNodeUnstageVolume(t *testing.T) {
   648  	tmpDir, err := utiltesting.MkTmpdir("csi-test")
   649  	if err != nil {
   650  		t.Fatalf("can't create temp dir: %v", err)
   651  	}
   652  	defer os.RemoveAll(tmpDir)
   653  	testPath := filepath.Join(tmpDir, "/test/path")
   655  	testCases := []struct {
   656  		name              string
   657  		volID             string
   658  		stagingTargetPath string
   659  		mustFail          bool
   660  		err               error
   661  	}{
   662  		{name: "test ok", volID: "vol-test", stagingTargetPath: testPath},
   663  		{name: "missing volID", stagingTargetPath: testPath, mustFail: true},
   664  		{name: "missing target path", volID: "vol-test", mustFail: true},
   665  		{name: "grpc error", volID: "vol-test", stagingTargetPath: testPath, mustFail: true, err: errors.New("grpc error")},
   666  	}
   668  	for _, tc := range testCases {
   669  		t.Logf("Running test case: %s", tc.name)
   670  		fakeCloser := fake.NewCloser(t)
   671  		client := &csiDriverClient{
   672  			driverName: "Fake Driver Name",
   673  			nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
   674  				nodeClient := fake.NewNodeClient(false /* stagingCapable */)
   675  				nodeClient.SetNextError(tc.err)
   676  				return nodeClient, fakeCloser, nil
   677  			},
   678  		}
   680  		err := client.NodeUnstageVolume(
   681  			context.Background(),
   682  			tc.volID, tc.stagingTargetPath,
   683  		)
   684  		checkErr(t, tc.mustFail, err)
   686  		if !tc.mustFail {
   687  			fakeCloser.Check()
   688  		}
   689  	}
   690  }
   692  func TestClientNodeSupportsStageUnstage(t *testing.T) {
   693  	testClientNodeSupportsCapabilities(t,
   694  		func(client *csiDriverClient) (bool, error) {
   695  			return client.NodeSupportsStageUnstage(context.Background())
   696  		},
   697  		func(stagingCapable bool) *fake.NodeClient {
   698  			// Creates a staging-capable client
   699  			return fake.NewNodeClient(stagingCapable)
   700  		})
   701  }
   703  func TestClientNodeSupportsNodeExpand(t *testing.T) {
   704  	testClientNodeSupportsCapabilities(t,
   705  		func(client *csiDriverClient) (bool, error) {
   706  			return client.NodeSupportsNodeExpand(context.Background())
   707  		},
   708  		func(expansionCapable bool) *fake.NodeClient {
   709  			return fake.NewNodeClientWithExpansion(false /* stageCapable */, expansionCapable)
   710  		})
   711  }
   713  func TestClientNodeSupportsVolumeStats(t *testing.T) {
   714  	testClientNodeSupportsCapabilities(t,
   715  		func(client *csiDriverClient) (bool, error) {
   716  			return client.NodeSupportsVolumeStats(context.Background())
   717  		},
   718  		func(volumeStatsCapable bool) *fake.NodeClient {
   719  			return fake.NewNodeClientWithVolumeStats(volumeStatsCapable)
   720  		})
   721  }
   723  func TestClientNodeSupportsVolumeMountGroup(t *testing.T) {
   724  	testClientNodeSupportsCapabilities(t,
   725  		func(client *csiDriverClient) (bool, error) {
   726  			return client.NodeSupportsVolumeMountGroup(context.Background())
   727  		},
   728  		func(volumeMountGroupCapable bool) *fake.NodeClient {
   729  			return fake.NewNodeClientWithVolumeMountGroup(false /* stagingCapable */, volumeMountGroupCapable)
   730  		})
   731  }
   733  func testClientNodeSupportsCapabilities(
   734  	t *testing.T,
   735  	capabilityMethodToTest func(*csiDriverClient) (bool, error),
   736  	nodeClientGenerator func(bool) *fake.NodeClient) {
   738  	testCases := []struct {
   739  		name    string
   740  		capable bool
   741  	}{
   742  		{name: "positive", capable: true},
   743  		{name: "negative", capable: false},
   744  	}
   746  	for _, tc := range testCases {
   747  		t.Logf("Running test case: %s", tc.name)
   748  		fakeCloser := fake.NewCloser(t)
   749  		client := &csiDriverClient{
   750  			driverName: "Fake Driver Name",
   751  			nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
   752  				nodeClient := nodeClientGenerator(tc.capable)
   753  				return nodeClient, fakeCloser, nil
   754  			},
   755  		}
   757  		got, _ := capabilityMethodToTest(client)
   759  		if got != tc.capable {
   760  			t.Errorf("Expected capability support to be %v, got: %v", tc.capable, got)
   761  		}
   763  		fakeCloser.Check()
   764  	}
   765  }
   767  func TestNodeExpandVolume(t *testing.T) {
   768  	testCases := []struct {
   769  		name       string
   770  		volID      string
   771  		volumePath string
   772  		newSize    resource.Quantity
   773  		mustFail   bool
   774  		err        error
   775  	}{
   776  		{
   777  			name:       "with all correct values",
   778  			volID:      "vol-abcde",
   779  			volumePath: "/foo/bar",
   780  			newSize:    resource.MustParse("10Gi"),
   781  			mustFail:   false,
   782  		},
   783  		{
   784  			name:       "with missing volume-id",
   785  			volumePath: "/foo/bar",
   786  			newSize:    resource.MustParse("10Gi"),
   787  			mustFail:   true,
   788  		},
   789  		{
   790  			name:     "with missing volume path",
   791  			volID:    "vol-1234",
   792  			newSize:  resource.MustParse("10Gi"),
   793  			mustFail: true,
   794  		},
   795  		{
   796  			name:       "with invalid quantity",
   797  			volID:      "vol-1234",
   798  			volumePath: "/foo/bar",
   799  			newSize:    *resource.NewQuantity(-10, resource.DecimalSI),
   800  			mustFail:   true,
   801  		},
   802  	}
   804  	for _, tc := range testCases {
   805  		t.Logf("Running test cases : %s", tc.name)
   806  		fakeCloser := fake.NewCloser(t)
   807  		client := &csiDriverClient{
   808  			driverName: "Fake Driver Name",
   809  			nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
   810  				nodeClient := fake.NewNodeClient(false /* stagingCapable */)
   811  				nodeClient.SetNextError(tc.err)
   812  				return nodeClient, fakeCloser, nil
   813  			},
   814  		}
   815  		opts := csiResizeOptions{volumeID: tc.volID, volumePath: tc.volumePath, newSize: tc.newSize}
   816  		_, err := client.NodeExpandVolume(context.Background(), opts)
   817  		checkErr(t, tc.mustFail, err)
   818  		if !tc.mustFail {
   819  			fakeCloser.Check()
   820  		}
   821  	}
   822  }
   824  type VolumeStatsOptions struct {
   825  	VolumeSpec *volume.Spec
   827  	// this just could be volumeID
   828  	VolumeID string
   830  	// DeviceMountPath location where device is mounted on the node. If volume type
   831  	// is attachable - this would be global mount path otherwise
   832  	// it would be location where volume was mounted for the pod
   833  	DeviceMountPath string
   834  }
   836  func TestVolumeHealthEnable(t *testing.T) {
   837  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)()
   838  	spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
   839  	tests := []struct {
   840  		name               string
   841  		volumeStatsSet     bool
   842  		volumeConditionSet bool
   843  		volumeData         VolumeStatsOptions
   844  		success            bool
   845  	}{
   846  		{
   847  			name:               "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=on",
   848  			volumeStatsSet:     true,
   849  			volumeConditionSet: true,
   850  			volumeData: VolumeStatsOptions{
   851  				VolumeSpec:      spec,
   852  				VolumeID:        "volume1",
   853  				DeviceMountPath: "/foo/bar",
   854  			},
   855  			success: true,
   856  		},
   857  		{
   858  			name:               "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=off",
   859  			volumeStatsSet:     true,
   860  			volumeConditionSet: false,
   861  			volumeData: VolumeStatsOptions{
   862  				VolumeSpec:      spec,
   863  				VolumeID:        "volume1",
   864  				DeviceMountPath: "/foo/bar",
   865  			},
   866  			success: true,
   867  		},
   868  	}
   870  	for _, tc := range tests {
   871  		t.Run(tc.name, func(t *testing.T) {
   872  			ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
   873  			defer cancel()
   874  			csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec)
   875  			csClient := setupClientWithVolumeStatsAndCondition(t, tc.volumeStatsSet, tc.volumeConditionSet)
   876  			metrics, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath)
   877  			if tc.success {
   878  				assert.Nil(t, err)
   879  			}
   881  			if metrics == nil {
   882  				t.Errorf("csi.NodeGetVolumeStats returned nil metrics for volume %s", tc.volumeData.VolumeID)
   883  			} else {
   884  				if tc.volumeConditionSet {
   885  					assert.NotNil(t, metrics.Abnormal)
   886  					assert.NotNil(t, metrics.Message)
   887  				} else {
   888  					assert.Nil(t, metrics.Abnormal)
   889  					assert.Nil(t, metrics.Message)
   890  				}
   891  			}
   893  		})
   894  	}
   895  }
   897  func TestVolumeHealthDisable(t *testing.T) {
   898  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, false)()
   899  	spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
   900  	tests := []struct {
   901  		name           string
   902  		volumeStatsSet bool
   903  		volumeData     VolumeStatsOptions
   904  		success        bool
   905  	}{
   906  		{
   907  			name:           "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on, VolumeCondition=off",
   908  			volumeStatsSet: true,
   909  			volumeData: VolumeStatsOptions{
   910  				VolumeSpec:      spec,
   911  				VolumeID:        "volume1",
   912  				DeviceMountPath: "/foo/bar",
   913  			},
   914  			success: true,
   915  		},
   916  	}
   917  	for _, tc := range tests {
   918  		t.Run(tc.name, func(t *testing.T) {
   919  			ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
   920  			defer cancel()
   921  			csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec)
   922  			csClient := setupClientWithVolumeStatsAndCondition(t, tc.volumeStatsSet, false)
   923  			metrics, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath)
   924  			if tc.success {
   925  				assert.Nil(t, err)
   926  			}
   928  			if metrics == nil {
   929  				t.Errorf("csi.NodeGetVolumeStats returned nil metrics for volume %s", tc.volumeData.VolumeID)
   930  			} else {
   931  				assert.Nil(t, metrics.Abnormal)
   932  				assert.Nil(t, metrics.Message)
   933  			}
   934  		})
   935  	}
   936  }
   938  func TestVolumeStats(t *testing.T) {
   939  	defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.CSIVolumeHealth, true)()
   940  	spec := volume.NewSpecFromPersistentVolume(makeTestPV("test-pv", 10, "metrics", "test-vol"), false)
   941  	tests := []struct {
   942  		name               string
   943  		volumeStatsSet     bool
   944  		volumeConditionSet bool
   945  		volumeData         VolumeStatsOptions
   946  		success            bool
   947  	}{
   948  		{
   949  			name:           "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=on",
   950  			volumeStatsSet: true,
   951  			volumeData: VolumeStatsOptions{
   952  				VolumeSpec:      spec,
   953  				VolumeID:        "volume1",
   954  				DeviceMountPath: "/foo/bar",
   955  			},
   956  			success: true,
   957  		},
   959  		{
   960  			name:           "when nodeVolumeStats=off, VolumeID=on, DeviceMountPath=on",
   961  			volumeStatsSet: false,
   962  			volumeData: VolumeStatsOptions{
   963  				VolumeSpec:      spec,
   964  				VolumeID:        "volume1",
   965  				DeviceMountPath: "/foo/bar",
   966  			},
   967  			success: false,
   968  		},
   970  		{
   971  			name:           "when nodeVolumeStats=on, VolumeID=off, DeviceMountPath=on",
   972  			volumeStatsSet: true,
   973  			volumeData: VolumeStatsOptions{
   974  				VolumeSpec:      spec,
   975  				VolumeID:        "",
   976  				DeviceMountPath: "/foo/bar",
   977  			},
   978  			success: false,
   979  		},
   981  		{
   982  			name:           "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=off",
   983  			volumeStatsSet: true,
   984  			volumeData: VolumeStatsOptions{
   985  				VolumeSpec:      spec,
   986  				VolumeID:        "volume1",
   987  				DeviceMountPath: "",
   988  			},
   989  			success: false,
   990  		},
   991  		{
   992  			name:           "when nodeVolumeStats=on, VolumeID=on, DeviceMountPath=off",
   993  			volumeStatsSet: true,
   994  			volumeData: VolumeStatsOptions{
   995  				VolumeSpec:      spec,
   996  				VolumeID:        "",
   997  				DeviceMountPath: "",
   998  			},
   999  			success: false,
  1000  		},
  1001  	}
  1002  	for _, tc := range tests {
  1003  		t.Run(tc.name, func(t *testing.T) {
  1004  			ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
  1005  			defer cancel()
  1006  			csiSource, _ := getCSISourceFromSpec(tc.volumeData.VolumeSpec)
  1007  			csClient := setupClientWithVolumeStats(t, tc.volumeStatsSet)
  1008  			_, err := csClient.NodeGetVolumeStats(ctx, csiSource.VolumeHandle, tc.volumeData.DeviceMountPath)
  1009  			if err != nil && tc.success {
  1010  				t.Errorf("For %s : expected %v got %v", tc.name, tc.success, err)
  1011  			}
  1012  		})
  1013  	}
  1015  }
  1017  func TestAccessModeMapping(t *testing.T) {
  1018  	tests := []struct {
  1019  		name                     string
  1020  		singleNodeMultiWriterSet bool
  1021  		accessMode               api.PersistentVolumeAccessMode
  1022  		expectedMappedAccessMode csipbv1.VolumeCapability_AccessMode_Mode
  1023  	}{
  1024  		{
  1025  			name:                     "with ReadWriteOnce and incapable driver",
  1026  			singleNodeMultiWriterSet: false,
  1027  			accessMode:               api.ReadWriteOnce,
  1028  			expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
  1029  		},
  1030  		{
  1031  			name:                     "with ReadOnlyMany and incapable driver",
  1032  			singleNodeMultiWriterSet: false,
  1033  			accessMode:               api.ReadOnlyMany,
  1034  			expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
  1035  		},
  1036  		{
  1037  			name:                     "with ReadWriteMany and incapable driver",
  1038  			singleNodeMultiWriterSet: false,
  1039  			accessMode:               api.ReadWriteMany,
  1040  			expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
  1041  		},
  1042  		{
  1043  			name:                     "with ReadWriteOncePod and incapable driver",
  1044  			singleNodeMultiWriterSet: false,
  1045  			accessMode:               api.ReadWriteOncePod,
  1046  			expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
  1047  		},
  1048  		{
  1049  			name:                     "with ReadWriteOnce and capable driver",
  1050  			singleNodeMultiWriterSet: true,
  1051  			accessMode:               api.ReadWriteOnce,
  1052  			expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
  1053  		},
  1054  		{
  1055  			name:                     "with ReadOnlyMany and capable driver",
  1056  			singleNodeMultiWriterSet: true,
  1057  			accessMode:               api.ReadOnlyMany,
  1058  			expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
  1059  		},
  1060  		{
  1061  			name:                     "with ReadWriteMany and capable driver",
  1062  			singleNodeMultiWriterSet: true,
  1063  			accessMode:               api.ReadWriteMany,
  1064  			expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
  1065  		},
  1066  		{
  1067  			name:                     "with ReadWriteOncePod and capable driver",
  1068  			singleNodeMultiWriterSet: true,
  1069  			accessMode:               api.ReadWriteOncePod,
  1070  			expectedMappedAccessMode: csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
  1071  		},
  1072  	}
  1073  	for _, tc := range tests {
  1074  		t.Run(tc.name, func(t *testing.T) {
  1075  			fakeCloser := fake.NewCloser(t)
  1076  			client := &csiDriverClient{
  1077  				driverName: "Fake Driver Name",
  1078  				nodeV1ClientCreator: func(addr csiAddr, m *MetricsManager) (csipbv1.NodeClient, io.Closer, error) {
  1079  					nodeClient := fake.NewNodeClientWithSingleNodeMultiWriter(tc.singleNodeMultiWriterSet)
  1080  					return nodeClient, fakeCloser, nil
  1081  				},
  1082  			}
  1084  			accessModeMapper, err := client.getNodeV1AccessModeMapper(context.Background())
  1085  			if err != nil {
  1086  				t.Error(err)
  1087  			}
  1089  			mappedAccessMode := accessModeMapper(tc.accessMode)
  1090  			if mappedAccessMode != tc.expectedMappedAccessMode {
  1091  				t.Errorf("expected access mode: %v; got: %v", tc.expectedMappedAccessMode, mappedAccessMode)
  1092  			}
  1093  		})
  1094  	}
  1095  }

View as plain text