...

Source file src/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_generator.go

Documentation: k8s.io/kubernetes/pkg/volume/util/operationexecutor

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package operationexecutor
    18  
    19  import (
    20  	"context"
    21  	goerrors "errors"
    22  	"fmt"
    23  	"os"
    24  	"path/filepath"
    25  	"strings"
    26  	"time"
    27  
    28  	"k8s.io/apimachinery/pkg/api/resource"
    29  
    30  	v1 "k8s.io/api/core/v1"
    31  	"k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    35  	clientset "k8s.io/client-go/kubernetes"
    36  	"k8s.io/client-go/tools/record"
    37  	volerr "k8s.io/cloud-provider/volume/errors"
    38  	storagehelpers "k8s.io/component-helpers/storage/volume"
    39  	csitrans "k8s.io/csi-translation-lib"
    40  	"k8s.io/klog/v2"
    41  	v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
    42  	"k8s.io/kubernetes/pkg/features"
    43  	kevents "k8s.io/kubernetes/pkg/kubelet/events"
    44  	"k8s.io/kubernetes/pkg/volume"
    45  	"k8s.io/kubernetes/pkg/volume/util"
    46  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
    47  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    48  	"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
    49  )
    50  
    51  const (
    52  	unknownVolumePlugin                  string = "UnknownVolumePlugin"
    53  	unknownAttachableVolumePlugin        string = "UnknownAttachableVolumePlugin"
    54  	DetachOperationName                  string = "volume_detach"
    55  	VerifyControllerAttachedVolumeOpName string = "verify_controller_attached_volume"
    56  )
    57  
    58  // InTreeToCSITranslator contains methods required to check migratable status
    59  // and perform translations from InTree PVs and Inline to CSI
    60  type InTreeToCSITranslator interface {
    61  	IsPVMigratable(pv *v1.PersistentVolume) bool
    62  	IsInlineMigratable(vol *v1.Volume) bool
    63  	IsMigratableIntreePluginByName(inTreePluginName string) bool
    64  	GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
    65  	GetCSINameFromInTreeName(pluginName string) (string, error)
    66  	TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
    67  	TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
    68  }
    69  
    70  var _ OperationGenerator = &operationGenerator{}
    71  
    72  type operationGenerator struct {
    73  	// Used to fetch objects from the API server like Node in the
    74  	// VerifyControllerAttachedVolume operation.
    75  	kubeClient clientset.Interface
    76  
    77  	// volumePluginMgr is the volume plugin manager used to create volume
    78  	// plugin objects.
    79  	volumePluginMgr *volume.VolumePluginMgr
    80  
    81  	// recorder is used to record events in the API server
    82  	recorder record.EventRecorder
    83  
    84  	// blkUtil provides volume path related operations for block volume
    85  	blkUtil volumepathhandler.BlockVolumePathHandler
    86  
    87  	translator InTreeToCSITranslator
    88  }
    89  
    90  type inTreeResizeResponse struct {
    91  	pvc *v1.PersistentVolumeClaim
    92  	pv  *v1.PersistentVolume
    93  
    94  	err error
    95  	// indicates that resize operation was called on underlying volume driver
    96  	// mainly useful for testing.
    97  	resizeCalled bool
    98  }
    99  
   100  // NewOperationGenerator is returns instance of operationGenerator
   101  func NewOperationGenerator(kubeClient clientset.Interface,
   102  	volumePluginMgr *volume.VolumePluginMgr,
   103  	recorder record.EventRecorder,
   104  	blkUtil volumepathhandler.BlockVolumePathHandler) OperationGenerator {
   105  
   106  	return &operationGenerator{
   107  		kubeClient:      kubeClient,
   108  		volumePluginMgr: volumePluginMgr,
   109  		recorder:        recorder,
   110  		blkUtil:         blkUtil,
   111  		translator:      csitrans.New(),
   112  	}
   113  }
   114  
   115  // OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
   116  type OperationGenerator interface {
   117  	// Generates the MountVolume function needed to perform the mount of a volume plugin
   118  	GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations
   119  
   120  	// Generates the UnmountVolume function needed to perform the unmount of a volume plugin
   121  	GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error)
   122  
   123  	// Generates the AttachVolume function needed to perform attach of a volume plugin
   124  	GenerateAttachVolumeFunc(logger klog.Logger, volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations
   125  
   126  	// Generates the DetachVolume function needed to perform the detach of a volume plugin
   127  	GenerateDetachVolumeFunc(logger klog.Logger, volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
   128  
   129  	// Generates the VolumesAreAttached function needed to verify if volume plugins are attached
   130  	GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
   131  
   132  	// Generates the UnMountDevice function needed to perform the unmount of a device
   133  	GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter hostutil.HostUtils) (volumetypes.GeneratedOperations, error)
   134  
   135  	// Generates the function needed to check if the attach_detach controller has attached the volume plugin
   136  	GenerateVerifyControllerAttachedVolumeFunc(logger klog.Logger, volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
   137  
   138  	// Generates the MapVolume function needed to perform the map of a volume plugin
   139  	GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
   140  
   141  	// Generates the UnmapVolume function needed to perform the unmap of a volume plugin
   142  	GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
   143  
   144  	// Generates the UnmapDevice function needed to perform the unmap of a device
   145  	GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter hostutil.HostUtils) (volumetypes.GeneratedOperations, error)
   146  
   147  	// GetVolumePluginMgr returns volume plugin manager
   148  	GetVolumePluginMgr() *volume.VolumePluginMgr
   149  
   150  	// GetCSITranslator returns the CSI Translation Library
   151  	GetCSITranslator() InTreeToCSITranslator
   152  
   153  	GenerateBulkVolumeVerifyFunc(
   154  		map[types.NodeName][]*volume.Spec,
   155  		string,
   156  		map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
   157  
   158  	GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error)
   159  
   160  	GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error)
   161  
   162  	// Generates the volume file system resize function, which can resize volume's file system to expected size without unmounting the volume.
   163  	// Along with volumeToMount and actualStateOfWorld, the function expects current size of volume on the node as an argument. The current
   164  	// size here always refers to capacity last recorded in actualStateOfWorld from pvc.Status.Capacity
   165  	GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error)
   166  }
   167  
   168  type inTreeResizeOpts struct {
   169  	resizerName  string
   170  	pvc          *v1.PersistentVolumeClaim
   171  	pv           *v1.PersistentVolume
   172  	volumeSpec   *volume.Spec
   173  	volumePlugin volume.ExpandableVolumePlugin
   174  }
   175  
   176  type nodeResizeOperationOpts struct {
   177  	vmt                VolumeToMount
   178  	pvc                *v1.PersistentVolumeClaim
   179  	pv                 *v1.PersistentVolume
   180  	pluginResizeOpts   volume.NodeResizeOptions
   181  	volumePlugin       volume.NodeExpandableVolumePlugin
   182  	actualStateOfWorld ActualStateOfWorldMounterUpdater
   183  }
   184  
   185  func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
   186  	attachedVolumes []AttachedVolume,
   187  	nodeName types.NodeName,
   188  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
   189  	// volumesPerPlugin maps from a volume plugin to a list of volume specs which belong
   190  	// to this type of plugin
   191  	volumesPerPlugin := make(map[string][]*volume.Spec)
   192  	// volumeSpecMap maps from a volume spec to its unique volumeName which will be used
   193  	// when calling MarkVolumeAsDetached
   194  	volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName)
   195  
   196  	// Iterate each volume spec and put them into a map index by the pluginName
   197  	for _, volumeAttached := range attachedVolumes {
   198  		if volumeAttached.VolumeSpec == nil {
   199  			klog.Errorf("VerifyVolumesAreAttached.GenerateVolumesAreAttachedFunc: nil spec for volume %s", volumeAttached.VolumeName)
   200  			continue
   201  		}
   202  		volumePlugin, err :=
   203  			og.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec)
   204  		if err != nil || volumePlugin == nil {
   205  			klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginBySpec failed", err).Error())
   206  			continue
   207  		}
   208  		volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()]
   209  		if !pluginExists {
   210  			volumeSpecList = []*volume.Spec{}
   211  		}
   212  		volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
   213  		volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList
   214  		// Migration: VolumeSpecMap contains original VolumeName for use in ActualStateOfWorld
   215  		volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
   216  	}
   217  
   218  	volumesAreAttachedFunc := func() volumetypes.OperationContext {
   219  
   220  		// For each volume plugin, pass the list of volume specs to VolumesAreAttached to check
   221  		// whether the volumes are still attached.
   222  		for pluginName, volumesSpecs := range volumesPerPlugin {
   223  			attachableVolumePlugin, err :=
   224  				og.volumePluginMgr.FindAttachablePluginByName(pluginName)
   225  			if err != nil || attachableVolumePlugin == nil {
   226  				klog.Errorf(
   227  					"VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v",
   228  					pluginName,
   229  					err)
   230  				continue
   231  			}
   232  
   233  			volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
   234  			if newAttacherErr != nil {
   235  				klog.Errorf(
   236  					"VolumesAreAttached.NewAttacher failed for getting plugin %q with: %v",
   237  					pluginName,
   238  					newAttacherErr)
   239  				continue
   240  			}
   241  
   242  			attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName)
   243  			if areAttachedErr != nil {
   244  				klog.Errorf(
   245  					"VolumesAreAttached failed for checking on node %q with: %v",
   246  					nodeName,
   247  					areAttachedErr)
   248  				continue
   249  			}
   250  
   251  			for spec, check := range attached {
   252  				if !check {
   253  					actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName)
   254  					klog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.",
   255  						volumeSpecMap[spec], spec.Name(), nodeName)
   256  				}
   257  			}
   258  		}
   259  
   260  		// It is hard to differentiate migrated status for all volumes for verify_volumes_are_attached_per_node
   261  		return volumetypes.NewOperationContext(nil, nil, false)
   262  	}
   263  
   264  	return volumetypes.GeneratedOperations{
   265  		OperationName:     "verify_volumes_are_attached_per_node",
   266  		OperationFunc:     volumesAreAttachedFunc,
   267  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume("<n/a>", nil), "verify_volumes_are_attached_per_node"),
   268  		EventRecorderFunc: nil, // nil because we do not want to generate event on error
   269  	}, nil
   270  }
   271  
   272  func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
   273  	pluginNodeVolumes map[types.NodeName][]*volume.Spec,
   274  	pluginName string,
   275  	volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
   276  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
   277  
   278  	// Migration: All inputs already should be translated by caller for this
   279  	// function except volumeSpecMap which contains original volume names for
   280  	// use with actualStateOfWorld
   281  
   282  	bulkVolumeVerifyFunc := func() volumetypes.OperationContext {
   283  		attachableVolumePlugin, err :=
   284  			og.volumePluginMgr.FindAttachablePluginByName(pluginName)
   285  		if err != nil || attachableVolumePlugin == nil {
   286  			klog.Errorf(
   287  				"BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v",
   288  				pluginName,
   289  				err)
   290  			return volumetypes.NewOperationContext(nil, nil, false)
   291  		}
   292  
   293  		volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
   294  
   295  		if newAttacherErr != nil {
   296  			klog.Errorf(
   297  				"BulkVerifyVolume.NewAttacher failed for getting plugin %q with: %v",
   298  				attachableVolumePlugin,
   299  				newAttacherErr)
   300  			return volumetypes.NewOperationContext(nil, nil, false)
   301  		}
   302  		bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier)
   303  
   304  		if !ok {
   305  			klog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier)
   306  			return volumetypes.NewOperationContext(nil, nil, false)
   307  		}
   308  
   309  		attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes)
   310  		if bulkAttachErr != nil {
   311  			klog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr)
   312  			return volumetypes.NewOperationContext(nil, nil, false)
   313  		}
   314  
   315  		for nodeName, volumeSpecs := range pluginNodeVolumes {
   316  			for _, volumeSpec := range volumeSpecs {
   317  				nodeVolumeSpecs, nodeChecked := attached[nodeName]
   318  
   319  				if !nodeChecked {
   320  					klog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and leaving volume %q as attached",
   321  						nodeName,
   322  						volumeSpec.Name())
   323  					continue
   324  				}
   325  
   326  				check := nodeVolumeSpecs[volumeSpec]
   327  
   328  				if !check {
   329  					klog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and volume %q",
   330  						nodeName,
   331  						volumeSpec.Name())
   332  					actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[volumeSpec], nodeName)
   333  				}
   334  			}
   335  		}
   336  
   337  		// It is hard to differentiate migrated status for all volumes for verify_volumes_are_attached
   338  		return volumetypes.NewOperationContext(nil, nil, false)
   339  	}
   340  
   341  	return volumetypes.GeneratedOperations{
   342  		OperationName:     "verify_volumes_are_attached",
   343  		OperationFunc:     bulkVolumeVerifyFunc,
   344  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, nil), "verify_volumes_are_attached"),
   345  		EventRecorderFunc: nil, // nil because we do not want to generate event on error
   346  	}, nil
   347  
   348  }
   349  
   350  func (og *operationGenerator) GenerateAttachVolumeFunc(
   351  	logger klog.Logger,
   352  	volumeToAttach VolumeToAttach,
   353  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
   354  
   355  	attachVolumeFunc := func() volumetypes.OperationContext {
   356  		attachableVolumePlugin, err :=
   357  			og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
   358  
   359  		migrated := getMigratedStatusBySpec(volumeToAttach.VolumeSpec)
   360  
   361  		if err != nil || attachableVolumePlugin == nil {
   362  			eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.FindAttachablePluginBySpec failed", err)
   363  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   364  		}
   365  
   366  		volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
   367  		if newAttacherErr != nil {
   368  			eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.NewAttacher failed", newAttacherErr)
   369  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   370  		}
   371  
   372  		// Execute attach
   373  		devicePath, attachErr := volumeAttacher.Attach(
   374  			volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
   375  
   376  		if attachErr != nil {
   377  			uncertainNode := volumeToAttach.NodeName
   378  			if derr, ok := attachErr.(*volerr.DanglingAttachError); ok {
   379  				uncertainNode = derr.CurrentNode
   380  			}
   381  			addErr := actualStateOfWorld.MarkVolumeAsUncertain(
   382  				logger,
   383  				volumeToAttach.VolumeName,
   384  				volumeToAttach.VolumeSpec,
   385  				uncertainNode)
   386  			if addErr != nil {
   387  				klog.Errorf("AttachVolume.MarkVolumeAsUncertain fail to add the volume %q to actual state with %s", volumeToAttach.VolumeName, addErr)
   388  			}
   389  
   390  			// On failure, return error. Caller will log and retry.
   391  			eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr)
   392  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   393  		}
   394  
   395  		// Successful attach event is useful for user debugging
   396  		simpleMsg, _ := volumeToAttach.GenerateMsg("AttachVolume.Attach succeeded", "")
   397  		for _, pod := range volumeToAttach.ScheduledPods {
   398  			og.recorder.Eventf(pod, v1.EventTypeNormal, kevents.SuccessfulAttachVolume, simpleMsg)
   399  		}
   400  		klog.Infof(volumeToAttach.GenerateMsgDetailed("AttachVolume.Attach succeeded", ""))
   401  
   402  		// Update actual state of world
   403  		addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
   404  			logger, v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
   405  		if addVolumeNodeErr != nil {
   406  			// On failure, return error. Caller will log and retry.
   407  			eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
   408  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   409  		}
   410  
   411  		return volumetypes.NewOperationContext(nil, nil, migrated)
   412  	}
   413  
   414  	eventRecorderFunc := func(err *error) {
   415  		if *err != nil {
   416  			for _, pod := range volumeToAttach.ScheduledPods {
   417  				og.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, (*err).Error())
   418  			}
   419  		}
   420  	}
   421  
   422  	attachableVolumePluginName := unknownAttachableVolumePlugin
   423  
   424  	// Get attacher plugin
   425  	attachableVolumePlugin, err :=
   426  		og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
   427  	// It's ok to ignore the error, returning error is not expected from this function.
   428  	// If an error case occurred during the function generation, this error case(skipped one) will also trigger an error
   429  	// while the generated function is executed. And those errors will be handled during the execution of the generated
   430  	// function with a back off policy.
   431  	if err == nil && attachableVolumePlugin != nil {
   432  		attachableVolumePluginName = attachableVolumePlugin.GetPluginName()
   433  	}
   434  
   435  	return volumetypes.GeneratedOperations{
   436  		OperationName:     "volume_attach",
   437  		OperationFunc:     attachVolumeFunc,
   438  		EventRecorderFunc: eventRecorderFunc,
   439  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(attachableVolumePluginName, volumeToAttach.VolumeSpec), "volume_attach"),
   440  	}
   441  }
   442  
   443  func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
   444  	return og.volumePluginMgr
   445  }
   446  
   447  func (og *operationGenerator) GetCSITranslator() InTreeToCSITranslator {
   448  	return og.translator
   449  }
   450  
   451  func (og *operationGenerator) GenerateDetachVolumeFunc(
   452  	logger klog.Logger,
   453  	volumeToDetach AttachedVolume,
   454  	verifySafeToDetach bool,
   455  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
   456  	var volumeName string
   457  	var attachableVolumePlugin volume.AttachableVolumePlugin
   458  	var pluginName string
   459  	var err error
   460  
   461  	if volumeToDetach.VolumeSpec != nil {
   462  		attachableVolumePlugin, err = findDetachablePluginBySpec(volumeToDetach.VolumeSpec, og.volumePluginMgr)
   463  		if err != nil || attachableVolumePlugin == nil {
   464  			return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.findDetachablePluginBySpec failed", err)
   465  		}
   466  
   467  		volumeName, err =
   468  			attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec)
   469  		if err != nil {
   470  			return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err)
   471  		}
   472  	} else {
   473  		// Get attacher plugin and the volumeName by splitting the volume unique name in case
   474  		// there's no VolumeSpec: this happens only on attach/detach controller crash recovery
   475  		// when a pod has been deleted during the controller downtime
   476  		pluginName, volumeName, err = util.SplitUniqueName(volumeToDetach.VolumeName)
   477  		if err != nil {
   478  			return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err)
   479  		}
   480  
   481  		attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName)
   482  		if err != nil || attachableVolumePlugin == nil {
   483  			return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err)
   484  		}
   485  
   486  	}
   487  
   488  	if pluginName == "" {
   489  		pluginName = attachableVolumePlugin.GetPluginName()
   490  	}
   491  
   492  	volumeDetacher, err := attachableVolumePlugin.NewDetacher()
   493  	if err != nil {
   494  		return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err)
   495  	}
   496  
   497  	detachVolumeFunc := func() volumetypes.OperationContext {
   498  		var err error
   499  		if verifySafeToDetach {
   500  			err = og.verifyVolumeIsSafeToDetach(volumeToDetach)
   501  		}
   502  		if err == nil {
   503  			err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
   504  		}
   505  
   506  		migrated := getMigratedStatusBySpec(volumeToDetach.VolumeSpec)
   507  
   508  		if err != nil {
   509  			// On failure, mark the volume as uncertain. Attach() must succeed before adding the volume back
   510  			// to node status as attached.
   511  			uncertainError := actualStateOfWorld.MarkVolumeAsUncertain(
   512  				logger, volumeToDetach.VolumeName, volumeToDetach.VolumeSpec, volumeToDetach.NodeName)
   513  			if uncertainError != nil {
   514  				klog.Errorf("DetachVolume.MarkVolumeAsUncertain failed to add the volume %q to actual state after detach error: %s", volumeToDetach.VolumeName, uncertainError)
   515  			}
   516  			eventErr, detailedErr := volumeToDetach.GenerateError("DetachVolume.Detach failed", err)
   517  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   518  		}
   519  
   520  		klog.Infof(volumeToDetach.GenerateMsgDetailed("DetachVolume.Detach succeeded", ""))
   521  
   522  		// Update actual state of world
   523  		actualStateOfWorld.MarkVolumeAsDetached(
   524  			volumeToDetach.VolumeName, volumeToDetach.NodeName)
   525  
   526  		return volumetypes.NewOperationContext(nil, nil, migrated)
   527  	}
   528  
   529  	return volumetypes.GeneratedOperations{
   530  		OperationName:     DetachOperationName,
   531  		OperationFunc:     detachVolumeFunc,
   532  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), DetachOperationName),
   533  		EventRecorderFunc: nil, // nil because we do not want to generate event on error
   534  	}, nil
   535  }
   536  
   537  func (og *operationGenerator) GenerateMountVolumeFunc(
   538  	waitForAttachTimeout time.Duration,
   539  	volumeToMount VolumeToMount,
   540  	actualStateOfWorld ActualStateOfWorldMounterUpdater,
   541  	isRemount bool) volumetypes.GeneratedOperations {
   542  
   543  	volumePluginName := unknownVolumePlugin
   544  	volumePlugin, err :=
   545  		og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
   546  	if err == nil && volumePlugin != nil {
   547  		volumePluginName = volumePlugin.GetPluginName()
   548  	}
   549  
   550  	mountVolumeFunc := func() volumetypes.OperationContext {
   551  		// Get mounter plugin
   552  		volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
   553  
   554  		migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
   555  
   556  		if err != nil || volumePlugin == nil {
   557  			eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err)
   558  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   559  		}
   560  
   561  		affinityErr := checkNodeAffinity(og, volumeToMount)
   562  		if affinityErr != nil {
   563  			eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
   564  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   565  		}
   566  
   567  		volumeMounter, newMounterErr := volumePlugin.NewMounter(
   568  			volumeToMount.VolumeSpec,
   569  			volumeToMount.Pod,
   570  			volume.VolumeOptions{})
   571  		if newMounterErr != nil {
   572  			eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr)
   573  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   574  		}
   575  
   576  		mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin)
   577  		if mountCheckError != nil {
   578  			eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError)
   579  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   580  		}
   581  
   582  		// Enforce ReadWriteOncePod access mode if it is the only one present. This is also enforced during scheduling.
   583  		if actualStateOfWorld.IsVolumeMountedElsewhere(volumeToMount.VolumeName, volumeToMount.PodName) &&
   584  			// Because we do not know what access mode the pod intends to use if there are multiple.
   585  			len(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes) == 1 &&
   586  			v1helper.ContainsAccessMode(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes, v1.ReadWriteOncePod) {
   587  
   588  			err = goerrors.New("volume uses the ReadWriteOncePod access mode and is already in use by another pod")
   589  			eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", err)
   590  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   591  		}
   592  
   593  		// Get attacher, if possible
   594  		attachableVolumePlugin, _ :=
   595  			og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
   596  		var volumeAttacher volume.Attacher
   597  		if attachableVolumePlugin != nil {
   598  			volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
   599  		}
   600  
   601  		// get deviceMounter, if possible
   602  		deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
   603  		var volumeDeviceMounter volume.DeviceMounter
   604  		if deviceMountableVolumePlugin != nil {
   605  			volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
   606  		}
   607  
   608  		var fsGroup *int64
   609  		var fsGroupChangePolicy *v1.PodFSGroupChangePolicy
   610  		if podSc := volumeToMount.Pod.Spec.SecurityContext; podSc != nil {
   611  			if podSc.FSGroup != nil {
   612  				fsGroup = podSc.FSGroup
   613  			}
   614  			if podSc.FSGroupChangePolicy != nil {
   615  				fsGroupChangePolicy = podSc.FSGroupChangePolicy
   616  			}
   617  		}
   618  
   619  		devicePath := volumeToMount.DevicePath
   620  		if volumeAttacher != nil {
   621  			// Wait for attachable volumes to finish attaching
   622  			klog.InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
   623  
   624  			devicePath, err = volumeAttacher.WaitForAttach(
   625  				volumeToMount.VolumeSpec, devicePath, volumeToMount.Pod, waitForAttachTimeout)
   626  			if err != nil {
   627  				// On failure, return error. Caller will log and retry.
   628  				eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err)
   629  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   630  			}
   631  
   632  			klog.InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath)), "pod", klog.KObj(volumeToMount.Pod))
   633  		}
   634  
   635  		var resizeError error
   636  		resizeOptions := volume.NodeResizeOptions{
   637  			DevicePath: devicePath,
   638  		}
   639  
   640  		if volumeDeviceMounter != nil && actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) != DeviceGloballyMounted {
   641  			deviceMountPath, err :=
   642  				volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
   643  			if err != nil {
   644  				// On failure, return error. Caller will log and retry.
   645  				eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err)
   646  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   647  			}
   648  
   649  			// Mount device to global mount path
   650  			err = volumeDeviceMounter.MountDevice(
   651  				volumeToMount.VolumeSpec,
   652  				devicePath,
   653  				deviceMountPath,
   654  				volume.DeviceMounterArgs{FsGroup: fsGroup, SELinuxLabel: volumeToMount.SELinuxLabel},
   655  			)
   656  			if err != nil {
   657  				og.checkForFailedMount(volumeToMount, err)
   658  				og.markDeviceErrorState(volumeToMount, devicePath, deviceMountPath, err, actualStateOfWorld)
   659  				// On failure, return error. Caller will log and retry.
   660  				eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed", err)
   661  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   662  			}
   663  
   664  			klog.InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.MountDevice succeeded", fmt.Sprintf("device mount path %q", deviceMountPath)), "pod", klog.KObj(volumeToMount.Pod))
   665  
   666  			// Update actual state of world to reflect volume is globally mounted
   667  			markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
   668  				volumeToMount.VolumeName, devicePath, deviceMountPath, volumeToMount.SELinuxLabel)
   669  			if markDeviceMountedErr != nil {
   670  				// On failure, return error. Caller will log and retry.
   671  				eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr)
   672  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   673  			}
   674  			// set staging path for volume expansion
   675  			resizeOptions.DeviceStagePath = deviceMountPath
   676  		}
   677  
   678  		if volumeDeviceMounter != nil && resizeOptions.DeviceStagePath == "" {
   679  			deviceStagePath, err := volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
   680  			if err != nil {
   681  				// On failure, return error. Caller will log and retry.
   682  				eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed for expansion", err)
   683  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   684  			}
   685  			resizeOptions.DeviceStagePath = deviceStagePath
   686  		}
   687  
   688  		// Execute mount
   689  		mountErr := volumeMounter.SetUp(volume.MounterArgs{
   690  			FsUser:              util.FsUserFrom(volumeToMount.Pod),
   691  			FsGroup:             fsGroup,
   692  			DesiredSize:         volumeToMount.DesiredSizeLimit,
   693  			FSGroupChangePolicy: fsGroupChangePolicy,
   694  			SELinuxLabel:        volumeToMount.SELinuxLabel,
   695  		})
   696  		// Update actual state of world
   697  		markOpts := MarkVolumeOpts{
   698  			PodName:             volumeToMount.PodName,
   699  			PodUID:              volumeToMount.Pod.UID,
   700  			VolumeName:          volumeToMount.VolumeName,
   701  			Mounter:             volumeMounter,
   702  			OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
   703  			VolumeGidVolume:     volumeToMount.VolumeGidValue,
   704  			VolumeSpec:          volumeToMount.VolumeSpec,
   705  			VolumeMountState:    VolumeMounted,
   706  			SELinuxMountContext: volumeToMount.SELinuxLabel,
   707  		}
   708  		if mountErr != nil {
   709  			og.checkForFailedMount(volumeToMount, mountErr)
   710  			og.markVolumeErrorState(volumeToMount, markOpts, mountErr, actualStateOfWorld)
   711  			// On failure, return error. Caller will log and retry.
   712  			eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr)
   713  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   714  		}
   715  
   716  		detailedMsg := volumeToMount.GenerateMsgDetailed("MountVolume.SetUp succeeded", "")
   717  		verbosity := klog.Level(1)
   718  		if isRemount {
   719  			verbosity = klog.Level(4)
   720  		}
   721  		klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
   722  		resizeOptions.DeviceMountPath = volumeMounter.GetPath()
   723  
   724  		_, resizeError = og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions)
   725  		if resizeError != nil {
   726  			klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError)
   727  			eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
   728  			// At this point, MountVolume.Setup already succeeded, we should add volume into actual state
   729  			// so that reconciler can clean up volume when needed. However, volume resize failed,
   730  			// we should not mark the volume as mounted to avoid pod starts using it.
   731  			// Considering the above situations, we mark volume as uncertain here so that reconciler will trigger
   732  			// volume tear down when pod is deleted, and also makes sure pod will not start using it.
   733  			if err := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts); err != nil {
   734  				klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", err).Error())
   735  			}
   736  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   737  		}
   738  
   739  		// record total time it takes to mount a volume. This is end to end time that includes waiting for volume to attach, node to be update
   740  		// plugin call to succeed
   741  		mountRequestTime := volumeToMount.MountRequestTime
   742  		totalTimeTaken := time.Since(mountRequestTime).Seconds()
   743  		util.RecordOperationLatencyMetric(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "overall_volume_mount", totalTimeTaken)
   744  
   745  		markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
   746  		if markVolMountedErr != nil {
   747  			// On failure, return error. Caller will log and retry.
   748  			eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr)
   749  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   750  		}
   751  		return volumetypes.NewOperationContext(nil, nil, migrated)
   752  	}
   753  
   754  	eventRecorderFunc := func(err *error) {
   755  		if *err != nil {
   756  			og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, (*err).Error())
   757  		}
   758  	}
   759  
   760  	return volumetypes.GeneratedOperations{
   761  		OperationName:     "volume_mount",
   762  		OperationFunc:     mountVolumeFunc,
   763  		EventRecorderFunc: eventRecorderFunc,
   764  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"),
   765  	}
   766  }
   767  
   768  func (og *operationGenerator) checkForFailedMount(volumeToMount VolumeToMount, mountError error) {
   769  	pv := volumeToMount.VolumeSpec.PersistentVolume
   770  	if pv == nil {
   771  		return
   772  	}
   773  
   774  	if volumetypes.IsFilesystemMismatchError(mountError) {
   775  		simpleMsg, _ := volumeToMount.GenerateMsg("MountVolume failed", mountError.Error())
   776  		og.recorder.Eventf(pv, v1.EventTypeWarning, kevents.FailedMountOnFilesystemMismatch, simpleMsg)
   777  	}
   778  }
   779  
   780  func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount, devicePath, deviceMountPath string, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
   781  	if volumetypes.IsOperationFinishedError(mountError) &&
   782  		actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceMountUncertain {
   783  
   784  		if actualStateOfWorld.IsVolumeDeviceReconstructed(volumeToMount.VolumeName) {
   785  			klog.V(2).InfoS("MountVolume.markDeviceErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName)
   786  			return
   787  		}
   788  
   789  		// Only devices which were uncertain can be marked as unmounted
   790  		markDeviceUnmountError := actualStateOfWorld.MarkDeviceAsUnmounted(volumeToMount.VolumeName)
   791  		if markDeviceUnmountError != nil {
   792  			klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUnmounted failed", markDeviceUnmountError).Error())
   793  		}
   794  		return
   795  	}
   796  
   797  	if volumetypes.IsUncertainProgressError(mountError) &&
   798  		actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceNotMounted {
   799  		// only devices which are not mounted can be marked as uncertain. We do not want to mark a device
   800  		// which was previously marked as mounted here as uncertain.
   801  		markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath, volumeToMount.SELinuxLabel)
   802  		if markDeviceUncertainError != nil {
   803  			klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainError).Error())
   804  		}
   805  	}
   806  
   807  }
   808  
   809  func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
   810  	if volumetypes.IsOperationFinishedError(mountError) &&
   811  		actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain {
   812  		// if volume was previously reconstructed we are not going to change its state as unmounted even
   813  		// if mount operation fails.
   814  		if actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) {
   815  			klog.V(3).InfoS("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName)
   816  			return
   817  		}
   818  
   819  		t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName)
   820  		if t != nil {
   821  			klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error())
   822  		}
   823  		return
   824  
   825  	}
   826  
   827  	if volumetypes.IsUncertainProgressError(mountError) &&
   828  		actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeNotMounted {
   829  		t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts)
   830  		if t != nil {
   831  			klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error())
   832  		}
   833  	}
   834  }
   835  
   836  func (og *operationGenerator) GenerateUnmountVolumeFunc(
   837  	volumeToUnmount MountedVolume,
   838  	actualStateOfWorld ActualStateOfWorldMounterUpdater,
   839  	podsDir string) (volumetypes.GeneratedOperations, error) {
   840  	// Get mountable plugin
   841  	volumePlugin, err := og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName)
   842  	if err != nil || volumePlugin == nil {
   843  		return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err)
   844  	}
   845  	volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter(
   846  		volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID)
   847  	if newUnmounterErr != nil {
   848  		return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr)
   849  	}
   850  
   851  	unmountVolumeFunc := func() volumetypes.OperationContext {
   852  		subpather := og.volumePluginMgr.Host.GetSubpather()
   853  
   854  		migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec)
   855  
   856  		// Remove all bind-mounts for subPaths
   857  		podDir := filepath.Join(podsDir, string(volumeToUnmount.PodUID))
   858  		if err := subpather.CleanSubPaths(podDir, volumeToUnmount.InnerVolumeSpecName); err != nil {
   859  			eventErr, detailedErr := volumeToUnmount.GenerateError("error cleaning subPath mounts", err)
   860  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   861  		}
   862  
   863  		// Execute unmount
   864  		unmountErr := volumeUnmounter.TearDown()
   865  		if unmountErr != nil {
   866  			// Mark the volume as uncertain, so SetUp is called for new pods. Teardown may be already in progress.
   867  			opts := MarkVolumeOpts{
   868  				PodName:             volumeToUnmount.PodName,
   869  				PodUID:              volumeToUnmount.PodUID,
   870  				VolumeName:          volumeToUnmount.VolumeName,
   871  				OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName,
   872  				VolumeGidVolume:     volumeToUnmount.VolumeGidValue,
   873  				VolumeSpec:          volumeToUnmount.VolumeSpec,
   874  				VolumeMountState:    VolumeMountUncertain,
   875  			}
   876  			markMountUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(opts)
   877  			if markMountUncertainErr != nil {
   878  				// There is nothing else we can do. Hope that UnmountVolume will be re-tried shortly.
   879  				klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeMountAsUncertain failed", markMountUncertainErr).Error())
   880  			}
   881  
   882  			// On failure, return error. Caller will log and retry.
   883  			eventErr, detailedErr := volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr)
   884  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   885  		}
   886  
   887  		klog.Infof(
   888  			"UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q",
   889  			volumeToUnmount.VolumeName,
   890  			volumeToUnmount.OuterVolumeSpecName,
   891  			volumeToUnmount.PodName,
   892  			volumeToUnmount.PodUID,
   893  			volumeToUnmount.InnerVolumeSpecName,
   894  			volumeToUnmount.PluginName,
   895  			volumeToUnmount.VolumeGidValue)
   896  
   897  		// Update actual state of world
   898  		markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
   899  			volumeToUnmount.PodName, volumeToUnmount.VolumeName)
   900  		if markVolMountedErr != nil {
   901  			// On failure, just log and exit
   902  			klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeAsUnmounted failed", markVolMountedErr).Error())
   903  		}
   904  
   905  		return volumetypes.NewOperationContext(nil, nil, migrated)
   906  	}
   907  
   908  	return volumetypes.GeneratedOperations{
   909  		OperationName:     "volume_unmount",
   910  		OperationFunc:     unmountVolumeFunc,
   911  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "volume_unmount"),
   912  		EventRecorderFunc: nil, // nil because we do not want to generate event on error
   913  	}, nil
   914  }
   915  
   916  func (og *operationGenerator) GenerateUnmountDeviceFunc(
   917  	deviceToDetach AttachedVolume,
   918  	actualStateOfWorld ActualStateOfWorldMounterUpdater,
   919  	hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
   920  	// Get DeviceMounter plugin
   921  	deviceMountableVolumePlugin, err :=
   922  		og.volumePluginMgr.FindDeviceMountablePluginByName(deviceToDetach.PluginName)
   923  	if err != nil || deviceMountableVolumePlugin == nil {
   924  		return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindDeviceMountablePluginByName failed", err)
   925  	}
   926  
   927  	volumeDeviceUnmounter, err := deviceMountableVolumePlugin.NewDeviceUnmounter()
   928  	if err != nil {
   929  		return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceUnmounter failed", err)
   930  	}
   931  
   932  	volumeDeviceMounter, err := deviceMountableVolumePlugin.NewDeviceMounter()
   933  	if err != nil {
   934  		return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceMounter failed", err)
   935  	}
   936  
   937  	unmountDeviceFunc := func() volumetypes.OperationContext {
   938  
   939  		migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec)
   940  
   941  		//deviceMountPath := deviceToDetach.DeviceMountPath
   942  		deviceMountPath, err :=
   943  			volumeDeviceMounter.GetDeviceMountPath(deviceToDetach.VolumeSpec)
   944  		if err != nil {
   945  			// On failure other than "does not exist", return error. Caller will log and retry.
   946  			if !strings.Contains(err.Error(), "does not exist") {
   947  				eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountPath failed", err)
   948  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   949  			}
   950  			// If the mount path could not be found, don't fail the unmount, but instead log a warning and proceed,
   951  			// using the value from deviceToDetach.DeviceMountPath, so that the device can be marked as unmounted
   952  			deviceMountPath = deviceToDetach.DeviceMountPath
   953  			klog.Warningf(deviceToDetach.GenerateMsgDetailed(fmt.Sprintf(
   954  				"GetDeviceMountPath failed, but unmount operation will proceed using deviceMountPath=%s: %v", deviceMountPath, err), ""))
   955  		}
   956  		refs, err := deviceMountableVolumePlugin.GetDeviceMountRefs(deviceMountPath)
   957  
   958  		if err != nil || util.HasMountRefs(deviceMountPath, refs) {
   959  			if err == nil {
   960  				err = fmt.Errorf("the device mount path %q is still mounted by other references %v", deviceMountPath, refs)
   961  			}
   962  			eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err)
   963  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   964  		}
   965  		// Execute unmount
   966  		unmountDeviceErr := volumeDeviceUnmounter.UnmountDevice(deviceMountPath)
   967  		if unmountDeviceErr != nil {
   968  			// Mark the device as uncertain, so MountDevice is called for new pods. UnmountDevice may be already in progress.
   969  			markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(deviceToDetach.VolumeName, deviceToDetach.DevicePath, deviceMountPath, deviceToDetach.SELinuxMountContext)
   970  			if markDeviceUncertainErr != nil {
   971  				// There is nothing else we can do. Hope that UnmountDevice will be re-tried shortly.
   972  				klog.Errorf(deviceToDetach.GenerateErrorDetailed("UnmountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr).Error())
   973  			}
   974  
   975  			// On failure, return error. Caller will log and retry.
   976  			eventErr, detailedErr := deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr)
   977  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   978  		}
   979  		// Before logging that UnmountDevice succeeded and moving on,
   980  		// use hostutil.PathIsDevice to check if the path is a device,
   981  		// if so use hostutil.DeviceOpened to check if the device is in use anywhere
   982  		// else on the system. Retry if it returns true.
   983  		deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil)
   984  		if deviceOpenedErr != nil {
   985  			return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated)
   986  		}
   987  		// The device is still in use elsewhere. Caller will log and retry.
   988  		if deviceOpened {
   989  			// Mark the device as uncertain, so MountDevice is called for new pods.
   990  			markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(deviceToDetach.VolumeName, deviceToDetach.DevicePath, deviceMountPath, deviceToDetach.SELinuxMountContext)
   991  			if markDeviceUncertainErr != nil {
   992  				// There is nothing else we can do. Hope that UnmountDevice will be re-tried shortly.
   993  				klog.Errorf(deviceToDetach.GenerateErrorDetailed("UnmountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr).Error())
   994  			}
   995  			eventErr, detailedErr := deviceToDetach.GenerateError(
   996  				"UnmountDevice failed",
   997  				goerrors.New("the device is in use when it was no longer expected to be in use"))
   998  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
   999  		}
  1000  
  1001  		klog.Info(deviceToDetach.GenerateMsgDetailed("UnmountDevice succeeded", ""))
  1002  
  1003  		// Update actual state of world
  1004  		markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted(
  1005  			deviceToDetach.VolumeName)
  1006  		if markDeviceUnmountedErr != nil {
  1007  			// On failure, return error. Caller will log and retry.
  1008  			eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
  1009  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1010  		}
  1011  
  1012  		return volumetypes.NewOperationContext(nil, nil, migrated)
  1013  	}
  1014  
  1015  	return volumetypes.GeneratedOperations{
  1016  		OperationName:     "unmount_device",
  1017  		OperationFunc:     unmountDeviceFunc,
  1018  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(deviceMountableVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmount_device"),
  1019  		EventRecorderFunc: nil, // nil because we do not want to generate event on error
  1020  	}, nil
  1021  }
  1022  
  1023  // GenerateMapVolumeFunc marks volume as mounted based on following steps.
  1024  // If plugin is attachable, call WaitForAttach() and then mark the device
  1025  // as mounted. On next step, SetUpDevice is called without dependent of
  1026  // plugin type, but this method mainly is targeted for none attachable plugin.
  1027  // After setup is done, create symbolic links on both global map path and pod
  1028  // device map path. Once symbolic links are created, take fd lock by
  1029  // loopback for the device to avoid silent volume replacement. This lock
  1030  // will be released once no one uses the device.
  1031  // If all steps are completed, the volume is marked as mounted.
  1032  func (og *operationGenerator) GenerateMapVolumeFunc(
  1033  	waitForAttachTimeout time.Duration,
  1034  	volumeToMount VolumeToMount,
  1035  	actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  1036  
  1037  	// Get block volume mapper plugin
  1038  	blockVolumePlugin, err :=
  1039  		og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec)
  1040  	if err != nil {
  1041  		return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed", err)
  1042  	}
  1043  
  1044  	if blockVolumePlugin == nil {
  1045  		return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
  1046  	}
  1047  
  1048  	affinityErr := checkNodeAffinity(og, volumeToMount)
  1049  	if affinityErr != nil {
  1050  		eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NodeAffinity check failed", affinityErr)
  1051  		og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error())
  1052  		return volumetypes.GeneratedOperations{}, detailedErr
  1053  	}
  1054  	blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper(
  1055  		volumeToMount.VolumeSpec,
  1056  		volumeToMount.Pod,
  1057  		volume.VolumeOptions{})
  1058  	if newMapperErr != nil {
  1059  		eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr)
  1060  		og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error())
  1061  		return volumetypes.GeneratedOperations{}, detailedErr
  1062  	}
  1063  
  1064  	// Get attacher, if possible
  1065  	attachableVolumePlugin, _ :=
  1066  		og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
  1067  	var volumeAttacher volume.Attacher
  1068  	if attachableVolumePlugin != nil {
  1069  		volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
  1070  	}
  1071  
  1072  	mapVolumeFunc := func() (operationContext volumetypes.OperationContext) {
  1073  		var devicePath string
  1074  		var stagingPath string
  1075  
  1076  		migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
  1077  
  1078  		// Enforce ReadWriteOncePod access mode. This is also enforced during scheduling.
  1079  		if actualStateOfWorld.IsVolumeMountedElsewhere(volumeToMount.VolumeName, volumeToMount.PodName) &&
  1080  			// Because we do not know what access mode the pod intends to use if there are multiple.
  1081  			len(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes) == 1 &&
  1082  			v1helper.ContainsAccessMode(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes, v1.ReadWriteOncePod) {
  1083  
  1084  			err = goerrors.New("volume uses the ReadWriteOncePod access mode and is already in use by another pod")
  1085  			eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUpDevice failed", err)
  1086  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1087  		}
  1088  
  1089  		// Set up global map path under the given plugin directory using symbolic link
  1090  		globalMapPath, err :=
  1091  			blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec)
  1092  		if err != nil {
  1093  			// On failure, return error. Caller will log and retry.
  1094  			eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.GetGlobalMapPath failed", err)
  1095  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1096  		}
  1097  		if volumeAttacher != nil {
  1098  			// Wait for attachable volumes to finish attaching
  1099  			klog.InfoS(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
  1100  
  1101  			devicePath, err = volumeAttacher.WaitForAttach(
  1102  				volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout)
  1103  			if err != nil {
  1104  				// On failure, return error. Caller will log and retry.
  1105  				eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err)
  1106  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1107  			}
  1108  
  1109  			klog.InfoS(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath)), "pod", klog.KObj(volumeToMount.Pod))
  1110  
  1111  		}
  1112  		// Call SetUpDevice if blockVolumeMapper implements CustomBlockVolumeMapper
  1113  		if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok && actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) != DeviceGloballyMounted {
  1114  			var mapErr error
  1115  			stagingPath, mapErr = customBlockVolumeMapper.SetUpDevice()
  1116  			if mapErr != nil {
  1117  				og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld)
  1118  				// On failure, return error. Caller will log and retry.
  1119  				eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr)
  1120  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1121  			}
  1122  		}
  1123  
  1124  		// Update actual state of world to reflect volume is globally mounted
  1125  		markedDevicePath := devicePath
  1126  		markDeviceMappedErr := actualStateOfWorld.MarkDeviceAsMounted(
  1127  			volumeToMount.VolumeName, markedDevicePath, globalMapPath, "")
  1128  		if markDeviceMappedErr != nil {
  1129  			// On failure, return error. Caller will log and retry.
  1130  			eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
  1131  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1132  		}
  1133  
  1134  		markVolumeOpts := MarkVolumeOpts{
  1135  			PodName:             volumeToMount.PodName,
  1136  			PodUID:              volumeToMount.Pod.UID,
  1137  			VolumeName:          volumeToMount.VolumeName,
  1138  			BlockVolumeMapper:   blockVolumeMapper,
  1139  			OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
  1140  			VolumeGidVolume:     volumeToMount.VolumeGidValue,
  1141  			VolumeSpec:          volumeToMount.VolumeSpec,
  1142  			VolumeMountState:    VolumeMounted,
  1143  		}
  1144  
  1145  		// Call MapPodDevice if blockVolumeMapper implements CustomBlockVolumeMapper
  1146  		if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok {
  1147  			// Execute driver specific map
  1148  			pluginDevicePath, mapErr := customBlockVolumeMapper.MapPodDevice()
  1149  			if mapErr != nil {
  1150  				// On failure, return error. Caller will log and retry.
  1151  				og.markVolumeErrorState(volumeToMount, markVolumeOpts, mapErr, actualStateOfWorld)
  1152  				eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr)
  1153  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1154  			}
  1155  
  1156  			// From now on, the volume is mapped. Mark it as uncertain on error,
  1157  			// so it is is unmapped when corresponding pod is deleted.
  1158  			defer func() {
  1159  				if operationContext.EventErr != nil {
  1160  					errText := operationContext.EventErr.Error()
  1161  					og.markVolumeErrorState(volumeToMount, markVolumeOpts, volumetypes.NewUncertainProgressError(errText), actualStateOfWorld)
  1162  				}
  1163  			}()
  1164  
  1165  			// if pluginDevicePath is provided, assume attacher may not provide device
  1166  			// or attachment flow uses SetupDevice to get device path
  1167  			if len(pluginDevicePath) != 0 {
  1168  				devicePath = pluginDevicePath
  1169  			}
  1170  			if len(devicePath) == 0 {
  1171  				eventErr, detailedErr := volumeToMount.GenerateError("MapVolume failed", goerrors.New("device path of the volume is empty"))
  1172  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1173  			}
  1174  		}
  1175  
  1176  		// When kubelet is containerized, devicePath may be a symlink at a place unavailable to
  1177  		// kubelet, so evaluate it on the host and expect that it links to a device in /dev,
  1178  		// which will be available to containerized kubelet. If still it does not exist,
  1179  		// AttachFileDevice will fail. If kubelet is not containerized, eval it anyway.
  1180  		kvh, ok := og.GetVolumePluginMgr().Host.(volume.KubeletVolumeHost)
  1181  		if !ok {
  1182  			eventErr, detailedErr := volumeToMount.GenerateError("MapVolume type assertion error", fmt.Errorf("volume host does not implement KubeletVolumeHost interface"))
  1183  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1184  		}
  1185  		hu := kvh.GetHostUtil()
  1186  		devicePath, err = hu.EvalHostSymlinks(devicePath)
  1187  		if err != nil {
  1188  			eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.EvalHostSymlinks failed", err)
  1189  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1190  		}
  1191  
  1192  		// Update actual state of world with the devicePath again, if devicePath has changed from markedDevicePath
  1193  		// TODO: This can be improved after #82492 is merged and ASW has state.
  1194  		if markedDevicePath != devicePath {
  1195  			markDeviceMappedErr := actualStateOfWorld.MarkDeviceAsMounted(
  1196  				volumeToMount.VolumeName, devicePath, globalMapPath, "")
  1197  			if markDeviceMappedErr != nil {
  1198  				// On failure, return error. Caller will log and retry.
  1199  				eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
  1200  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1201  			}
  1202  		}
  1203  
  1204  		// Execute common map
  1205  		volumeMapPath, volName := blockVolumeMapper.GetPodDeviceMapPath()
  1206  		mapErr := util.MapBlockVolume(og.blkUtil, devicePath, globalMapPath, volumeMapPath, volName, volumeToMount.Pod.UID)
  1207  		if mapErr != nil {
  1208  			// On failure, return error. Caller will log and retry.
  1209  			eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapBlockVolume failed", mapErr)
  1210  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1211  		}
  1212  
  1213  		// Device mapping for global map path succeeded
  1214  		simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MapVolume.MapPodDevice succeeded", fmt.Sprintf("globalMapPath %q", globalMapPath))
  1215  		verbosity := klog.Level(4)
  1216  		og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.SuccessfulMountVolume, simpleMsg)
  1217  		klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
  1218  
  1219  		// Device mapping for pod device map path succeeded
  1220  		simpleMsg, detailedMsg = volumeToMount.GenerateMsg("MapVolume.MapPodDevice succeeded", fmt.Sprintf("volumeMapPath %q", volumeMapPath))
  1221  		verbosity = klog.Level(1)
  1222  		og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.SuccessfulMountVolume, simpleMsg)
  1223  		klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
  1224  
  1225  		resizeOptions := volume.NodeResizeOptions{
  1226  			DevicePath:      devicePath,
  1227  			DeviceStagePath: stagingPath,
  1228  		}
  1229  		_, resizeError := og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions)
  1230  		if resizeError != nil {
  1231  			klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError)
  1232  			eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
  1233  			// At this point, MountVolume.Setup already succeeded, we should add volume into actual state
  1234  			// so that reconciler can clean up volume when needed. However, if nodeExpandVolume failed,
  1235  			// we should not mark the volume as mounted to avoid pod starts using it.
  1236  			// Considering the above situations, we mark volume as uncertain here so that reconciler will trigger
  1237  			// volume tear down when pod is deleted, and also makes sure pod will not start using it.
  1238  			if err := actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts); err != nil {
  1239  				klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", err).Error())
  1240  			}
  1241  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1242  		}
  1243  
  1244  		markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
  1245  		if markVolMountedErr != nil {
  1246  			// On failure, return error. Caller will log and retry.
  1247  			eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr)
  1248  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1249  		}
  1250  
  1251  		return volumetypes.NewOperationContext(nil, nil, migrated)
  1252  	}
  1253  
  1254  	eventRecorderFunc := func(err *error) {
  1255  		if *err != nil {
  1256  			og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, (*err).Error())
  1257  		}
  1258  	}
  1259  
  1260  	return volumetypes.GeneratedOperations{
  1261  		OperationName:     "map_volume",
  1262  		OperationFunc:     mapVolumeFunc,
  1263  		EventRecorderFunc: eventRecorderFunc,
  1264  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "map_volume"),
  1265  	}, nil
  1266  }
  1267  
  1268  // GenerateUnmapVolumeFunc marks volume as unmonuted based on following steps.
  1269  // Remove symbolic links from pod device map path dir and  global map path dir.
  1270  // Once those cleanups are done, remove pod device map path dir.
  1271  // If all steps are completed, the volume is marked as unmounted.
  1272  func (og *operationGenerator) GenerateUnmapVolumeFunc(
  1273  	volumeToUnmount MountedVolume,
  1274  	actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
  1275  
  1276  	// Get block volume unmapper plugin
  1277  	blockVolumePlugin, err :=
  1278  		og.volumePluginMgr.FindMapperPluginByName(volumeToUnmount.PluginName)
  1279  	if err != nil {
  1280  		return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed", err)
  1281  	}
  1282  	if blockVolumePlugin == nil {
  1283  		return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
  1284  	}
  1285  	blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
  1286  		volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID)
  1287  	if newUnmapperErr != nil {
  1288  		return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr)
  1289  	}
  1290  
  1291  	unmapVolumeFunc := func() volumetypes.OperationContext {
  1292  
  1293  		migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec)
  1294  
  1295  		// pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
  1296  		podDeviceUnmapPath, volName := blockVolumeUnmapper.GetPodDeviceMapPath()
  1297  		// plugins/kubernetes.io/{PluginName}/volumeDevices/{volumePluginDependentPath}/{podUID}
  1298  		globalUnmapPath, err := blockVolumeUnmapper.GetGlobalMapPath(volumeToUnmount.VolumeSpec)
  1299  		if err != nil {
  1300  			// On failure, return error. Caller will log and retry.
  1301  			eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.GetGlobalMapPath failed", err)
  1302  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1303  		}
  1304  
  1305  		// Mark the device as uncertain to make sure kubelet calls UnmapDevice again in all the "return err"
  1306  		// cases below. The volume is marked as fully un-mapped at the end of this function, when everything
  1307  		// succeeds.
  1308  		markVolumeOpts := MarkVolumeOpts{
  1309  			PodName:             volumeToUnmount.PodName,
  1310  			PodUID:              volumeToUnmount.PodUID,
  1311  			VolumeName:          volumeToUnmount.VolumeName,
  1312  			OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName,
  1313  			VolumeGidVolume:     volumeToUnmount.VolumeGidValue,
  1314  			VolumeSpec:          volumeToUnmount.VolumeSpec,
  1315  			VolumeMountState:    VolumeMountUncertain,
  1316  		}
  1317  		markVolumeUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts)
  1318  		if markVolumeUncertainErr != nil {
  1319  			// On failure, return error. Caller will log and retry.
  1320  			eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.MarkDeviceAsUncertain failed", markVolumeUncertainErr)
  1321  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1322  		}
  1323  
  1324  		// Execute common unmap
  1325  		unmapErr := util.UnmapBlockVolume(og.blkUtil, globalUnmapPath, podDeviceUnmapPath, volName, volumeToUnmount.PodUID)
  1326  		if unmapErr != nil {
  1327  			// On failure, return error. Caller will log and retry.
  1328  			eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapBlockVolume failed", unmapErr)
  1329  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1330  		}
  1331  
  1332  		// Call UnmapPodDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper
  1333  		if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok {
  1334  			// Execute plugin specific unmap
  1335  			unmapErr = customBlockVolumeUnmapper.UnmapPodDevice()
  1336  			if unmapErr != nil {
  1337  				// On failure, return error. Caller will log and retry.
  1338  				eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapPodDevice failed", unmapErr)
  1339  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1340  			}
  1341  		}
  1342  
  1343  		klog.Infof(
  1344  			"UnmapVolume succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q",
  1345  			volumeToUnmount.VolumeName,
  1346  			volumeToUnmount.OuterVolumeSpecName,
  1347  			volumeToUnmount.PodName,
  1348  			volumeToUnmount.PodUID,
  1349  			volumeToUnmount.InnerVolumeSpecName,
  1350  			volumeToUnmount.PluginName,
  1351  			volumeToUnmount.VolumeGidValue)
  1352  
  1353  		// Update actual state of world
  1354  		markVolUnmountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
  1355  			volumeToUnmount.PodName, volumeToUnmount.VolumeName)
  1356  		if markVolUnmountedErr != nil {
  1357  			// On failure, just log and exit
  1358  			klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmapVolume.MarkVolumeAsUnmounted failed", markVolUnmountedErr).Error())
  1359  		}
  1360  
  1361  		return volumetypes.NewOperationContext(nil, nil, migrated)
  1362  	}
  1363  
  1364  	return volumetypes.GeneratedOperations{
  1365  		OperationName:     "unmap_volume",
  1366  		OperationFunc:     unmapVolumeFunc,
  1367  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "unmap_volume"),
  1368  		EventRecorderFunc: nil, // nil because we do not want to generate event on error
  1369  	}, nil
  1370  }
  1371  
  1372  // GenerateUnmapDeviceFunc marks device as unmounted based on following steps.
  1373  // Check under globalMapPath dir if there isn't pod's symbolic links in it.
  1374  // If symbolic link isn't there, the device isn't referenced from Pods.
  1375  // Call plugin TearDownDevice to clean-up device connection, stored data under
  1376  // globalMapPath, these operations depend on plugin implementation.
  1377  // Once TearDownDevice is completed, remove globalMapPath dir.
  1378  // After globalMapPath is removed, fd lock by loopback for the device can
  1379  // be released safely because no one can consume the device at this point.
  1380  // At last, device open status will be checked just in case.
  1381  // If all steps are completed, the device is marked as unmounted.
  1382  func (og *operationGenerator) GenerateUnmapDeviceFunc(
  1383  	deviceToDetach AttachedVolume,
  1384  	actualStateOfWorld ActualStateOfWorldMounterUpdater,
  1385  	hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
  1386  
  1387  	blockVolumePlugin, err :=
  1388  		og.volumePluginMgr.FindMapperPluginByName(deviceToDetach.PluginName)
  1389  	if err != nil {
  1390  		return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed", err)
  1391  	}
  1392  
  1393  	if blockVolumePlugin == nil {
  1394  		return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
  1395  	}
  1396  
  1397  	blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
  1398  		deviceToDetach.VolumeSpec.Name(),
  1399  		"" /* podUID */)
  1400  	if newUnmapperErr != nil {
  1401  		return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr)
  1402  	}
  1403  
  1404  	unmapDeviceFunc := func() volumetypes.OperationContext {
  1405  		migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec)
  1406  		// Search under globalMapPath dir if all symbolic links from pods have been removed already.
  1407  		// If symbolic links are there, pods may still refer the volume.
  1408  		globalMapPath := deviceToDetach.DeviceMountPath
  1409  		refs, err := og.blkUtil.GetDeviceBindMountRefs(deviceToDetach.DevicePath, globalMapPath)
  1410  		if err != nil {
  1411  			if os.IsNotExist(err) {
  1412  				// Looks like SetupDevice did not complete. Fall through to TearDownDevice and mark the device as unmounted.
  1413  				refs = nil
  1414  			} else {
  1415  				eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err)
  1416  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1417  			}
  1418  		}
  1419  		if len(refs) > 0 {
  1420  			err = fmt.Errorf("the device %q is still referenced from other Pods %v", globalMapPath, refs)
  1421  			eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice failed", err)
  1422  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1423  		}
  1424  
  1425  		// Mark the device as uncertain to make sure kubelet calls UnmapDevice again in all the "return err"
  1426  		// cases below. The volume is marked as fully un-mapped at the end of this function, when everything
  1427  		// succeeds.
  1428  		markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(
  1429  			deviceToDetach.VolumeName, deviceToDetach.DevicePath, globalMapPath, "" /* seLinuxMountContext */)
  1430  		if markDeviceUncertainErr != nil {
  1431  			// On failure, return error. Caller will log and retry.
  1432  			eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr)
  1433  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1434  		}
  1435  
  1436  		// Call TearDownDevice if blockVolumeUnmapper implements CustomBlockVolumeUnmapper
  1437  		if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok {
  1438  			// Execute tear down device
  1439  			unmapErr := customBlockVolumeUnmapper.TearDownDevice(globalMapPath, deviceToDetach.DevicePath)
  1440  			if unmapErr != nil {
  1441  				// On failure, return error. Caller will log and retry.
  1442  				eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr)
  1443  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1444  			}
  1445  		}
  1446  
  1447  		// Plugin finished TearDownDevice(). Now globalMapPath dir and plugin's stored data
  1448  		// on the dir are unnecessary, clean up it.
  1449  		removeMapPathErr := og.blkUtil.RemoveMapPath(globalMapPath)
  1450  		if removeMapPathErr != nil {
  1451  			// On failure, return error. Caller will log and retry.
  1452  			eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.RemoveMapPath failed", removeMapPathErr)
  1453  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1454  		}
  1455  
  1456  		// Before logging that UnmapDevice succeeded and moving on,
  1457  		// use hostutil.PathIsDevice to check if the path is a device,
  1458  		// if so use hostutil.DeviceOpened to check if the device is in use anywhere
  1459  		// else on the system. Retry if it returns true.
  1460  		deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil)
  1461  		if deviceOpenedErr != nil {
  1462  			return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated)
  1463  		}
  1464  		// The device is still in use elsewhere. Caller will log and retry.
  1465  		if deviceOpened {
  1466  			eventErr, detailedErr := deviceToDetach.GenerateError(
  1467  				"UnmapDevice failed",
  1468  				fmt.Errorf("the device is in use when it was no longer expected to be in use"))
  1469  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1470  		}
  1471  
  1472  		klog.Infof(deviceToDetach.GenerateMsgDetailed("UnmapDevice succeeded", ""))
  1473  
  1474  		// Update actual state of world
  1475  		markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted(
  1476  			deviceToDetach.VolumeName)
  1477  		if markDeviceUnmountedErr != nil {
  1478  			// On failure, return error. Caller will log and retry.
  1479  			eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
  1480  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1481  		}
  1482  
  1483  		return volumetypes.NewOperationContext(nil, nil, migrated)
  1484  	}
  1485  
  1486  	return volumetypes.GeneratedOperations{
  1487  		OperationName:     "unmap_device",
  1488  		OperationFunc:     unmapDeviceFunc,
  1489  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmap_device"),
  1490  		EventRecorderFunc: nil, // nil because we do not want to generate event on error
  1491  	}, nil
  1492  }
  1493  
  1494  func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
  1495  	logger klog.Logger,
  1496  	volumeToMount VolumeToMount,
  1497  	nodeName types.NodeName,
  1498  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
  1499  	volumePlugin, err :=
  1500  		og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
  1501  	if err != nil || volumePlugin == nil {
  1502  		return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
  1503  	}
  1504  
  1505  	// For attachable volume types, lets check if volume is attached by reading from node lister.
  1506  	// This would avoid exponential back-off and creation of goroutine unnecessarily. We still
  1507  	// verify status of attached volume by directly reading from API server later on.This is necessarily
  1508  	// to ensure any race conditions because of cached state in the informer.
  1509  	if volumeToMount.PluginIsAttachable {
  1510  		cachedAttachedVolumes, _ := og.volumePluginMgr.Host.GetAttachedVolumesFromNodeStatus()
  1511  		if cachedAttachedVolumes != nil {
  1512  			_, volumeFound := cachedAttachedVolumes[volumeToMount.VolumeName]
  1513  			if !volumeFound {
  1514  				return volumetypes.GeneratedOperations{}, NewMountPreConditionFailedError(fmt.Sprintf("volume %s is not yet in node's status", volumeToMount.VolumeName))
  1515  			}
  1516  		}
  1517  	}
  1518  
  1519  	verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
  1520  		migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
  1521  		claimSize := actualStateOfWorld.GetClaimSize(volumeToMount.VolumeName)
  1522  
  1523  		// only fetch claimSize if it was not set previously
  1524  		if volumeToMount.VolumeSpec.PersistentVolume != nil && claimSize == nil && !volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
  1525  			pv := volumeToMount.VolumeSpec.PersistentVolume
  1526  			pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
  1527  			if err != nil {
  1528  				eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume fetching pvc failed", err)
  1529  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1530  			}
  1531  			pvcStatusSize := pvc.Status.Capacity.Storage()
  1532  			if pvcStatusSize != nil {
  1533  				claimSize = pvcStatusSize
  1534  			}
  1535  		}
  1536  
  1537  		if !volumeToMount.PluginIsAttachable {
  1538  			// If the volume does not implement the attacher interface, it is
  1539  			// assumed to be attached and the actual state of the world is
  1540  			// updated accordingly.
  1541  
  1542  			addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
  1543  				logger, volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" /* devicePath */)
  1544  			if addVolumeNodeErr != nil {
  1545  				// On failure, return error. Caller will log and retry.
  1546  				eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
  1547  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1548  			}
  1549  			actualStateOfWorld.InitializeClaimSize(logger, volumeToMount.VolumeName, claimSize)
  1550  			return volumetypes.NewOperationContext(nil, nil, migrated)
  1551  		}
  1552  
  1553  		if !volumeToMount.ReportedInUse {
  1554  			// If the given volume has not yet been added to the list of
  1555  			// VolumesInUse in the node's volume status, do not proceed, return
  1556  			// error. Caller will log and retry. The node status is updated
  1557  			// periodically by kubelet, so it may take as much as 10 seconds
  1558  			// before this clears.
  1559  			// Issue #28141 to enable on demand status updates.
  1560  			eventErr, detailedErr := volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil)
  1561  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1562  		}
  1563  
  1564  		// Fetch current node object
  1565  		node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(nodeName), metav1.GetOptions{})
  1566  		if fetchErr != nil {
  1567  			// On failure, return error. Caller will log and retry.
  1568  			eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr)
  1569  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1570  		}
  1571  
  1572  		for _, attachedVolume := range node.Status.VolumesAttached {
  1573  			if attachedVolume.Name == volumeToMount.VolumeName {
  1574  				addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
  1575  					logger, v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
  1576  				klog.InfoS(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
  1577  				if addVolumeNodeErr != nil {
  1578  					// On failure, return error. Caller will log and retry.
  1579  					eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
  1580  					return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1581  				}
  1582  				actualStateOfWorld.InitializeClaimSize(logger, volumeToMount.VolumeName, claimSize)
  1583  				return volumetypes.NewOperationContext(nil, nil, migrated)
  1584  			}
  1585  		}
  1586  
  1587  		// Volume not attached, return error. Caller will log and retry.
  1588  		eventErr, detailedErr := volumeToMount.GenerateError("Volume not attached according to node status", nil)
  1589  		return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1590  	}
  1591  
  1592  	return volumetypes.GeneratedOperations{
  1593  		OperationName:     VerifyControllerAttachedVolumeOpName,
  1594  		OperationFunc:     verifyControllerAttachedVolumeFunc,
  1595  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"),
  1596  		EventRecorderFunc: nil, // nil because we do not want to generate event on error
  1597  	}, nil
  1598  
  1599  }
  1600  
  1601  func (og *operationGenerator) verifyVolumeIsSafeToDetach(
  1602  	volumeToDetach AttachedVolume) error {
  1603  	// Fetch current node object
  1604  	node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(volumeToDetach.NodeName), metav1.GetOptions{})
  1605  	if fetchErr != nil {
  1606  		if errors.IsNotFound(fetchErr) {
  1607  			klog.Warningf(volumeToDetach.GenerateMsgDetailed("Node not found on API server. DetachVolume will skip safe to detach check", ""))
  1608  			return nil
  1609  		}
  1610  
  1611  		// On failure, return error. Caller will log and retry.
  1612  		return volumeToDetach.GenerateErrorDetailed("DetachVolume failed fetching node from API server", fetchErr)
  1613  	}
  1614  
  1615  	for _, inUseVolume := range node.Status.VolumesInUse {
  1616  		if inUseVolume == volumeToDetach.VolumeName {
  1617  			return volumeToDetach.GenerateErrorDetailed(
  1618  				"DetachVolume failed",
  1619  				fmt.Errorf("volume is still in use by node, according to Node status"))
  1620  		}
  1621  	}
  1622  
  1623  	// Volume is not marked as in use by node
  1624  	klog.Infof(volumeToDetach.GenerateMsgDetailed("Verified volume is safe to detach", ""))
  1625  	return nil
  1626  }
  1627  
  1628  func (og *operationGenerator) GenerateExpandVolumeFunc(
  1629  	pvc *v1.PersistentVolumeClaim,
  1630  	pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
  1631  
  1632  	volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
  1633  
  1634  	volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
  1635  	if err != nil {
  1636  		return volumetypes.GeneratedOperations{}, fmt.Errorf("error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1637  	}
  1638  
  1639  	if volumePlugin == nil {
  1640  		return volumetypes.GeneratedOperations{}, fmt.Errorf("can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1641  	}
  1642  
  1643  	expandVolumeFunc := func() volumetypes.OperationContext {
  1644  		migrated := false
  1645  
  1646  		newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
  1647  		statusSize := pvc.Status.Capacity[v1.ResourceStorage]
  1648  		pvSize := pv.Spec.Capacity[v1.ResourceStorage]
  1649  		if pvSize.Cmp(newSize) < 0 {
  1650  			updatedSize, expandErr := volumePlugin.ExpandVolumeDevice(
  1651  				volumeSpec,
  1652  				newSize,
  1653  				statusSize)
  1654  			if expandErr != nil {
  1655  				detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), volumePlugin.GetPluginName(), expandErr)
  1656  				return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
  1657  			}
  1658  
  1659  			klog.Infof("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1660  
  1661  			newSize = updatedSize
  1662  			// k8s doesn't have transactions, we can't guarantee that after updating PV - updating PVC will be
  1663  			// successful, that is why all PVCs for which pvc.Spec.Size > pvc.Status.Size must be reprocessed
  1664  			// until they reflect user requested size in pvc.Status.Size
  1665  			_, updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient)
  1666  			if updateErr != nil {
  1667  				detailedErr := fmt.Errorf("error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr)
  1668  				return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
  1669  			}
  1670  
  1671  			klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1672  		}
  1673  
  1674  		fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec)
  1675  		// No Cloudprovider resize needed, lets mark resizing as done
  1676  		// Rest of the volume expand controller code will assume PVC as *not* resized until pvc.Status.Size
  1677  		// reflects user requested size.
  1678  		if !volumePlugin.RequiresFSResize() || !fsVolume {
  1679  			klog.V(4).Infof("Controller resizing done for PVC %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1680  			_, err := util.MarkResizeFinished(pvc, newSize, og.kubeClient)
  1681  			if err != nil {
  1682  				detailedErr := fmt.Errorf("error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1683  				return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
  1684  			}
  1685  			successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1686  			og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
  1687  		} else {
  1688  			_, err := util.MarkForFSResize(pvc, og.kubeClient)
  1689  			if err != nil {
  1690  				detailedErr := fmt.Errorf("error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1691  				klog.Warning(detailedErr)
  1692  				return volumetypes.NewOperationContext(nil, nil, migrated)
  1693  			}
  1694  			oldCapacity := pvc.Status.Capacity[v1.ResourceStorage]
  1695  			err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient)
  1696  			if err != nil {
  1697  				detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err)
  1698  				klog.Warning(detailedErr)
  1699  				return volumetypes.NewOperationContext(nil, nil, migrated)
  1700  			}
  1701  
  1702  		}
  1703  		return volumetypes.NewOperationContext(nil, nil, migrated)
  1704  	}
  1705  
  1706  	eventRecorderFunc := func(err *error) {
  1707  		if *err != nil {
  1708  			og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
  1709  		}
  1710  	}
  1711  
  1712  	return volumetypes.GeneratedOperations{
  1713  		OperationName:     "expand_volume",
  1714  		OperationFunc:     expandVolumeFunc,
  1715  		EventRecorderFunc: eventRecorderFunc,
  1716  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeSpec), "expand_volume"),
  1717  	}, nil
  1718  }
  1719  
  1720  func (og *operationGenerator) GenerateExpandAndRecoverVolumeFunc(
  1721  	pvc *v1.PersistentVolumeClaim,
  1722  	pv *v1.PersistentVolume, resizerName string) (volumetypes.GeneratedOperations, error) {
  1723  
  1724  	volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
  1725  
  1726  	volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
  1727  	if err != nil {
  1728  		return volumetypes.GeneratedOperations{}, fmt.Errorf("error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1729  	}
  1730  
  1731  	if volumePlugin == nil {
  1732  		return volumetypes.GeneratedOperations{}, fmt.Errorf("can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1733  	}
  1734  
  1735  	expandVolumeFunc := func() volumetypes.OperationContext {
  1736  		resizeOpts := inTreeResizeOpts{
  1737  			pvc:          pvc,
  1738  			pv:           pv,
  1739  			resizerName:  resizerName,
  1740  			volumePlugin: volumePlugin,
  1741  			volumeSpec:   volumeSpec,
  1742  		}
  1743  		migrated := false
  1744  		resp := og.expandAndRecoverFunction(resizeOpts)
  1745  		if resp.err != nil {
  1746  			return volumetypes.NewOperationContext(resp.err, resp.err, migrated)
  1747  		}
  1748  		return volumetypes.NewOperationContext(nil, nil, migrated)
  1749  	}
  1750  
  1751  	eventRecorderFunc := func(err *error) {
  1752  		if *err != nil {
  1753  			og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
  1754  		}
  1755  	}
  1756  
  1757  	return volumetypes.GeneratedOperations{
  1758  		OperationName:     "expand_volume",
  1759  		OperationFunc:     expandVolumeFunc,
  1760  		EventRecorderFunc: eventRecorderFunc,
  1761  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeSpec), "expand_volume"),
  1762  	}, nil
  1763  }
  1764  
  1765  func (og *operationGenerator) expandAndRecoverFunction(resizeOpts inTreeResizeOpts) inTreeResizeResponse {
  1766  	pvc := resizeOpts.pvc
  1767  	pv := resizeOpts.pv
  1768  	resizerName := resizeOpts.resizerName
  1769  	volumePlugin := resizeOpts.volumePlugin
  1770  	volumeSpec := resizeOpts.volumeSpec
  1771  
  1772  	pvcSpecSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
  1773  	pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage]
  1774  	pvSize := pv.Spec.Capacity[v1.ResourceStorage]
  1775  
  1776  	resizeResponse := inTreeResizeResponse{
  1777  		pvc:          pvc,
  1778  		pv:           pv,
  1779  		resizeCalled: false,
  1780  	}
  1781  
  1782  	// by default we are expanding to fulfill size requested in pvc.Spec.Resources
  1783  	newSize := pvcSpecSize
  1784  
  1785  	var resizeStatus v1.ClaimResourceStatus
  1786  	if status, ok := pvc.Status.AllocatedResourceStatuses[v1.ResourceStorage]; ok {
  1787  		resizeStatus = status
  1788  	}
  1789  
  1790  	var allocatedSize *resource.Quantity
  1791  	t, ok := pvc.Status.AllocatedResources[v1.ResourceStorage]
  1792  	if ok {
  1793  		allocatedSize = &t
  1794  	}
  1795  	var err error
  1796  
  1797  	if pvSize.Cmp(pvcSpecSize) < 0 {
  1798  		// pv is not of requested size yet and hence will require expanding
  1799  
  1800  		switch resizeStatus {
  1801  		case v1.PersistentVolumeClaimControllerResizeInProgress,
  1802  			v1.PersistentVolumeClaimNodeResizePending,
  1803  			v1.PersistentVolumeClaimNodeResizeInProgress,
  1804  			v1.PersistentVolumeClaimNodeResizeFailed:
  1805  			if allocatedSize != nil {
  1806  				newSize = *allocatedSize
  1807  			}
  1808  		default:
  1809  			newSize = pvcSpecSize
  1810  		}
  1811  	} else {
  1812  		// PV has already been expanded and hence we can be here for following reasons:
  1813  		//   1. If expansion is pending on the node and this was just a spurious update event
  1814  		//      we don't need to do anything and let kubelet handle it.
  1815  		//   2. It could be that - although we successfully expanded the volume, we failed to
  1816  		//      record our work in API objects, in which case - we should resume resizing operation
  1817  		//      and let API objects be updated.
  1818  		//   3. Controller successfully expanded the volume, but expansion is failing on the node
  1819  		//      and before kubelet can retry failed node expansion - controller must verify if it is
  1820  		//      safe to do so.
  1821  		//   4. While expansion was still pending on the node, user reduced the pvc size.
  1822  		switch resizeStatus {
  1823  		case v1.PersistentVolumeClaimNodeResizeInProgress,
  1824  			v1.PersistentVolumeClaimNodeResizePending:
  1825  			// we don't need to do any work. We could be here because of a spurious update event.
  1826  			// This is case #1
  1827  			return resizeResponse
  1828  		case v1.PersistentVolumeClaimNodeResizeFailed:
  1829  			// This is case#3
  1830  			pvc, err = og.markForPendingNodeExpansion(pvc, pv)
  1831  			resizeResponse.pvc = pvc
  1832  			resizeResponse.err = err
  1833  			return resizeResponse
  1834  		case v1.PersistentVolumeClaimControllerResizeInProgress,
  1835  			v1.PersistentVolumeClaimControllerResizeFailed:
  1836  			// This is case#2 or it could also be case#4 when user manually shrunk the PVC
  1837  			// after expanding it.
  1838  			if allocatedSize != nil {
  1839  				newSize = *allocatedSize
  1840  			}
  1841  		default:
  1842  			// It is impossible for ResizeStatus to be "" and allocatedSize to be not nil but somehow
  1843  			// if we do end up in this state, it is safest to resume expansion to last recorded size in
  1844  			// allocatedSize variable.
  1845  			if resizeStatus == "" && allocatedSize != nil {
  1846  				newSize = *allocatedSize
  1847  			} else {
  1848  				newSize = pvcSpecSize
  1849  			}
  1850  		}
  1851  	}
  1852  
  1853  	pvc, err = util.MarkControllerReisizeInProgress(pvc, resizerName, newSize, og.kubeClient)
  1854  	if err != nil {
  1855  		msg := fmt.Errorf("error updating pvc %s with resize in progress: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1856  		resizeResponse.err = msg
  1857  		resizeResponse.pvc = pvc
  1858  		return resizeResponse
  1859  	}
  1860  
  1861  	updatedSize, err := volumePlugin.ExpandVolumeDevice(volumeSpec, newSize, pvcStatusSize)
  1862  	resizeResponse.resizeCalled = true
  1863  
  1864  	if err != nil {
  1865  		msg := fmt.Errorf("error expanding pvc %s: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1866  		resizeResponse.err = msg
  1867  		resizeResponse.pvc = pvc
  1868  		return resizeResponse
  1869  	}
  1870  
  1871  	// update PV size
  1872  	var updateErr error
  1873  	pv, updateErr = util.UpdatePVSize(pv, updatedSize, og.kubeClient)
  1874  	// if updating PV failed, we are going to leave the PVC in ControllerExpansionInProgress state, so as expansion can be retried to previously set allocatedSize value.
  1875  	if updateErr != nil {
  1876  		msg := fmt.Errorf("error updating pv for pvc %s: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr)
  1877  		resizeResponse.err = msg
  1878  		return resizeResponse
  1879  	}
  1880  	resizeResponse.pv = pv
  1881  
  1882  	fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec)
  1883  
  1884  	if !volumePlugin.RequiresFSResize() || !fsVolume {
  1885  		pvc, err = util.MarkResizeFinished(pvc, updatedSize, og.kubeClient)
  1886  		if err != nil {
  1887  			msg := fmt.Errorf("error marking pvc %s as resized: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1888  			resizeResponse.err = msg
  1889  			return resizeResponse
  1890  		}
  1891  		resizeResponse.pvc = pvc
  1892  		successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
  1893  		og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
  1894  	} else {
  1895  		pvc, err = og.markForPendingNodeExpansion(pvc, pv)
  1896  		resizeResponse.pvc = pvc
  1897  		if err != nil {
  1898  			msg := fmt.Errorf("error marking pvc %s for node expansion: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1899  			resizeResponse.err = msg
  1900  			return resizeResponse
  1901  		}
  1902  	}
  1903  	return resizeResponse
  1904  }
  1905  
  1906  func (og *operationGenerator) markForPendingNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) {
  1907  	var err error
  1908  	pvc, err = util.MarkForFSResize(pvc, og.kubeClient)
  1909  	if err != nil {
  1910  		msg := fmt.Errorf("error marking pvc %s for node expansion: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
  1911  		return pvc, msg
  1912  	}
  1913  	// store old PVC capacity in pv, so as if PVC gets deleted while node expansion was pending
  1914  	// we can restore size of pvc from PV annotation and still perform expansion on the node
  1915  	oldCapacity := pvc.Status.Capacity[v1.ResourceStorage]
  1916  	err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient)
  1917  	if err != nil {
  1918  		detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err)
  1919  		klog.Warning(detailedErr)
  1920  		return pvc, detailedErr
  1921  	}
  1922  	return pvc, nil
  1923  }
  1924  
  1925  func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
  1926  	volumeToMount VolumeToMount,
  1927  	actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
  1928  
  1929  	volumePlugin, err :=
  1930  		og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
  1931  	if err != nil || volumePlugin == nil {
  1932  		return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("NodeExpandVolume.FindPluginBySpec failed", err)
  1933  	}
  1934  
  1935  	fsResizeFunc := func() volumetypes.OperationContext {
  1936  		var resizeDone bool
  1937  		var eventErr, detailedErr error
  1938  		migrated := false
  1939  
  1940  		if currentSize.IsZero() || volumeToMount.DesiredPersistentVolumeSize.IsZero() {
  1941  			err := fmt.Errorf("current or new size of the volume is not set")
  1942  			eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.expansion failed", err)
  1943  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1944  		}
  1945  
  1946  		resizeOptions := volume.NodeResizeOptions{
  1947  			VolumeSpec: volumeToMount.VolumeSpec,
  1948  			DevicePath: volumeToMount.DevicePath,
  1949  			OldSize:    currentSize,
  1950  			NewSize:    volumeToMount.DesiredPersistentVolumeSize,
  1951  		}
  1952  		fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
  1953  		if err != nil {
  1954  			eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.CheckVolumeModeFilesystem failed", err)
  1955  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1956  		}
  1957  
  1958  		if fsVolume {
  1959  			volumeMounter, newMounterErr := volumePlugin.NewMounter(
  1960  				volumeToMount.VolumeSpec,
  1961  				volumeToMount.Pod,
  1962  				volume.VolumeOptions{})
  1963  			if newMounterErr != nil {
  1964  				eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr)
  1965  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1966  			}
  1967  
  1968  			resizeOptions.DeviceMountPath = volumeMounter.GetPath()
  1969  
  1970  			deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
  1971  			var volumeDeviceMounter volume.DeviceMounter
  1972  			if deviceMountableVolumePlugin != nil {
  1973  				volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
  1974  			}
  1975  
  1976  			if volumeDeviceMounter != nil {
  1977  				deviceStagePath, err := volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
  1978  				if err != nil {
  1979  					eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err)
  1980  					return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1981  				}
  1982  				resizeOptions.DeviceStagePath = deviceStagePath
  1983  			}
  1984  		} else {
  1985  			// Get block volume mapper plugin
  1986  			blockVolumePlugin, err :=
  1987  				og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec)
  1988  			if err != nil {
  1989  				eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed", err)
  1990  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1991  			}
  1992  
  1993  			if blockVolumePlugin == nil {
  1994  				eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
  1995  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  1996  			}
  1997  
  1998  			blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper(
  1999  				volumeToMount.VolumeSpec,
  2000  				volumeToMount.Pod,
  2001  				volume.VolumeOptions{})
  2002  			if newMapperErr != nil {
  2003  				eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr)
  2004  				return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  2005  			}
  2006  
  2007  			// if plugin supports custom mappers lets add DeviceStagePath
  2008  			if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok {
  2009  				resizeOptions.DeviceStagePath = customBlockVolumeMapper.GetStagingPath()
  2010  			}
  2011  		}
  2012  
  2013  		// if we are doing online expansion then volume is already published
  2014  		resizeDone, eventErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
  2015  		if eventErr != nil || detailedErr != nil {
  2016  			return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  2017  		}
  2018  		if resizeDone {
  2019  			return volumetypes.NewOperationContext(nil, nil, migrated)
  2020  		}
  2021  		// This is a placeholder error - we should NEVER reach here.
  2022  		err = fmt.Errorf("volume resizing failed for unknown reason")
  2023  		eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed to resize volume", err)
  2024  		return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
  2025  	}
  2026  
  2027  	eventRecorderFunc := func(err *error) {
  2028  		if *err != nil {
  2029  			og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
  2030  		}
  2031  	}
  2032  
  2033  	return volumetypes.GeneratedOperations{
  2034  		OperationName:     "volume_fs_resize",
  2035  		OperationFunc:     fsResizeFunc,
  2036  		EventRecorderFunc: eventRecorderFunc,
  2037  		CompleteFunc:      util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "volume_fs_resize"),
  2038  	}, nil
  2039  }
  2040  
  2041  func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
  2042  	actualStateOfWorld ActualStateOfWorldMounterUpdater,
  2043  	resizeOptions volume.NodeResizeOptions) (bool, error, error) {
  2044  
  2045  	resizeDone, err := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
  2046  	if err != nil {
  2047  		e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed", err)
  2048  		klog.Errorf(e2.Error())
  2049  		return false, e1, e2
  2050  	}
  2051  	if resizeDone {
  2052  		markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, &resizeOptions.NewSize)
  2053  		if !markingDone {
  2054  			// On failure, return error. Caller will log and retry.
  2055  			genericFailureError := fmt.Errorf("unable to mark volume as resized")
  2056  			e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", genericFailureError)
  2057  			return false, e1, e2
  2058  		}
  2059  		return true, nil, nil
  2060  	}
  2061  	return false, nil, nil
  2062  }
  2063  
  2064  func (og *operationGenerator) expandVolumeDuringMount(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, rsOpts volume.NodeResizeOptions) (bool, error) {
  2065  	supportsExpansion, expandablePlugin := og.checkIfSupportsNodeExpansion(volumeToMount)
  2066  	if supportsExpansion {
  2067  		pv := volumeToMount.VolumeSpec.PersistentVolume
  2068  		pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
  2069  		if err != nil {
  2070  			// Return error rather than leave the file system un-resized, caller will log and retry
  2071  			return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err)
  2072  		}
  2073  
  2074  		pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
  2075  		pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
  2076  		if pvcStatusCap.Cmp(pvSpecCap) < 0 {
  2077  			if volumeToMount.VolumeSpec.ReadOnly {
  2078  				simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system")
  2079  				klog.Warningf(detailedMsg)
  2080  				og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
  2081  				og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
  2082  				return true, nil
  2083  			}
  2084  
  2085  			rsOpts.NewSize = pvSpecCap
  2086  			rsOpts.OldSize = pvcStatusCap
  2087  			resizeOp := nodeResizeOperationOpts{
  2088  				vmt:                volumeToMount,
  2089  				pvc:                pvc,
  2090  				pv:                 pv,
  2091  				pluginResizeOpts:   rsOpts,
  2092  				volumePlugin:       expandablePlugin,
  2093  				actualStateOfWorld: actualStateOfWorld,
  2094  			}
  2095  			if og.checkForRecoveryFromExpansion(pvc, volumeToMount) {
  2096  				nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
  2097  				resizeFinished, err, _ := nodeExpander.expandOnPlugin()
  2098  				return resizeFinished, err
  2099  			} else {
  2100  				return og.legacyCallNodeExpandOnPlugin(resizeOp)
  2101  			}
  2102  		}
  2103  	}
  2104  	return true, nil
  2105  }
  2106  
  2107  func (og *operationGenerator) checkIfSupportsNodeExpansion(volumeToMount VolumeToMount) (bool, volume.NodeExpandableVolumePlugin) {
  2108  	if volumeToMount.VolumeSpec != nil &&
  2109  		volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
  2110  		klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName)
  2111  		return false, nil
  2112  	}
  2113  
  2114  	// Get expander, if possible
  2115  	expandableVolumePlugin, _ :=
  2116  		og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec)
  2117  	if expandableVolumePlugin != nil &&
  2118  		expandableVolumePlugin.RequiresFSResize() &&
  2119  		volumeToMount.VolumeSpec.PersistentVolume != nil {
  2120  		return true, expandableVolumePlugin
  2121  	}
  2122  	return false, nil
  2123  }
  2124  
  2125  func (og *operationGenerator) nodeExpandVolume(
  2126  	volumeToMount VolumeToMount,
  2127  	actualStateOfWorld ActualStateOfWorldMounterUpdater,
  2128  	rsOpts volume.NodeResizeOptions) (bool, error) {
  2129  
  2130  	supportsExpansion, expandableVolumePlugin := og.checkIfSupportsNodeExpansion(volumeToMount)
  2131  
  2132  	if supportsExpansion {
  2133  		// lets use sizes handed over to us by caller for comparison
  2134  		if rsOpts.NewSize.Cmp(rsOpts.OldSize) > 0 {
  2135  			pv := volumeToMount.VolumeSpec.PersistentVolume
  2136  			pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
  2137  			if err != nil {
  2138  				// Return error rather than leave the file system un-resized, caller will log and retry
  2139  				return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err)
  2140  			}
  2141  
  2142  			if volumeToMount.VolumeSpec.ReadOnly {
  2143  				simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system")
  2144  				klog.Warningf(detailedMsg)
  2145  				og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
  2146  				og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
  2147  				return true, nil
  2148  			}
  2149  			resizeOp := nodeResizeOperationOpts{
  2150  				vmt:                volumeToMount,
  2151  				pvc:                pvc,
  2152  				pv:                 pv,
  2153  				pluginResizeOpts:   rsOpts,
  2154  				volumePlugin:       expandableVolumePlugin,
  2155  				actualStateOfWorld: actualStateOfWorld,
  2156  			}
  2157  
  2158  			if og.checkForRecoveryFromExpansion(pvc, volumeToMount) {
  2159  				nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
  2160  				resizeFinished, err, _ := nodeExpander.expandOnPlugin()
  2161  				return resizeFinished, err
  2162  			} else {
  2163  				return og.legacyCallNodeExpandOnPlugin(resizeOp)
  2164  			}
  2165  		}
  2166  	}
  2167  	return true, nil
  2168  }
  2169  
  2170  func (og *operationGenerator) checkForRecoveryFromExpansion(pvc *v1.PersistentVolumeClaim, volumeToMount VolumeToMount) bool {
  2171  	resizeStatus := pvc.Status.AllocatedResourceStatuses[v1.ResourceStorage]
  2172  	allocatedResource := pvc.Status.AllocatedResources
  2173  	featureGateStatus := utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure)
  2174  
  2175  	if !featureGateStatus {
  2176  		return false
  2177  	}
  2178  
  2179  	// Even though RecoverVolumeExpansionFailure feature gate is enabled, it appears that we are running with older version
  2180  	// of resize controller, which will not populate allocatedResource and resizeStatus. This can happen because of version skew
  2181  	// and hence we are going to keep expanding using older logic.
  2182  	if resizeStatus == "" && allocatedResource == nil {
  2183  		_, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume running with", "older external resize controller")
  2184  		klog.Warningf(detailedMsg)
  2185  		return false
  2186  	}
  2187  	return true
  2188  }
  2189  
  2190  // legacyCallNodeExpandOnPlugin is old version of calling node expansion on plugin, which does not support
  2191  // recovery from volume expansion failure
  2192  // TODO: Removing this code when RecoverVolumeExpansionFailure feature goes GA.
  2193  func (og *operationGenerator) legacyCallNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) (bool, error) {
  2194  	pvc := resizeOp.pvc
  2195  	volumeToMount := resizeOp.vmt
  2196  	rsOpts := resizeOp.pluginResizeOpts
  2197  	actualStateOfWorld := resizeOp.actualStateOfWorld
  2198  	expandableVolumePlugin := resizeOp.volumePlugin
  2199  
  2200  	pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
  2201  
  2202  	nodeName := volumeToMount.Pod.Spec.NodeName
  2203  
  2204  	var err error
  2205  
  2206  	// File system resize was requested, proceed
  2207  	klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
  2208  
  2209  	rsOpts.VolumeSpec = volumeToMount.VolumeSpec
  2210  
  2211  	_, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
  2212  	if resizeErr != nil {
  2213  		// This is a workaround for now, until RecoverFromVolumeExpansionFailure feature goes GA.
  2214  		// If RecoverFromVolumeExpansionFailure feature is enabled, we will not ever hit this state, because
  2215  		// we will wait for VolumeExpansionPendingOnNode before trying to expand volume in kubelet.
  2216  		if volumetypes.IsOperationNotSupportedError(resizeErr) {
  2217  			klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume failed", "NodeExpandVolume not supported"), "pod", klog.KObj(volumeToMount.Pod))
  2218  			return true, nil
  2219  		}
  2220  
  2221  		// if driver returned FailedPrecondition error that means
  2222  		// volume expansion should not be retried on this node but
  2223  		// expansion operation should not block mounting
  2224  		if volumetypes.IsFailedPreconditionError(resizeErr) {
  2225  			actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
  2226  			klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed", resizeErr).Error())
  2227  			return true, nil
  2228  		}
  2229  		return false, resizeErr
  2230  	}
  2231  
  2232  	simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName)
  2233  	og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
  2234  	og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
  2235  	klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
  2236  
  2237  	// if PVC already has new size, there is no need to update it.
  2238  	if pvcStatusCap.Cmp(rsOpts.NewSize) >= 0 {
  2239  		return true, nil
  2240  	}
  2241  
  2242  	// File system resize succeeded, now update the PVC's Capacity to match the PV's
  2243  	_, err = util.MarkFSResizeFinished(pvc, rsOpts.NewSize, og.kubeClient)
  2244  	if err != nil {
  2245  		// On retry, NodeExpandVolume will be called again but do nothing
  2246  		return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
  2247  	}
  2248  	return true, nil
  2249  }
  2250  
  2251  func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error {
  2252  	mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec)
  2253  
  2254  	if len(mountOptions) > 0 && !plugin.SupportsMountOption() {
  2255  		return fmt.Errorf("mount options are not supported for this volume type")
  2256  	}
  2257  	return nil
  2258  }
  2259  
  2260  // checkNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels
  2261  // This ensures that we don't mount a volume that doesn't belong to this node
  2262  func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount) error {
  2263  	pv := volumeToMount.VolumeSpec.PersistentVolume
  2264  	if pv != nil {
  2265  		nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels()
  2266  		if err != nil {
  2267  			return err
  2268  		}
  2269  		err = storagehelpers.CheckNodeAffinity(pv, nodeLabels)
  2270  		if err != nil {
  2271  			return err
  2272  		}
  2273  	}
  2274  	return nil
  2275  }
  2276  
  2277  // isDeviceOpened checks the device status if the device is in use anywhere else on the system
  2278  func isDeviceOpened(deviceToDetach AttachedVolume, hostUtil hostutil.HostUtils) (bool, error) {
  2279  	isDevicePath, devicePathErr := hostUtil.PathIsDevice(deviceToDetach.DevicePath)
  2280  	var deviceOpened bool
  2281  	var deviceOpenedErr error
  2282  	if !isDevicePath && devicePathErr == nil ||
  2283  		(devicePathErr != nil && strings.Contains(devicePathErr.Error(), "does not exist")) {
  2284  		// not a device path or path doesn't exist
  2285  		//TODO: refer to #36092
  2286  		klog.V(3).Infof("The path isn't device path or doesn't exist. Skip checking device path: %s", deviceToDetach.DevicePath)
  2287  		deviceOpened = false
  2288  	} else if devicePathErr != nil {
  2289  		return false, deviceToDetach.GenerateErrorDetailed("PathIsDevice failed", devicePathErr)
  2290  	} else {
  2291  		deviceOpened, deviceOpenedErr = hostUtil.DeviceOpened(deviceToDetach.DevicePath)
  2292  		if deviceOpenedErr != nil {
  2293  			return false, deviceToDetach.GenerateErrorDetailed("DeviceOpened failed", deviceOpenedErr)
  2294  		}
  2295  	}
  2296  	return deviceOpened, nil
  2297  }
  2298  
  2299  // findDetachablePluginBySpec is a variant of VolumePluginMgr.FindAttachablePluginByName() function.
  2300  // The difference is that it bypass the CanAttach() check for CSI plugin, i.e. it assumes all CSI plugin supports detach.
  2301  // The intention here is that a CSI plugin volume can end up in an Uncertain state,  so that a detach
  2302  // operation will help it to detach no matter it actually has the ability to attach/detach.
  2303  func findDetachablePluginBySpec(spec *volume.Spec, pm *volume.VolumePluginMgr) (volume.AttachableVolumePlugin, error) {
  2304  	volumePlugin, err := pm.FindPluginBySpec(spec)
  2305  	if err != nil {
  2306  		return nil, err
  2307  	}
  2308  	if attachableVolumePlugin, ok := volumePlugin.(volume.AttachableVolumePlugin); ok {
  2309  		if attachableVolumePlugin.GetPluginName() == "kubernetes.io/csi" {
  2310  			return attachableVolumePlugin, nil
  2311  		}
  2312  		if canAttach, err := attachableVolumePlugin.CanAttach(spec); err != nil {
  2313  			return nil, err
  2314  		} else if canAttach {
  2315  			return attachableVolumePlugin, nil
  2316  		}
  2317  	}
  2318  	return nil, nil
  2319  }
  2320  
  2321  func getMigratedStatusBySpec(spec *volume.Spec) bool {
  2322  	migrated := false
  2323  	if spec != nil {
  2324  		migrated = spec.Migrated
  2325  	}
  2326  	return migrated
  2327  }
  2328  

View as plain text