1
16
17
65
66 package csi
67
68 import (
69 "context"
70 "errors"
71 "fmt"
72 "os"
73 "path/filepath"
74
75 v1 "k8s.io/api/core/v1"
76 storage "k8s.io/api/storage/v1"
77 meta "k8s.io/apimachinery/pkg/apis/meta/v1"
78 "k8s.io/apimachinery/pkg/types"
79 "k8s.io/client-go/kubernetes"
80 "k8s.io/klog/v2"
81 "k8s.io/kubernetes/pkg/util/removeall"
82 "k8s.io/kubernetes/pkg/volume"
83 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
84 utilstrings "k8s.io/utils/strings"
85 )
86
87 type csiBlockMapper struct {
88 csiClientGetter
89 k8s kubernetes.Interface
90 plugin *csiPlugin
91 driverName csiDriverName
92 specName string
93 volumeID string
94 readOnly bool
95 spec *volume.Spec
96 pod *v1.Pod
97 podUID types.UID
98 volume.MetricsProvider
99 }
100
101 var _ volume.BlockVolumeMapper = &csiBlockMapper{}
102 var _ volume.CustomBlockVolumeMapper = &csiBlockMapper{}
103
104
105
106 func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
107 dir := getVolumeDevicePluginDir(m.specName, m.plugin.host)
108 klog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
109 return dir, nil
110 }
111
112
113
114 func (m *csiBlockMapper) GetStagingPath() string {
115 return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName)
116 }
117
118
119
120 func (m *csiBlockMapper) SupportsMetrics() bool {
121 return true
122 }
123
124
125
126 func (m *csiBlockMapper) getPublishDir() string {
127 return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName)
128 }
129
130
131
132 func (m *csiBlockMapper) getPublishPath() string {
133 return filepath.Join(m.getPublishDir(), string(m.podUID))
134 }
135
136
137
138 func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
139 path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName))
140 klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, m.specName))
141 return path, m.specName
142 }
143
144
145 func (m *csiBlockMapper) stageVolumeForBlock(
146 ctx context.Context,
147 csi csiClient,
148 accessMode v1.PersistentVolumeAccessMode,
149 csiSource *v1.CSIPersistentVolumeSource,
150 attachment *storage.VolumeAttachment,
151 ) (string, error) {
152 klog.V(4).Infof(log("blockMapper.stageVolumeForBlock called"))
153
154 stagingPath := m.GetStagingPath()
155 klog.V(4).Infof(log("blockMapper.stageVolumeForBlock stagingPath set [%s]", stagingPath))
156
157
158 stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
159 if err != nil {
160 return "", errors.New(log("blockMapper.stageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
161 }
162 if !stageUnstageSet {
163 klog.Infof(log("blockMapper.stageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
164 return "", nil
165 }
166 publishVolumeInfo := map[string]string{}
167 if attachment != nil {
168 publishVolumeInfo = attachment.Status.AttachmentMetadata
169 }
170 nodeStageSecrets := map[string]string{}
171 if csiSource.NodeStageSecretRef != nil {
172 nodeStageSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodeStageSecretRef)
173 if err != nil {
174 return "", fmt.Errorf("failed to get NodeStageSecretRef %s/%s: %v",
175 csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
176 }
177 }
178
179
180 if err := os.MkdirAll(stagingPath, 0750); err != nil {
181 return "", errors.New(log("blockMapper.stageVolumeForBlock failed to create dir %s: %v", stagingPath, err))
182 }
183 klog.V(4).Info(log("blockMapper.stageVolumeForBlock created stagingPath directory successfully [%s]", stagingPath))
184
185
186
187
188 err = csi.NodeStageVolume(ctx,
189 csiSource.VolumeHandle,
190 publishVolumeInfo,
191 stagingPath,
192 fsTypeBlockName,
193 accessMode,
194 nodeStageSecrets,
195 csiSource.VolumeAttributes,
196 nil,
197 nil )
198
199 if err != nil {
200 return "", err
201 }
202
203 klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath))
204 return stagingPath, nil
205 }
206
207
208 func (m *csiBlockMapper) publishVolumeForBlock(
209 ctx context.Context,
210 csi csiClient,
211 accessMode v1.PersistentVolumeAccessMode,
212 csiSource *v1.CSIPersistentVolumeSource,
213 attachment *storage.VolumeAttachment,
214 ) (string, error) {
215 klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called"))
216
217 publishVolumeInfo := map[string]string{}
218 if attachment != nil {
219 publishVolumeInfo = attachment.Status.AttachmentMetadata
220 }
221
222
223 volAttribs := csiSource.VolumeAttributes
224 podInfoEnabled, err := m.plugin.podInfoEnabled(string(m.driverName))
225 if err != nil {
226 return "", errors.New(log("blockMapper.publishVolumeForBlock failed to assemble volume attributes: %v", err))
227 }
228 volumeLifecycleMode, err := m.plugin.getVolumeLifecycleMode(m.spec)
229 if err != nil {
230 return "", errors.New(log("blockMapper.publishVolumeForBlock failed to get VolumeLifecycleMode: %v", err))
231 }
232 if podInfoEnabled {
233 volAttribs = mergeMap(volAttribs, getPodInfoAttrs(m.pod, volumeLifecycleMode))
234 }
235
236 nodePublishSecrets := map[string]string{}
237 if csiSource.NodePublishSecretRef != nil {
238 nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef)
239 if err != nil {
240 return "", errors.New(log("blockMapper.publishVolumeForBlock failed to get NodePublishSecretRef %s/%s: %v",
241 csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err))
242 }
243 }
244
245 publishPath := m.getPublishPath()
246
247 publishDir := filepath.Dir(publishPath)
248 if err := os.MkdirAll(publishDir, 0750); err != nil {
249 return "", errors.New(log("blockMapper.publishVolumeForBlock failed to create dir %s: %v", publishDir, err))
250 }
251 klog.V(4).Info(log("blockMapper.publishVolumeForBlock created directory for publishPath successfully [%s]", publishDir))
252
253
254
255
256
257
258 err = csi.NodePublishVolume(
259 ctx,
260 m.volumeID,
261 m.readOnly,
262 m.GetStagingPath(),
263 publishPath,
264 accessMode,
265 publishVolumeInfo,
266 volAttribs,
267 nodePublishSecrets,
268 fsTypeBlockName,
269 []string{},
270 nil,
271 )
272
273 if err != nil {
274 return "", err
275 }
276
277 return publishPath, nil
278 }
279
280
281 func (m *csiBlockMapper) SetUpDevice() (string, error) {
282 klog.V(4).Infof(log("blockMapper.SetUpDevice called"))
283
284
285 if m.spec == nil {
286 return "", errors.New(log("blockMapper.SetUpDevice spec is nil"))
287 }
288
289 csiSource, err := getCSISourceFromSpec(m.spec)
290 if err != nil {
291 return "", errors.New(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err))
292 }
293
294 driverName := csiSource.Driver
295 skip, err := m.plugin.skipAttach(driverName)
296 if err != nil {
297 return "", errors.New(log("blockMapper.SetupDevice failed to check CSIDriver for %s: %v", driverName, err))
298 }
299
300 var attachment *storage.VolumeAttachment
301 if !skip {
302
303 nodeName := string(m.plugin.host.GetNodeName())
304 attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
305 attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
306 if err != nil {
307 return "", errors.New(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
308 }
309 }
310
311
312 accessMode := v1.ReadWriteOnce
313 if m.spec.PersistentVolume.Spec.AccessModes != nil {
314 accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
315 }
316
317 ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
318 defer cancel()
319
320 csiClient, err := m.csiClientGetter.Get()
321 if err != nil {
322
323
324 return "", volumetypes.NewTransientOperationFailure(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
325 }
326
327
328 stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
329 if err != nil {
330 if volumetypes.IsOperationFinishedError(err) {
331 cleanupErr := m.cleanupOrphanDeviceFiles()
332 if cleanupErr != nil {
333
334 klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr)
335 }
336 }
337 return "", err
338 }
339
340 return stagingPath, nil
341 }
342
343 func (m *csiBlockMapper) MapPodDevice() (string, error) {
344 klog.V(4).Infof(log("blockMapper.MapPodDevice called"))
345
346
347 if m.spec == nil {
348 return "", errors.New(log("blockMapper.MapPodDevice spec is nil"))
349 }
350
351 csiSource, err := getCSISourceFromSpec(m.spec)
352 if err != nil {
353 return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI persistent source: %v", err))
354 }
355
356 driverName := csiSource.Driver
357 skip, err := m.plugin.skipAttach(driverName)
358 if err != nil {
359 return "", errors.New(log("blockMapper.MapPodDevice failed to check CSIDriver for %s: %v", driverName, err))
360 }
361
362 var attachment *storage.VolumeAttachment
363 if !skip {
364
365 nodeName := string(m.plugin.host.GetNodeName())
366 attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
367 attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
368 if err != nil {
369 return "", errors.New(log("blockMapper.MapPodDevice failed to get volume attachment [id=%v]: %v", attachID, err))
370 }
371 }
372
373
374 accessMode := v1.ReadWriteOnce
375 if m.spec.PersistentVolume.Spec.AccessModes != nil {
376 accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
377 }
378
379 ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
380 defer cancel()
381
382 csiClient, err := m.csiClientGetter.Get()
383 if err != nil {
384
385
386 return "", volumetypes.NewTransientOperationFailure(log("blockMapper.MapPodDevice failed to get CSI client: %v", err))
387 }
388
389
390 publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
391 if err != nil {
392 return "", err
393 }
394
395 return publishPath, nil
396 }
397
398 var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
399 var _ volume.CustomBlockVolumeUnmapper = &csiBlockMapper{}
400
401
402 func (m *csiBlockMapper) unpublishVolumeForBlock(ctx context.Context, csi csiClient, publishPath string) error {
403
404
405
406
407
408 if err := csi.NodeUnpublishVolume(ctx, m.volumeID, publishPath); err != nil {
409 return errors.New(log("blockMapper.unpublishVolumeForBlock failed: %v", err))
410 }
411 klog.V(4).Infof(log("blockMapper.unpublishVolumeForBlock NodeUnpublished successfully [%s]", publishPath))
412
413 return nil
414 }
415
416
417 func (m *csiBlockMapper) unstageVolumeForBlock(ctx context.Context, csi csiClient, stagingPath string) error {
418
419 stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
420 if err != nil {
421 return errors.New(log("blockMapper.unstageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
422 }
423 if !stageUnstageSet {
424 klog.Infof(log("blockMapper.unstageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping unstageVolumeForBlock ..."))
425 return nil
426 }
427
428
429
430
431 if err := csi.NodeUnstageVolume(ctx, m.volumeID, stagingPath); err != nil {
432 return errors.New(log("blockMapper.unstageVolumeForBlock failed: %v", err))
433 }
434 klog.V(4).Infof(log("blockMapper.unstageVolumeForBlock NodeUnstageVolume successfully [%s]", stagingPath))
435
436
437 if err := os.RemoveAll(stagingPath); err != nil {
438 return errors.New(log("blockMapper.unstageVolumeForBlock failed to remove staging path after NodeUnstageVolume() error [%s]: %v", stagingPath, err))
439 }
440
441 return nil
442 }
443
444
445 func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error {
446 ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
447 defer cancel()
448
449 csiClient, err := m.csiClientGetter.Get()
450 if err != nil {
451
452
453 return volumetypes.NewTransientOperationFailure(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
454 }
455
456
457 stagingPath := m.GetStagingPath()
458 if _, err := os.Stat(stagingPath); err != nil {
459 if os.IsNotExist(err) {
460 klog.V(4).Infof(log("blockMapper.TearDownDevice stagingPath(%s) has already been deleted, skip calling NodeUnstageVolume", stagingPath))
461 } else {
462 return err
463 }
464 } else {
465 err := m.unstageVolumeForBlock(ctx, csiClient, stagingPath)
466 if err != nil {
467 return err
468 }
469 }
470 if err = m.cleanupOrphanDeviceFiles(); err != nil {
471
472 klog.V(4).Infof("Failed to clean up block volume directory %s", err)
473 }
474
475 return nil
476 }
477
478
479
480
481 func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error {
482
483
484
485
486 publishDir := m.getPublishDir()
487 if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) {
488 return errors.New(log("failed to remove publish directory [%s]: %v", publishDir, err))
489 }
490
491
492
493 stagingPath := m.GetStagingPath()
494 if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) {
495 return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err))
496 }
497
498
499
500 volumeDir := getVolumePluginDir(m.specName, m.plugin.host)
501 mounter := m.plugin.host.GetMounter(m.plugin.GetPluginName())
502 if err := removeall.RemoveAllOneFilesystem(mounter, volumeDir); err != nil {
503 return err
504 }
505
506 return nil
507 }
508
509
510 func (m *csiBlockMapper) UnmapPodDevice() error {
511 publishPath := m.getPublishPath()
512
513 csiClient, err := m.csiClientGetter.Get()
514 if err != nil {
515
516
517 return volumetypes.NewTransientOperationFailure(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err))
518 }
519
520 ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
521 defer cancel()
522
523
524
525
526 return m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
527 }
528
View as plain text