1
16
17 package csi
18
19 import (
20 "crypto/sha256"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "os"
25 "path/filepath"
26
27 authenticationv1 "k8s.io/api/authentication/v1"
28 api "k8s.io/api/core/v1"
29 storage "k8s.io/api/storage/v1"
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 "k8s.io/apimachinery/pkg/types"
32 utilfeature "k8s.io/apiserver/pkg/util/feature"
33 "k8s.io/client-go/kubernetes"
34 "k8s.io/klog/v2"
35 "k8s.io/kubernetes/pkg/features"
36 "k8s.io/kubernetes/pkg/volume"
37 "k8s.io/kubernetes/pkg/volume/util"
38 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
39 "k8s.io/mount-utils"
40 utilstrings "k8s.io/utils/strings"
41 )
42
43
44 var (
45 volDataKey = struct {
46 specVolID,
47 volHandle,
48 driverName,
49 nodeName,
50 attachmentID,
51 volumeLifecycleMode,
52 seLinuxMountContext string
53 }{
54 "specVolID",
55 "volumeHandle",
56 "driverName",
57 "nodeName",
58 "attachmentID",
59 "volumeLifecycleMode",
60 "seLinuxMountContext",
61 }
62 )
63
64 type csiMountMgr struct {
65 csiClientGetter
66 k8s kubernetes.Interface
67 plugin *csiPlugin
68 driverName csiDriverName
69 volumeLifecycleMode storage.VolumeLifecycleMode
70 volumeID string
71 specVolumeID string
72 readOnly bool
73 needSELinuxRelabel bool
74 spec *volume.Spec
75 pod *api.Pod
76 podUID types.UID
77 publishContext map[string]string
78 kubeVolHost volume.KubeletVolumeHost
79 volume.MetricsProvider
80 }
81
82
83 var _ volume.Volume = &csiMountMgr{}
84
85 func (c *csiMountMgr) GetPath() string {
86 dir := GetCSIMounterPath(filepath.Join(getTargetPath(c.podUID, c.specVolumeID, c.plugin.host)))
87 klog.V(4).Info(log("mounter.GetPath generated [%s]", dir))
88 return dir
89 }
90
91 func getTargetPath(uid types.UID, specVolumeID string, host volume.VolumeHost) string {
92 specVolID := utilstrings.EscapeQualifiedName(specVolumeID)
93 return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(CSIPluginName), specVolID)
94 }
95
96
97 var _ volume.Mounter = &csiMountMgr{}
98
99 func (c *csiMountMgr) SetUp(mounterArgs volume.MounterArgs) error {
100 return c.SetUpAt(c.GetPath(), mounterArgs)
101 }
102
103 func (c *csiMountMgr) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
104 klog.V(4).Infof(log("Mounter.SetUpAt(%s)", dir))
105
106 csi, err := c.csiClientGetter.Get()
107 if err != nil {
108
109
110 return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get CSI client: %v", err))
111 }
112
113 ctx, cancel := createCSIOperationContext(c.spec, csiTimeout)
114 defer cancel()
115
116 volSrc, pvSrc, err := getSourceFromSpec(c.spec)
117 if err != nil {
118 return errors.New(log("mounter.SetupAt failed to get CSI persistent source: %v", err))
119 }
120
121
122
123 if err := c.supportsVolumeLifecycleMode(); err != nil {
124 return volumetypes.NewTransientOperationFailure(log("mounter.SetupAt failed to check volume lifecycle mode: %s", err))
125 }
126
127 fsGroupPolicy, err := c.getFSGroupPolicy()
128 if err != nil {
129 return volumetypes.NewTransientOperationFailure(log("mounter.SetupAt failed to check fsGroup policy: %s", err))
130 }
131
132 driverName := c.driverName
133 volumeHandle := c.volumeID
134 readOnly := c.readOnly
135 accessMode := api.ReadWriteOnce
136
137 var (
138 fsType string
139 volAttribs map[string]string
140 nodePublishSecrets map[string]string
141 publishContext map[string]string
142 mountOptions []string
143 deviceMountPath string
144 secretRef *api.SecretReference
145 )
146
147 switch {
148 case volSrc != nil:
149 if c.volumeLifecycleMode != storage.VolumeLifecycleEphemeral {
150 return fmt.Errorf("unexpected volume mode: %s", c.volumeLifecycleMode)
151 }
152 if volSrc.FSType != nil {
153 fsType = *volSrc.FSType
154 }
155
156 volAttribs = volSrc.VolumeAttributes
157
158 if volSrc.NodePublishSecretRef != nil {
159 secretName := volSrc.NodePublishSecretRef.Name
160 ns := c.pod.Namespace
161 secretRef = &api.SecretReference{Name: secretName, Namespace: ns}
162 }
163 case pvSrc != nil:
164 if c.volumeLifecycleMode != storage.VolumeLifecyclePersistent {
165 return fmt.Errorf("unexpected driver mode: %s", c.volumeLifecycleMode)
166 }
167
168 fsType = pvSrc.FSType
169
170 volAttribs = pvSrc.VolumeAttributes
171
172 if pvSrc.NodePublishSecretRef != nil {
173 secretRef = pvSrc.NodePublishSecretRef
174 }
175
176
177 if c.spec.PersistentVolume.Spec.AccessModes != nil {
178 accessMode = c.spec.PersistentVolume.Spec.AccessModes[0]
179 }
180
181 mountOptions = c.spec.PersistentVolume.Spec.MountOptions
182
183
184 stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
185 if err != nil {
186 return errors.New(log("mounter.SetUpAt failed to check for STAGE_UNSTAGE_VOLUME capability: %v", err))
187 }
188
189 if stageUnstageSet {
190 deviceMountPath, err = makeDeviceMountPath(c.plugin, c.spec)
191 if err != nil {
192 return errors.New(log("mounter.SetUpAt failed to make device mount path: %v", err))
193 }
194 }
195
196
197 if c.publishContext == nil {
198 nodeName := string(c.plugin.host.GetNodeName())
199 c.publishContext, err = c.plugin.getPublishContext(c.k8s, volumeHandle, string(driverName), nodeName)
200 if err != nil {
201
202 return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to fetch publishContext: %v", err))
203 }
204 publishContext = c.publishContext
205 }
206
207 default:
208 return fmt.Errorf("volume source not found in volume.Spec")
209 }
210
211
212 parentDir := filepath.Dir(dir)
213 if err := os.MkdirAll(parentDir, 0750); err != nil {
214 return errors.New(log("mounter.SetUpAt failed to create dir %#v: %v", parentDir, err))
215 }
216 klog.V(4).Info(log("created target path successfully [%s]", parentDir))
217
218 nodePublishSecrets = map[string]string{}
219 if secretRef != nil {
220 nodePublishSecrets, err = getCredentialsFromSecret(c.k8s, secretRef)
221 if err != nil {
222 return volumetypes.NewTransientOperationFailure(fmt.Sprintf("fetching NodePublishSecretRef %s/%s failed: %v",
223 secretRef.Namespace, secretRef.Name, err))
224 }
225
226 }
227
228
229 podInfoEnabled, err := c.plugin.podInfoEnabled(string(c.driverName))
230 if err != nil {
231 return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to assemble volume attributes: %v", err))
232 }
233 if podInfoEnabled {
234 volAttribs = mergeMap(volAttribs, getPodInfoAttrs(c.pod, c.volumeLifecycleMode))
235 }
236
237
238 serviceAccountTokenAttrs, err := c.podServiceAccountTokenAttrs()
239 if err != nil {
240 return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to get service accoount token attributes: %v", err))
241 }
242 volAttribs = mergeMap(volAttribs, serviceAccountTokenAttrs)
243
244 driverSupportsCSIVolumeMountGroup := false
245 var nodePublishFSGroupArg *int64
246 driverSupportsCSIVolumeMountGroup, err = csi.NodeSupportsVolumeMountGroup(ctx)
247 if err != nil {
248 return volumetypes.NewTransientOperationFailure(log("mounter.SetUpAt failed to determine if the node service has VOLUME_MOUNT_GROUP capability: %v", err))
249 }
250
251 if driverSupportsCSIVolumeMountGroup {
252 klog.V(3).Infof("Driver %s supports applying FSGroup (has VOLUME_MOUNT_GROUP node capability). Delegating FSGroup application to the driver through NodePublishVolume.", c.driverName)
253 nodePublishFSGroupArg = mounterArgs.FsGroup
254 }
255
256 var selinuxLabelMount bool
257 if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
258 support, err := c.plugin.SupportsSELinuxContextMount(c.spec)
259 if err != nil {
260 return errors.New(log("failed to query for SELinuxMount support: %s", err))
261 }
262 if support && mounterArgs.SELinuxLabel != "" {
263 mountOptions = util.AddSELinuxMountOption(mountOptions, mounterArgs.SELinuxLabel)
264 selinuxLabelMount = true
265 }
266 }
267
268
269
270 nodeName := string(c.plugin.host.GetNodeName())
271 volData := map[string]string{
272 volDataKey.specVolID: c.spec.Name(),
273 volDataKey.volHandle: volumeHandle,
274 volDataKey.driverName: string(c.driverName),
275 volDataKey.nodeName: nodeName,
276 volDataKey.volumeLifecycleMode: string(c.volumeLifecycleMode),
277 volDataKey.attachmentID: getAttachmentName(volumeHandle, string(c.driverName), nodeName),
278 }
279
280 if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) && selinuxLabelMount {
281 volData[volDataKey.seLinuxMountContext] = mounterArgs.SELinuxLabel
282 }
283
284 err = saveVolumeData(parentDir, volDataFileName, volData)
285 defer func() {
286
287
288 if err != nil && volumetypes.IsOperationFinishedError(err) {
289
290 if removeerr := removeMountDir(c.plugin, dir); removeerr != nil {
291 klog.Error(log("mounter.SetUpAt failed to remove mount dir after error [%s]: %v", dir, removeerr))
292 }
293 }
294 }()
295 if err != nil {
296 errorMsg := log("mounter.SetUpAt failed to save volume info data: %v", err)
297 klog.Error(errorMsg)
298 return volumetypes.NewTransientOperationFailure(errorMsg)
299 }
300
301 err = csi.NodePublishVolume(
302 ctx,
303 volumeHandle,
304 readOnly,
305 deviceMountPath,
306 dir,
307 accessMode,
308 publishContext,
309 volAttribs,
310 nodePublishSecrets,
311 fsType,
312 mountOptions,
313 nodePublishFSGroupArg,
314 )
315
316 if err != nil {
317
318 if volumetypes.IsOperationFinishedError(err) {
319 if removeMountDirErr := removeMountDir(c.plugin, dir); removeMountDirErr != nil {
320 klog.Error(log("mounter.SetupAt failed to remove mount dir after a NodePublish() error [%s]: %v", dir, removeMountDirErr))
321 }
322 }
323 return err
324 }
325
326 if !selinuxLabelMount {
327 c.needSELinuxRelabel, err = c.kubeVolHost.GetHostUtil().GetSELinuxSupport(dir)
328 if err != nil {
329
330 return volumetypes.NewUncertainProgressError(fmt.Sprintf("error checking for SELinux support: %s", err))
331 }
332 }
333
334 if !driverSupportsCSIVolumeMountGroup && c.supportsFSGroup(fsType, mounterArgs.FsGroup, fsGroupPolicy) {
335
336
337
338 err := volume.SetVolumeOwnership(c, dir, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(c.plugin, c.spec))
339 if err != nil {
340
341
342
343
344 return volumetypes.NewUncertainProgressError(fmt.Sprintf("applyFSGroup failed for vol %s: %v", c.volumeID, err))
345 }
346 klog.V(4).Info(log("mounter.SetupAt fsGroup [%d] applied successfully to %s", *mounterArgs.FsGroup, c.volumeID))
347 }
348
349 klog.V(4).Infof(log("mounter.SetUp successfully requested NodePublish [%s]", dir))
350 return nil
351 }
352
353 func (c *csiMountMgr) podServiceAccountTokenAttrs() (map[string]string, error) {
354 if c.plugin.serviceAccountTokenGetter == nil {
355 return nil, errors.New("ServiceAccountTokenGetter is nil")
356 }
357
358 csiDriver, err := c.plugin.csiDriverLister.Get(string(c.driverName))
359 if err != nil {
360 if apierrors.IsNotFound(err) {
361 klog.V(5).Infof(log("CSIDriver %q not found, not adding service account token information", c.driverName))
362 return nil, nil
363 }
364 return nil, err
365 }
366
367 if len(csiDriver.Spec.TokenRequests) == 0 {
368 return nil, nil
369 }
370
371 outputs := map[string]authenticationv1.TokenRequestStatus{}
372 for _, tokenRequest := range csiDriver.Spec.TokenRequests {
373 audience := tokenRequest.Audience
374 audiences := []string{audience}
375 if audience == "" {
376 audiences = []string{}
377 }
378 tr, err := c.plugin.serviceAccountTokenGetter(c.pod.Namespace, c.pod.Spec.ServiceAccountName, &authenticationv1.TokenRequest{
379 Spec: authenticationv1.TokenRequestSpec{
380 Audiences: audiences,
381 ExpirationSeconds: tokenRequest.ExpirationSeconds,
382 BoundObjectRef: &authenticationv1.BoundObjectReference{
383 APIVersion: "v1",
384 Kind: "Pod",
385 Name: c.pod.Name,
386 UID: c.pod.UID,
387 },
388 },
389 })
390 if err != nil {
391 return nil, err
392 }
393
394 outputs[audience] = tr.Status
395 }
396
397 klog.V(4).Infof(log("Fetched service account token attrs for CSIDriver %q", c.driverName))
398 tokens, _ := json.Marshal(outputs)
399 return map[string]string{
400 "csi.storage.k8s.io/serviceAccount.tokens": string(tokens),
401 }, nil
402 }
403
404 func (c *csiMountMgr) GetAttributes() volume.Attributes {
405 return volume.Attributes{
406 ReadOnly: c.readOnly,
407 Managed: !c.readOnly,
408 SELinuxRelabel: c.needSELinuxRelabel,
409 }
410 }
411
412
413 var _ volume.Unmounter = &csiMountMgr{}
414
415 func (c *csiMountMgr) TearDown() error {
416 return c.TearDownAt(c.GetPath())
417 }
418 func (c *csiMountMgr) TearDownAt(dir string) error {
419 klog.V(4).Infof(log("Unmounter.TearDownAt(%s)", dir))
420
421 volID := c.volumeID
422 csi, err := c.csiClientGetter.Get()
423 if err != nil {
424
425
426 return volumetypes.NewTransientOperationFailure(log("Unmounter.TearDownAt failed to get CSI client: %v", err))
427 }
428
429
430 ctx, cancel := createCSIOperationContext(c.spec, csiTimeout)
431 defer cancel()
432
433 if err := csi.NodeUnpublishVolume(ctx, volID, dir); err != nil {
434 return errors.New(log("Unmounter.TearDownAt failed: %v", err))
435 }
436
437
438
439
440
441
442
443
444
445
446
447 if err := removeMountDir(c.plugin, dir); err != nil {
448 return errors.New(log("Unmounter.TearDownAt failed to clean mount dir [%s]: %v", dir, err))
449 }
450 klog.V(4).Infof(log("Unmounter.TearDownAt successfully unmounted dir [%s]", dir))
451
452 return nil
453 }
454
455 func (c *csiMountMgr) supportsFSGroup(fsType string, fsGroup *int64, driverPolicy storage.FSGroupPolicy) bool {
456 if fsGroup == nil || driverPolicy == storage.NoneFSGroupPolicy || c.readOnly {
457 return false
458 }
459
460 if driverPolicy == storage.FileFSGroupPolicy {
461 return true
462 }
463
464 if fsType == "" {
465 klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, fsType not provided"))
466 return false
467 }
468
469 if c.spec.PersistentVolume != nil {
470 if c.spec.PersistentVolume.Spec.AccessModes == nil {
471 klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, access modes not provided"))
472 return false
473 }
474 if !hasReadWriteOnce(c.spec.PersistentVolume.Spec.AccessModes) {
475 klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, only support ReadWriteOnce access mode"))
476 return false
477 }
478 return true
479 } else if c.spec.Volume != nil && c.spec.Volume.CSI != nil {
480
481 return true
482 }
483
484 klog.V(4).Info(log("mounter.SetupAt WARNING: skipping fsGroup, unsupported volume type"))
485 return false
486 }
487
488
489
490 func (c *csiMountMgr) getFSGroupPolicy() (storage.FSGroupPolicy, error) {
491
492
493
494
495 var csiDriver *storage.CSIDriver
496 driver := string(c.driverName)
497 if c.plugin.csiDriverLister != nil {
498 c, err := c.plugin.getCSIDriver(driver)
499 if err != nil && !apierrors.IsNotFound(err) {
500
501 return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, err
502 }
503 csiDriver = c
504 }
505
506
507 if csiDriver == nil {
508 return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, nil
509 }
510
511 if csiDriver.Spec.FSGroupPolicy == nil || *csiDriver.Spec.FSGroupPolicy == "" {
512 return storage.ReadWriteOnceWithFSTypeFSGroupPolicy, errors.New(log("expected valid fsGroupPolicy, received nil value or empty string"))
513 }
514 return *csiDriver.Spec.FSGroupPolicy, nil
515 }
516
517
518
519 func (c *csiMountMgr) supportsVolumeLifecycleMode() error {
520
521
522
523
524 var csiDriver *storage.CSIDriver
525 driver := string(c.driverName)
526 if c.plugin.csiDriverLister != nil {
527 c, err := c.plugin.getCSIDriver(driver)
528 if err != nil && !apierrors.IsNotFound(err) {
529
530 return err
531 }
532 csiDriver = c
533 }
534
535
536
537 switch {
538 case csiDriver == nil && c.volumeLifecycleMode == storage.VolumeLifecyclePersistent:
539
540 return nil
541 case csiDriver == nil:
542 return fmt.Errorf("volume mode %q not supported by driver %s (no CSIDriver object)", c.volumeLifecycleMode, driver)
543 case containsVolumeMode(csiDriver.Spec.VolumeLifecycleModes, c.volumeLifecycleMode):
544
545 return nil
546 default:
547 return fmt.Errorf("volume mode %q not supported by driver %s (only supports %q)", c.volumeLifecycleMode, driver, csiDriver.Spec.VolumeLifecycleModes)
548 }
549 }
550
551
552 func containsVolumeMode(modes []storage.VolumeLifecycleMode, mode storage.VolumeLifecycleMode) bool {
553 for _, m := range modes {
554 if m == mode {
555 return true
556 }
557 }
558 return false
559 }
560
561
562 func isDirMounted(plug *csiPlugin, dir string) (bool, error) {
563 mounter := plug.host.GetMounter(plug.GetPluginName())
564 notMnt, err := mounter.IsLikelyNotMountPoint(dir)
565 if err != nil && !os.IsNotExist(err) {
566 klog.Error(log("isDirMounted IsLikelyNotMountPoint test failed for dir [%v]", dir))
567 return false, err
568 }
569 return !notMnt, nil
570 }
571
572 func isCorruptedDir(dir string) bool {
573 _, pathErr := mount.PathExists(dir)
574 return pathErr != nil && mount.IsCorruptedMnt(pathErr)
575 }
576
577
578 func removeMountDir(plug *csiPlugin, mountPath string) error {
579 klog.V(4).Info(log("removing mount path [%s]", mountPath))
580
581 mnt, err := isDirMounted(plug, mountPath)
582 if err != nil {
583 return err
584 }
585 if !mnt {
586 klog.V(4).Info(log("dir not mounted, deleting it [%s]", mountPath))
587 if err := os.Remove(mountPath); err != nil && !os.IsNotExist(err) {
588 return errors.New(log("failed to remove dir [%s]: %v", mountPath, err))
589 }
590
591 volPath := filepath.Dir(mountPath)
592 dataFile := filepath.Join(volPath, volDataFileName)
593 klog.V(4).Info(log("also deleting volume info data file [%s]", dataFile))
594 if err := os.Remove(dataFile); err != nil && !os.IsNotExist(err) {
595 return errors.New(log("failed to delete volume data file [%s]: %v", dataFile, err))
596 }
597
598 klog.V(4).Info(log("deleting volume path [%s]", volPath))
599 if err := os.Remove(volPath); err != nil && !os.IsNotExist(err) {
600 return errors.New(log("failed to delete volume path [%s]: %v", volPath, err))
601 }
602 }
603 return nil
604 }
605
606
607 func makeVolumeHandle(podUID, volSourceSpecName string) string {
608 result := sha256.Sum256([]byte(fmt.Sprintf("%s%s", podUID, volSourceSpecName)))
609 return fmt.Sprintf("csi-%x", result)
610 }
611
612 func mergeMap(first, second map[string]string) map[string]string {
613 if first == nil {
614 return second
615 }
616 for k, v := range second {
617 first[k] = v
618 }
619 return first
620 }
621
View as plain text