1
16
17 package service
18
19 import (
20 "fmt"
21 "path"
22 "reflect"
23 "strconv"
24
25 "github.com/container-storage-interface/spec/lib/go/csi"
26 "golang.org/x/net/context"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29
30 "k8s.io/klog/v2"
31 )
32
33 const (
34 MaxStorageCapacity = tib
35 ReadOnlyKey = "readonly"
36 )
37
38 func (s *service) CreateVolume(
39 ctx context.Context,
40 req *csi.CreateVolumeRequest) (
41 *csi.CreateVolumeResponse, error) {
42
43 if len(req.Name) == 0 {
44 return nil, status.Error(codes.InvalidArgument, "Volume Name cannot be empty")
45 }
46 if req.VolumeCapabilities == nil {
47 return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
48 }
49
50
51 if i, v := s.findVolByName(ctx, req.Name); i >= 0 {
52
53
54 if v.GetCapacityBytes() < req.GetCapacityRange().GetRequiredBytes() {
55 return nil, status.Error(codes.AlreadyExists,
56 fmt.Sprintf("Volume with name %s already exists", req.GetName()))
57 }
58 return &csi.CreateVolumeResponse{Volume: &v}, nil
59 }
60
61
62 capacity := gib100
63 if cr := req.CapacityRange; cr != nil {
64 if rb := cr.RequiredBytes; rb > 0 {
65 capacity = rb
66 }
67 if lb := cr.LimitBytes; lb > 0 {
68 capacity = lb
69 }
70 }
71
72 if capacity >= MaxStorageCapacity {
73 return nil, status.Errorf(codes.OutOfRange, "Requested capacity %d exceeds maximum allowed %d", capacity, MaxStorageCapacity)
74 }
75
76 var v csi.Volume
77
78 if req.GetVolumeContentSource() != nil {
79 switch req.GetVolumeContentSource().GetType().(type) {
80 case *csi.VolumeContentSource_Snapshot:
81 sid := req.GetVolumeContentSource().GetSnapshot().GetSnapshotId()
82
83 if snapID, _ := s.snapshots.FindSnapshot("id", sid); snapID >= 0 {
84 v = s.newVolumeFromSnapshot(req.Name, capacity, snapID)
85 } else {
86 return nil, status.Errorf(codes.NotFound, "Requested source snapshot %s not found", sid)
87 }
88 case *csi.VolumeContentSource_Volume:
89 vid := req.GetVolumeContentSource().GetVolume().GetVolumeId()
90
91 if volID, _ := s.findVolNoLock("id", vid); volID >= 0 {
92 v = s.newVolumeFromVolume(req.Name, capacity, volID)
93 } else {
94 return nil, status.Errorf(codes.NotFound, "Requested source volume %s not found", vid)
95 }
96 }
97 } else {
98 v = s.newVolume(req.Name, capacity)
99 }
100
101
102 s.volsRWL.Lock()
103 defer s.volsRWL.Unlock()
104 s.vols = append(s.vols, v)
105 MockVolumes[v.GetVolumeId()] = Volume{
106 VolumeCSI: v,
107 NodeID: "",
108 ISStaged: false,
109 ISPublished: false,
110 StageTargetPath: "",
111 TargetPath: "",
112 }
113
114 if hookVal, hookMsg := s.execHook("CreateVolumeEnd"); hookVal != codes.OK {
115 return nil, status.Errorf(hookVal, hookMsg)
116 }
117
118 return &csi.CreateVolumeResponse{Volume: &v}, nil
119 }
120
121 func (s *service) DeleteVolume(
122 ctx context.Context,
123 req *csi.DeleteVolumeRequest) (
124 *csi.DeleteVolumeResponse, error) {
125
126 s.volsRWL.Lock()
127 defer s.volsRWL.Unlock()
128
129
130 if len(req.VolumeId) == 0 {
131 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
132 }
133
134 if hookVal, hookMsg := s.execHook("DeleteVolumeStart"); hookVal != codes.OK {
135 return nil, status.Errorf(hookVal, hookMsg)
136 }
137
138
139 i, _ := s.findVolNoLock("id", req.VolumeId)
140 if i < 0 {
141 return &csi.DeleteVolumeResponse{}, nil
142 }
143
144
145
146
147 copy(s.vols[i:], s.vols[i+1:])
148 s.vols[len(s.vols)-1] = csi.Volume{}
149 s.vols = s.vols[:len(s.vols)-1]
150 klog.V(5).InfoS("mock delete volume", "volumeID", req.VolumeId)
151
152 if hookVal, hookMsg := s.execHook("DeleteVolumeEnd"); hookVal != codes.OK {
153 return nil, status.Errorf(hookVal, hookMsg)
154 }
155 return &csi.DeleteVolumeResponse{}, nil
156 }
157
158 func (s *service) ControllerPublishVolume(
159 ctx context.Context,
160 req *csi.ControllerPublishVolumeRequest) (
161 *csi.ControllerPublishVolumeResponse, error) {
162
163 if s.config.DisableAttach {
164 return nil, status.Error(codes.Unimplemented, "ControllerPublish is not supported")
165 }
166
167 if len(req.VolumeId) == 0 {
168 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
169 }
170 if len(req.NodeId) == 0 {
171 return nil, status.Error(codes.InvalidArgument, "Node ID cannot be empty")
172 }
173 if req.VolumeCapability == nil {
174 return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
175 }
176
177 if req.NodeId != s.nodeID {
178 return nil, status.Errorf(codes.NotFound, "Not matching Node ID %s to Mock Node ID %s", req.NodeId, s.nodeID)
179 }
180
181 if hookVal, hookMsg := s.execHook("ControllerPublishVolumeStart"); hookVal != codes.OK {
182 return nil, status.Errorf(hookVal, hookMsg)
183 }
184
185 s.volsRWL.Lock()
186 defer s.volsRWL.Unlock()
187
188 i, v := s.findVolNoLock("id", req.VolumeId)
189 if i < 0 {
190 return nil, status.Error(codes.NotFound, req.VolumeId)
191 }
192
193
194
195
196 devPathKey := path.Join(req.NodeId, "dev")
197
198
199 if device := v.VolumeContext[devPathKey]; device != "" {
200 var volRo bool
201 var roVal string
202 if ro, ok := v.VolumeContext[ReadOnlyKey]; ok {
203 roVal = ro
204 }
205
206 if roVal == "true" {
207 volRo = true
208 } else {
209 volRo = false
210 }
211
212
213 if req.GetReadonly() != volRo {
214 return nil, status.Error(codes.AlreadyExists, "Volume published but has incompatible readonly flag")
215 }
216
217 return &csi.ControllerPublishVolumeResponse{
218 PublishContext: map[string]string{
219 "device": device,
220 "readonly": roVal,
221 },
222 }, nil
223 }
224
225
226 if s.config.AttachLimit > 0 && s.getAttachCount(devPathKey) >= s.config.AttachLimit {
227 return nil, status.Errorf(codes.ResourceExhausted, "Cannot attach any more volumes to this node")
228 }
229
230 var roVal string
231 if req.GetReadonly() {
232 roVal = "true"
233 } else {
234 roVal = "false"
235 }
236
237
238 device := "/dev/mock"
239 v.VolumeContext[devPathKey] = device
240 v.VolumeContext[ReadOnlyKey] = roVal
241 s.vols[i] = v
242
243 if volInfo, ok := MockVolumes[req.VolumeId]; ok {
244 volInfo.ISControllerPublished = true
245 MockVolumes[req.VolumeId] = volInfo
246 }
247
248 if hookVal, hookMsg := s.execHook("ControllerPublishVolumeEnd"); hookVal != codes.OK {
249 return nil, status.Errorf(hookVal, hookMsg)
250 }
251
252 return &csi.ControllerPublishVolumeResponse{
253 PublishContext: map[string]string{
254 "device": device,
255 "readonly": roVal,
256 },
257 }, nil
258 }
259
260 func (s *service) ControllerUnpublishVolume(
261 ctx context.Context,
262 req *csi.ControllerUnpublishVolumeRequest) (
263 *csi.ControllerUnpublishVolumeResponse, error) {
264
265 if s.config.DisableAttach {
266 return nil, status.Error(codes.Unimplemented, "ControllerPublish is not supported")
267 }
268
269 if len(req.VolumeId) == 0 {
270 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
271 }
272 nodeID := req.NodeId
273 if len(nodeID) == 0 {
274
275 nodeID = s.nodeID
276 }
277
278 if req.NodeId != s.nodeID {
279 return nil, status.Errorf(codes.NotFound, "Node ID %s does not match to expected Node ID %s", req.NodeId, s.nodeID)
280 }
281
282 if hookVal, hookMsg := s.execHook("ControllerUnpublishVolumeStart"); hookVal != codes.OK {
283 return nil, status.Errorf(hookVal, hookMsg)
284 }
285
286 s.volsRWL.Lock()
287 defer s.volsRWL.Unlock()
288
289 i, v := s.findVolNoLock("id", req.VolumeId)
290 if i < 0 {
291
292
293 return &csi.ControllerUnpublishVolumeResponse{}, nil
294 }
295
296
297
298
299 devPathKey := path.Join(nodeID, "dev")
300
301
302 if v.VolumeContext[devPathKey] == "" {
303 return &csi.ControllerUnpublishVolumeResponse{}, nil
304 }
305
306
307 delete(v.VolumeContext, devPathKey)
308 delete(v.VolumeContext, ReadOnlyKey)
309 s.vols[i] = v
310
311 if hookVal, hookMsg := s.execHook("ControllerUnpublishVolumeEnd"); hookVal != codes.OK {
312 return nil, status.Errorf(hookVal, hookMsg)
313 }
314
315 return &csi.ControllerUnpublishVolumeResponse{}, nil
316 }
317
318 func (s *service) ValidateVolumeCapabilities(
319 ctx context.Context,
320 req *csi.ValidateVolumeCapabilitiesRequest) (
321 *csi.ValidateVolumeCapabilitiesResponse, error) {
322
323 if len(req.GetVolumeId()) == 0 {
324 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
325 }
326 if len(req.VolumeCapabilities) == 0 {
327 return nil, status.Error(codes.InvalidArgument, req.VolumeId)
328 }
329 i, _ := s.findVolNoLock("id", req.VolumeId)
330 if i < 0 {
331 return nil, status.Error(codes.NotFound, req.VolumeId)
332 }
333
334 if hookVal, hookMsg := s.execHook("ValidateVolumeCapabilities"); hookVal != codes.OK {
335 return nil, status.Errorf(hookVal, hookMsg)
336 }
337
338 return &csi.ValidateVolumeCapabilitiesResponse{
339 Confirmed: &csi.ValidateVolumeCapabilitiesResponse_Confirmed{
340 VolumeContext: req.GetVolumeContext(),
341 VolumeCapabilities: req.GetVolumeCapabilities(),
342 Parameters: req.GetParameters(),
343 },
344 }, nil
345 }
346
347 func (s *service) ControllerGetVolume(
348 ctx context.Context,
349 req *csi.ControllerGetVolumeRequest) (
350 *csi.ControllerGetVolumeResponse, error) {
351
352 if hookVal, hookMsg := s.execHook("GetVolumeStart"); hookVal != codes.OK {
353 return nil, status.Errorf(hookVal, hookMsg)
354 }
355
356 resp := &csi.ControllerGetVolumeResponse{
357 Status: &csi.ControllerGetVolumeResponse_VolumeStatus{
358 VolumeCondition: &csi.VolumeCondition{},
359 },
360 }
361 i, v := s.findVolByID(ctx, req.VolumeId)
362 if i < 0 {
363 resp.Status.VolumeCondition.Abnormal = true
364 resp.Status.VolumeCondition.Message = "volume not found"
365 return resp, status.Error(codes.NotFound, req.VolumeId)
366 }
367
368 resp.Volume = &v
369 if !s.config.DisableAttach {
370 resp.Status.PublishedNodeIds = []string{
371 s.nodeID,
372 }
373 }
374
375 if hookVal, hookMsg := s.execHook("GetVolumeEnd"); hookVal != codes.OK {
376 return nil, status.Errorf(hookVal, hookMsg)
377 }
378
379 return resp, nil
380 }
381
382 func (s *service) ListVolumes(
383 ctx context.Context,
384 req *csi.ListVolumesRequest) (
385 *csi.ListVolumesResponse, error) {
386
387 if hookVal, hookMsg := s.execHook("ListVolumesStart"); hookVal != codes.OK {
388 return nil, status.Errorf(hookVal, hookMsg)
389 }
390
391
392
393
394 var vols []csi.Volume
395 func() {
396 s.volsRWL.RLock()
397 defer s.volsRWL.RUnlock()
398 vols = make([]csi.Volume, len(s.vols))
399 copy(vols, s.vols)
400 }()
401
402 var (
403 ulenVols = int32(len(vols))
404 maxEntries = req.MaxEntries
405 startingToken int32
406 )
407
408 if v := req.StartingToken; v != "" {
409 i, err := strconv.ParseUint(v, 10, 32)
410 if err != nil {
411 return nil, status.Errorf(
412 codes.Aborted,
413 "startingToken=%s: %v",
414 v, err)
415 }
416 startingToken = int32(i)
417 }
418
419 if startingToken > ulenVols {
420 return nil, status.Errorf(
421 codes.Aborted,
422 "startingToken=%d > len(vols)=%d",
423 startingToken, ulenVols)
424 }
425
426
427 rem := ulenVols - startingToken
428
429
430
431 if maxEntries == 0 || maxEntries > rem {
432 maxEntries = rem
433 }
434
435 var (
436 i int
437 j = startingToken
438 entries = make(
439 []*csi.ListVolumesResponse_Entry,
440 maxEntries)
441 )
442
443 for i = 0; i < len(entries); i++ {
444 volumeStatus := &csi.ListVolumesResponse_VolumeStatus{
445 VolumeCondition: &csi.VolumeCondition{},
446 }
447
448 if !s.config.DisableAttach {
449 volumeStatus.PublishedNodeIds = []string{
450 s.nodeID,
451 }
452 }
453
454 entries[i] = &csi.ListVolumesResponse_Entry{
455 Volume: &vols[j],
456 Status: volumeStatus,
457 }
458 j++
459 }
460
461 var nextToken string
462 if n := startingToken + int32(i); n < ulenVols {
463 nextToken = fmt.Sprintf("%d", n)
464 }
465
466 if hookVal, hookMsg := s.execHook("ListVolumesEnd"); hookVal != codes.OK {
467 return nil, status.Errorf(hookVal, hookMsg)
468 }
469
470 return &csi.ListVolumesResponse{
471 Entries: entries,
472 NextToken: nextToken,
473 }, nil
474 }
475
476 func (s *service) GetCapacity(
477 ctx context.Context,
478 req *csi.GetCapacityRequest) (
479 *csi.GetCapacityResponse, error) {
480
481 if hookVal, hookMsg := s.execHook("GetCapacity"); hookVal != codes.OK {
482 return nil, status.Errorf(hookVal, hookMsg)
483 }
484
485 return &csi.GetCapacityResponse{
486 AvailableCapacity: MaxStorageCapacity,
487 }, nil
488 }
489
490 func (s *service) ControllerGetCapabilities(
491 ctx context.Context,
492 req *csi.ControllerGetCapabilitiesRequest) (
493 *csi.ControllerGetCapabilitiesResponse, error) {
494
495 if hookVal, hookMsg := s.execHook("ControllerGetCapabilitiesStart"); hookVal != codes.OK {
496 return nil, status.Errorf(hookVal, hookMsg)
497 }
498
499 caps := []*csi.ControllerServiceCapability{
500 {
501 Type: &csi.ControllerServiceCapability_Rpc{
502 Rpc: &csi.ControllerServiceCapability_RPC{
503 Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
504 },
505 },
506 },
507 {
508 Type: &csi.ControllerServiceCapability_Rpc{
509 Rpc: &csi.ControllerServiceCapability_RPC{
510 Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES,
511 },
512 },
513 },
514 {
515 Type: &csi.ControllerServiceCapability_Rpc{
516 Rpc: &csi.ControllerServiceCapability_RPC{
517 Type: csi.ControllerServiceCapability_RPC_LIST_VOLUMES_PUBLISHED_NODES,
518 },
519 },
520 },
521 {
522 Type: &csi.ControllerServiceCapability_Rpc{
523 Rpc: &csi.ControllerServiceCapability_RPC{
524 Type: csi.ControllerServiceCapability_RPC_GET_CAPACITY,
525 },
526 },
527 },
528 {
529 Type: &csi.ControllerServiceCapability_Rpc{
530 Rpc: &csi.ControllerServiceCapability_RPC{
531 Type: csi.ControllerServiceCapability_RPC_LIST_SNAPSHOTS,
532 },
533 },
534 },
535 {
536 Type: &csi.ControllerServiceCapability_Rpc{
537 Rpc: &csi.ControllerServiceCapability_RPC{
538 Type: csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
539 },
540 },
541 },
542 {
543 Type: &csi.ControllerServiceCapability_Rpc{
544 Rpc: &csi.ControllerServiceCapability_RPC{
545 Type: csi.ControllerServiceCapability_RPC_PUBLISH_READONLY,
546 },
547 },
548 },
549 {
550 Type: &csi.ControllerServiceCapability_Rpc{
551 Rpc: &csi.ControllerServiceCapability_RPC{
552 Type: csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
553 },
554 },
555 },
556 {
557 Type: &csi.ControllerServiceCapability_Rpc{
558 Rpc: &csi.ControllerServiceCapability_RPC{
559 Type: csi.ControllerServiceCapability_RPC_GET_VOLUME,
560 },
561 },
562 },
563 {
564 Type: &csi.ControllerServiceCapability_Rpc{
565 Rpc: &csi.ControllerServiceCapability_RPC{
566 Type: csi.ControllerServiceCapability_RPC_VOLUME_CONDITION,
567 },
568 },
569 },
570 }
571
572 if !s.config.DisableAttach {
573 caps = append(caps, &csi.ControllerServiceCapability{
574 Type: &csi.ControllerServiceCapability_Rpc{
575 Rpc: &csi.ControllerServiceCapability_RPC{
576 Type: csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
577 },
578 },
579 })
580 }
581
582 if !s.config.DisableControllerExpansion {
583 caps = append(caps, &csi.ControllerServiceCapability{
584 Type: &csi.ControllerServiceCapability_Rpc{
585 Rpc: &csi.ControllerServiceCapability_RPC{
586 Type: csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
587 },
588 },
589 })
590 }
591
592 if hookVal, hookMsg := s.execHook("ControllerGetCapabilitiesEnd"); hookVal != codes.OK {
593 return nil, status.Errorf(hookVal, hookMsg)
594 }
595
596 return &csi.ControllerGetCapabilitiesResponse{
597 Capabilities: caps,
598 }, nil
599 }
600
601 func (s *service) CreateSnapshot(ctx context.Context,
602 req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
603
604 if len(req.GetName()) == 0 {
605 return nil, status.Error(codes.InvalidArgument, "Snapshot Name cannot be empty")
606 }
607 if len(req.GetSourceVolumeId()) == 0 {
608 return nil, status.Error(codes.InvalidArgument, "Snapshot SourceVolumeId cannot be empty")
609 }
610
611
612 if i, v := s.snapshots.FindSnapshot("name", req.GetName()); i >= 0 {
613
614 if v.SnapshotCSI.GetSourceVolumeId() != req.GetSourceVolumeId() || !reflect.DeepEqual(v.Parameters, req.GetParameters()) {
615 return nil, status.Error(codes.AlreadyExists,
616 fmt.Sprintf("Snapshot with name %s already exists", req.GetName()))
617 }
618 return &csi.CreateSnapshotResponse{Snapshot: &v.SnapshotCSI}, nil
619 }
620
621
622 snapshot := s.newSnapshot(req.GetName(), req.GetSourceVolumeId(), req.GetParameters())
623 s.snapshots.Add(snapshot)
624
625 if hookVal, hookMsg := s.execHook("CreateSnapshotEnd"); hookVal != codes.OK {
626 return nil, status.Errorf(hookVal, hookMsg)
627 }
628
629 return &csi.CreateSnapshotResponse{Snapshot: &snapshot.SnapshotCSI}, nil
630 }
631
632 func (s *service) DeleteSnapshot(ctx context.Context,
633 req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
634
635
636 if len(req.SnapshotId) == 0 {
637 return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
638 }
639
640 if hookVal, hookMsg := s.execHook("DeleteSnapshotStart"); hookVal != codes.OK {
641 return nil, status.Errorf(hookVal, hookMsg)
642 }
643
644
645 i, _ := s.snapshots.FindSnapshot("id", req.SnapshotId)
646 if i < 0 {
647 return &csi.DeleteSnapshotResponse{}, nil
648 }
649
650
651
652
653 s.snapshots.Delete(i)
654 klog.V(5).InfoS("mock delete snapshot", "snapshotId", req.SnapshotId)
655
656 if hookVal, hookMsg := s.execHook("DeleteSnapshotEnd"); hookVal != codes.OK {
657 return nil, status.Errorf(hookVal, hookMsg)
658 }
659
660 return &csi.DeleteSnapshotResponse{}, nil
661 }
662
663 func (s *service) ListSnapshots(ctx context.Context,
664 req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
665
666 if hookVal, hookMsg := s.execHook("ListSnapshots"); hookVal != codes.OK {
667 return nil, status.Errorf(hookVal, hookMsg)
668 }
669
670
671 if len(req.GetSnapshotId()) != 0 {
672 return getSnapshotById(s, req)
673 }
674
675
676 if len(req.GetSourceVolumeId()) != 0 {
677 return getSnapshotByVolumeId(s, req)
678 }
679
680
681 return getAllSnapshots(s, req)
682 }
683
684 func (s *service) ControllerExpandVolume(
685 ctx context.Context,
686 req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
687 if len(req.VolumeId) == 0 {
688 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
689 }
690
691 if req.CapacityRange == nil {
692 return nil, status.Error(codes.InvalidArgument, "Request capacity cannot be empty")
693 }
694
695 if hookVal, hookMsg := s.execHook("ControllerExpandVolumeStart"); hookVal != codes.OK {
696 return nil, status.Errorf(hookVal, hookMsg)
697 }
698
699 s.volsRWL.Lock()
700 defer s.volsRWL.Unlock()
701
702 i, v := s.findVolNoLock("id", req.VolumeId)
703 if i < 0 {
704 return nil, status.Error(codes.NotFound, req.VolumeId)
705 }
706
707 if s.config.DisableOnlineExpansion && MockVolumes[v.GetVolumeId()].ISControllerPublished {
708 return nil, status.Error(codes.FailedPrecondition, "volume is published and online volume expansion is not supported")
709 }
710
711 requestBytes := req.CapacityRange.RequiredBytes
712
713 if v.CapacityBytes > requestBytes {
714 return nil, status.Error(codes.InvalidArgument, "cannot change volume capacity to a smaller size")
715 }
716
717 resp := &csi.ControllerExpandVolumeResponse{
718 CapacityBytes: requestBytes,
719 NodeExpansionRequired: s.config.NodeExpansionRequired,
720 }
721
722
723 if v.CapacityBytes == requestBytes {
724 klog.V(5).InfoS("volume capacity sufficient, no need to expand", "requested", requestBytes, "current", v.CapacityBytes, "volumeID", v.VolumeId)
725 return resp, nil
726 }
727
728
729 v.CapacityBytes = requestBytes
730 s.vols[i] = v
731
732 if hookVal, hookMsg := s.execHook("ControllerExpandVolumeEnd"); hookVal != codes.OK {
733 return nil, status.Errorf(hookVal, hookMsg)
734 }
735
736 return resp, nil
737 }
738
739 func getSnapshotById(s *service, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
740 if len(req.GetSnapshotId()) != 0 {
741 i, snapshot := s.snapshots.FindSnapshot("id", req.GetSnapshotId())
742 if i < 0 {
743 return &csi.ListSnapshotsResponse{}, nil
744 }
745
746 if len(req.GetSourceVolumeId()) != 0 {
747 if snapshot.SnapshotCSI.GetSourceVolumeId() != req.GetSourceVolumeId() {
748 return &csi.ListSnapshotsResponse{}, nil
749 }
750 }
751
752 return &csi.ListSnapshotsResponse{
753 Entries: []*csi.ListSnapshotsResponse_Entry{
754 {
755 Snapshot: &snapshot.SnapshotCSI,
756 },
757 },
758 }, nil
759 }
760 return nil, nil
761 }
762
763 func getSnapshotByVolumeId(s *service, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
764 if len(req.GetSourceVolumeId()) != 0 {
765 i, snapshot := s.snapshots.FindSnapshot("sourceVolumeId", req.SourceVolumeId)
766 if i < 0 {
767 return &csi.ListSnapshotsResponse{}, nil
768 }
769 return &csi.ListSnapshotsResponse{
770 Entries: []*csi.ListSnapshotsResponse_Entry{
771 {
772 Snapshot: &snapshot.SnapshotCSI,
773 },
774 },
775 }, nil
776 }
777 return nil, nil
778 }
779
780 func getAllSnapshots(s *service, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
781
782
783
784 readyToUse := true
785 snapshots := s.snapshots.List(readyToUse)
786
787 var (
788 ulenSnapshots = int32(len(snapshots))
789 maxEntries = req.MaxEntries
790 startingToken int32
791 )
792
793 if v := req.StartingToken; v != "" {
794 i, err := strconv.ParseUint(v, 10, 32)
795 if err != nil {
796 return nil, status.Errorf(
797 codes.Aborted,
798 "startingToken=%s: %v",
799 v, err)
800 }
801 startingToken = int32(i)
802 }
803
804 if startingToken > ulenSnapshots {
805 return nil, status.Errorf(
806 codes.Aborted,
807 "startingToken=%d > len(snapshots)=%d",
808 startingToken, ulenSnapshots)
809 }
810
811
812 rem := ulenSnapshots - startingToken
813
814
815
816 if maxEntries == 0 || maxEntries > rem {
817 maxEntries = rem
818 }
819
820 var (
821 i int
822 j = startingToken
823 entries = make(
824 []*csi.ListSnapshotsResponse_Entry,
825 maxEntries)
826 )
827
828 for i = 0; i < len(entries); i++ {
829 entries[i] = &csi.ListSnapshotsResponse_Entry{
830 Snapshot: &snapshots[j],
831 }
832 j++
833 }
834
835 var nextToken string
836 if n := startingToken + int32(i); n < ulenSnapshots {
837 nextToken = fmt.Sprintf("%d", n)
838 }
839
840 return &csi.ListSnapshotsResponse{
841 Entries: entries,
842 NextToken: nextToken,
843 }, nil
844 }
845
View as plain text