...

Source file src/k8s.io/kubernetes/pkg/volume/csi/csi_attacher.go

Documentation: k8s.io/kubernetes/pkg/volume/csi

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package csi
    18  
    19  import (
    20  	"context"
    21  	"crypto/sha256"
    22  	"errors"
    23  	"fmt"
    24  	"os"
    25  	"path/filepath"
    26  	"strings"
    27  	"time"
    28  
    29  	v1 "k8s.io/api/core/v1"
    30  	storage "k8s.io/api/storage/v1"
    31  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/util/wait"
    35  	"k8s.io/apimachinery/pkg/watch"
    36  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    37  	"k8s.io/client-go/kubernetes"
    38  	"k8s.io/klog/v2"
    39  	"k8s.io/kubernetes/pkg/features"
    40  	"k8s.io/kubernetes/pkg/util/filesystem"
    41  	"k8s.io/kubernetes/pkg/volume"
    42  	"k8s.io/kubernetes/pkg/volume/util"
    43  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    44  	"k8s.io/utils/clock"
    45  )
    46  
    47  const globalMountInGlobalPath = "globalmount"
    48  
    49  type csiAttacher struct {
    50  	plugin       *csiPlugin
    51  	k8s          kubernetes.Interface
    52  	watchTimeout time.Duration
    53  
    54  	csiClient csiClient
    55  }
    56  
    57  type verifyAttachDetachStatus func(attach *storage.VolumeAttachment, volumeHandle string) (bool, error)
    58  
    59  // volume.Attacher methods
    60  var _ volume.Attacher = &csiAttacher{}
    61  
    62  var _ volume.Detacher = &csiAttacher{}
    63  
    64  var _ volume.DeviceMounter = &csiAttacher{}
    65  
    66  func (c *csiAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (string, error) {
    67  	_, ok := c.plugin.host.(volume.KubeletVolumeHost)
    68  	if ok {
    69  		return "", errors.New("attaching volumes from the kubelet is not supported")
    70  	}
    71  
    72  	if spec == nil {
    73  		klog.Error(log("attacher.Attach missing volume.Spec"))
    74  		return "", errors.New("missing spec")
    75  	}
    76  
    77  	pvSrc, err := getPVSourceFromSpec(spec)
    78  	if err != nil {
    79  		return "", errors.New(log("attacher.Attach failed to get CSIPersistentVolumeSource: %v", err))
    80  	}
    81  
    82  	node := string(nodeName)
    83  	attachID := getAttachmentName(pvSrc.VolumeHandle, pvSrc.Driver, node)
    84  
    85  	attachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
    86  	if err != nil && !apierrors.IsNotFound(err) {
    87  		return "", errors.New(log("failed to get volume attachment from lister: %v", err))
    88  	}
    89  
    90  	if attachment == nil {
    91  		var vaSrc storage.VolumeAttachmentSource
    92  		if spec.InlineVolumeSpecForCSIMigration {
    93  			// inline PV scenario - use PV spec to populate VA source.
    94  			// The volume spec will be populated by CSI translation API
    95  			// for inline volumes. This allows fields required by the CSI
    96  			// attacher such as AccessMode and MountOptions (in addition to
    97  			// fields in the CSI persistent volume source) to be populated
    98  			// as part of CSI translation for inline volumes.
    99  			vaSrc = storage.VolumeAttachmentSource{
   100  				InlineVolumeSpec: &spec.PersistentVolume.Spec,
   101  			}
   102  		} else {
   103  			// regular PV scenario - use PV name to populate VA source
   104  			pvName := spec.PersistentVolume.GetName()
   105  			vaSrc = storage.VolumeAttachmentSource{
   106  				PersistentVolumeName: &pvName,
   107  			}
   108  		}
   109  
   110  		attachment := &storage.VolumeAttachment{
   111  			ObjectMeta: metav1.ObjectMeta{
   112  				Name: attachID,
   113  			},
   114  			Spec: storage.VolumeAttachmentSpec{
   115  				NodeName: node,
   116  				Attacher: pvSrc.Driver,
   117  				Source:   vaSrc,
   118  			},
   119  		}
   120  
   121  		_, err = c.k8s.StorageV1().VolumeAttachments().Create(context.TODO(), attachment, metav1.CreateOptions{})
   122  		if err != nil {
   123  			if !apierrors.IsAlreadyExists(err) {
   124  				return "", errors.New(log("attacher.Attach failed: %v", err))
   125  			}
   126  			klog.V(4).Info(log("attachment [%v] for volume [%v] already exists (will not be recreated)", attachID, pvSrc.VolumeHandle))
   127  		} else {
   128  			klog.V(4).Info(log("attachment [%v] for volume [%v] created successfully", attachID, pvSrc.VolumeHandle))
   129  		}
   130  	}
   131  
   132  	// Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController,
   133  	// and has access to a VolumeAttachment lister that can be polled for the current status.
   134  	if err := c.waitForVolumeAttachmentWithLister(spec, pvSrc.VolumeHandle, attachID, c.watchTimeout); err != nil {
   135  		return "", err
   136  	}
   137  
   138  	klog.V(4).Info(log("attacher.Attach finished OK with VolumeAttachment object [%s]", attachID))
   139  
   140  	// Don't return attachID as a devicePath. We can reconstruct the attachID using getAttachmentName()
   141  	return "", nil
   142  }
   143  
   144  func (c *csiAttacher) WaitForAttach(spec *volume.Spec, _ string, pod *v1.Pod, timeout time.Duration) (string, error) {
   145  	source, err := getPVSourceFromSpec(spec)
   146  	if err != nil {
   147  		return "", errors.New(log("attacher.WaitForAttach failed to extract CSI volume source: %v", err))
   148  	}
   149  
   150  	attachID := getAttachmentName(source.VolumeHandle, source.Driver, string(c.plugin.host.GetNodeName()))
   151  
   152  	return c.waitForVolumeAttachment(source.VolumeHandle, attachID, timeout)
   153  }
   154  
   155  func (c *csiAttacher) waitForVolumeAttachment(volumeHandle, attachID string, timeout time.Duration) (string, error) {
   156  	klog.V(4).Info(log("probing for updates from CSI driver for [attachment.ID=%v]", attachID))
   157  
   158  	timer := time.NewTimer(timeout) // TODO (vladimirvivien) investigate making this configurable
   159  	defer timer.Stop()
   160  
   161  	return c.waitForVolumeAttachmentInternal(volumeHandle, attachID, timer, timeout)
   162  }
   163  
   164  func (c *csiAttacher) waitForVolumeAttachmentInternal(volumeHandle, attachID string, timer *time.Timer, timeout time.Duration) (string, error) {
   165  
   166  	klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
   167  	attach, err := c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, metav1.GetOptions{})
   168  	if err != nil {
   169  		klog.Error(log("attacher.WaitForAttach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
   170  		return "", fmt.Errorf("volume %v has GET error for volume attachment %v: %v", volumeHandle, attachID, err)
   171  	}
   172  	err = c.waitForVolumeAttachDetachStatus(attach, volumeHandle, attachID, timer, timeout, verifyAttachmentStatus)
   173  	if err != nil {
   174  		return "", err
   175  	}
   176  	return attach.Name, nil
   177  }
   178  
   179  func (c *csiAttacher) waitForVolumeAttachmentWithLister(spec *volume.Spec, volumeHandle, attachID string, timeout time.Duration) error {
   180  	klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
   181  
   182  	verifyStatus := func() (bool, error) {
   183  		volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
   184  		if err != nil {
   185  			// Ignore "not found" errors in case the VolumeAttachment was just created and hasn't yet made it into the lister.
   186  			if !apierrors.IsNotFound(err) {
   187  				klog.Error(log("unexpected error waiting for volume attachment, %v", err))
   188  				return false, err
   189  			}
   190  
   191  			// The VolumeAttachment is not available yet and we will have to try again.
   192  			return false, nil
   193  		}
   194  
   195  		successful, err := verifyAttachmentStatus(volumeAttachment, volumeHandle)
   196  		if err != nil {
   197  			return false, err
   198  		}
   199  		return successful, nil
   200  	}
   201  
   202  	return c.waitForVolumeAttachDetachStatusWithLister(spec, volumeHandle, attachID, timeout, verifyStatus, "Attach")
   203  }
   204  
   205  func (c *csiAttacher) VolumesAreAttached(specs []*volume.Spec, nodeName types.NodeName) (map[*volume.Spec]bool, error) {
   206  	klog.V(4).Info(log("probing attachment status for %d volume(s) ", len(specs)))
   207  
   208  	attached := make(map[*volume.Spec]bool)
   209  
   210  	for _, spec := range specs {
   211  		if spec == nil {
   212  			klog.Error(log("attacher.VolumesAreAttached missing volume.Spec"))
   213  			return nil, errors.New("missing spec")
   214  		}
   215  		pvSrc, err := getPVSourceFromSpec(spec)
   216  		if err != nil {
   217  			attached[spec] = false
   218  			klog.Error(log("attacher.VolumesAreAttached failed to get CSIPersistentVolumeSource: %v", err))
   219  			continue
   220  		}
   221  		driverName := pvSrc.Driver
   222  		volumeHandle := pvSrc.VolumeHandle
   223  
   224  		skip, err := c.plugin.skipAttach(driverName)
   225  		if err != nil {
   226  			klog.Error(log("Failed to check CSIDriver for %s: %s", driverName, err))
   227  		} else {
   228  			if skip {
   229  				// This volume is not attachable, pretend it's attached
   230  				attached[spec] = true
   231  				continue
   232  			}
   233  		}
   234  
   235  		attachID := getAttachmentName(volumeHandle, driverName, string(nodeName))
   236  		var attach *storage.VolumeAttachment
   237  		if c.plugin.volumeAttachmentLister != nil {
   238  			attach, err = c.plugin.volumeAttachmentLister.Get(attachID)
   239  			if err == nil {
   240  				attached[spec] = attach.Status.Attached
   241  				continue
   242  			}
   243  			klog.V(4).Info(log("attacher.VolumesAreAttached failed in AttachmentLister for attach.ID=%v: %v. Probing the API server.", attachID, err))
   244  		}
   245  		// The cache lookup is not setup or the object is not found in the cache.
   246  		// Get the object from the API server.
   247  		klog.V(4).Info(log("probing attachment status for VolumeAttachment %v", attachID))
   248  		attach, err = c.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, metav1.GetOptions{})
   249  		if err != nil {
   250  			attached[spec] = false
   251  			klog.Error(log("attacher.VolumesAreAttached failed for attach.ID=%v: %v", attachID, err))
   252  			continue
   253  		}
   254  		klog.V(4).Info(log("attacher.VolumesAreAttached attachment [%v] has status.attached=%t", attachID, attach.Status.Attached))
   255  		attached[spec] = attach.Status.Attached
   256  	}
   257  
   258  	return attached, nil
   259  }
   260  
   261  func (c *csiAttacher) GetDeviceMountPath(spec *volume.Spec) (string, error) {
   262  	klog.V(4).Info(log("attacher.GetDeviceMountPath(%v)", spec))
   263  	deviceMountPath, err := makeDeviceMountPath(c.plugin, spec)
   264  	if err != nil {
   265  		return "", errors.New(log("attacher.GetDeviceMountPath failed to make device mount path: %v", err))
   266  	}
   267  	klog.V(4).Infof("attacher.GetDeviceMountPath succeeded, deviceMountPath: %s", deviceMountPath)
   268  	return deviceMountPath, nil
   269  }
   270  
   271  func (c *csiAttacher) MountDevice(spec *volume.Spec, devicePath string, deviceMountPath string, deviceMounterArgs volume.DeviceMounterArgs) error {
   272  	klog.V(4).Infof(log("attacher.MountDevice(%s, %s)", devicePath, deviceMountPath))
   273  
   274  	if deviceMountPath == "" {
   275  		return errors.New(log("attacher.MountDevice failed, deviceMountPath is empty"))
   276  	}
   277  
   278  	// Setup
   279  	if spec == nil {
   280  		return errors.New(log("attacher.MountDevice failed, spec is nil"))
   281  	}
   282  	csiSource, err := getPVSourceFromSpec(spec)
   283  	if err != nil {
   284  		return errors.New(log("attacher.MountDevice failed to get CSIPersistentVolumeSource: %v", err))
   285  	}
   286  
   287  	// lets check if node/unstage is supported
   288  	if c.csiClient == nil {
   289  		c.csiClient, err = newCsiDriverClient(csiDriverName(csiSource.Driver))
   290  		if err != nil {
   291  			// Treat the absence of the CSI driver as a transient error
   292  			// See https://github.com/kubernetes/kubernetes/issues/120268
   293  			return volumetypes.NewTransientOperationFailure(log("attacher.MountDevice failed to create newCsiDriverClient: %v", err))
   294  		}
   295  	}
   296  	csi := c.csiClient
   297  
   298  	ctx, cancel := createCSIOperationContext(spec, c.watchTimeout)
   299  	defer cancel()
   300  	// Check whether "STAGE_UNSTAGE_VOLUME" is set
   301  	stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
   302  	if err != nil {
   303  		return err
   304  	}
   305  
   306  	// Get secrets and publish context required for mountDevice
   307  	nodeName := string(c.plugin.host.GetNodeName())
   308  	publishContext, err := c.plugin.getPublishContext(c.k8s, csiSource.VolumeHandle, csiSource.Driver, nodeName)
   309  
   310  	if err != nil {
   311  		return volumetypes.NewTransientOperationFailure(err.Error())
   312  	}
   313  
   314  	nodeStageSecrets := map[string]string{}
   315  	// we only require secrets if csiSource has them and volume has NodeStage capability
   316  	if csiSource.NodeStageSecretRef != nil && stageUnstageSet {
   317  		nodeStageSecrets, err = getCredentialsFromSecret(c.k8s, csiSource.NodeStageSecretRef)
   318  		if err != nil {
   319  			err = fmt.Errorf("fetching NodeStageSecretRef %s/%s failed: %v",
   320  				csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
   321  			// if we failed to fetch secret then that could be a transient error
   322  			return volumetypes.NewTransientOperationFailure(err.Error())
   323  		}
   324  	}
   325  
   326  	var mountOptions []string
   327  	if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.MountOptions != nil {
   328  		mountOptions = spec.PersistentVolume.Spec.MountOptions
   329  	}
   330  
   331  	var seLinuxSupported bool
   332  	if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
   333  		support, err := c.plugin.SupportsSELinuxContextMount(spec)
   334  		if err != nil {
   335  			return errors.New(log("failed to query for SELinuxMount support: %s", err))
   336  		}
   337  		if support && deviceMounterArgs.SELinuxLabel != "" {
   338  			mountOptions = util.AddSELinuxMountOption(mountOptions, deviceMounterArgs.SELinuxLabel)
   339  			seLinuxSupported = true
   340  		}
   341  	}
   342  
   343  	// Store volume metadata for UnmountDevice. Keep it around even if the
   344  	// driver does not support NodeStage, UnmountDevice still needs it.
   345  	if err = filesystem.MkdirAllWithPathCheck(deviceMountPath, 0750); err != nil {
   346  		return errors.New(log("attacher.MountDevice failed to create dir %#v:  %v", deviceMountPath, err))
   347  	}
   348  
   349  	klog.V(4).Info(log("created target path successfully [%s]", deviceMountPath))
   350  	dataDir := filepath.Dir(deviceMountPath)
   351  	data := map[string]string{
   352  		volDataKey.volHandle:  csiSource.VolumeHandle,
   353  		volDataKey.driverName: csiSource.Driver,
   354  	}
   355  
   356  	if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) && seLinuxSupported {
   357  		data[volDataKey.seLinuxMountContext] = deviceMounterArgs.SELinuxLabel
   358  	}
   359  
   360  	err = saveVolumeData(dataDir, volDataFileName, data)
   361  	defer func() {
   362  		// Only if there was an error and volume operation was considered
   363  		// finished, we should remove the directory.
   364  		if err != nil && volumetypes.IsOperationFinishedError(err) {
   365  			// clean up metadata
   366  			klog.Errorf(log("attacher.MountDevice failed: %v", err))
   367  			if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
   368  				klog.Error(log("attacher.MountDevice failed to remove mount dir after error [%s]: %v", deviceMountPath, err))
   369  			}
   370  		}
   371  	}()
   372  
   373  	if err != nil {
   374  		errMsg := log("failed to save volume info data: %v", err)
   375  		klog.Error(errMsg)
   376  		return errors.New(errMsg)
   377  	}
   378  
   379  	if !stageUnstageSet {
   380  		klog.Infof(log("attacher.MountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
   381  		// defer does *not* remove the metadata file and it's correct - UnmountDevice needs it there.
   382  		return nil
   383  	}
   384  
   385  	//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
   386  	accessMode := v1.ReadWriteOnce
   387  	if spec.PersistentVolume.Spec.AccessModes != nil {
   388  		accessMode = spec.PersistentVolume.Spec.AccessModes[0]
   389  	}
   390  
   391  	var nodeStageFSGroupArg *int64
   392  	driverSupportsCSIVolumeMountGroup, err := csi.NodeSupportsVolumeMountGroup(ctx)
   393  	if err != nil {
   394  		return volumetypes.NewTransientOperationFailure(log("attacher.MountDevice failed to determine if the node service has VOLUME_MOUNT_GROUP capability: %v", err))
   395  	}
   396  
   397  	if driverSupportsCSIVolumeMountGroup {
   398  		klog.V(3).Infof("Driver %s supports applying FSGroup (has VOLUME_MOUNT_GROUP node capability). Delegating FSGroup application to the driver through NodeStageVolume.", csiSource.Driver)
   399  		nodeStageFSGroupArg = deviceMounterArgs.FsGroup
   400  	}
   401  
   402  	fsType := csiSource.FSType
   403  	err = csi.NodeStageVolume(ctx,
   404  		csiSource.VolumeHandle,
   405  		publishContext,
   406  		deviceMountPath,
   407  		fsType,
   408  		accessMode,
   409  		nodeStageSecrets,
   410  		csiSource.VolumeAttributes,
   411  		mountOptions,
   412  		nodeStageFSGroupArg)
   413  
   414  	if err != nil {
   415  		return err
   416  	}
   417  
   418  	klog.V(4).Infof(log("attacher.MountDevice successfully requested NodeStageVolume [%s]", deviceMountPath))
   419  	return err
   420  }
   421  
   422  var _ volume.Detacher = &csiAttacher{}
   423  
   424  var _ volume.DeviceUnmounter = &csiAttacher{}
   425  
   426  func (c *csiAttacher) Detach(volumeName string, nodeName types.NodeName) error {
   427  	_, ok := c.plugin.host.(volume.KubeletVolumeHost)
   428  	if ok {
   429  		return errors.New("detaching volumes from the kubelet is not supported")
   430  	}
   431  
   432  	var attachID string
   433  	var volID string
   434  
   435  	if volumeName == "" {
   436  		klog.Error(log("detacher.Detach missing value for parameter volumeName"))
   437  		return errors.New("missing expected parameter volumeName")
   438  	}
   439  
   440  	// volumeName in format driverName<SEP>volumeHandle generated by plugin.GetVolumeName()
   441  	parts := strings.Split(volumeName, volNameSep)
   442  	if len(parts) != 2 {
   443  		klog.Error(log("detacher.Detach insufficient info encoded in volumeName"))
   444  		return errors.New("volumeName missing expected data")
   445  	}
   446  
   447  	driverName := parts[0]
   448  	volID = parts[1]
   449  	attachID = getAttachmentName(volID, driverName, string(nodeName))
   450  
   451  	if err := c.k8s.StorageV1().VolumeAttachments().Delete(context.TODO(), attachID, metav1.DeleteOptions{}); err != nil {
   452  		if apierrors.IsNotFound(err) {
   453  			// object deleted or never existed, done
   454  			klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volID))
   455  			return nil
   456  		}
   457  		return errors.New(log("detacher.Detach failed to delete VolumeAttachment [%s]: %v", attachID, err))
   458  	}
   459  
   460  	klog.V(4).Info(log("detacher deleted ok VolumeAttachment.ID=%s", attachID))
   461  
   462  	// Attach and detach functionality is exclusive to the CSI plugin that runs in the AttachDetachController,
   463  	// and has access to a VolumeAttachment lister that can be polled for the current status.
   464  	return c.waitForVolumeDetachmentWithLister(volID, attachID, c.watchTimeout)
   465  }
   466  
   467  func (c *csiAttacher) waitForVolumeDetachmentWithLister(volumeHandle, attachID string, timeout time.Duration) error {
   468  	klog.V(4).Info(log("probing VolumeAttachment [id=%v]", attachID))
   469  
   470  	verifyStatus := func() (bool, error) {
   471  		volumeAttachment, err := c.plugin.volumeAttachmentLister.Get(attachID)
   472  		if err != nil {
   473  			if !apierrors.IsNotFound(err) {
   474  				return false, errors.New(log("detacher.WaitForDetach failed for volume [%s] (will continue to try): %v", volumeHandle, err))
   475  			}
   476  
   477  			// Detachment successful.
   478  			klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] not found, object deleted", attachID, volumeHandle))
   479  			return true, nil
   480  		}
   481  
   482  		// Detachment is only "successful" once the VolumeAttachment is deleted, however we perform
   483  		// this check to make sure the object does not contain any detach errors.
   484  		successful, err := verifyDetachmentStatus(volumeAttachment, volumeHandle)
   485  		if err != nil {
   486  			return false, err
   487  		}
   488  		return successful, nil
   489  	}
   490  
   491  	return c.waitForVolumeAttachDetachStatusWithLister(nil, volumeHandle, attachID, timeout, verifyStatus, "Detach")
   492  }
   493  
   494  func (c *csiAttacher) waitForVolumeAttachDetachStatusWithLister(spec *volume.Spec, volumeHandle, attachID string, timeout time.Duration, verifyStatus func() (bool, error), operation string) error {
   495  	var (
   496  		initBackoff = 500 * time.Millisecond
   497  		// This is approximately the duration between consecutive ticks after two minutes (CSI timeout).
   498  		maxBackoff    = 7 * time.Second
   499  		resetDuration = time.Minute
   500  		backoffFactor = 1.05
   501  		jitter        = 0.1
   502  		clock         = &clock.RealClock{}
   503  	)
   504  	backoffMgr := wait.NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration, backoffFactor, jitter, clock)
   505  
   506  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
   507  	defer cancel()
   508  
   509  	// Get driver name from spec for better log messages. During detach spec can be nil, and it's ok for driver to be unknown.
   510  	csiDriverName, err := GetCSIDriverName(spec)
   511  	if err != nil {
   512  		csiDriverName = "unknown"
   513  		klog.V(4).Info(log("Could not find CSI driver name in spec for volume [%v]", volumeHandle))
   514  	}
   515  
   516  	for {
   517  		t := backoffMgr.Backoff()
   518  		select {
   519  		case <-t.C():
   520  			successful, err := verifyStatus()
   521  			if err != nil {
   522  				return err
   523  			}
   524  			if successful {
   525  				return nil
   526  			}
   527  		case <-ctx.Done():
   528  			t.Stop()
   529  			klog.Error(log("%s timeout after %v [volume=%v; attachment.ID=%v]", operation, timeout, volumeHandle, attachID))
   530  			return fmt.Errorf("timed out waiting for external-attacher of %v CSI driver to %v volume %v", csiDriverName, strings.ToLower(operation), volumeHandle)
   531  		}
   532  	}
   533  }
   534  
   535  func (c *csiAttacher) waitForVolumeAttachDetachStatus(attach *storage.VolumeAttachment, volumeHandle, attachID string,
   536  	timer *time.Timer, timeout time.Duration, verifyStatus verifyAttachDetachStatus) error {
   537  	successful, err := verifyStatus(attach, volumeHandle)
   538  	if err != nil {
   539  		return err
   540  	}
   541  	if successful {
   542  		return nil
   543  	}
   544  
   545  	watcher, err := c.k8s.StorageV1().VolumeAttachments().Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: attachID, ResourceVersion: attach.ResourceVersion}))
   546  	if err != nil {
   547  		return fmt.Errorf("watch error:%v for volume %v", err, volumeHandle)
   548  	}
   549  
   550  	ch := watcher.ResultChan()
   551  	defer watcher.Stop()
   552  	for {
   553  		select {
   554  		case event, ok := <-ch:
   555  			if !ok {
   556  				klog.Errorf("[attachment.ID=%v] watch channel had been closed", attachID)
   557  				return errors.New("volume attachment watch channel had been closed")
   558  			}
   559  
   560  			switch event.Type {
   561  			case watch.Added, watch.Modified:
   562  				attach, _ := event.Object.(*storage.VolumeAttachment)
   563  				successful, err := verifyStatus(attach, volumeHandle)
   564  				if err != nil {
   565  					return err
   566  				}
   567  				if successful {
   568  					return nil
   569  				}
   570  			case watch.Deleted:
   571  				// set attach nil to get different results
   572  				// for detachment, a deleted event means successful detachment, should return success
   573  				// for attachment, should return fail
   574  				if successful, err := verifyStatus(nil, volumeHandle); !successful {
   575  					return err
   576  				}
   577  				klog.V(4).Info(log("VolumeAttachment object [%v] for volume [%v] has been deleted", attachID, volumeHandle))
   578  				return nil
   579  
   580  			case watch.Error:
   581  				klog.Warningf("waitForVolumeAttachDetachInternal received watch error: %v", event)
   582  			}
   583  
   584  		case <-timer.C:
   585  			klog.Error(log("attachdetacher.WaitForDetach timeout after %v [volume=%v; attachment.ID=%v]", timeout, volumeHandle, attachID))
   586  			return fmt.Errorf("attachdetachment timeout for volume %v", volumeHandle)
   587  		}
   588  	}
   589  }
   590  
   591  func (c *csiAttacher) UnmountDevice(deviceMountPath string) error {
   592  	klog.V(4).Info(log("attacher.UnmountDevice(%s)", deviceMountPath))
   593  
   594  	// Setup
   595  	var driverName, volID string
   596  	dataDir := filepath.Dir(deviceMountPath)
   597  	data, err := loadVolumeData(dataDir, volDataFileName)
   598  	if err == nil {
   599  		driverName = data[volDataKey.driverName]
   600  		volID = data[volDataKey.volHandle]
   601  	} else {
   602  		if errors.Is(err, os.ErrNotExist) {
   603  			klog.V(4).Info(log("attacher.UnmountDevice skipped because volume data file [%s] does not exist", dataDir))
   604  			return nil
   605  		}
   606  
   607  		klog.Errorf(log("attacher.UnmountDevice failed to get driver and volume name from device mount path: %v", err))
   608  		return err
   609  	}
   610  
   611  	if c.csiClient == nil {
   612  		c.csiClient, err = newCsiDriverClient(csiDriverName(driverName))
   613  		if err != nil {
   614  			// Treat the absence of the CSI driver as a transient error
   615  			// See https://github.com/kubernetes/kubernetes/issues/120268
   616  			return volumetypes.NewTransientOperationFailure(log("attacher.UnmountDevice failed to create newCsiDriverClient: %v", err))
   617  		}
   618  	}
   619  	csi := c.csiClient
   620  
   621  	// could not get whether this is migrated because there is no spec
   622  	ctx, cancel := createCSIOperationContext(nil, csiTimeout)
   623  	defer cancel()
   624  	// Check whether "STAGE_UNSTAGE_VOLUME" is set
   625  	stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
   626  	if err != nil {
   627  		return errors.New(log("attacher.UnmountDevice failed to check whether STAGE_UNSTAGE_VOLUME set: %v", err))
   628  	}
   629  	if !stageUnstageSet {
   630  		klog.Infof(log("attacher.UnmountDevice STAGE_UNSTAGE_VOLUME capability not set. Skipping UnmountDevice..."))
   631  		// Just	delete the global directory + json file
   632  		if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
   633  			return errors.New(log("failed to clean up global mount %s: %s", dataDir, err))
   634  		}
   635  
   636  		return nil
   637  	}
   638  
   639  	// Start UnmountDevice
   640  	err = csi.NodeUnstageVolume(ctx,
   641  		volID,
   642  		deviceMountPath)
   643  
   644  	if err != nil {
   645  		return errors.New(log("attacher.UnmountDevice failed: %v", err))
   646  	}
   647  
   648  	// Delete the global directory + json file
   649  	if err := removeMountDir(c.plugin, deviceMountPath); err != nil {
   650  		return errors.New(log("failed to clean up global mount %s: %s", dataDir, err))
   651  	}
   652  
   653  	klog.V(4).Infof(log("attacher.UnmountDevice successfully requested NodeUnStageVolume [%s]", deviceMountPath))
   654  	return nil
   655  }
   656  
   657  // getAttachmentName returns csi-<sha256(volName,csiDriverName,NodeName)>
   658  func getAttachmentName(volName, csiDriverName, nodeName string) string {
   659  	result := sha256.Sum256([]byte(fmt.Sprintf("%s%s%s", volName, csiDriverName, nodeName)))
   660  	return fmt.Sprintf("csi-%x", result)
   661  }
   662  
   663  func makeDeviceMountPath(plugin *csiPlugin, spec *volume.Spec) (string, error) {
   664  	if spec == nil {
   665  		return "", errors.New("makeDeviceMountPath failed, spec is nil")
   666  	}
   667  
   668  	driver, err := GetCSIDriverName(spec)
   669  	if err != nil {
   670  		return "", err
   671  	}
   672  	if driver == "" {
   673  		return "", errors.New("makeDeviceMountPath failed, csi source driver name is empty")
   674  	}
   675  
   676  	csiSource, err := getPVSourceFromSpec(spec)
   677  	if err != nil {
   678  		return "", errors.New(log("makeDeviceMountPath failed to get CSIPersistentVolumeSource: %v", err))
   679  	}
   680  
   681  	if csiSource.VolumeHandle == "" {
   682  		return "", errors.New("makeDeviceMountPath failed, CSIPersistentVolumeSource volume handle is empty")
   683  	}
   684  
   685  	result := sha256.Sum256([]byte(fmt.Sprintf("%s", csiSource.VolumeHandle)))
   686  	volSha := fmt.Sprintf("%x", result)
   687  	return filepath.Join(plugin.host.GetPluginDir(plugin.GetPluginName()), driver, volSha, globalMountInGlobalPath), nil
   688  }
   689  
   690  func verifyAttachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
   691  	// when we received a deleted event during attachment, fail fast
   692  	if attachment == nil {
   693  		klog.Error(log("VolumeAttachment [%s] has been deleted, will not continue to wait for attachment", volumeHandle))
   694  		return false, errors.New("volume attachment has been deleted")
   695  	}
   696  	// if being deleted, fail fast
   697  	if attachment.GetDeletionTimestamp() != nil {
   698  		klog.Error(log("VolumeAttachment [%s] has deletion timestamp, will not continue to wait for attachment", attachment.Name))
   699  		return false, errors.New("volume attachment is being deleted")
   700  	}
   701  	// attachment OK
   702  	if attachment.Status.Attached {
   703  		return true, nil
   704  	}
   705  	// driver reports attach error
   706  	attachErr := attachment.Status.AttachError
   707  	if attachErr != nil {
   708  		klog.Error(log("attachment for %v failed: %v", volumeHandle, attachErr.Message))
   709  		return false, errors.New(attachErr.Message)
   710  	}
   711  	return false, nil
   712  }
   713  
   714  func verifyDetachmentStatus(attachment *storage.VolumeAttachment, volumeHandle string) (bool, error) {
   715  	// when we received a deleted event during detachment
   716  	// it means we have successfully detached it.
   717  	if attachment == nil {
   718  		return true, nil
   719  	}
   720  	// driver reports detach error
   721  	detachErr := attachment.Status.DetachError
   722  	if detachErr != nil {
   723  		klog.Error(log("detachment for VolumeAttachment for volume [%s] failed: %v", volumeHandle, detachErr.Message))
   724  		return false, errors.New(detachErr.Message)
   725  	}
   726  	return false, nil
   727  }
   728  

View as plain text