1
16
17 package service
18
19 import (
20 "fmt"
21 "path"
22 "strconv"
23
24 "google.golang.org/grpc/codes"
25 "google.golang.org/grpc/status"
26
27 "golang.org/x/net/context"
28
29 "github.com/container-storage-interface/spec/lib/go/csi"
30 )
31
32 func (s *service) NodeStageVolume(
33 ctx context.Context,
34 req *csi.NodeStageVolumeRequest) (
35 *csi.NodeStageVolumeResponse, error) {
36
37 device, ok := req.PublishContext["device"]
38 if !ok {
39 if s.config.DisableAttach {
40 device = "mock device"
41 } else {
42 return nil, status.Error(
43 codes.InvalidArgument,
44 "stage volume info 'device' key required")
45 }
46 }
47
48 if len(req.GetVolumeId()) == 0 {
49 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
50 }
51
52 if len(req.GetStagingTargetPath()) == 0 {
53 return nil, status.Error(codes.InvalidArgument, "Staging Target Path cannot be empty")
54 }
55
56 if req.GetVolumeCapability() == nil {
57 return nil, status.Error(codes.InvalidArgument, "Volume Capability cannot be empty")
58 }
59
60 exists, err := s.config.IO.DirExists(req.StagingTargetPath)
61 if err != nil {
62 return nil, status.Error(codes.Internal, err.Error())
63 }
64 if !exists {
65 status.Errorf(codes.Internal, "staging target path %s does not exist", req.StagingTargetPath)
66 }
67
68 s.volsRWL.Lock()
69 defer s.volsRWL.Unlock()
70
71 i, v := s.findVolNoLock("id", req.VolumeId)
72 if i < 0 {
73 return nil, status.Error(codes.NotFound, req.VolumeId)
74 }
75
76
77
78 nodeStgPathKey := path.Join(s.nodeID, req.StagingTargetPath)
79
80
81 if v.VolumeContext[nodeStgPathKey] != "" {
82
83
84 return &csi.NodeStageVolumeResponse{}, nil
85 }
86
87
88 v.VolumeContext[nodeStgPathKey] = device
89 s.vols[i] = v
90
91 if hookVal, hookMsg := s.execHook("NodeStageVolumeEnd"); hookVal != codes.OK {
92 return nil, status.Errorf(hookVal, hookMsg)
93 }
94
95 return &csi.NodeStageVolumeResponse{}, nil
96 }
97
98 func (s *service) NodeUnstageVolume(
99 ctx context.Context,
100 req *csi.NodeUnstageVolumeRequest) (
101 *csi.NodeUnstageVolumeResponse, error) {
102
103 if len(req.GetVolumeId()) == 0 {
104 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
105 }
106
107 if len(req.GetStagingTargetPath()) == 0 {
108 return nil, status.Error(codes.InvalidArgument, "Staging Target Path cannot be empty")
109 }
110
111 s.volsRWL.Lock()
112 defer s.volsRWL.Unlock()
113
114 i, v := s.findVolNoLock("id", req.VolumeId)
115 if i < 0 {
116 return nil, status.Error(codes.NotFound, req.VolumeId)
117 }
118
119
120
121 nodeStgPathKey := path.Join(s.nodeID, req.StagingTargetPath)
122
123
124 if v.VolumeContext[nodeStgPathKey] == "" {
125 return &csi.NodeUnstageVolumeResponse{}, nil
126 }
127
128
129 delete(v.VolumeContext, nodeStgPathKey)
130 s.vols[i] = v
131
132 if hookVal, hookMsg := s.execHook("NodeUnstageVolumeEnd"); hookVal != codes.OK {
133 return nil, status.Errorf(hookVal, hookMsg)
134 }
135 return &csi.NodeUnstageVolumeResponse{}, nil
136 }
137
138 func (s *service) NodePublishVolume(
139 ctx context.Context,
140 req *csi.NodePublishVolumeRequest) (
141 *csi.NodePublishVolumeResponse, error) {
142
143 if hookVal, hookMsg := s.execHook("NodePublishVolumeStart"); hookVal != codes.OK {
144 return nil, status.Errorf(hookVal, hookMsg)
145 }
146 ephemeralVolume := req.GetVolumeContext()["csi.storage.k8s.io/ephemeral"] == "true"
147 device, ok := req.PublishContext["device"]
148 if !ok {
149 if ephemeralVolume || s.config.DisableAttach {
150 device = "mock device"
151 } else {
152 return nil, status.Error(
153 codes.InvalidArgument,
154 "stage volume info 'device' key required")
155 }
156 }
157
158 if len(req.GetVolumeId()) == 0 {
159 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
160 }
161
162 if len(req.GetTargetPath()) == 0 {
163 return nil, status.Error(codes.InvalidArgument, "Target Path cannot be empty")
164 }
165
166 if req.GetVolumeCapability() == nil {
167 return nil, status.Error(codes.InvalidArgument, "Volume Capability cannot be empty")
168 }
169
170
171
172 exists, err := s.config.IO.DirExists(req.TargetPath)
173 if err != nil {
174 return nil, status.Error(codes.Internal, err.Error())
175 }
176 if !s.config.PermissiveTargetPath && exists {
177 status.Errorf(codes.Internal, "target path %s does exist", req.TargetPath)
178 }
179
180 s.volsRWL.Lock()
181 defer s.volsRWL.Unlock()
182
183 i, v := s.findVolNoLock("id", req.VolumeId)
184 if i < 0 && !ephemeralVolume {
185 return nil, status.Error(codes.NotFound, req.VolumeId)
186 }
187 if i >= 0 && ephemeralVolume {
188 return nil, status.Error(codes.AlreadyExists, req.VolumeId)
189 }
190
191
192
193 nodeMntPathKey := path.Join(s.nodeID, req.TargetPath)
194
195
196 if v.VolumeContext[nodeMntPathKey] != "" {
197
198
199
200 if req.Readonly {
201 return nil, status.Error(codes.AlreadyExists, req.VolumeId)
202 }
203
204 return &csi.NodePublishVolumeResponse{}, nil
205 }
206
207
208 if ephemeralVolume {
209 MockVolumes[req.VolumeId] = Volume{
210 ISEphemeral: true,
211 }
212 } else {
213 if req.GetTargetPath() != "" {
214 exists, err := s.config.IO.DirExists(req.GetTargetPath())
215 if err != nil {
216 return nil, status.Error(codes.Internal, err.Error())
217 }
218 if !exists {
219
220 if err = s.config.IO.Mkdir(req.TargetPath); err != nil {
221 msg := fmt.Sprintf("NodePublishVolume: could not create target dir %q: %v", req.TargetPath, err)
222 return nil, status.Error(codes.Internal, msg)
223 }
224 }
225 v.VolumeContext[nodeMntPathKey] = req.GetTargetPath()
226 } else {
227 v.VolumeContext[nodeMntPathKey] = device
228 }
229 s.vols[i] = v
230 }
231 if hookVal, hookMsg := s.execHook("NodePublishVolumeEnd"); hookVal != codes.OK {
232 return nil, status.Errorf(hookVal, hookMsg)
233 }
234
235 return &csi.NodePublishVolumeResponse{}, nil
236 }
237
238 func (s *service) NodeUnpublishVolume(
239 ctx context.Context,
240 req *csi.NodeUnpublishVolumeRequest) (
241 *csi.NodeUnpublishVolumeResponse, error) {
242
243 if len(req.GetVolumeId()) == 0 {
244 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
245 }
246 if len(req.GetTargetPath()) == 0 {
247 return nil, status.Error(codes.InvalidArgument, "Target Path cannot be empty")
248 }
249 if hookVal, hookMsg := s.execHook("NodeUnpublishVolumeStart"); hookVal != codes.OK {
250 return nil, status.Errorf(hookVal, hookMsg)
251 }
252
253 s.volsRWL.Lock()
254 defer s.volsRWL.Unlock()
255
256 ephemeralVolume := MockVolumes[req.VolumeId].ISEphemeral
257 i, v := s.findVolNoLock("id", req.VolumeId)
258 if i < 0 && !ephemeralVolume {
259 return nil, status.Error(codes.NotFound, req.VolumeId)
260 }
261
262 if ephemeralVolume {
263 delete(MockVolumes, req.VolumeId)
264 } else {
265
266
267 nodeMntPathKey := path.Join(s.nodeID, req.TargetPath)
268
269
270 if v.VolumeContext[nodeMntPathKey] == "" {
271 return &csi.NodeUnpublishVolumeResponse{}, nil
272 }
273
274
275 err := s.config.IO.RemoveAll(v.VolumeContext[nodeMntPathKey])
276 if err != nil {
277 return nil, status.Errorf(codes.Internal, "Unable to delete previously created target directory")
278 }
279
280
281 delete(v.VolumeContext, nodeMntPathKey)
282 s.vols[i] = v
283 }
284 if hookVal, hookMsg := s.execHook("NodeUnpublishVolumeEnd"); hookVal != codes.OK {
285 return nil, status.Errorf(hookVal, hookMsg)
286 }
287
288 return &csi.NodeUnpublishVolumeResponse{}, nil
289 }
290
291 func (s *service) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
292 if len(req.GetVolumeId()) == 0 {
293 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
294 }
295 if len(req.GetVolumePath()) == 0 {
296 return nil, status.Error(codes.InvalidArgument, "Volume Path cannot be empty")
297 }
298 if hookVal, hookMsg := s.execHook("NodeExpandVolumeStart"); hookVal != codes.OK {
299 return nil, status.Errorf(hookVal, hookMsg)
300 }
301
302 s.volsRWL.Lock()
303 defer s.volsRWL.Unlock()
304
305 i, v := s.findVolNoLock("id", req.VolumeId)
306 if i < 0 {
307 return nil, status.Error(codes.NotFound, req.VolumeId)
308 }
309
310
311 resp := &csi.NodeExpandVolumeResponse{}
312 var requestCapacity int64 = 0
313 if req.GetCapacityRange() != nil {
314 requestCapacity = req.CapacityRange.GetRequiredBytes()
315 resp.CapacityBytes = requestCapacity
316 }
317
318
319 fsCapacityKey := path.Join(s.nodeID, req.GetVolumePath(), "size")
320
321 if requestCapacity > 0 {
322 v.VolumeContext[fsCapacityKey] = strconv.FormatInt(requestCapacity, 10)
323 s.vols[i] = v
324 }
325 if hookVal, hookMsg := s.execHook("NodeExpandVolumeEnd"); hookVal != codes.OK {
326 return nil, status.Errorf(hookVal, hookMsg)
327 }
328
329 return resp, nil
330 }
331
332 func (s *service) NodeGetCapabilities(
333 ctx context.Context,
334 req *csi.NodeGetCapabilitiesRequest) (
335 *csi.NodeGetCapabilitiesResponse, error) {
336
337 if hookVal, hookMsg := s.execHook("NodeGetCapabilities"); hookVal != codes.OK {
338 return nil, status.Errorf(hookVal, hookMsg)
339 }
340 capabilities := []*csi.NodeServiceCapability{
341 {
342 Type: &csi.NodeServiceCapability_Rpc{
343 Rpc: &csi.NodeServiceCapability_RPC{
344 Type: csi.NodeServiceCapability_RPC_UNKNOWN,
345 },
346 },
347 },
348 {
349 Type: &csi.NodeServiceCapability_Rpc{
350 Rpc: &csi.NodeServiceCapability_RPC{
351 Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
352 },
353 },
354 },
355 {
356 Type: &csi.NodeServiceCapability_Rpc{
357 Rpc: &csi.NodeServiceCapability_RPC{
358 Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
359 },
360 },
361 },
362 {
363 Type: &csi.NodeServiceCapability_Rpc{
364 Rpc: &csi.NodeServiceCapability_RPC{
365 Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
366 },
367 },
368 },
369 }
370 if s.config.NodeExpansionRequired {
371 capabilities = append(capabilities, &csi.NodeServiceCapability{
372 Type: &csi.NodeServiceCapability_Rpc{
373 Rpc: &csi.NodeServiceCapability_RPC{
374 Type: csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
375 },
376 },
377 })
378 }
379
380 if s.config.VolumeMountGroupRequired {
381 capabilities = append(capabilities, &csi.NodeServiceCapability{
382 Type: &csi.NodeServiceCapability_Rpc{
383 Rpc: &csi.NodeServiceCapability_RPC{
384 Type: csi.NodeServiceCapability_RPC_VOLUME_MOUNT_GROUP,
385 },
386 },
387 })
388 }
389
390 return &csi.NodeGetCapabilitiesResponse{
391 Capabilities: capabilities,
392 }, nil
393 }
394
395 func (s *service) NodeGetInfo(ctx context.Context,
396 req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
397 if hookVal, hookMsg := s.execHook("NodeGetInfo"); hookVal != codes.OK {
398 return nil, status.Errorf(hookVal, hookMsg)
399 }
400 csiNodeResponse := &csi.NodeGetInfoResponse{
401 NodeId: s.nodeID,
402 }
403 if s.config.AttachLimit > 0 {
404 csiNodeResponse.MaxVolumesPerNode = s.config.AttachLimit
405 }
406 if s.config.EnableTopology {
407 csiNodeResponse.AccessibleTopology = &csi.Topology{
408 Segments: map[string]string{
409 TopologyKey: TopologyValue,
410 },
411 }
412 }
413 return csiNodeResponse, nil
414 }
415
416 func (s *service) NodeGetVolumeStats(ctx context.Context,
417 req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
418
419 resp := &csi.NodeGetVolumeStatsResponse{
420 VolumeCondition: &csi.VolumeCondition{},
421 }
422
423 if len(req.GetVolumeId()) == 0 {
424 return nil, status.Error(codes.InvalidArgument, "Volume ID cannot be empty")
425 }
426
427 if len(req.GetVolumePath()) == 0 {
428 return nil, status.Error(codes.InvalidArgument, "Volume Path cannot be empty")
429 }
430
431 i, v := s.findVolNoLock("id", req.VolumeId)
432 if i < 0 {
433 resp.VolumeCondition.Abnormal = true
434 resp.VolumeCondition.Message = "Volume not found"
435 return resp, status.Error(codes.NotFound, req.VolumeId)
436 }
437
438 nodeMntPathKey := path.Join(s.nodeID, req.VolumePath)
439
440 _, exists := v.VolumeContext[nodeMntPathKey]
441 if !exists {
442 msg := fmt.Sprintf("volume %q doest not exist on the specified path %q", req.VolumeId, req.VolumePath)
443 resp.VolumeCondition.Abnormal = true
444 resp.VolumeCondition.Message = msg
445 return resp, status.Errorf(codes.NotFound, msg)
446 }
447
448 if hookVal, hookMsg := s.execHook("NodeGetVolumeStatsEnd"); hookVal != codes.OK {
449 return nil, status.Errorf(hookVal, hookMsg)
450 }
451
452 resp.Usage = []*csi.VolumeUsage{
453 {
454 Total: v.GetCapacityBytes(),
455 Unit: csi.VolumeUsage_BYTES,
456 },
457 }
458
459 return resp, nil
460 }
461
View as plain text