1
16
17 package fake
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "os"
24 "strings"
25
26 csipb "github.com/container-storage-interface/spec/lib/go/csi"
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30 )
31
32 const (
33
34 NodePublishTimeOut_VolumeID = "node-publish-timeout"
35
36
37 NodeStageTimeOut_VolumeID = "node-stage-timeout"
38 )
39
40
41 type IdentityClient struct {
42 nextErr error
43 }
44
45
46 func (f *IdentityClient) SetNextError(err error) {
47 f.nextErr = err
48 }
49
50
51 func (f *IdentityClient) GetPluginInfo(ctx context.Context, in *csipb.GetPluginInfoRequest, opts ...grpc.CallOption) (*csipb.GetPluginInfoResponse, error) {
52 return nil, nil
53 }
54
55
56 func (f *IdentityClient) GetPluginCapabilities(ctx context.Context, in *csipb.GetPluginCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.GetPluginCapabilitiesResponse, error) {
57 return nil, nil
58 }
59
60
61 func (f *IdentityClient) Probe(ctx context.Context, in *csipb.ProbeRequest, opts ...grpc.CallOption) (*csipb.ProbeResponse, error) {
62 return nil, nil
63 }
64
65 type CSIVolume struct {
66 VolumeHandle string
67 VolumeContext map[string]string
68 Path string
69 DeviceMountPath string
70 FSType string
71 MountFlags []string
72 VolumeMountGroup string
73 }
74
75
76 type NodeClient struct {
77 nodePublishedVolumes map[string]CSIVolume
78 nodeStagedVolumes map[string]CSIVolume
79 stageUnstageSet bool
80 expansionSet bool
81 volumeStatsSet bool
82 volumeConditionSet bool
83 singleNodeMultiWriterSet bool
84 volumeMountGroupSet bool
85 nodeGetInfoResp *csipb.NodeGetInfoResponse
86 nodeVolumeStatsResp *csipb.NodeGetVolumeStatsResponse
87 FakeNodeExpansionRequest *csipb.NodeExpandVolumeRequest
88 nextErr error
89 }
90
91
92 func NewNodeClient(stageUnstageSet bool) *NodeClient {
93 return &NodeClient{
94 nodePublishedVolumes: make(map[string]CSIVolume),
95 nodeStagedVolumes: make(map[string]CSIVolume),
96 stageUnstageSet: stageUnstageSet,
97 volumeStatsSet: true,
98 }
99 }
100
101 func NewNodeClientWithExpansion(stageUnstageSet bool, expansionSet bool) *NodeClient {
102 return &NodeClient{
103 nodePublishedVolumes: make(map[string]CSIVolume),
104 nodeStagedVolumes: make(map[string]CSIVolume),
105 stageUnstageSet: stageUnstageSet,
106 expansionSet: expansionSet,
107 }
108 }
109
110 func NewNodeClientWithVolumeStats(volumeStatsSet bool) *NodeClient {
111 return &NodeClient{
112 volumeStatsSet: volumeStatsSet,
113 }
114 }
115
116 func NewNodeClientWithVolumeStatsAndCondition(volumeStatsSet, volumeConditionSet bool) *NodeClient {
117 return &NodeClient{
118 volumeStatsSet: volumeStatsSet,
119 volumeConditionSet: volumeConditionSet,
120 }
121 }
122
123 func NewNodeClientWithSingleNodeMultiWriter(singleNodeMultiWriterSet bool) *NodeClient {
124 return &NodeClient{
125 nodePublishedVolumes: make(map[string]CSIVolume),
126 nodeStagedVolumes: make(map[string]CSIVolume),
127 stageUnstageSet: true,
128 volumeStatsSet: true,
129 singleNodeMultiWriterSet: singleNodeMultiWriterSet,
130 }
131 }
132
133 func NewNodeClientWithVolumeMountGroup(stageUnstageSet, volumeMountGroupSet bool) *NodeClient {
134 return &NodeClient{
135 nodePublishedVolumes: make(map[string]CSIVolume),
136 nodeStagedVolumes: make(map[string]CSIVolume),
137 stageUnstageSet: stageUnstageSet,
138 volumeMountGroupSet: volumeMountGroupSet,
139 }
140 }
141
142
143 func (f *NodeClient) SetNextError(err error) {
144 f.nextErr = err
145 }
146
147 func (f *NodeClient) SetNodeGetInfoResp(resp *csipb.NodeGetInfoResponse) {
148 f.nodeGetInfoResp = resp
149 }
150
151 func (f *NodeClient) SetNodeVolumeStatsResp(resp *csipb.NodeGetVolumeStatsResponse) {
152 f.nodeVolumeStatsResp = resp
153 }
154
155
156 func (f *NodeClient) GetNodePublishedVolumes() map[string]CSIVolume {
157 return f.nodePublishedVolumes
158 }
159
160
161 func (f *NodeClient) AddNodePublishedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
162 f.nodePublishedVolumes[volID] = CSIVolume{
163 Path: deviceMountPath,
164 VolumeContext: volumeContext,
165 }
166 }
167
168
169 func (f *NodeClient) GetNodeStagedVolumes() map[string]CSIVolume {
170 return f.nodeStagedVolumes
171 }
172
173
174 func (f *NodeClient) AddNodeStagedVolume(volID, deviceMountPath string, volumeContext map[string]string) {
175 f.nodeStagedVolumes[volID] = CSIVolume{
176 Path: deviceMountPath,
177 VolumeContext: volumeContext,
178 }
179 }
180
181
182 func (f *NodeClient) NodePublishVolume(ctx context.Context, req *csipb.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodePublishVolumeResponse, error) {
183 if f.nextErr != nil {
184 return nil, f.nextErr
185 }
186
187 if req.GetVolumeId() == "" {
188 return nil, errors.New("missing volume id")
189 }
190 if req.GetTargetPath() == "" {
191 return nil, errors.New("missing target path")
192 }
193 fsTypes := "block|ext4|xfs|zfs"
194 fsType := req.GetVolumeCapability().GetMount().GetFsType()
195 if !strings.Contains(fsTypes, fsType) {
196 return nil, errors.New("invalid fstype")
197 }
198
199 if req.GetVolumeId() == NodePublishTimeOut_VolumeID {
200 timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded")
201 return nil, timeoutErr
202 }
203
204
205
206 if req.VolumeCapability.GetBlock() != nil {
207 if err := os.WriteFile(req.TargetPath, []byte{}, 0644); err != nil {
208 return nil, fmt.Errorf("cannot create target path %s for block file: %s", req.TargetPath, err)
209 }
210 } else {
211 if err := os.MkdirAll(req.TargetPath, 0755); err != nil {
212 return nil, fmt.Errorf("cannot create target directory %s for mount: %s", req.TargetPath, err)
213 }
214 }
215
216 publishedVolume := CSIVolume{
217 VolumeHandle: req.GetVolumeId(),
218 Path: req.GetTargetPath(),
219 DeviceMountPath: req.GetStagingTargetPath(),
220 VolumeContext: req.GetVolumeContext(),
221 }
222 if req.GetVolumeCapability().GetMount() != nil {
223 publishedVolume.FSType = req.GetVolumeCapability().GetMount().FsType
224 publishedVolume.MountFlags = req.GetVolumeCapability().GetMount().MountFlags
225 publishedVolume.VolumeMountGroup = req.GetVolumeCapability().GetMount().VolumeMountGroup
226 }
227 f.nodePublishedVolumes[req.GetVolumeId()] = publishedVolume
228 return &csipb.NodePublishVolumeResponse{}, nil
229 }
230
231
232 func (f *NodeClient) NodeUnpublishVolume(ctx context.Context, req *csipb.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnpublishVolumeResponse, error) {
233 if f.nextErr != nil {
234 return nil, f.nextErr
235 }
236
237 if req.GetVolumeId() == "" {
238 return nil, errors.New("missing volume id")
239 }
240 if req.GetTargetPath() == "" {
241 return nil, errors.New("missing target path")
242 }
243 delete(f.nodePublishedVolumes, req.GetVolumeId())
244
245
246 if err := os.Remove(req.TargetPath); err != nil && !os.IsNotExist(err) {
247 return nil, fmt.Errorf("failed to remove publish path %s: %s", req.TargetPath, err)
248 }
249
250 return &csipb.NodeUnpublishVolumeResponse{}, nil
251 }
252
253
254 func (f *NodeClient) NodeStageVolume(ctx context.Context, req *csipb.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeStageVolumeResponse, error) {
255 if f.nextErr != nil {
256 return nil, f.nextErr
257 }
258
259 if req.GetVolumeId() == "" {
260 return nil, errors.New("missing volume id")
261 }
262 if req.GetStagingTargetPath() == "" {
263 return nil, errors.New("missing staging target path")
264 }
265
266 csiVol := CSIVolume{
267 Path: req.GetStagingTargetPath(),
268 VolumeContext: req.GetVolumeContext(),
269 }
270
271 fsType := ""
272 fsTypes := "block|ext4|xfs|zfs"
273 mounted := req.GetVolumeCapability().GetMount()
274 if mounted != nil {
275 fsType = mounted.GetFsType()
276 csiVol.MountFlags = mounted.GetMountFlags()
277 csiVol.VolumeMountGroup = mounted.VolumeMountGroup
278 }
279 if !strings.Contains(fsTypes, fsType) {
280 return nil, errors.New("invalid fstype")
281 }
282
283 if req.GetVolumeId() == NodeStageTimeOut_VolumeID {
284 timeoutErr := status.Errorf(codes.DeadlineExceeded, "timeout exceeded")
285 return nil, timeoutErr
286 }
287
288 f.nodeStagedVolumes[req.GetVolumeId()] = csiVol
289 return &csipb.NodeStageVolumeResponse{}, nil
290 }
291
292
293 func (f *NodeClient) NodeUnstageVolume(ctx context.Context, req *csipb.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeUnstageVolumeResponse, error) {
294 if f.nextErr != nil {
295 return nil, f.nextErr
296 }
297
298 if req.GetVolumeId() == "" {
299 return nil, errors.New("missing volume id")
300 }
301 if req.GetStagingTargetPath() == "" {
302 return nil, errors.New("missing staging target path")
303 }
304
305 delete(f.nodeStagedVolumes, req.GetVolumeId())
306 return &csipb.NodeUnstageVolumeResponse{}, nil
307 }
308
309
310 func (f *NodeClient) NodeExpandVolume(ctx context.Context, req *csipb.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipb.NodeExpandVolumeResponse, error) {
311 if f.nextErr != nil {
312 return nil, f.nextErr
313 }
314
315 if req.GetVolumeId() == "" {
316 return nil, errors.New("missing volume id")
317 }
318 if req.GetVolumePath() == "" {
319 return nil, errors.New("missing volume path")
320 }
321
322 if req.GetCapacityRange().RequiredBytes <= 0 {
323 return nil, errors.New("required bytes should be greater than 0")
324 }
325
326 f.FakeNodeExpansionRequest = req
327
328 resp := &csipb.NodeExpandVolumeResponse{
329 CapacityBytes: req.GetCapacityRange().RequiredBytes,
330 }
331 return resp, nil
332 }
333
334
335 func (f *NodeClient) NodeGetInfo(ctx context.Context, in *csipb.NodeGetInfoRequest, opts ...grpc.CallOption) (*csipb.NodeGetInfoResponse, error) {
336 if f.nextErr != nil {
337 return nil, f.nextErr
338 }
339 return f.nodeGetInfoResp, nil
340 }
341
342
343 func (f *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipb.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.NodeGetCapabilitiesResponse, error) {
344 resp := &csipb.NodeGetCapabilitiesResponse{
345 Capabilities: []*csipb.NodeServiceCapability{},
346 }
347 if f.stageUnstageSet {
348 resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
349 Type: &csipb.NodeServiceCapability_Rpc{
350 Rpc: &csipb.NodeServiceCapability_RPC{
351 Type: csipb.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
352 },
353 },
354 })
355 }
356 if f.expansionSet {
357 resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
358 Type: &csipb.NodeServiceCapability_Rpc{
359 Rpc: &csipb.NodeServiceCapability_RPC{
360 Type: csipb.NodeServiceCapability_RPC_EXPAND_VOLUME,
361 },
362 },
363 })
364 }
365
366 if f.volumeStatsSet {
367 resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
368 Type: &csipb.NodeServiceCapability_Rpc{
369 Rpc: &csipb.NodeServiceCapability_RPC{
370 Type: csipb.NodeServiceCapability_RPC_GET_VOLUME_STATS,
371 },
372 },
373 })
374 }
375
376 if f.volumeConditionSet {
377 resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
378 Type: &csipb.NodeServiceCapability_Rpc{
379 Rpc: &csipb.NodeServiceCapability_RPC{
380 Type: csipb.NodeServiceCapability_RPC_VOLUME_CONDITION,
381 },
382 },
383 })
384 }
385
386 if f.singleNodeMultiWriterSet {
387 resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
388 Type: &csipb.NodeServiceCapability_Rpc{
389 Rpc: &csipb.NodeServiceCapability_RPC{
390 Type: csipb.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
391 },
392 },
393 })
394 }
395
396 if f.volumeMountGroupSet {
397 resp.Capabilities = append(resp.Capabilities, &csipb.NodeServiceCapability{
398 Type: &csipb.NodeServiceCapability_Rpc{
399 Rpc: &csipb.NodeServiceCapability_RPC{
400 Type: csipb.NodeServiceCapability_RPC_VOLUME_MOUNT_GROUP,
401 },
402 },
403 })
404 }
405 return resp, nil
406 }
407
408
414
415
416 func (f *NodeClient) NodeGetVolumeStats(ctx context.Context, req *csipb.NodeGetVolumeStatsRequest, opts ...grpc.CallOption) (*csipb.NodeGetVolumeStatsResponse, error) {
417 if f.nextErr != nil {
418 return nil, f.nextErr
419 }
420 if req.GetVolumeId() == "" {
421 return nil, errors.New("missing volume id")
422 }
423 if req.GetVolumePath() == "" {
424 return nil, errors.New("missing Volume path")
425 }
426 if f.nodeVolumeStatsResp != nil {
427 return f.nodeVolumeStatsResp, nil
428 }
429 return &csipb.NodeGetVolumeStatsResponse{}, nil
430 }
431
432
433 type ControllerClient struct {
434 nextCapabilities []*csipb.ControllerServiceCapability
435 nextErr error
436 }
437
438
439 func (f *ControllerClient) SetNextError(err error) {
440 f.nextErr = err
441 }
442
443
444 func (f *ControllerClient) SetNextCapabilities(caps []*csipb.ControllerServiceCapability) {
445 f.nextCapabilities = caps
446 }
447
448
449 func (f *ControllerClient) ControllerGetCapabilities(ctx context.Context, in *csipb.ControllerGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ControllerGetCapabilitiesResponse, error) {
450 if f.nextErr != nil {
451 return nil, f.nextErr
452 }
453
454 if f.nextCapabilities == nil {
455 f.nextCapabilities = []*csipb.ControllerServiceCapability{
456 {
457 Type: &csipb.ControllerServiceCapability_Rpc{
458 Rpc: &csipb.ControllerServiceCapability_RPC{
459 Type: csipb.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
460 },
461 },
462 },
463 }
464 }
465 return &csipb.ControllerGetCapabilitiesResponse{
466 Capabilities: f.nextCapabilities,
467 }, nil
468 }
469
470
471 func (f *ControllerClient) CreateVolume(ctx context.Context, in *csipb.CreateVolumeRequest, opts ...grpc.CallOption) (*csipb.CreateVolumeResponse, error) {
472 return nil, nil
473 }
474
475
476 func (f *ControllerClient) DeleteVolume(ctx context.Context, in *csipb.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipb.DeleteVolumeResponse, error) {
477 return nil, nil
478 }
479
480
481 func (f *ControllerClient) ControllerPublishVolume(ctx context.Context, in *csipb.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerPublishVolumeResponse, error) {
482 return nil, nil
483 }
484
485
486 func (f *ControllerClient) ControllerUnpublishVolume(ctx context.Context, in *csipb.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipb.ControllerUnpublishVolumeResponse, error) {
487 return nil, nil
488 }
489
490
491 func (f *ControllerClient) ValidateVolumeCapabilities(ctx context.Context, in *csipb.ValidateVolumeCapabilitiesRequest, opts ...grpc.CallOption) (*csipb.ValidateVolumeCapabilitiesResponse, error) {
492 return nil, nil
493 }
494
495
496 func (f *ControllerClient) ListVolumes(ctx context.Context, in *csipb.ListVolumesRequest, opts ...grpc.CallOption) (*csipb.ListVolumesResponse, error) {
497 return nil, nil
498 }
499
500
501 func (f *ControllerClient) GetCapacity(ctx context.Context, in *csipb.GetCapacityRequest, opts ...grpc.CallOption) (*csipb.GetCapacityResponse, error) {
502 return nil, nil
503 }
504
View as plain text