...

Source file src/k8s.io/kubernetes/pkg/volume/csi/fake/fake_client.go

Documentation: k8s.io/kubernetes/pkg/volume/csi/fake

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package fake
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"os"
    24  	"strings"
    25  
    26  	csipb "github.com/container-storage-interface/spec/lib/go/csi"
    27  	"google.golang.org/grpc"
    28  	"google.golang.org/grpc/codes"
    29  	"google.golang.org/grpc/status"
    30  )
    31  
    32  const (
    33  	// NodePublishTimeOut_VolumeID is volume id that will result in NodePublish operation to timeout
    34  	NodePublishTimeOut_VolumeID = "node-publish-timeout"
    35  
    36  	// NodeStageTimeOut_VolumeID is a volume id that will result in NodeStage operation to timeout
    37  	NodeStageTimeOut_VolumeID = "node-stage-timeout"
    38  )
    39  
    40  // IdentityClient is a CSI identity client used for testing
    41  type IdentityClient struct {
    42  	nextErr error
    43  }
    44  
    45  // SetNextError injects expected error
    46  func (f *IdentityClient) SetNextError(err error) {
    47  	f.nextErr = err
    48  }
    49  
    50  // GetPluginInfo returns plugin info
    51  func (f *IdentityClient) GetPluginInfo(ctx context.Context, in *csipb.GetPluginInfoRequest, opts ...grpc.CallOption) (*csipb.GetPluginInfoResponse, error) {
    52  	return nil, nil
    53  }
    54  
    55  // GetPluginCapabilities implements csi method
    56  func (f *IdentityClient) GetPluginCapabilities(ctx context.Context, in *csipb.GetPluginCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.GetPluginCapabilitiesResponse, error) {
    57  	return nil, nil
    58  }
    59  
    60  // Probe implements csi method
    61  func (f *IdentityClient) Probe(ctx context.Context, in *csipb.ProbeRequest, opts ...grpc.CallOption) (*csipb.ProbeResponse, error) {
    62  	return nil, nil
    63  }
    64  
    65  type CSIVolume struct {
    66  	VolumeHandle     string
    67  	VolumeContext    map[string]string
    68  	Path             string
    69  	DeviceMountPath  string
    70  	FSType           string
    71  	MountFlags       []string
    72  	VolumeMountGroup string
    73  }
    74  
    75  // NodeClient returns CSI node client
    76  type NodeClient struct {
    77  	nodePublishedVolumes     map[string]CSIVolume
    78  	nodeStagedVolumes        map[string]CSIVolume
    79  	stageUnstageSet          bool
    80  	expansionSet             bool
    81  	volumeStatsSet           bool
    82  	volumeConditionSet       bool
    83  	singleNodeMultiWriterSet bool
    84  	volumeMountGroupSet      bool
    85  	nodeGetInfoResp          *csipb.NodeGetInfoResponse
    86  	nodeVolumeStatsResp      *csipb.NodeGetVolumeStatsResponse
    87  	FakeNodeExpansionRequest *csipb.NodeExpandVolumeRequest
    88  	nextErr                  error
    89  }
    90  
    91  // NewNodeClient returns fake node client
    92  func NewNodeClient(stageUnstageSet bool) *NodeClient {
    93  	return &NodeClient{
    94  		nodePublishedVolumes: make(map[string]CSIVolume),
    95  		nodeStagedVolumes:    make(map[string]CSIVolume),
    96  		stageUnstageSet:      stageUnstageSet,
    97  		volumeStatsSet:       true,
    98  	}
    99  }
   100  
   101  func NewNodeClientWithExpansion(stageUnstageSet bool, expansionSet bool) *NodeClient {
   102  	return &NodeClient{
   103  		nodePublishedVolumes: make(map[string]CSIVolume),
   104  		nodeStagedVolumes:    make(map[string]CSIVolume),
   105  		stageUnstageSet:      stageUnstageSet,
   106  		expansionSet:         expansionSet,
   107  	}
   108  }
   109  
   110  func NewNodeClientWithVolumeStats(volumeStatsSet bool) *NodeClient {
   111  	return &NodeClient{
   112  		volumeStatsSet: volumeStatsSet,
   113  	}
   114  }
   115  
   116  func NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet bool) *NodeClient {
   117  	return &NodeClient{
   118  		volumeStatsSet:     volumeStatsSet,
   119  		volumeConditionSet: volumeConditionSet,
   120  	}
   121  }
   122  
   123  func NewNodeClientWithSingleNodeMultiWriter(singleNodeMultiWriterSet bool) *NodeClient {
   124  	return &NodeClient{
   125  		nodePublishedVolumes:     make(map[string]CSIVolume),
   126  		nodeStagedVolumes:        make(map[string]CSIVolume),
   127  		stageUnstageSet:          true,
   128  		volumeStatsSet:           true,
   129  		singleNodeMultiWriterSet: singleNodeMultiWriterSet,
   130  	}
   131  }
   132  
   133  func NewNodeClientWithVolumeMountGroup(stageUnstageSet, volumeMountGroupSet bool) *NodeClient {
   134  	return &NodeClient{
   135  		nodePublishedVolumes: make(map[string]CSIVolume),
   136  		nodeStagedVolumes:    make(map[string]CSIVolume),
   137  		stageUnstageSet:      stageUnstageSet,
   138  		volumeMountGroupSet:  volumeMountGroupSet,
   139  	}
   140  }
   141  
   142  // SetNextError injects next expected error
   143  func (f *NodeClient) SetNextError(err error) {
   144  	f.nextErr = err
   145  }
   146  
   147  func (f *NodeClient) SetNodeGetInfoResp(resp *csipb.NodeGetInfoResponse) {
   148  	f.nodeGetInfoResp = resp
   149  }
   150  
   151  func (f *NodeClient) SetNodeVolumeStatsResp(resp *csipb.NodeGetVolumeStatsResponse) {
   152  	f.nodeVolumeStatsResp = resp
   153  }
   154  
   155  // GetNodePublishedVolumes returns node published volumes
   156  func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume {
   157  	return f.nodePublishedVolumes
   158  }
   159  
   160  // AddNodePublishedVolume adds specified volume to nodePublishedVolumes
   161  func (f *NodeClient) AddNodePublishedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
   162  	f.nodePublishedVolumes[volID] = CSIVolume{
   163  		Path:          deviceMountPath,
   164  		VolumeContext: volumeContext,
   165  	}
   166  }
   167  
   168  // GetNodeStagedVolumes returns node staged volumes
   169  func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume {
   170  	return f.nodeStagedVolumes
   171  }
   172  
   173  // AddNodeStagedVolume adds specified volume to nodeStagedVolumes
   174  func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
   175  	f.nodeStagedVolumes[volID] = CSIVolume{
   176  		Path:          deviceMountPath,
   177  		VolumeContext: volumeContext,
   178  	}
   179  }
   180  
   181  // NodePublishVolume implements CSI NodePublishVolume
   182  func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodePublishVolumeResponse, error) {
   183  	if f.nextErr != nil {
   184  		return nil, f.nextErr
   185  	}
   186  
   187  	if req.GetVolumeId() == "" {
   188  		return nil, errors.New("missing volume id")
   189  	}
   190  	if req.GetTargetPath() == "" {
   191  		return nil, errors.New("missing target path")
   192  	}
   193  	fsTypes := "block|ext4|xfs|zfs"
   194  	fsType := req.GetVolumeCapability().GetMount().GetFsType()
   195  	if !strings.Contains(fsTypes, fsType) {
   196  		return nil, errors.New("invalid fstype")
   197  	}
   198  
   199  	if req.GetVolumeId() == NodePublishTimeOut_VolumeID {
   200  		timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded")
   201  		return nil, timeoutErr
   202  	}
   203  
   204  	// "Creation of target_path is the responsibility of the SP."
   205  	// Our plugin depends on it.
   206  	if req.VolumeCapability.GetBlock() != nil {
   207  		if err := os.WriteFile(req.TargetPath, []byte{}, 0644); err != nil {
   208  			return nil, fmt.Errorf("cannot create target path %s for block file: %s", req.TargetPath, err)
   209  		}
   210  	} else {
   211  		if err := os.MkdirAll(req.TargetPath, 0755); err != nil {
   212  			return nil, fmt.Errorf("cannot create target directory %s for mount: %s", req.TargetPath, err)
   213  		}
   214  	}
   215  
   216  	publishedVolume := CSIVolume{
   217  		VolumeHandle:    req.GetVolumeId(),
   218  		Path:            req.GetTargetPath(),
   219  		DeviceMountPath: req.GetStagingTargetPath(),
   220  		VolumeContext:   req.GetVolumeContext(),
   221  	}
   222  	if req.GetVolumeCapability().GetMount() != nil {
   223  		publishedVolume.FSType = req.GetVolumeCapability().GetMount().FsType
   224  		publishedVolume.MountFlags = req.GetVolumeCapability().GetMount().MountFlags
   225  		publishedVolume.VolumeMountGroup = req.GetVolumeCapability().GetMount().VolumeMountGroup
   226  	}
   227  	f.nodePublishedVolumes[req.GetVolumeId()] = publishedVolume
   228  	return &csipb.NodePublishVolumeResponse{}, nil
   229  }
   230  
   231  // NodeUnpublishVolume implements csi method
   232  func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) {
   233  	if f.nextErr != nil {
   234  		return nil, f.nextErr
   235  	}
   236  
   237  	if req.GetVolumeId() == "" {
   238  		return nil, errors.New("missing volume id")
   239  	}
   240  	if req.GetTargetPath() == "" {
   241  		return nil, errors.New("missing target path")
   242  	}
   243  	delete(f.nodePublishedVolumes, req.GetVolumeId())
   244  
   245  	// "The SP MUST delete the file or directory it created at this path."
   246  	if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
   247  		return nil, fmt.Errorf("failed to remove publish path %s: %s", req.TargetPath, err)
   248  	}
   249  
   250  	return &csipb.NodeUnpublishVolumeResponse{}, nil
   251  }
   252  
   253  // NodeStagevolume implements csi method
   254  func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeStageVolumeResponse, error) {
   255  	if f.nextErr != nil {
   256  		return nil, f.nextErr
   257  	}
   258  
   259  	if req.GetVolumeId() == "" {
   260  		return nil, errors.New("missing volume id")
   261  	}
   262  	if req.GetStagingTargetPath() == "" {
   263  		return nil, errors.New("missing staging target path")
   264  	}
   265  
   266  	csiVol := CSIVolume{
   267  		Path:          req.GetStagingTargetPath(),
   268  		VolumeContext: req.GetVolumeContext(),
   269  	}
   270  
   271  	fsType := ""
   272  	fsTypes := "block|ext4|xfs|zfs"
   273  	mounted := req.GetVolumeCapability().GetMount()
   274  	if mounted != nil {
   275  		fsType = mounted.GetFsType()
   276  		csiVol.MountFlags = mounted.GetMountFlags()
   277  		csiVol.VolumeMountGroup = mounted.VolumeMountGroup
   278  	}
   279  	if !strings.Contains(fsTypes, fsType) {
   280  		return nil, errors.New("invalid fstype")
   281  	}
   282  
   283  	if req.GetVolumeId() == NodeStageTimeOut_VolumeID {
   284  		timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded")
   285  		return nil, timeoutErr
   286  	}
   287  
   288  	f.nodeStagedVolumes[req.GetVolumeId()] = csiVol
   289  	return &csipb.NodeStageVolumeResponse{}, nil
   290  }
   291  
   292  // NodeUnstageVolume implements csi method
   293  func (f *NodeClient) NodeUnstageVolume(ctx context.Context, req *csipb.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnstageVolumeResponse, error) {
   294  	if f.nextErr != nil {
   295  		return nil, f.nextErr
   296  	}
   297  
   298  	if req.GetVolumeId() == "" {
   299  		return nil, errors.New("missing volume id")
   300  	}
   301  	if req.GetStagingTargetPath() == "" {
   302  		return nil, errors.New("missing staging target path")
   303  	}
   304  
   305  	delete(f.nodeStagedVolumes, req.GetVolumeId())
   306  	return &csipb.NodeUnstageVolumeResponse{}, nil
   307  }
   308  
   309  // NodeExpandVolume implements csi method
   310  func (f *NodeClient) NodeExpandVolume(ctx context.Context, req *csipb.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeExpandVolumeResponse, error) {
   311  	if f.nextErr != nil {
   312  		return nil, f.nextErr
   313  	}
   314  
   315  	if req.GetVolumeId() == "" {
   316  		return nil, errors.New("missing volume id")
   317  	}
   318  	if req.GetVolumePath() == "" {
   319  		return nil, errors.New("missing volume path")
   320  	}
   321  
   322  	if req.GetCapacityRange().RequiredBytes <= 0 {
   323  		return nil, errors.New("required bytes should be greater than 0")
   324  	}
   325  
   326  	f.FakeNodeExpansionRequest = req
   327  
   328  	resp := &csipb.NodeExpandVolumeResponse{
   329  		CapacityBytes: req.GetCapacityRange().RequiredBytes,
   330  	}
   331  	return resp, nil
   332  }
   333  
   334  // NodeGetId implements csi method
   335  func (f *NodeClient) NodeGetInfo(ctx context.Context, in *csipb.NodeGetInfoRequest, opts ...grpc.CallOption) (*csipb.NodeGetInfoResponse, error) {
   336  	if f.nextErr != nil {
   337  		return nil, f.nextErr
   338  	}
   339  	return f.nodeGetInfoResp, nil
   340  }
   341  
   342  // NodeGetCapabilities implements csi method
   343  func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) {
   344  	resp := &csipb.NodeGetCapabilitiesResponse{
   345  		Capabilities: []*csipb.NodeServiceCapability{},
   346  	}
   347  	if f.stageUnstageSet {
   348  		resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
   349  			Type: &csipb.NodeServiceCapability_Rpc{
   350  				Rpc: &csipb.NodeServiceCapability_RPC{
   351  					Type: csipb.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
   352  				},
   353  			},
   354  		})
   355  	}
   356  	if f.expansionSet {
   357  		resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
   358  			Type: &csipb.NodeServiceCapability_Rpc{
   359  				Rpc: &csipb.NodeServiceCapability_RPC{
   360  					Type: csipb.NodeServiceCapability_RPC_EXPAND_VOLUME,
   361  				},
   362  			},
   363  		})
   364  	}
   365  
   366  	if f.volumeStatsSet {
   367  		resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
   368  			Type: &csipb.NodeServiceCapability_Rpc{
   369  				Rpc: &csipb.NodeServiceCapability_RPC{
   370  					Type: csipb.NodeServiceCapability_RPC_GET_VOLUME_STATS,
   371  				},
   372  			},
   373  		})
   374  	}
   375  
   376  	if f.volumeConditionSet {
   377  		resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
   378  			Type: &csipb.NodeServiceCapability_Rpc{
   379  				Rpc: &csipb.NodeServiceCapability_RPC{
   380  					Type: csipb.NodeServiceCapability_RPC_VOLUME_CONDITION,
   381  				},
   382  			},
   383  		})
   384  	}
   385  
   386  	if f.singleNodeMultiWriterSet {
   387  		resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
   388  			Type: &csipb.NodeServiceCapability_Rpc{
   389  				Rpc: &csipb.NodeServiceCapability_RPC{
   390  					Type: csipb.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
   391  				},
   392  			},
   393  		})
   394  	}
   395  
   396  	if f.volumeMountGroupSet {
   397  		resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
   398  			Type: &csipb.NodeServiceCapability_Rpc{
   399  				Rpc: &csipb.NodeServiceCapability_RPC{
   400  					Type: csipb.NodeServiceCapability_RPC_VOLUME_MOUNT_GROUP,
   401  				},
   402  			},
   403  		})
   404  	}
   405  	return resp, nil
   406  }
   407  
   408  /*
   409  // NodeGetVolumeStats implements csi method
   410  func (f *NodeClient) NodeGetVolumeStats(ctx context.Context, in *csipb.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csipb.NodeGetVolumeStatsResponse, error) {
   411  	return nil, nil
   412  }
   413  */
   414  
   415  // NodeGetVolumeStats implements csi method
   416  func (f *NodeClient) NodeGetVolumeStats(ctx context.Context, req *csipb.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csipb.NodeGetVolumeStatsResponse, error) {
   417  	if f.nextErr != nil {
   418  		return nil, f.nextErr
   419  	}
   420  	if req.GetVolumeId() == "" {
   421  		return nil, errors.New("missing volume id")
   422  	}
   423  	if req.GetVolumePath() == "" {
   424  		return nil, errors.New("missing Volume path")
   425  	}
   426  	if f.nodeVolumeStatsResp != nil {
   427  		return f.nodeVolumeStatsResp, nil
   428  	}
   429  	return &csipb.NodeGetVolumeStatsResponse{}, nil
   430  }
   431  
   432  // ControllerClient represents a CSI Controller client
   433  type ControllerClient struct {
   434  	nextCapabilities []*csipb.ControllerServiceCapability
   435  	nextErr          error
   436  }
   437  
   438  // SetNextError injects next expected error
   439  func (f *ControllerClient) SetNextError(err error) {
   440  	f.nextErr = err
   441  }
   442  
   443  // SetNextCapabilities injects next expected capabilities
   444  func (f *ControllerClient) SetNextCapabilities(caps []*csipb.ControllerServiceCapability) {
   445  	f.nextCapabilities = caps
   446  }
   447  
   448  // ControllerGetCapabilities implements csi method
   449  func (f *ControllerClient) ControllerGetCapabilities(ctx context.Context, in *csipb.ControllerGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ControllerGetCapabilitiesResponse, error) {
   450  	if f.nextErr != nil {
   451  		return nil, f.nextErr
   452  	}
   453  
   454  	if f.nextCapabilities == nil {
   455  		f.nextCapabilities = []*csipb.ControllerServiceCapability{
   456  			{
   457  				Type: &csipb.ControllerServiceCapability_Rpc{
   458  					Rpc: &csipb.ControllerServiceCapability_RPC{
   459  						Type: csipb.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
   460  					},
   461  				},
   462  			},
   463  		}
   464  	}
   465  	return &csipb.ControllerGetCapabilitiesResponse{
   466  		Capabilities: f.nextCapabilities,
   467  	}, nil
   468  }
   469  
   470  // CreateVolume implements csi method
   471  func (f *ControllerClient) CreateVolume(ctx context.Context, in *csipb.CreateVolumeRequest, opts ...grpc.CallOption) (*csipb.CreateVolumeResponse, error) {
   472  	return nil, nil
   473  }
   474  
   475  // DeleteVolume implements csi method
   476  func (f *ControllerClient) DeleteVolume(ctx context.Context, in *csipb.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipb.DeleteVolumeResponse, error) {
   477  	return nil, nil
   478  }
   479  
   480  // ControllerPublishVolume implements csi method
   481  func (f *ControllerClient) ControllerPublishVolume(ctx context.Context, in *csipb.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerPublishVolumeResponse, error) {
   482  	return nil, nil
   483  }
   484  
   485  // ControllerUnpublishVolume implements csi method
   486  func (f *ControllerClient) ControllerUnpublishVolume(ctx context.Context, in *csipb.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerUnpublishVolumeResponse, error) {
   487  	return nil, nil
   488  }
   489  
   490  // ValidateVolumeCapabilities implements csi method
   491  func (f *ControllerClient) ValidateVolumeCapabilities(ctx context.Context, in *csipb.ValidateVolumeCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ValidateVolumeCapabilitiesResponse, error) {
   492  	return nil, nil
   493  }
   494  
   495  // ListVolumes implements csi method
   496  func (f *ControllerClient) ListVolumes(ctx context.Context, in *csipb.ListVolumesRequest, opts ...grpc.CallOption) (*csipb.ListVolumesResponse, error) {
   497  	return nil, nil
   498  }
   499  
   500  // GetCapacity implements csi method
   501  func (f *ControllerClient) GetCapacity(ctx context.Context, in *csipb.GetCapacityRequest, opts ...grpc.CallOption) (*csipb.GetCapacityResponse, error) {
   502  	return nil, nil
   503  }
   504  

View as plain text