    17  package csi
    19  import (
    20  	"context"
    21  	"crypto/sha256"
    22  	"errors"
    23  	"fmt"
    24  	"os"
    25  	"path/filepath"
    26  	"strings"
    27  	"time"
    29  	v1 "k8s.io/api/core/v1"
    30  	storage "k8s.io/api/storage/v1"
    31  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/util/wait"
    35  	"k8s.io/apimachinery/pkg/watch"
    36  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    37  	"k8s.io/client-go/kubernetes"
    38  	"k8s.io/klog/v2"
    39  	"k8s.io/kubernetes/pkg/features"
    40  	"k8s.io/kubernetes/pkg/util/filesystem"
    41  	"k8s.io/kubernetes/pkg/volume"
    42  	"k8s.io/kubernetes/pkg/volume/util"
    43  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    44  	"k8s.io/utils/clock"
    45  )
    47  const globalMountInGlobalPath = "globalmount"
    49  type csiAttacher struct {
    50  	plugin       *csiPlugin
    51  	k8s          kubernetes.Interface
    52  	watchTimeout time.Duration
    54  	csiClient csiClient
    55  }
    57  type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error)
    59  // volume.Attacher methods
    60  var _ volume.Attacher = &csiAttacher{}
    62  var _ volume.Detacher = &csiAttacher{}
    64  var _ volume.DeviceMounter = &csiAttacher{}
    66  func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
    67  	_, ok := c.plugin.host.(volume.KubeletVolumeHost)
    68  	if ok {
    69  		return "", errors.New("attaching volumes from the kubelet is not supported")
    70  	}
    72  	if spec == nil {
    73  		klog.Error(log("attacher.Attach missing volume.Spec"))
    74  		return "", errors.New("missing spec")
    75  	}
    77  	pvSrc, err := getPVSourceFromSpec(spec)
    78  	if err != nil {
    79  		return "", errors.New(log("attacher.Attach failed to get CSIPersistentVolumeSource: %v", err))
    80  	}
    82  	node := string(nodeName)
    83  	attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node)
    85  	attachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
    86  	if err != nil && !apierrors.IsNotFound(err) {
    87  		return "", errors.New(log("failed to get volume attachment from lister: %v", err))
    88  	}
    90  	if attachment == nil {
    91  		var vaSrc storage.VolumeAttachmentSource
    92  		if spec.InlineVolumeSpecForCSIMigration {
    93  			// inline PV scenario - use PV spec to populate VA source.
    94  			// The volume spec will be populated by CSI translation API
    95  			// for inline volumes. This allows fields required by the CSI
    96  			// attacher such as AccessMode and MountOptions (in addition to
    97  			// fields in the CSI persistent volume source) to be populated
    98  			// as part of CSI translation for inline volumes.
    99  			vaSrc = storage.VolumeAttachmentSource{
   100  				InlineVolumeSpec: &spec.PersistentVolume.Spec,
   101  			}
   102  		} else {
   103  			// regular PV scenario - use PV name to populate VA source
   104  			pvName := spec.PersistentVolume.GetName()
   105  			vaSrc = storage.VolumeAttachmentSource{
   106  				PersistentVolumeName: &pvName,
   107  			}
   108  		}
   110  		attachment := &storage.VolumeAttachment{
   111  			ObjectMeta: metav1.ObjectMeta{
   112  				Name: attachID,
   113  			},
   114  			Spec: storage.VolumeAttachmentSpec{
   115  				NodeName: node,
   116  				Attacher: pvSrc.Driver,
   117  				Source:   vaSrc,
   118  			},
   119  		}
   121  		_, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
   122  		if err != nil {
   123  			if !apierrors.IsAlreadyExists(err) {
   124  				return "", errors.New(log("attacher.Attach failed: %v", err))
   125  			}
   126  			klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle))
   127  		} else {
   128  			klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle))
   129  		}
   130  	}
   132  	// Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController,
   133  	// and has access to a VolumeAttachment lister that can be polled for the current status.
   134  	if err := c.waitForVolumeAttachmentWithLister(spec, pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil {
   135  		return "", err
   136  	}
   138  	klog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment object [%s]", attachID))
   140  	// Don't return attachID as a devicePath. We can reconstruct the attachID using getAttachmentName()
   141  	return "", nil
   142  }
   144  func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) {
   145  	source, err := getPVSourceFromSpec(spec)
   146  	if err != nil {
   147  		return "", errors.New(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
   148  	}
   150  	attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(c.plugin.host.GetNodeName()))
   152  	return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
   153  }
   155  func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
   156  	klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
   158  	timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
   159  	defer timer.Stop()
   161  	return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
   162  }
   164  func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
   166  	klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
   167  	attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, metav1.GetOptions{})
   168  	if err != nil {
   169  		klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
   170  		return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
   171  	}
   172  	err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus)
   173  	if err != nil {
   174  		return "", err
   175  	}
   176  	return attach.Name, nil
   177  }
   179  func (c *csiAttacher) waitForVolumeAttachmentWithLister(spec *volume.Spec, volumeHandle, attachID string, timeout time.Duration) error {
   180  	klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
   182  	verifyStatus := func() (bool, error) {
   183  		volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
   184  		if err != nil {
   185  			// Ignore "not found" errors in case the VolumeAttachment was just created and hasn't yet made it into the lister.
   186  			if !apierrors.IsNotFound(err) {
   187  				klog.Error(log("unexpected error waiting for volume attachment, %v", err))
   188  				return false, err
   189  			}
   191  			// The VolumeAttachment is not available yet and we will have to try again.
   192  			return false, nil
   193  		}
   195  		successful, err := verifyAttachmentStatus(volumeAttachment, volumeHandle)
   196  		if err != nil {
   197  			return false, err
   198  		}
   199  		return successful, nil
   200  	}
   202  	return c.waitForVolumeAttachDetachStatusWithLister(spec, volumeHandle, attachID, timeout, verifyStatus, "Attach")
   203  }
   205  func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
   206  	klog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs)))
   208  	attached := make(map[*volume.Spec]bool)
   210  	for _, spec := range specs {
   211  		if spec == nil {
   212  			klog.Error(log("attacher.VolumesAreAttached missing volume.Spec"))
   213  			return nil, errors.New("missing spec")
   214  		}
   215  		pvSrc, err := getPVSourceFromSpec(spec)
   216  		if err != nil {
   217  			attached[spec] = false
   218  			klog.Error(log("attacher.VolumesAreAttached failed to get CSIPersistentVolumeSource: %v", err))
   219  			continue
   220  		}
   221  		driverName := pvSrc.Driver
   222  		volumeHandle := pvSrc.VolumeHandle
   224  		skip, err := c.plugin.skipAttach(driverName)
   225  		if err != nil {
   226  			klog.Error(log("Failed to check CSIDriver for %s: %s", driverName, err))
   227  		} else {
   228  			if skip {
   229  				// This volume is not attachable, pretend it's attached
   230  				attached[spec] = true
   231  				continue
   232  			}
   233  		}
   235  		attachID := getAttachmentName(volumeHandle, driverName, string(nodeName))
   236  		var attach *storage.VolumeAttachment
   237  		if c.plugin.volumeAttachmentLister != nil {
   238  			attach, err = c.plugin.volumeAttachmentLister.Get(attachID)
   239  			if err == nil {
   240  				attached[spec] = attach.Status.Attached
   241  				continue
   242  			}
   243  			klog.V(4).Info(log("attacher.VolumesAreAttached failed in AttachmentLister for attach.ID=%v: %v. Probing the API server.", attachID, err))
   244  		}
   245  		// The cache lookup is not setup or the object is not found in the cache.
   246  		// Get the object from the API server.
   247  		klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
   248  		attach, err = c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, metav1.GetOptions{})
   249  		if err != nil {
   250  			attached[spec] = false
   251  			klog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
   252  			continue
   253  		}
   254  		klog.V(4).Info(log("attacher.VolumesAreAttached attachment [%v] has status.attached=%t", attachID, attach.Status.Attached))
   255  		attached[spec] = attach.Status.Attached
   256  	}
   258  	return attached, nil
   259  }
   261  func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
   262  	klog.V(4).Info(log("attacher.GetDeviceMountPath(%v)", spec))
   263  	deviceMountPath, err := makeDeviceMountPath(c.plugin, spec)
   264  	if err != nil {
   265  		return "", errors.New(log("attacher.GetDeviceMountPath failed to make device mount path: %v", err))
   266  	}
   267  	klog.V(4).Infof("attacher.GetDeviceMountPath succeeded, deviceMountPath: %s", deviceMountPath)
   268  	return deviceMountPath, nil
   269  }
   271  func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, deviceMounterArgs volume.DeviceMounterArgs) error {
   272  	klog.V(4).Infof(log("attacher.MountDevice(%s, %s)", devicePath, deviceMountPath))
   274  	if deviceMountPath == "" {
   275  		return errors.New(log("attacher.MountDevice failed, deviceMountPath is empty"))
   276  	}
   278  	// Setup
   279  	if spec == nil {
   280  		return errors.New(log("attacher.MountDevice failed, spec is nil"))
   281  	}
   282  	csiSource, err := getPVSourceFromSpec(spec)
   283  	if err != nil {
   284  		return errors.New(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err))
   285  	}
   287  	// lets check if node/unstage is supported
   288  	if c.csiClient == nil {
   289  		c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver))
   290  		if err != nil {
   291  			// Treat the absence of the CSI driver as a transient error
   292  			// See https://github.com/kubernetes/kubernetes/issues/120268
   293  			return volumetypes.NewTransientOperationFailure(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
   294  		}
   295  	}
   296  	csi := c.csiClient
   298  	ctx, cancel := createCSIOperationContext(spec, c.watchTimeout)
   299  	defer cancel()
   300  	// Check whether "STAGE_UNSTAGE_VOLUME" is set
   301  	stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
   302  	if err != nil {
   303  		return err
   304  	}
   306  	// Get secrets and publish context required for mountDevice
   307  	nodeName := string(c.plugin.host.GetNodeName())
   308  	publishContext, err := c.plugin.getPublishContext(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName)
   310  	if err != nil {
   311  		return volumetypes.NewTransientOperationFailure(err.Error())
   312  	}
   314  	nodeStageSecrets := map[string]string{}
   315  	// we only require secrets if csiSource has them and volume has NodeStage capability
   316  	if csiSource.NodeStageSecretRef != nil && stageUnstageSet {
   317  		nodeStageSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodeStageSecretRef)
   318  		if err != nil {
   319  			err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v",
   320  				csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
   321  			// if we failed to fetch secret then that could be a transient error
   322  			return volumetypes.NewTransientOperationFailure(err.Error())
   323  		}
   324  	}
   326  	var mountOptions []string
   327  	if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.MountOptions != nil {
   328  		mountOptions = spec.PersistentVolume.Spec.MountOptions
   329  	}
   331  	var seLinuxSupported bool
   332  	if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
   333  		support, err := c.plugin.SupportsSELinuxContextMount(spec)
   334  		if err != nil {
   335  			return errors.New(log("failed to query for SELinuxMount support: %s", err))
   336  		}
   337  		if support && deviceMounterArgs.SELinuxLabel != "" {
   338  			mountOptions = util.AddSELinuxMountOption(mountOptions, deviceMounterArgs.SELinuxLabel)
   339  			seLinuxSupported = true
   340  		}
   341  	}
   343  	// Store volume metadata for UnmountDevice. Keep it around even if the
   344  	// driver does not support NodeStage, UnmountDevice still needs it.
   345  	if err = filesystem.MkdirAllWithPathCheck(deviceMountPath, 0750); err != nil {
   346  		return errors.New(log("attacher.MountDevice failed to create dir %#v:  %v", deviceMountPath, err))
   347  	}
   349  	klog.V(4).Info(log("created target path successfully [%s]", deviceMountPath))
   350  	dataDir := filepath.Dir(deviceMountPath)
   351  	data := map[string]string{
   352  		volDataKey.volHandle:  csiSource.VolumeHandle,
   353  		volDataKey.driverName: csiSource.Driver,
   354  	}
   356  	if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) && seLinuxSupported {
   357  		data[volDataKey.seLinuxMountContext] = deviceMounterArgs.SELinuxLabel
   358  	}
   360  	err = saveVolumeData(dataDir, volDataFileName, data)
   361  	defer func() {
   362  		// Only if there was an error and volume operation was considered
   363  		// finished, we should remove the directory.
   364  		if err != nil && volumetypes.IsOperationFinishedError(err) {
   365  			// clean up metadata
   366  			klog.Errorf(log("attacher.MountDevice failed: %v", err))
   367  			if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
   368  				klog.Error(log("attacher.MountDevice failed to remove mount dir after error [%s]: %v", deviceMountPath, err))
   369  			}
   370  		}
   371  	}()
   373  	if err != nil {
   374  		errMsg := log("failed to save volume info data: %v", err)
   375  		klog.Error(errMsg)
   376  		return errors.New(errMsg)
   377  	}
   379  	if !stageUnstageSet {
   380  		klog.Infof(log("attacher.MountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
   381  		// defer does *not* remove the metadata file and it's correct - UnmountDevice needs it there.
   382  		return nil
   383  	}
   385  	//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
   386  	accessMode := v1.ReadWriteOnce
   387  	if spec.PersistentVolume.Spec.AccessModes != nil {
   388  		accessMode = spec.PersistentVolume.Spec.AccessModes[0]
   389  	}
   391  	var nodeStageFSGroupArg *int64
   392  	driverSupportsCSIVolumeMountGroup, err := csi.NodeSupportsVolumeMountGroup(ctx)
   393  	if err != nil {
   394  		return volumetypes.NewTransientOperationFailure(log("attacher.MountDevice failed to determine if the node service has VOLUME_MOUNT_GROUP capability: %v", err))
   395  	}
   397  	if driverSupportsCSIVolumeMountGroup {
   398  		klog.V(3).Infof("Driver %s supports applying FSGroup (has VOLUME_MOUNT_GROUP node capability). Delegating FSGroup application to the driver through NodeStageVolume.", csiSource.Driver)
   399  		nodeStageFSGroupArg = deviceMounterArgs.FsGroup
   400  	}
   402  	fsType := csiSource.FSType
   403  	err = csi.NodeStageVolume(ctx,
   404  		csiSource.VolumeHandle,
   405  		publishContext,
   406  		deviceMountPath,
   407  		fsType,
   408  		accessMode,
   409  		nodeStageSecrets,
   410  		csiSource.VolumeAttributes,
   411  		mountOptions,
   412  		nodeStageFSGroupArg)
   414  	if err != nil {
   415  		return err
   416  	}
   418  	klog.V(4).Infof(log("attacher.MountDevice successfully requested NodeStageVolume [%s]", deviceMountPath))
   419  	return err
   420  }
   422  var _ volume.Detacher = &csiAttacher{}
   424  var _ volume.DeviceUnmounter = &csiAttacher{}
   426  func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
   427  	_, ok := c.plugin.host.(volume.KubeletVolumeHost)
   428  	if ok {
   429  		return errors.New("detaching volumes from the kubelet is not supported")
   430  	}
   432  	var attachID string
   433  	var volID string
   435  	if volumeName == "" {
   436  		klog.Error(log("detacher.Detach missing value for parameter volumeName"))
   437  		return errors.New("missing expected parameter volumeName")
   438  	}
   440  	// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
   441  	parts := strings.Split(volumeName, volNameSep)
   442  	if len(parts) != 2 {
   443  		klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
   444  		return errors.New("volumeName missing expected data")
   445  	}
   447  	driverName := parts[0]
   448  	volID = parts[1]
   449  	attachID = getAttachmentName(volID, driverName, string(nodeName))
   451  	if err := c.k8s.StorageV1().VolumeAttachments().Delete(context.TODO(), attachID, metav1.DeleteOptions{}); err != nil {
   452  		if apierrors.IsNotFound(err) {
   453  			// object deleted or never existed, done
   454  			klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volID))
   455  			return nil
   456  		}
   457  		return errors.New(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
   458  	}
   460  	klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
   462  	// Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController,
   463  	// and has access to a VolumeAttachment lister that can be polled for the current status.
   464  	return c.waitForVolumeDetachmentWithLister(volID, attachID, c.watchTimeout)
   465  }
   467  func (c *csiAttacher) waitForVolumeDetachmentWithLister(volumeHandle, attachID string, timeout time.Duration) error {
   468  	klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
   470  	verifyStatus := func() (bool, error) {
   471  		volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
   472  		if err != nil {
   473  			if !apierrors.IsNotFound(err) {
   474  				return false, errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
   475  			}
   477  			// Detachment successful.
   478  			klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle))
   479  			return true, nil
   480  		}
   482  		// Detachment is only "successful" once the VolumeAttachment is deleted, however we perform
   483  		// this check to make sure the object does not contain any detach errors.
   484  		successful, err := verifyDetachmentStatus(volumeAttachment, volumeHandle)
   485  		if err != nil {
   486  			return false, err
   487  		}
   488  		return successful, nil
   489  	}
   491  	return c.waitForVolumeAttachDetachStatusWithLister(nil, volumeHandle, attachID, timeout, verifyStatus, "Detach")
   492  }
   494  func (c *csiAttacher) waitForVolumeAttachDetachStatusWithLister(spec *volume.Spec, volumeHandle, attachID string, timeout time.Duration, verifyStatus func() (bool, error), operation string) error {
   495  	var (
   496  		initBackoff = 500 * time.Millisecond
   497  		// This is approximately the duration between consecutive ticks after two minutes (CSI timeout).
   498  		maxBackoff    = 7 * time.Second
   499  		resetDuration = time.Minute
   500  		backoffFactor = 1.05
   501  		jitter        = 0.1
   502  		clock         = &clock.RealClock{}
   503  	)
   504  	backoffMgr := wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock)
   506  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
   507  	defer cancel()
   509  	// Get driver name from spec for better log messages. During detach spec can be nil, and it's ok for driver to be unknown.
   510  	csiDriverName, err := GetCSIDriverName(spec)
   511  	if err != nil {
   512  		csiDriverName = "unknown"
   513  		klog.V(4).Info(log("Could not find CSI driver name in spec for volume [%v]", volumeHandle))
   514  	}
   516  	for {
   517  		t := backoffMgr.Backoff()
   518  		select {
   519  		case <-t.C():
   520  			successful, err := verifyStatus()
   521  			if err != nil {
   522  				return err
   523  			}
   524  			if successful {
   525  				return nil
   526  			}
   527  		case <-ctx.Done():
   528  			t.Stop()
   529  			klog.Error(log("%s timeout after %v [volume=%v; attachment.ID=%v]", operation, timeout, volumeHandle, attachID))
   530  			return fmt.Errorf("timed out waiting for external-attacher of %v CSI driver to %v volume %v", csiDriverName, strings.ToLower(operation), volumeHandle)
   531  		}
   532  	}
   533  }
   535  func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,
   536  	timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error {
   537  	successful, err := verifyStatus(attach, volumeHandle)
   538  	if err != nil {
   539  		return err
   540  	}
   541  	if successful {
   542  		return nil
   543  	}
   545  	watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
   546  	if err != nil {
   547  		return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
   548  	}
   550  	ch := watcher.ResultChan()
   551  	defer watcher.Stop()
   552  	for {
   553  		select {
   554  		case event, ok := <-ch:
   555  			if !ok {
   556  				klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
   557  				return errors.New("volume attachment watch channel had been closed")
   558  			}
   560  			switch event.Type {
   561  			case watch.Added, watch.Modified:
   562  				attach, _ := event.Object.(*storage.VolumeAttachment)
   563  				successful, err := verifyStatus(attach, volumeHandle)
   564  				if err != nil {
   565  					return err
   566  				}
   567  				if successful {
   568  					return nil
   569  				}
   570  			case watch.Deleted:
   571  				// set attach nil to get different results
   572  				// for detachment, a deleted event means successful detachment, should return success
   573  				// for attachment, should return fail
   574  				if successful, err := verifyStatus(nil, volumeHandle); !successful {
   575  					return err
   576  				}
   577  				klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
   578  				return nil
   580  			case watch.Error:
   581  				klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event)
   582  			}
   584  		case <-timer.C:
   585  			klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
   586  			return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle)
   587  		}
   588  	}
   589  }
   591  func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
   592  	klog.V(4).Info(log("attacher.UnmountDevice(%s)", deviceMountPath))
   594  	// Setup
   595  	var driverName, volID string
   596  	dataDir := filepath.Dir(deviceMountPath)
   597  	data, err := loadVolumeData(dataDir, volDataFileName)
   598  	if err == nil {
   599  		driverName = data[volDataKey.driverName]
   600  		volID = data[volDataKey.volHandle]
   601  	} else {
   602  		if errors.Is(err, os.ErrNotExist) {
   603  			klog.V(4).Info(log("attacher.UnmountDevice skipped because volume data file [%s] does not exist", dataDir))
   604  			return nil
   605  		}
   607  		klog.Errorf(log("attacher.UnmountDevice failed to get driver and volume name from device mount path: %v", err))
   608  		return err
   609  	}
   611  	if c.csiClient == nil {
   612  		c.csiClient, err = newCsiDriverClient(csiDriverName(driverName))
   613  		if err != nil {
   614  			// Treat the absence of the CSI driver as a transient error
   615  			// See https://github.com/kubernetes/kubernetes/issues/120268
   616  			return volumetypes.NewTransientOperationFailure(log("attacher.UnmountDevice failed to create newCsiDriverClient: %v", err))
   617  		}
   618  	}
   619  	csi := c.csiClient
   621  	// could not get whether this is migrated because there is no spec
   622  	ctx, cancel := createCSIOperationContext(nil, csiTimeout)
   623  	defer cancel()
   624  	// Check whether "STAGE_UNSTAGE_VOLUME" is set
   625  	stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
   626  	if err != nil {
   627  		return errors.New(log("attacher.UnmountDevice failed to check whether STAGE_UNSTAGE_VOLUME set: %v", err))
   628  	}
   629  	if !stageUnstageSet {
   630  		klog.Infof(log("attacher.UnmountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping UnmountDevice..."))
   631  		// Just	delete the global directory + json file
   632  		if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
   633  			return errors.New(log("failed to clean up global mount %s: %s", dataDir, err))
   634  		}
   636  		return nil
   637  	}
   639  	// Start UnmountDevice
   640  	err = csi.NodeUnstageVolume(ctx,
   641  		volID,
   642  		deviceMountPath)
   644  	if err != nil {
   645  		return errors.New(log("attacher.UnmountDevice failed: %v", err))
   646  	}
   648  	// Delete the global directory + json file
   649  	if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
   650  		return errors.New(log("failed to clean up global mount %s: %s", dataDir, err))
   651  	}
   653  	klog.V(4).Infof(log("attacher.UnmountDevice successfully requested NodeUnStageVolume [%s]", deviceMountPath))
   654  	return nil
   655  }
   657  // getAttachmentName returns csi-<sha256(volName,csiDriverName,NodeName)>
   658  func getAttachmentName(volName, csiDriverName, nodeName string) string {
   659  	result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
   660  	return fmt.Sprintf("csi-%x", result)
   661  }
   663  func makeDeviceMountPath(plugin *csiPlugin, spec *volume.Spec) (string, error) {
   664  	if spec == nil {
   665  		return "", errors.New("makeDeviceMountPath failed, spec is nil")
   666  	}
   668  	driver, err := GetCSIDriverName(spec)
   669  	if err != nil {
   670  		return "", err
   671  	}
   672  	if driver == "" {
   673  		return "", errors.New("makeDeviceMountPath failed, csi source driver name is empty")
   674  	}
   676  	csiSource, err := getPVSourceFromSpec(spec)
   677  	if err != nil {
   678  		return "", errors.New(log("makeDeviceMountPath failed to get CSIPersistentVolumeSource: %v", err))
   679  	}
   681  	if csiSource.VolumeHandle == "" {
   682  		return "", errors.New("makeDeviceMountPath failed, CSIPersistentVolumeSource volume handle is empty")
   683  	}
   685  	result := sha256.Sum256([]byte(fmt.Sprintf("%s", csiSource.VolumeHandle)))
   686  	volSha := fmt.Sprintf("%x", result)
   687  	return filepath.Join(plugin.host.GetPluginDir(plugin.GetPluginName()), driver, volSha, globalMountInGlobalPath), nil
   688  }
   690  func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
   691  	// when we received a deleted event during attachment, fail fast
   692  	if attachment == nil {
   693  		klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", volumeHandle))
   694  		return false, errors.New("volume attachment has been deleted")
   695  	}
   696  	// if being deleted, fail fast
   697  	if attachment.GetDeletionTimestamp() != nil {
   698  		klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name))
   699  		return false, errors.New("volume attachment is being deleted")
   700  	}
   701  	// attachment OK
   702  	if attachment.Status.Attached {
   703  		return true, nil
   704  	}
   705  	// driver reports attach error
   706  	attachErr := attachment.Status.AttachError
   707  	if attachErr != nil {
   708  		klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
   709  		return false, errors.New(attachErr.Message)
   710  	}
   711  	return false, nil
   712  }
   714  func verifyDetachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
   715  	// when we received a deleted event during detachment
   716  	// it means we have successfully detached it.
   717  	if attachment == nil {
   718  		return true, nil
   719  	}
   720  	// driver reports detach error
   721  	detachErr := attachment.Status.DetachError
   722  	if detachErr != nil {
   723  		klog.Error(log("detachment for VolumeAttachment for volume [%s] failed: %v", volumeHandle, detachErr.Message))
   724  		return false, errors.New(detachErr.Message)
   725  	}
   726  	return false, nil
   727  }

