...

Source file src/k8s.io/kubernetes/pkg/volume/csi/csi_block.go

Documentation: k8s.io/kubernetes/pkg/volume/csi

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  /*
    18  This file defines block volume related methods for CSI driver.
    19  CSI driver is responsible for staging/publishing volumes to their staging/publish paths.
    20  Mapping and unmapping of a device in a publish path to its global map path and its
    21  pod device map path are done by operation_executor through MapBlockVolume/UnmapBlockVolume
    22  (MapBlockVolume and UnmapBlockVolume take care for lock, symlink, and bind mount).
    23  
    24  Summary of block volume related CSI driver's methods are as follows:
    25   - GetGlobalMapPath returns a global map path,
    26   - GetPodDeviceMapPath returns a pod device map path and filename,
    27   - SetUpDevice calls CSI's NodeStageVolume and stage a volume to its staging path,
    28   - MapPodDevice calls CSI's NodePublishVolume and publish a volume to its publish path,
    29   - UnmapPodDevice calls CSI's NodeUnpublishVolume and unpublish a volume from its publish path,
    30   - TearDownDevice calls CSI's NodeUnstageVolume and unstage a volume from its staging path.
    31  
    32  These methods are called by below sequences:
    33   - operation_executor.MountVolume
    34     - csi.GetGlobalMapPath
    35     - csi.SetupDevice
    36       - NodeStageVolume
    37     - ASW.MarkDeviceAsMounted
    38     - csi.GetPodDeviceMapPath
    39     - csi.MapPodDevice
    40       - NodePublishVolume
    41     - util.MapBlockVolume
    42     - ASW.MarkVolumeAsMounted
    43  
    44   - operation_executor.UnmountVolume
    45     - csi.GetPodDeviceMapPath
    46     - util.UnmapBlockVolume
    47     - csi.UnmapPodDevice
    48       - NodeUnpublishVolume
    49     - ASW.MarkVolumeAsUnmounted
    50  
    51   - operation_executor.UnmountDevice
    52     - csi.TearDownDevice
    53       - NodeUnstageVolume
    54     - ASW.MarkDeviceAsUnmounted
    55  
    56  After successful MountVolume for block volume, directory structure will be like below:
    57    /dev/loopX ... Descriptor lock(Loopback device to mapFile under global map path)
    58    /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/ ... Global map path
    59    /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/{specName}/dev/{podUID} ... MapFile(Bind mount to publish Path)
    60    /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/staging/{specName} ... Staging path
    61    /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID} ... Publish path
    62    /var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/ ... Pod device map path
    63    /var/lib/kubelet/pods/{podUID}/volumeDevices/kubernetes.io~csi/{specName} ... MapFile(Symlink to publish path)
    64  */
    65  
    66  package csi
    67  
    68  import (
    69  	"context"
    70  	"errors"
    71  	"fmt"
    72  	"os"
    73  	"path/filepath"
    74  
    75  	v1 "k8s.io/api/core/v1"
    76  	storage "k8s.io/api/storage/v1"
    77  	meta "k8s.io/apimachinery/pkg/apis/meta/v1"
    78  	"k8s.io/apimachinery/pkg/types"
    79  	"k8s.io/client-go/kubernetes"
    80  	"k8s.io/klog/v2"
    81  	"k8s.io/kubernetes/pkg/util/removeall"
    82  	"k8s.io/kubernetes/pkg/volume"
    83  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    84  	utilstrings "k8s.io/utils/strings"
    85  )
    86  
    87  type csiBlockMapper struct {
    88  	csiClientGetter
    89  	k8s        kubernetes.Interface
    90  	plugin     *csiPlugin
    91  	driverName csiDriverName
    92  	specName   string
    93  	volumeID   string
    94  	readOnly   bool
    95  	spec       *volume.Spec
    96  	pod        *v1.Pod
    97  	podUID     types.UID
    98  	volume.MetricsProvider
    99  }
   100  
   101  var _ volume.BlockVolumeMapper = &csiBlockMapper{}
   102  var _ volume.CustomBlockVolumeMapper = &csiBlockMapper{}
   103  
   104  // GetGlobalMapPath returns a global map path (on the node) to a device file which will be symlinked to
   105  // Example: plugins/kubernetes.io/csi/volumeDevices/{specName}/dev
   106  func (m *csiBlockMapper) GetGlobalMapPath(spec *volume.Spec) (string, error) {
   107  	dir := getVolumeDevicePluginDir(m.specName, m.plugin.host)
   108  	klog.V(4).Infof(log("blockMapper.GetGlobalMapPath = %s", dir))
   109  	return dir, nil
   110  }
   111  
   112  // GetStagingPath returns a staging path for a directory (on the node) that should be used on NodeStageVolume/NodeUnstageVolume
   113  // Example: plugins/kubernetes.io/csi/volumeDevices/staging/{specName}
   114  func (m *csiBlockMapper) GetStagingPath() string {
   115  	return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "staging", m.specName)
   116  }
   117  
   118  // SupportsMetrics returns true for csiBlockMapper as it initializes the
   119  // MetricsProvider.
   120  func (m *csiBlockMapper) SupportsMetrics() bool {
   121  	return true
   122  }
   123  
   124  // getPublishDir returns path to a directory, where the volume is published to each pod.
   125  // Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}
   126  func (m *csiBlockMapper) getPublishDir() string {
   127  	return filepath.Join(m.plugin.host.GetVolumeDevicePluginDir(CSIPluginName), "publish", m.specName)
   128  }
   129  
   130  // getPublishPath returns a publish path for a file (on the node) that should be used on NodePublishVolume/NodeUnpublishVolume
   131  // Example: plugins/kubernetes.io/csi/volumeDevices/publish/{specName}/{podUID}
   132  func (m *csiBlockMapper) getPublishPath() string {
   133  	return filepath.Join(m.getPublishDir(), string(m.podUID))
   134  }
   135  
   136  // GetPodDeviceMapPath returns pod's device file which will be mapped to a volume
   137  // returns: pods/{podUID}/volumeDevices/kubernetes.io~csi, {specName}
   138  func (m *csiBlockMapper) GetPodDeviceMapPath() (string, string) {
   139  	path := m.plugin.host.GetPodVolumeDeviceDir(m.podUID, utilstrings.EscapeQualifiedName(CSIPluginName))
   140  	klog.V(4).Infof(log("blockMapper.GetPodDeviceMapPath [path=%s; name=%s]", path, m.specName))
   141  	return path, m.specName
   142  }
   143  
   144  // stageVolumeForBlock stages a block volume to stagingPath
   145  func (m *csiBlockMapper) stageVolumeForBlock(
   146  	ctx context.Context,
   147  	csi csiClient,
   148  	accessMode v1.PersistentVolumeAccessMode,
   149  	csiSource *v1.CSIPersistentVolumeSource,
   150  	attachment *storage.VolumeAttachment,
   151  ) (string, error) {
   152  	klog.V(4).Infof(log("blockMapper.stageVolumeForBlock called"))
   153  
   154  	stagingPath := m.GetStagingPath()
   155  	klog.V(4).Infof(log("blockMapper.stageVolumeForBlock stagingPath set [%s]", stagingPath))
   156  
   157  	// Check whether "STAGE_UNSTAGE_VOLUME" is set
   158  	stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
   159  	if err != nil {
   160  		return "", errors.New(log("blockMapper.stageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
   161  	}
   162  	if !stageUnstageSet {
   163  		klog.Infof(log("blockMapper.stageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping MountDevice..."))
   164  		return "", nil
   165  	}
   166  	publishVolumeInfo := map[string]string{}
   167  	if attachment != nil {
   168  		publishVolumeInfo = attachment.Status.AttachmentMetadata
   169  	}
   170  	nodeStageSecrets := map[string]string{}
   171  	if csiSource.NodeStageSecretRef != nil {
   172  		nodeStageSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodeStageSecretRef)
   173  		if err != nil {
   174  			return "", fmt.Errorf("failed to get NodeStageSecretRef %s/%s: %v",
   175  				csiSource.NodeStageSecretRef.Namespace, csiSource.NodeStageSecretRef.Name, err)
   176  		}
   177  	}
   178  
   179  	// Creating a stagingPath directory before call to NodeStageVolume
   180  	if err := os.MkdirAll(stagingPath, 0750); err != nil {
   181  		return "", errors.New(log("blockMapper.stageVolumeForBlock failed to create dir %s: %v", stagingPath, err))
   182  	}
   183  	klog.V(4).Info(log("blockMapper.stageVolumeForBlock created stagingPath directory successfully [%s]", stagingPath))
   184  
   185  	// Request to stage a block volume to stagingPath.
   186  	// Expected implementation for driver is creating driver specific resource on stagingPath and
   187  	// attaching the block volume to the node.
   188  	err = csi.NodeStageVolume(ctx,
   189  		csiSource.VolumeHandle,
   190  		publishVolumeInfo,
   191  		stagingPath,
   192  		fsTypeBlockName,
   193  		accessMode,
   194  		nodeStageSecrets,
   195  		csiSource.VolumeAttributes,
   196  		nil, /* MountOptions */
   197  		nil /* fsGroup */)
   198  
   199  	if err != nil {
   200  		return "", err
   201  	}
   202  
   203  	klog.V(4).Infof(log("blockMapper.stageVolumeForBlock successfully requested NodeStageVolume [%s]", stagingPath))
   204  	return stagingPath, nil
   205  }
   206  
   207  // publishVolumeForBlock publishes a block volume to publishPath
   208  func (m *csiBlockMapper) publishVolumeForBlock(
   209  	ctx context.Context,
   210  	csi csiClient,
   211  	accessMode v1.PersistentVolumeAccessMode,
   212  	csiSource *v1.CSIPersistentVolumeSource,
   213  	attachment *storage.VolumeAttachment,
   214  ) (string, error) {
   215  	klog.V(4).Infof(log("blockMapper.publishVolumeForBlock called"))
   216  
   217  	publishVolumeInfo := map[string]string{}
   218  	if attachment != nil {
   219  		publishVolumeInfo = attachment.Status.AttachmentMetadata
   220  	}
   221  
   222  	// Inject pod information into volume_attributes
   223  	volAttribs := csiSource.VolumeAttributes
   224  	podInfoEnabled, err := m.plugin.podInfoEnabled(string(m.driverName))
   225  	if err != nil {
   226  		return "", errors.New(log("blockMapper.publishVolumeForBlock failed to assemble volume attributes: %v", err))
   227  	}
   228  	volumeLifecycleMode, err := m.plugin.getVolumeLifecycleMode(m.spec)
   229  	if err != nil {
   230  		return "", errors.New(log("blockMapper.publishVolumeForBlock failed to get VolumeLifecycleMode: %v", err))
   231  	}
   232  	if podInfoEnabled {
   233  		volAttribs = mergeMap(volAttribs, getPodInfoAttrs(m.pod, volumeLifecycleMode))
   234  	}
   235  
   236  	nodePublishSecrets := map[string]string{}
   237  	if csiSource.NodePublishSecretRef != nil {
   238  		nodePublishSecrets, err = getCredentialsFromSecret(m.k8s, csiSource.NodePublishSecretRef)
   239  		if err != nil {
   240  			return "", errors.New(log("blockMapper.publishVolumeForBlock failed to get NodePublishSecretRef %s/%s: %v",
   241  				csiSource.NodePublishSecretRef.Namespace, csiSource.NodePublishSecretRef.Name, err))
   242  		}
   243  	}
   244  
   245  	publishPath := m.getPublishPath()
   246  	// Setup a parent directory for publishPath before call to NodePublishVolume
   247  	publishDir := filepath.Dir(publishPath)
   248  	if err := os.MkdirAll(publishDir, 0750); err != nil {
   249  		return "", errors.New(log("blockMapper.publishVolumeForBlock failed to create dir %s:  %v", publishDir, err))
   250  	}
   251  	klog.V(4).Info(log("blockMapper.publishVolumeForBlock created directory for publishPath successfully [%s]", publishDir))
   252  
   253  	// Request to publish a block volume to publishPath.
   254  	// Expectation for driver is to place a block volume on the publishPath, by bind-mounting the device file on the publishPath or
   255  	// creating device file on the publishPath.
   256  	// Parent directory for publishPath is created by k8s, but driver is responsible for creating publishPath itself.
   257  	// If driver doesn't implement NodeStageVolume, attaching the block volume to the node may be done, here.
   258  	err = csi.NodePublishVolume(
   259  		ctx,
   260  		m.volumeID,
   261  		m.readOnly,
   262  		m.GetStagingPath(),
   263  		publishPath,
   264  		accessMode,
   265  		publishVolumeInfo,
   266  		volAttribs,
   267  		nodePublishSecrets,
   268  		fsTypeBlockName,
   269  		[]string{}, /* mountOptions */
   270  		nil,        /* fsGroup */
   271  	)
   272  
   273  	if err != nil {
   274  		return "", err
   275  	}
   276  
   277  	return publishPath, nil
   278  }
   279  
   280  // SetUpDevice ensures the device is attached returns path where the device is located.
   281  func (m *csiBlockMapper) SetUpDevice() (string, error) {
   282  	klog.V(4).Infof(log("blockMapper.SetUpDevice called"))
   283  
   284  	// Get csiSource from spec
   285  	if m.spec == nil {
   286  		return "", errors.New(log("blockMapper.SetUpDevice spec is nil"))
   287  	}
   288  
   289  	csiSource, err := getCSISourceFromSpec(m.spec)
   290  	if err != nil {
   291  		return "", errors.New(log("blockMapper.SetUpDevice failed to get CSI persistent source: %v", err))
   292  	}
   293  
   294  	driverName := csiSource.Driver
   295  	skip, err := m.plugin.skipAttach(driverName)
   296  	if err != nil {
   297  		return "", errors.New(log("blockMapper.SetupDevice failed to check CSIDriver for %s: %v", driverName, err))
   298  	}
   299  
   300  	var attachment *storage.VolumeAttachment
   301  	if !skip {
   302  		// Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
   303  		nodeName := string(m.plugin.host.GetNodeName())
   304  		attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
   305  		attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
   306  		if err != nil {
   307  			return "", errors.New(log("blockMapper.SetupDevice failed to get volume attachment [id=%v]: %v", attachID, err))
   308  		}
   309  	}
   310  
   311  	//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
   312  	accessMode := v1.ReadWriteOnce
   313  	if m.spec.PersistentVolume.Spec.AccessModes != nil {
   314  		accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
   315  	}
   316  
   317  	ctx, cancel := context.WithTimeout(context.Background(), csiTimeout)
   318  	defer cancel()
   319  
   320  	csiClient, err := m.csiClientGetter.Get()
   321  	if err != nil {
   322  		// Treat the absence of the CSI driver as a transient error
   323  		// See https://github.com/kubernetes/kubernetes/issues/120268
   324  		return "", volumetypes.NewTransientOperationFailure(log("blockMapper.SetUpDevice failed to get CSI client: %v", err))
   325  	}
   326  
   327  	// Call NodeStageVolume
   328  	stagingPath, err := m.stageVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
   329  	if err != nil {
   330  		if volumetypes.IsOperationFinishedError(err) {
   331  			cleanupErr := m.cleanupOrphanDeviceFiles()
   332  			if cleanupErr != nil {
   333  				// V(4) for not so serious error
   334  				klog.V(4).Infof("Failed to clean up block volume directory %s", cleanupErr)
   335  			}
   336  		}
   337  		return "", err
   338  	}
   339  
   340  	return stagingPath, nil
   341  }
   342  
   343  func (m *csiBlockMapper) MapPodDevice() (string, error) {
   344  	klog.V(4).Infof(log("blockMapper.MapPodDevice called"))
   345  
   346  	// Get csiSource from spec
   347  	if m.spec == nil {
   348  		return "", errors.New(log("blockMapper.MapPodDevice spec is nil"))
   349  	}
   350  
   351  	csiSource, err := getCSISourceFromSpec(m.spec)
   352  	if err != nil {
   353  		return "", errors.New(log("blockMapper.MapPodDevice failed to get CSI persistent source: %v", err))
   354  	}
   355  
   356  	driverName := csiSource.Driver
   357  	skip, err := m.plugin.skipAttach(driverName)
   358  	if err != nil {
   359  		return "", errors.New(log("blockMapper.MapPodDevice failed to check CSIDriver for %s: %v", driverName, err))
   360  	}
   361  
   362  	var attachment *storage.VolumeAttachment
   363  	if !skip {
   364  		// Search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
   365  		nodeName := string(m.plugin.host.GetNodeName())
   366  		attachID := getAttachmentName(csiSource.VolumeHandle, csiSource.Driver, nodeName)
   367  		attachment, err = m.k8s.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
   368  		if err != nil {
   369  			return "", errors.New(log("blockMapper.MapPodDevice failed to get volume attachment [id=%v]: %v", attachID, err))
   370  		}
   371  	}
   372  
   373  	//TODO (vladimirvivien) implement better AccessModes mapping between k8s and CSI
   374  	accessMode := v1.ReadWriteOnce
   375  	if m.spec.PersistentVolume.Spec.AccessModes != nil {
   376  		accessMode = m.spec.PersistentVolume.Spec.AccessModes[0]
   377  	}
   378  
   379  	ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
   380  	defer cancel()
   381  
   382  	csiClient, err := m.csiClientGetter.Get()
   383  	if err != nil {
   384  		// Treat the absence of the CSI driver as a transient error
   385  		// See https://github.com/kubernetes/kubernetes/issues/120268
   386  		return "", volumetypes.NewTransientOperationFailure(log("blockMapper.MapPodDevice failed to get CSI client: %v", err))
   387  	}
   388  
   389  	// Call NodePublishVolume
   390  	publishPath, err := m.publishVolumeForBlock(ctx, csiClient, accessMode, csiSource, attachment)
   391  	if err != nil {
   392  		return "", err
   393  	}
   394  
   395  	return publishPath, nil
   396  }
   397  
   398  var _ volume.BlockVolumeUnmapper = &csiBlockMapper{}
   399  var _ volume.CustomBlockVolumeUnmapper = &csiBlockMapper{}
   400  
   401  // unpublishVolumeForBlock unpublishes a block volume from publishPath
   402  func (m *csiBlockMapper) unpublishVolumeForBlock(ctx context.Context, csi csiClient, publishPath string) error {
   403  	// Request to unpublish a block volume from publishPath.
   404  	// Expectation for driver is to remove block volume from the publishPath, by unmounting bind-mounted device file
   405  	// or deleting device file.
   406  	// Driver is responsible for deleting publishPath itself.
   407  	// If driver doesn't implement NodeUnstageVolume, detaching the block volume from the node may be done, here.
   408  	if err := csi.NodeUnpublishVolume(ctx, m.volumeID, publishPath); err != nil {
   409  		return errors.New(log("blockMapper.unpublishVolumeForBlock failed: %v", err))
   410  	}
   411  	klog.V(4).Infof(log("blockMapper.unpublishVolumeForBlock NodeUnpublished successfully [%s]", publishPath))
   412  
   413  	return nil
   414  }
   415  
   416  // unstageVolumeForBlock unstages a block volume from stagingPath
   417  func (m *csiBlockMapper) unstageVolumeForBlock(ctx context.Context, csi csiClient, stagingPath string) error {
   418  	// Check whether "STAGE_UNSTAGE_VOLUME" is set
   419  	stageUnstageSet, err := csi.NodeSupportsStageUnstage(ctx)
   420  	if err != nil {
   421  		return errors.New(log("blockMapper.unstageVolumeForBlock failed to check STAGE_UNSTAGE_VOLUME capability: %v", err))
   422  	}
   423  	if !stageUnstageSet {
   424  		klog.Infof(log("blockMapper.unstageVolumeForBlock STAGE_UNSTAGE_VOLUME capability not set. Skipping unstageVolumeForBlock ..."))
   425  		return nil
   426  	}
   427  
   428  	// Request to unstage a block volume from stagingPath.
   429  	// Expected implementation for driver is removing driver specific resource in stagingPath and
   430  	// detaching the block volume from the node.
   431  	if err := csi.NodeUnstageVolume(ctx, m.volumeID, stagingPath); err != nil {
   432  		return errors.New(log("blockMapper.unstageVolumeForBlock failed: %v", err))
   433  	}
   434  	klog.V(4).Infof(log("blockMapper.unstageVolumeForBlock NodeUnstageVolume successfully [%s]", stagingPath))
   435  
   436  	// Remove stagingPath directory and its contents
   437  	if err := os.RemoveAll(stagingPath); err != nil {
   438  		return errors.New(log("blockMapper.unstageVolumeForBlock failed to remove staging path after NodeUnstageVolume() error [%s]: %v", stagingPath, err))
   439  	}
   440  
   441  	return nil
   442  }
   443  
   444  // TearDownDevice removes traces of the SetUpDevice.
   445  func (m *csiBlockMapper) TearDownDevice(globalMapPath, devicePath string) error {
   446  	ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
   447  	defer cancel()
   448  
   449  	csiClient, err := m.csiClientGetter.Get()
   450  	if err != nil {
   451  		// Treat the absence of the CSI driver as a transient error
   452  		// See https://github.com/kubernetes/kubernetes/issues/120268
   453  		return volumetypes.NewTransientOperationFailure(log("blockMapper.TearDownDevice failed to get CSI client: %v", err))
   454  	}
   455  
   456  	// Call NodeUnstageVolume
   457  	stagingPath := m.GetStagingPath()
   458  	if _, err := os.Stat(stagingPath); err != nil {
   459  		if os.IsNotExist(err) {
   460  			klog.V(4).Infof(log("blockMapper.TearDownDevice stagingPath(%s) has already been deleted, skip calling NodeUnstageVolume", stagingPath))
   461  		} else {
   462  			return err
   463  		}
   464  	} else {
   465  		err := m.unstageVolumeForBlock(ctx, csiClient, stagingPath)
   466  		if err != nil {
   467  			return err
   468  		}
   469  	}
   470  	if err = m.cleanupOrphanDeviceFiles(); err != nil {
   471  		// V(4) for not so serious error
   472  		klog.V(4).Infof("Failed to clean up block volume directory %s", err)
   473  	}
   474  
   475  	return nil
   476  }
   477  
   478  // Clean up any orphan files / directories when a block volume is being unstaged.
   479  // At this point we can be sure that there is no pod using the volume and all
   480  // files are indeed orphaned.
   481  func (m *csiBlockMapper) cleanupOrphanDeviceFiles() error {
   482  	// Remove artifacts of NodePublish.
   483  	// publishDir: xxx/plugins/kubernetes.io/csi/volumeDevices/publish/<volume name>
   484  	// Each PublishVolume() created a subdirectory there. Since everything should be
   485  	// already unpublished at this point, the directory should be empty by now.
   486  	publishDir := m.getPublishDir()
   487  	if err := os.Remove(publishDir); err != nil && !os.IsNotExist(err) {
   488  		return errors.New(log("failed to remove publish directory [%s]: %v", publishDir, err))
   489  	}
   490  
   491  	// Remove artifacts of NodeStage.
   492  	// stagingPath: xxx/plugins/kubernetes.io/csi/volumeDevices/staging/<volume name>
   493  	stagingPath := m.GetStagingPath()
   494  	if err := os.Remove(stagingPath); err != nil && !os.IsNotExist(err) {
   495  		return errors.New(log("failed to delete volume staging path [%s]: %v", stagingPath, err))
   496  	}
   497  
   498  	// Remove everything under xxx/plugins/kubernetes.io/csi/volumeDevices/<volume name>.
   499  	// At this point it contains only "data/vol_data.json" and empty "dev/".
   500  	volumeDir := getVolumePluginDir(m.specName, m.plugin.host)
   501  	mounter := m.plugin.host.GetMounter(m.plugin.GetPluginName())
   502  	if err := removeall.RemoveAllOneFilesystem(mounter, volumeDir); err != nil {
   503  		return err
   504  	}
   505  
   506  	return nil
   507  }
   508  
   509  // UnmapPodDevice unmaps the block device path.
   510  func (m *csiBlockMapper) UnmapPodDevice() error {
   511  	publishPath := m.getPublishPath()
   512  
   513  	csiClient, err := m.csiClientGetter.Get()
   514  	if err != nil {
   515  		// Treat the absence of the CSI driver as a transient error
   516  		// See https://github.com/kubernetes/kubernetes/issues/120268
   517  		return volumetypes.NewTransientOperationFailure(log("blockMapper.UnmapPodDevice failed to get CSI client: %v", err))
   518  	}
   519  
   520  	ctx, cancel := createCSIOperationContext(m.spec, csiTimeout)
   521  	defer cancel()
   522  
   523  	// Call NodeUnpublishVolume.
   524  	// Even if publishPath does not exist - previous NodePublish may have timed out
   525  	// and Kubernetes makes sure that the operation is finished.
   526  	return m.unpublishVolumeForBlock(ctx, csiClient, publishPath)
   527  }
   528  

View as plain text