...

Source file src/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor.go

Documentation: k8s.io/kubernetes/pkg/volume/util/operationexecutor

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package operationexecutor implements interfaces that enable execution of
    18  // attach, detach, mount, and unmount operations with a
    19  // nestedpendingoperations so that more than one operation is never triggered
    20  // on the same volume for the same pod.
    21  package operationexecutor
    22  
    23  import (
    24  	"errors"
    25  	"fmt"
    26  	"time"
    27  
    28  	"github.com/go-logr/logr"
    29  
    30  	"k8s.io/klog/v2"
    31  
    32  	v1 "k8s.io/api/core/v1"
    33  	"k8s.io/apimachinery/pkg/api/resource"
    34  	"k8s.io/apimachinery/pkg/types"
    35  	"k8s.io/kubernetes/pkg/volume"
    36  	"k8s.io/kubernetes/pkg/volume/util"
    37  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
    38  	"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
    39  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    40  )
    41  
    42  // OperationExecutor defines a set of operations for attaching, detaching,
    43  // mounting, or unmounting a volume that are executed with a NewNestedPendingOperations which
    44  // prevents more than one operation from being triggered on the same volume.
    45  //
    46  // These operations should be idempotent (for example, AttachVolume should
    47  // still succeed if the volume is already attached to the node, etc.). However,
    48  // they depend on the volume plugins to implement this behavior.
    49  //
    50  // Once an operation completes successfully, the actualStateOfWorld is updated
    51  // to indicate the volume is attached/detached/mounted/unmounted.
    52  //
    53  // If the OperationExecutor fails to start the operation because, for example,
    54  // an operation with the same UniqueVolumeName is already pending, a non-nil
    55  // error is returned.
    56  //
    57  // Once the operation is started, since it is executed asynchronously,
    58  // errors are simply logged and the goroutine is terminated without updating
    59  // actualStateOfWorld (callers are responsible for retrying as needed).
    60  //
    61  // Some of these operations may result in calls to the API server; callers are
    62  // responsible for rate limiting on errors.
    63  type OperationExecutor interface {
    64  	// AttachVolume attaches the volume to the node specified in volumeToAttach.
    65  	// It then updates the actual state of the world to reflect that.
    66  	AttachVolume(logger klog.Logger, volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
    67  
    68  	// VerifyVolumesAreAttachedPerNode verifies the given list of volumes to see whether they are still attached to the node.
    69  	// If any volume is not attached right now, it will update the actual state of the world to reflect that.
    70  	// Note that this operation could be operated concurrently with other attach/detach operations.
    71  	// In theory (but very unlikely in practise), race condition among these operations might mark volume as detached
    72  	// even if it is attached. But reconciler can correct this in a short period of time.
    73  	VerifyVolumesAreAttachedPerNode(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
    74  
    75  	// VerifyVolumesAreAttached verifies volumes being used in entire cluster and if they are still attached to the node
    76  	// If any volume is not attached right now, it will update actual state of world to reflect that.
    77  	VerifyVolumesAreAttached(volumesToVerify map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater)
    78  
    79  	// DetachVolume detaches the volume from the node specified in
    80  	// volumeToDetach, and updates the actual state of the world to reflect
    81  	// that. If verifySafeToDetach is set, a call is made to the fetch the node
    82  	// object and it is used to verify that the volume does not exist in Node's
    83  	// Status.VolumesInUse list (operation fails with error if it is).
    84  	DetachVolume(logger klog.Logger, volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
    85  
    86  	// If a volume has 'Filesystem' volumeMode, MountVolume mounts the
    87  	// volume to the pod specified in volumeToMount.
    88  	// Specifically it will:
    89  	// * Wait for the device to finish attaching (for attachable volumes only).
    90  	// * Mount device to global mount path (for attachable volumes only).
    91  	// * Update actual state of world to reflect volume is globally mounted (for
    92  	//   attachable volumes only).
    93  	// * Mount the volume to the pod specific path.
    94  	// * Update actual state of world to reflect volume is mounted to the pod
    95  	//   path.
    96  	// The parameter "isRemount" is informational and used to adjust logging
    97  	// verbosity. An initial mount is more log-worthy than a remount, for
    98  	// example.
    99  	//
   100  	// For 'Block' volumeMode, this method creates a symbolic link to
   101  	// the volume from both the pod specified in volumeToMount and global map path.
   102  	// Specifically it will:
   103  	// * Wait for the device to finish attaching (for attachable volumes only).
   104  	// * Update actual state of world to reflect volume is globally mounted/mapped.
   105  	// * Map volume to global map path using symbolic link.
   106  	// * Map the volume to the pod device map path using symbolic link.
   107  	// * Update actual state of world to reflect volume is mounted/mapped to the pod path.
   108  	MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error
   109  
   110  	// If a volume has 'Filesystem' volumeMode, UnmountVolume unmounts the
   111  	// volume from the pod specified in volumeToUnmount and updates the actual
   112  	// state of the world to reflect that.
   113  	//
   114  	// For 'Block' volumeMode, this method unmaps symbolic link to the volume
   115  	// from both the pod device map path in volumeToUnmount and global map path.
   116  	// And then, updates the actual state of the world to reflect that.
   117  	UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) error
   118  
   119  	// If a volume has 'Filesystem' volumeMode, UnmountDevice unmounts the
   120  	// volumes global mount path from the device (for attachable volumes only,
   121  	// freeing it for detach. It then updates the actual state of the world to
   122  	// reflect that.
   123  	//
   124  	// For 'Block' volumeMode, this method checks number of symbolic links under
   125  	// global map path. If number of reference is zero, remove global map path
   126  	// directory and free a volume for detach.
   127  	// It then updates the actual state of the world to reflect that.
   128  	UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) error
   129  
   130  	// VerifyControllerAttachedVolume checks if the specified volume is present
   131  	// in the specified nodes AttachedVolumes Status field. It uses kubeClient
   132  	// to fetch the node object.
   133  	// If the volume is found, the actual state of the world is updated to mark
   134  	// the volume as attached.
   135  	// If the volume does not implement the attacher interface, it is assumed to
   136  	// be attached and the actual state of the world is updated accordingly.
   137  	// If the volume is not found or there is an error (fetching the node
   138  	// object, for example) then an error is returned which triggers exponential
   139  	// back off on retries.
   140  	VerifyControllerAttachedVolume(logger klog.Logger, volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
   141  
   142  	// IsOperationPending returns true if an operation for the given volumeName
   143  	// and one of podName or nodeName is pending, otherwise it returns false
   144  	IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool
   145  	// IsOperationSafeToRetry returns false if an operation for the given volumeName
   146  	// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
   147  	IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool
   148  	// ExpandInUseVolume will resize volume's file system to expected size without unmounting the volume.
   149  	ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error
   150  	// ReconstructVolumeOperation construct a new volumeSpec and returns it created by plugin
   151  	ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (volume.ReconstructedVolume, error)
   152  }
   153  
   154  // NewOperationExecutor returns a new instance of OperationExecutor.
   155  func NewOperationExecutor(
   156  	operationGenerator OperationGenerator) OperationExecutor {
   157  
   158  	return &operationExecutor{
   159  		pendingOperations: nestedpendingoperations.NewNestedPendingOperations(
   160  			true /* exponentialBackOffOnError */),
   161  		operationGenerator: operationGenerator,
   162  	}
   163  }
   164  
   165  // MarkVolumeOpts is a struct to pass arguments to MountVolume functions
   166  type MarkVolumeOpts struct {
   167  	PodName             volumetypes.UniquePodName
   168  	PodUID              types.UID
   169  	VolumeName          v1.UniqueVolumeName
   170  	Mounter             volume.Mounter
   171  	BlockVolumeMapper   volume.BlockVolumeMapper
   172  	OuterVolumeSpecName string
   173  	VolumeGidVolume     string
   174  	VolumeSpec          *volume.Spec
   175  	VolumeMountState    VolumeMountState
   176  	SELinuxMountContext string
   177  }
   178  
   179  // ActualStateOfWorldMounterUpdater defines a set of operations updating the actual
   180  // state of the world cache after successful mount/unmount.
   181  type ActualStateOfWorldMounterUpdater interface {
   182  	// Marks the specified volume as mounted to the specified pod
   183  	MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error
   184  
   185  	// Marks the specified volume as unmounted from the specified pod
   186  	MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
   187  
   188  	// MarkVolumeMountAsUncertain marks state of volume mount for the pod uncertain
   189  	MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeOpts) error
   190  
   191  	// Marks the specified volume as having been globally mounted.
   192  	MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath, seLinuxMountContext string) error
   193  
   194  	// MarkDeviceAsUncertain marks device state in global mount path as uncertain
   195  	MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath, seLinuxMountContext string) error
   196  
   197  	// Marks the specified volume as having its global mount unmounted.
   198  	MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
   199  
   200  	// Marks the specified volume's file system resize request is finished.
   201  	MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool
   202  
   203  	// GetDeviceMountState returns mount state of the device in global path
   204  	GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState
   205  
   206  	// GetVolumeMountState returns mount state of the volume for the Pod
   207  	GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState
   208  
   209  	// IsVolumeMountedElsewhere returns whether the supplied volume is mounted in a Pod other than the supplied one
   210  	IsVolumeMountedElsewhere(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
   211  
   212  	// MarkForInUseExpansionError marks the volume to have in-use error during expansion.
   213  	// volume expansion must not be retried for this volume
   214  	MarkForInUseExpansionError(volumeName v1.UniqueVolumeName)
   215  
   216  	// CheckAndMarkVolumeAsUncertainViaReconstruction only adds volume to actual state of the world
   217  	// if volume was not already there. This avoid overwriting in any previously stored
   218  	// state. It returns error if there was an error adding the volume to ASOW.
   219  	// It returns true, if this operation resulted in volume being added to ASOW
   220  	// otherwise it returns false.
   221  	CheckAndMarkVolumeAsUncertainViaReconstruction(opts MarkVolumeOpts) (bool, error)
   222  
   223  	// CheckAndMarkDeviceUncertainViaReconstruction only adds device to actual state of the world
   224  	// if device was not already there. This avoids overwriting in any previously stored
   225  	// state. We only supply deviceMountPath because devicePath is already determined from
   226  	// VerifyControllerAttachedVolume function.
   227  	CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool
   228  
   229  	// IsVolumeReconstructed returns true if volume currently added to actual state of the world
   230  	// was found during reconstruction.
   231  	IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
   232  
   233  	// IsVolumeDeviceReconstructed returns true if volume device identified by volumeName has been
   234  	// found during reconstruction.
   235  	IsVolumeDeviceReconstructed(volumeName v1.UniqueVolumeName) bool
   236  }
   237  
   238  // ActualStateOfWorldAttacherUpdater defines a set of operations updating the
   239  // actual state of the world cache after successful attach/detach/mount/unmount.
   240  type ActualStateOfWorldAttacherUpdater interface {
   241  	// Marks the specified volume as attached to the specified node.  If the
   242  	// volume name is supplied, that volume name will be used.  If not, the
   243  	// volume name is computed using the result from querying the plugin.
   244  	//
   245  	// TODO: in the future, we should be able to remove the volumeName
   246  	// argument to this method -- since it is used only for attachable
   247  	// volumes.  See issue 29695.
   248  	MarkVolumeAsAttached(logger klog.Logger, volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error
   249  
   250  	// Marks the specified volume as *possibly* attached to the specified node.
   251  	// If an attach operation fails, the attach/detach controller does not know for certain if the volume is attached or not.
   252  	// If the volume name is supplied, that volume name will be used.  If not, the
   253  	// volume name is computed using the result from querying the plugin.
   254  	MarkVolumeAsUncertain(logger klog.Logger, volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName) error
   255  
   256  	// Marks the specified volume as detached from the specified node
   257  	MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
   258  
   259  	// Marks desire to detach the specified volume (remove the volume from the node's
   260  	// volumesToReportAsAttached list)
   261  	RemoveVolumeFromReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) error
   262  
   263  	// Unmarks the desire to detach for the specified volume (add the volume back to
   264  	// the node's volumesToReportAsAttached list)
   265  	AddVolumeToReportAsAttached(logger klog.Logger, volumeName v1.UniqueVolumeName, nodeName types.NodeName)
   266  
   267  	// InitializeClaimSize sets pvc claim size by reading pvc.Status.Capacity
   268  	InitializeClaimSize(logger klog.Logger, volumeName v1.UniqueVolumeName, claimSize *resource.Quantity)
   269  
   270  	GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity
   271  }
   272  
   273  // VolumeLogger defines a set of operations for generating volume-related logging and error msgs
   274  type VolumeLogger interface {
   275  	// Creates a detailed msg that can be used in logs
   276  	// The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>",
   277  	// where each implementation provides the volume details
   278  	GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string)
   279  
   280  	// Creates a detailed error that can be used in logs.
   281  	// The msg format follows the pattern "<prefixMsg> <volume details>: <err> ",
   282  	GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error)
   283  
   284  	// Creates a simple msg that is user friendly and a detailed msg that can be used in logs
   285  	// The msg format follows the pattern "<prefixMsg> <volume details> <suffixMsg>",
   286  	// where each implementation provides the volume details
   287  	GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string)
   288  
   289  	// Creates a simple error that is user friendly and a detailed error that can be used in logs.
   290  	// The msg format follows the pattern "<prefixMsg> <volume details>: <err> ",
   291  	GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error)
   292  }
   293  
   294  // Generates an error string with the format ": <err>" if err exists
   295  func errSuffix(err error) string {
   296  	errStr := ""
   297  	if err != nil {
   298  		errStr = fmt.Sprintf(": %v", err)
   299  	}
   300  	return errStr
   301  }
   302  
   303  // Generate a detailed error msg for logs
   304  func generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details string) (detailedMsg string) {
   305  	return fmt.Sprintf("%v for volume %q %v %v", prefixMsg, volumeName, details, suffixMsg)
   306  }
   307  
   308  // Generate a simplified error msg for events and a detailed error msg for logs
   309  func generateVolumeMsg(prefixMsg, suffixMsg, volumeName, details string) (simpleMsg, detailedMsg string) {
   310  	simpleMsg = fmt.Sprintf("%v for volume %q %v", prefixMsg, volumeName, suffixMsg)
   311  	return simpleMsg, generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details)
   312  }
   313  
   314  // VolumeToAttach represents a volume that should be attached to a node.
   315  type VolumeToAttach struct {
   316  	// MultiAttachErrorReported indicates whether the multi-attach error has been reported for the given volume.
   317  	// It is used to prevent reporting the error from being reported more than once for a given volume.
   318  	MultiAttachErrorReported bool
   319  
   320  	// VolumeName is the unique identifier for the volume that should be
   321  	// attached.
   322  	VolumeName v1.UniqueVolumeName
   323  
   324  	// VolumeSpec is a volume spec containing the specification for the volume
   325  	// that should be attached.
   326  	VolumeSpec *volume.Spec
   327  
   328  	// NodeName is the identifier for the node that the volume should be
   329  	// attached to.
   330  	NodeName types.NodeName
   331  
   332  	// scheduledPods is a map containing the set of pods that reference this
   333  	// volume and are scheduled to the underlying node. The key in the map is
   334  	// the name of the pod and the value is a pod object containing more
   335  	// information about the pod.
   336  	ScheduledPods []*v1.Pod
   337  }
   338  
   339  // GenerateMsgDetailed returns detailed msgs for volumes to attach
   340  func (volume *VolumeToAttach) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
   341  	detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
   342  	volumeSpecName := "nil"
   343  	if volume.VolumeSpec != nil {
   344  		volumeSpecName = volume.VolumeSpec.Name()
   345  	}
   346  	return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
   347  }
   348  
   349  // GenerateMsg returns simple and detailed msgs for volumes to attach
   350  func (volume *VolumeToAttach) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
   351  	detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
   352  	volumeSpecName := "nil"
   353  	if volume.VolumeSpec != nil {
   354  		volumeSpecName = volume.VolumeSpec.Name()
   355  	}
   356  	return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
   357  }
   358  
   359  // GenerateErrorDetailed returns detailed errors for volumes to attach
   360  func (volume *VolumeToAttach) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
   361  	return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
   362  }
   363  
   364  // GenerateError returns simple and detailed errors for volumes to attach
   365  func (volume *VolumeToAttach) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
   366  	simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
   367  	return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
   368  }
   369  
   370  // String combines key fields of the volume for logging in text format.
   371  func (volume *VolumeToAttach) String() string {
   372  	volumeSpecName := "nil"
   373  	if volume.VolumeSpec != nil {
   374  		volumeSpecName = volume.VolumeSpec.Name()
   375  	}
   376  	return fmt.Sprintf("%s (UniqueName: %s) from node %s", volumeSpecName, volume.VolumeName, volume.NodeName)
   377  }
   378  
   379  // MarshalLog combines key fields of the volume for logging in a structured format.
   380  func (volume *VolumeToAttach) MarshalLog() interface{} {
   381  	volumeSpecName := "nil"
   382  	if volume.VolumeSpec != nil {
   383  		volumeSpecName = volume.VolumeSpec.Name()
   384  	}
   385  	return struct {
   386  		VolumeName, UniqueName, NodeName string
   387  	}{
   388  		VolumeName: volumeSpecName,
   389  		UniqueName: string(volume.VolumeName),
   390  		NodeName:   string(volume.NodeName),
   391  	}
   392  }
   393  
   394  var _ fmt.Stringer = &VolumeToAttach{}
   395  var _ logr.Marshaler = &VolumeToAttach{}
   396  
   397  // VolumeToMount represents a volume that should be attached to this node and
   398  // mounted to the PodName.
   399  type VolumeToMount struct {
   400  	// VolumeName is the unique identifier for the volume that should be
   401  	// mounted.
   402  	VolumeName v1.UniqueVolumeName
   403  
   404  	// PodName is the unique identifier for the pod that the volume should be
   405  	// mounted to after it is attached.
   406  	PodName volumetypes.UniquePodName
   407  
   408  	// VolumeSpec is a volume spec containing the specification for the volume
   409  	// that should be mounted. Used to create NewMounter. Used to generate
   410  	// InnerVolumeSpecName.
   411  	VolumeSpec *volume.Spec
   412  
   413  	// outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the
   414  	// volume was referenced through a persistent volume claim, this contains
   415  	// the podSpec.Volume[x].Name of the persistent volume claim.
   416  	OuterVolumeSpecName string
   417  
   418  	// Pod to mount the volume to. Used to create NewMounter.
   419  	Pod *v1.Pod
   420  
   421  	// PluginIsAttachable indicates that the plugin for this volume implements
   422  	// the volume.Attacher interface
   423  	PluginIsAttachable bool
   424  
   425  	// PluginIsDeviceMountable indicates that the plugin for this volume implements
   426  	// the volume.DeviceMounter interface
   427  	PluginIsDeviceMountable bool
   428  
   429  	// VolumeGidValue contains the value of the GID annotation, if present.
   430  	VolumeGidValue string
   431  
   432  	// DevicePath contains the path on the node where the volume is attached.
   433  	// For non-attachable volumes this is empty.
   434  	DevicePath string
   435  
   436  	// ReportedInUse indicates that the volume was successfully added to the
   437  	// VolumesInUse field in the node's status.
   438  	ReportedInUse bool
   439  
   440  	// DesiredSizeLimit indicates the desired upper bound on the size of the volume
   441  	// (if so implemented)
   442  	DesiredSizeLimit *resource.Quantity
   443  
   444  	// time at which volume was requested to be mounted
   445  	MountRequestTime time.Time
   446  
   447  	// DesiredPersistentVolumeSize stores desired size of the volume.
   448  	// usually this is the size if pv.Spec.Capacity
   449  	DesiredPersistentVolumeSize resource.Quantity
   450  
   451  	// SELinux label that should be used to mount.
   452  	// The label is set when:
   453  	// * SELinuxMountReadWriteOncePod feature gate is enabled and the volume is RWOP and kubelet knows the SELinux label.
   454  	// * Or, SELinuxMount feature gate is enabled and kubelet knows the SELinux label.
   455  	SELinuxLabel string
   456  }
   457  
   458  // DeviceMountState represents device mount state in a global path.
   459  type DeviceMountState string
   460  
   461  const (
   462  	// DeviceGloballyMounted means device has been globally mounted successfully
   463  	DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted"
   464  
   465  	// DeviceMountUncertain means device may not be mounted but a mount operation may be
   466  	// in-progress which can cause device mount to succeed.
   467  	DeviceMountUncertain DeviceMountState = "DeviceMountUncertain"
   468  
   469  	// DeviceNotMounted means device has not been mounted globally.
   470  	DeviceNotMounted DeviceMountState = "DeviceNotMounted"
   471  )
   472  
   473  // VolumeMountState represents volume mount state in a path local to the pod.
   474  type VolumeMountState string
   475  
   476  const (
   477  	// VolumeMounted means volume has been mounted in pod's local path
   478  	VolumeMounted VolumeMountState = "VolumeMounted"
   479  
   480  	// VolumeMountUncertain means volume may or may not be mounted in pods' local path
   481  	VolumeMountUncertain VolumeMountState = "VolumeMountUncertain"
   482  
   483  	// VolumeNotMounted means volume has not be mounted in pod's local path
   484  	VolumeNotMounted VolumeMountState = "VolumeNotMounted"
   485  )
   486  
   487  type MountPreConditionFailed struct {
   488  	msg string
   489  }
   490  
   491  func (err *MountPreConditionFailed) Error() string {
   492  	return err.msg
   493  }
   494  
   495  func NewMountPreConditionFailedError(msg string) *MountPreConditionFailed {
   496  	return &MountPreConditionFailed{msg: msg}
   497  }
   498  
   499  func IsMountFailedPreconditionError(err error) bool {
   500  	var failedPreconditionError *MountPreConditionFailed
   501  	return errors.As(err, &failedPreconditionError)
   502  }
   503  
   504  // GenerateMsgDetailed returns detailed msgs for volumes to mount
   505  func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
   506  	detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
   507  	volumeSpecName := "nil"
   508  	if volume.VolumeSpec != nil {
   509  		volumeSpecName = volume.VolumeSpec.Name()
   510  	}
   511  	return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
   512  }
   513  
   514  // GenerateMsg returns simple and detailed msgs for volumes to mount
   515  func (volume *VolumeToMount) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
   516  	detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
   517  	volumeSpecName := "nil"
   518  	if volume.VolumeSpec != nil {
   519  		volumeSpecName = volume.VolumeSpec.Name()
   520  	}
   521  	return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
   522  }
   523  
   524  // GenerateErrorDetailed returns detailed errors for volumes to mount
   525  func (volume *VolumeToMount) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
   526  	return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
   527  }
   528  
   529  // GenerateError returns simple and detailed errors for volumes to mount
   530  func (volume *VolumeToMount) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
   531  	simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
   532  	return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
   533  }
   534  
   535  // AttachedVolume represents a volume that is attached to a node.
   536  type AttachedVolume struct {
   537  	// VolumeName is the unique identifier for the volume that is attached.
   538  	VolumeName v1.UniqueVolumeName
   539  
   540  	// VolumeSpec is the volume spec containing the specification for the
   541  	// volume that is attached.
   542  	VolumeSpec *volume.Spec
   543  
   544  	// NodeName is the identifier for the node that the volume is attached to.
   545  	NodeName types.NodeName
   546  
   547  	// PluginIsAttachable indicates that the plugin for this volume implements
   548  	// the volume.Attacher interface
   549  	PluginIsAttachable bool
   550  
   551  	// DevicePath contains the path on the node where the volume is attached.
   552  	// For non-attachable volumes this is empty.
   553  	DevicePath string
   554  
   555  	// DeviceMountPath contains the path on the node where the device should
   556  	// be mounted after it is attached.
   557  	DeviceMountPath string
   558  
   559  	// PluginName is the Unescaped Qualified name of the volume plugin used to
   560  	// attach and mount this volume.
   561  	PluginName string
   562  
   563  	SELinuxMountContext string
   564  }
   565  
   566  // GenerateMsgDetailed returns detailed msgs for attached volumes
   567  func (volume *AttachedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
   568  	detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
   569  	volumeSpecName := "nil"
   570  	if volume.VolumeSpec != nil {
   571  		volumeSpecName = volume.VolumeSpec.Name()
   572  	}
   573  	return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
   574  }
   575  
   576  // GenerateMsg returns simple and detailed msgs for attached volumes
   577  func (volume *AttachedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
   578  	detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
   579  	volumeSpecName := "nil"
   580  	if volume.VolumeSpec != nil {
   581  		volumeSpecName = volume.VolumeSpec.Name()
   582  	}
   583  	return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
   584  }
   585  
   586  // GenerateErrorDetailed returns detailed errors for attached volumes
   587  func (volume *AttachedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
   588  	return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
   589  }
   590  
   591  // GenerateError returns simple and detailed errors for attached volumes
   592  func (volume *AttachedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
   593  	simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
   594  	return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
   595  }
   596  
   597  // String combines key fields of the volume for logging in text format.
   598  func (volume *AttachedVolume) String() string {
   599  	volumeSpecName := "nil"
   600  	if volume.VolumeSpec != nil {
   601  		volumeSpecName = volume.VolumeSpec.Name()
   602  	}
   603  	return fmt.Sprintf("%s (UniqueName: %s) from node %s", volumeSpecName, volume.VolumeName, volume.NodeName)
   604  }
   605  
   606  // MarshalLog combines key fields of the volume for logging in a structured format.
   607  func (volume *AttachedVolume) MarshalLog() interface{} {
   608  	volumeSpecName := "nil"
   609  	if volume.VolumeSpec != nil {
   610  		volumeSpecName = volume.VolumeSpec.Name()
   611  	}
   612  	return struct {
   613  		VolumeName, UniqueName, NodeName string
   614  	}{
   615  		VolumeName: volumeSpecName,
   616  		UniqueName: string(volume.VolumeName),
   617  		NodeName:   string(volume.NodeName),
   618  	}
   619  }
   620  
   621  var _ fmt.Stringer = &AttachedVolume{}
   622  var _ logr.Marshaler = &AttachedVolume{}
   623  
   624  // MountedVolume represents a volume that has successfully been mounted to a pod.
   625  type MountedVolume struct {
   626  	// PodName is the unique identifier of the pod mounted to.
   627  	PodName volumetypes.UniquePodName
   628  
   629  	// VolumeName is the unique identifier of the volume mounted to the pod.
   630  	VolumeName v1.UniqueVolumeName
   631  
   632  	// InnerVolumeSpecName is the volume.Spec.Name() of the volume. If the
   633  	// volume was referenced through a persistent volume claims, this contains
   634  	// the name of the bound persistent volume object.
   635  	// It is the name that plugins use in their pod mount path, i.e.
   636  	// /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{innerVolumeSpecName}/
   637  	// PVC example,
   638  	//   apiVersion: v1
   639  	//   kind: PersistentVolume
   640  	//   metadata:
   641  	//     name: pv0003				<- InnerVolumeSpecName
   642  	//   spec:
   643  	//     capacity:
   644  	//       storage: 5Gi
   645  	//     accessModes:
   646  	//       - ReadWriteOnce
   647  	//     persistentVolumeReclaimPolicy: Recycle
   648  	//     nfs:
   649  	//       path: /tmp
   650  	//       server: 172.17.0.2
   651  	// Non-PVC example:
   652  	//   apiVersion: v1
   653  	//   kind: Pod
   654  	//   metadata:
   655  	//     name: test-pd
   656  	//   spec:
   657  	//     containers:
   658  	//     - image: registry.k8s.io/test-webserver
   659  	//     	 name: test-container
   660  	//     	 volumeMounts:
   661  	//     	 - mountPath: /test-pd
   662  	//     	   name: test-volume
   663  	//     volumes:
   664  	//     - name: test-volume			<- InnerVolumeSpecName
   665  	//     	 gcePersistentDisk:
   666  	//     	   pdName: my-data-disk
   667  	//     	   fsType: ext4
   668  	InnerVolumeSpecName string
   669  
   670  	// outerVolumeSpecName is the podSpec.Volume[x].Name of the volume. If the
   671  	// volume was referenced through a persistent volume claim, this contains
   672  	// the podSpec.Volume[x].Name of the persistent volume claim.
   673  	// PVC example:
   674  	//   kind: Pod
   675  	//   apiVersion: v1
   676  	//   metadata:
   677  	//     name: mypod
   678  	//   spec:
   679  	//     containers:
   680  	//       - name: myfrontend
   681  	//         image: dockerfile/nginx
   682  	//         volumeMounts:
   683  	//         - mountPath: "/var/www/html"
   684  	//           name: mypd
   685  	//     volumes:
   686  	//       - name: mypd				<- OuterVolumeSpecName
   687  	//         persistentVolumeClaim:
   688  	//           claimName: myclaim
   689  	// Non-PVC example:
   690  	//   apiVersion: v1
   691  	//   kind: Pod
   692  	//   metadata:
   693  	//     name: test-pd
   694  	//   spec:
   695  	//     containers:
   696  	//     - image: registry.k8s.io/test-webserver
   697  	//     	 name: test-container
   698  	//     	 volumeMounts:
   699  	//     	 - mountPath: /test-pd
   700  	//     	   name: test-volume
   701  	//     volumes:
   702  	//     - name: test-volume			<- OuterVolumeSpecName
   703  	//     	 gcePersistentDisk:
   704  	//     	   pdName: my-data-disk
   705  	//     	   fsType: ext4
   706  	OuterVolumeSpecName string
   707  
   708  	// PluginName is the "Unescaped Qualified" name of the volume plugin used to
   709  	// mount and unmount this volume. It can be used to fetch the volume plugin
   710  	// to unmount with, on demand. It is also the name that plugins use, though
   711  	// escaped, in their pod mount path, i.e.
   712  	// /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/
   713  	PluginName string
   714  
   715  	// PodUID is the UID of the pod mounted to. It is also the string used by
   716  	// plugins in their pod mount path, i.e.
   717  	// /var/lib/kubelet/pods/{podUID}/volumes/{escapeQualifiedPluginName}/{outerVolumeSpecName}/
   718  	PodUID types.UID
   719  
   720  	// Mounter is the volume mounter used to mount this volume. It is required
   721  	// by kubelet to create container.VolumeMap.
   722  	// Mounter is only required for file system volumes and not required for block volumes.
   723  	Mounter volume.Mounter
   724  
   725  	// BlockVolumeMapper is the volume mapper used to map this volume. It is required
   726  	// by kubelet to create container.VolumeMap.
   727  	// BlockVolumeMapper is only required for block volumes and not required for file system volumes.
   728  	BlockVolumeMapper volume.BlockVolumeMapper
   729  
   730  	// VolumeGidValue contains the value of the GID annotation, if present.
   731  	VolumeGidValue string
   732  
   733  	// VolumeSpec is a volume spec containing the specification for the volume
   734  	// that should be mounted.
   735  	VolumeSpec *volume.Spec
   736  
   737  	// DeviceMountPath contains the path on the node where the device should
   738  	// be mounted after it is attached.
   739  	DeviceMountPath string
   740  
   741  	// SELinuxMountContext is value of mount option 'mount -o context=XYZ'.
   742  	// If empty, no such mount option was used.
   743  	SELinuxMountContext string
   744  }
   745  
   746  // GenerateMsgDetailed returns detailed msgs for mounted volumes
   747  func (volume *MountedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
   748  	detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
   749  	return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
   750  }
   751  
   752  // GenerateMsg returns simple and detailed msgs for mounted volumes
   753  func (volume *MountedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
   754  	detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
   755  	return generateVolumeMsg(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
   756  }
   757  
   758  // GenerateErrorDetailed returns simple and detailed errors for mounted volumes
   759  func (volume *MountedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
   760  	return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
   761  }
   762  
   763  // GenerateError returns simple and detailed errors for mounted volumes
   764  func (volume *MountedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
   765  	simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
   766  	return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
   767  }
   768  
   769  type operationExecutor struct {
   770  	// pendingOperations keeps track of pending attach and detach operations so
   771  	// multiple operations are not started on the same volume
   772  	pendingOperations nestedpendingoperations.NestedPendingOperations
   773  
   774  	// operationGenerator is an interface that provides implementations for
   775  	// generating volume function
   776  	operationGenerator OperationGenerator
   777  }
   778  
   779  func (oe *operationExecutor) IsOperationPending(
   780  	volumeName v1.UniqueVolumeName,
   781  	podName volumetypes.UniquePodName,
   782  	nodeName types.NodeName) bool {
   783  	return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName)
   784  }
   785  
   786  func (oe *operationExecutor) IsOperationSafeToRetry(
   787  	volumeName v1.UniqueVolumeName,
   788  	podName volumetypes.UniquePodName,
   789  	nodeName types.NodeName,
   790  	operationName string) bool {
   791  	return oe.pendingOperations.IsOperationSafeToRetry(volumeName, podName, nodeName, operationName)
   792  }
   793  
   794  func (oe *operationExecutor) AttachVolume(
   795  	logger klog.Logger,
   796  	volumeToAttach VolumeToAttach,
   797  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
   798  	generatedOperations :=
   799  		oe.operationGenerator.GenerateAttachVolumeFunc(logger, volumeToAttach, actualStateOfWorld)
   800  
   801  	if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
   802  		return oe.pendingOperations.Run(
   803  			volumeToAttach.VolumeName, "" /* podName */, volumeToAttach.NodeName, generatedOperations)
   804  	}
   805  
   806  	return oe.pendingOperations.Run(
   807  		volumeToAttach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
   808  }
   809  
   810  func (oe *operationExecutor) DetachVolume(
   811  	logger klog.Logger,
   812  	volumeToDetach AttachedVolume,
   813  	verifySafeToDetach bool,
   814  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
   815  	generatedOperations, err :=
   816  		oe.operationGenerator.GenerateDetachVolumeFunc(logger, volumeToDetach, verifySafeToDetach, actualStateOfWorld)
   817  	if err != nil {
   818  		return err
   819  	}
   820  
   821  	if util.IsMultiAttachAllowed(volumeToDetach.VolumeSpec) {
   822  		return oe.pendingOperations.Run(
   823  			volumeToDetach.VolumeName, "" /* podName */, volumeToDetach.NodeName, generatedOperations)
   824  	}
   825  	return oe.pendingOperations.Run(
   826  		volumeToDetach.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
   827  
   828  }
   829  
   830  func (oe *operationExecutor) VerifyVolumesAreAttached(
   831  	attachedVolumes map[types.NodeName][]AttachedVolume,
   832  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) {
   833  
   834  	// A map of plugin names and nodes on which they exist with volumes they manage
   835  	bulkVerifyPluginsByNode := make(map[string]map[types.NodeName][]*volume.Spec)
   836  	volumeSpecMapByPlugin := make(map[string]map[*volume.Spec]v1.UniqueVolumeName)
   837  
   838  	for node, nodeAttachedVolumes := range attachedVolumes {
   839  		needIndividualVerifyVolumes := []AttachedVolume{}
   840  		for _, volumeAttached := range nodeAttachedVolumes {
   841  			if volumeAttached.VolumeSpec == nil {
   842  				klog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName)
   843  				continue
   844  			}
   845  
   846  			volumePlugin, err :=
   847  				oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec)
   848  			if err != nil {
   849  				klog.Errorf(
   850  					"VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
   851  					volumeAttached.VolumeName,
   852  					volumeAttached.VolumeSpec.Name(),
   853  					volumeAttached.NodeName,
   854  					err)
   855  				continue
   856  			}
   857  			if volumePlugin == nil {
   858  				// should never happen since FindPluginBySpec always returns error if volumePlugin = nil
   859  				klog.Errorf(
   860  					"Failed to find volume plugin for volume %q (spec.Name: %q) on node %q",
   861  					volumeAttached.VolumeName,
   862  					volumeAttached.VolumeSpec.Name(),
   863  					volumeAttached.NodeName)
   864  				continue
   865  			}
   866  
   867  			pluginName := volumePlugin.GetPluginName()
   868  
   869  			if volumePlugin.SupportsBulkVolumeVerification() {
   870  				pluginNodes, pluginNodesExist := bulkVerifyPluginsByNode[pluginName]
   871  
   872  				if !pluginNodesExist {
   873  					pluginNodes = make(map[types.NodeName][]*volume.Spec)
   874  				}
   875  
   876  				volumeSpecList, nodeExists := pluginNodes[node]
   877  				if !nodeExists {
   878  					volumeSpecList = []*volume.Spec{}
   879  				}
   880  				volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
   881  				pluginNodes[node] = volumeSpecList
   882  
   883  				bulkVerifyPluginsByNode[pluginName] = pluginNodes
   884  				volumeSpecMap, mapExists := volumeSpecMapByPlugin[pluginName]
   885  
   886  				if !mapExists {
   887  					volumeSpecMap = make(map[*volume.Spec]v1.UniqueVolumeName)
   888  				}
   889  				volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
   890  				volumeSpecMapByPlugin[pluginName] = volumeSpecMap
   891  				continue
   892  			}
   893  			// If node doesn't support Bulk volume polling it is best to poll individually
   894  			needIndividualVerifyVolumes = append(needIndividualVerifyVolumes, volumeAttached)
   895  		}
   896  		nodeError := oe.VerifyVolumesAreAttachedPerNode(needIndividualVerifyVolumes, node, actualStateOfWorld)
   897  		if nodeError != nil {
   898  			klog.Errorf("VerifyVolumesAreAttached failed for volumes %v, node %q with error %v", needIndividualVerifyVolumes, node, nodeError)
   899  		}
   900  	}
   901  
   902  	for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode {
   903  		generatedOperations, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc(
   904  			pluginNodeVolumes,
   905  			pluginName,
   906  			volumeSpecMapByPlugin[pluginName],
   907  			actualStateOfWorld)
   908  		if err != nil {
   909  			klog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with  %v", pluginName, err)
   910  		}
   911  
   912  		// Ugly hack to ensure - we don't do parallel bulk polling of same volume plugin
   913  		uniquePluginName := v1.UniqueVolumeName(pluginName)
   914  		err = oe.pendingOperations.Run(uniquePluginName, "" /* Pod Name */, "" /* nodeName */, generatedOperations)
   915  		if err != nil {
   916  			klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q  with %v", pluginName, err)
   917  		}
   918  	}
   919  }
   920  
   921  func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
   922  	attachedVolumes []AttachedVolume,
   923  	nodeName types.NodeName,
   924  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
   925  	generatedOperations, err :=
   926  		oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
   927  	if err != nil {
   928  		return err
   929  	}
   930  
   931  	// Give an empty UniqueVolumeName so that this operation could be executed concurrently.
   932  	return oe.pendingOperations.Run("" /* volumeName */, "" /* podName */, "" /* nodeName */, generatedOperations)
   933  }
   934  
   935  func (oe *operationExecutor) MountVolume(
   936  	waitForAttachTimeout time.Duration,
   937  	volumeToMount VolumeToMount,
   938  	actualStateOfWorld ActualStateOfWorldMounterUpdater,
   939  	isRemount bool) error {
   940  	fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
   941  	if err != nil {
   942  		return err
   943  	}
   944  	var generatedOperations volumetypes.GeneratedOperations
   945  	if fsVolume {
   946  		// Filesystem volume case
   947  		// Mount/remount a volume when a volume is attached
   948  		generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
   949  			waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
   950  
   951  	} else {
   952  		// Block volume case
   953  		// Creates a map to device if a volume is attached
   954  		generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
   955  			waitForAttachTimeout, volumeToMount, actualStateOfWorld)
   956  	}
   957  	if err != nil {
   958  		return err
   959  	}
   960  	// Avoid executing mount/map from multiple pods referencing the
   961  	// same volume in parallel
   962  	podName := nestedpendingoperations.EmptyUniquePodName
   963  
   964  	// TODO: remove this -- not necessary
   965  	if !volumeToMount.PluginIsAttachable && !volumeToMount.PluginIsDeviceMountable {
   966  		// volume plugins which are Non-attachable and Non-deviceMountable can execute mount for multiple pods
   967  		// referencing the same volume in parallel
   968  		podName = util.GetUniquePodName(volumeToMount.Pod)
   969  	}
   970  
   971  	// TODO mount_device
   972  	return oe.pendingOperations.Run(
   973  		volumeToMount.VolumeName, podName, "" /* nodeName */, generatedOperations)
   974  }
   975  
   976  func (oe *operationExecutor) UnmountVolume(
   977  	volumeToUnmount MountedVolume,
   978  	actualStateOfWorld ActualStateOfWorldMounterUpdater,
   979  	podsDir string) error {
   980  	fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
   981  	if err != nil {
   982  		return err
   983  	}
   984  	var generatedOperations volumetypes.GeneratedOperations
   985  	if fsVolume {
   986  		// Filesystem volume case
   987  		// Unmount a volume if a volume is mounted
   988  		generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc(
   989  			volumeToUnmount, actualStateOfWorld, podsDir)
   990  	} else {
   991  		// Block volume case
   992  		// Unmap a volume if a volume is mapped
   993  		generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc(
   994  			volumeToUnmount, actualStateOfWorld)
   995  	}
   996  	if err != nil {
   997  		return err
   998  	}
   999  	// All volume plugins can execute unmount/unmap for multiple pods referencing the
  1000  	// same volume in parallel
  1001  	podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
  1002  
  1003  	return oe.pendingOperations.Run(
  1004  		volumeToUnmount.VolumeName, podName, "" /* nodeName */, generatedOperations)
  1005  }
  1006  
  1007  func (oe *operationExecutor) UnmountDevice(
  1008  	deviceToDetach AttachedVolume,
  1009  	actualStateOfWorld ActualStateOfWorldMounterUpdater,
  1010  	hostutil hostutil.HostUtils) error {
  1011  	fsVolume, err := util.CheckVolumeModeFilesystem(deviceToDetach.VolumeSpec)
  1012  	if err != nil {
  1013  		return err
  1014  	}
  1015  	var generatedOperations volumetypes.GeneratedOperations
  1016  	if fsVolume {
  1017  		// Filesystem volume case
  1018  		// Unmount and detach a device if a volume isn't referenced
  1019  		generatedOperations, err = oe.operationGenerator.GenerateUnmountDeviceFunc(
  1020  			deviceToDetach, actualStateOfWorld, hostutil)
  1021  	} else {
  1022  		// Block volume case
  1023  		// Detach a device and remove loopback if a volume isn't referenced
  1024  		generatedOperations, err = oe.operationGenerator.GenerateUnmapDeviceFunc(
  1025  			deviceToDetach, actualStateOfWorld, hostutil)
  1026  	}
  1027  	if err != nil {
  1028  		return err
  1029  	}
  1030  	// Avoid executing unmount/unmap device from multiple pods referencing
  1031  	// the same volume in parallel
  1032  	podName := nestedpendingoperations.EmptyUniquePodName
  1033  
  1034  	return oe.pendingOperations.Run(
  1035  		deviceToDetach.VolumeName, podName, "" /* nodeName */, generatedOperations)
  1036  }
  1037  
  1038  func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error {
  1039  	generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld, currentSize)
  1040  	if err != nil {
  1041  		return err
  1042  	}
  1043  	return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" /* nodeName */, generatedOperations)
  1044  }
  1045  
  1046  func (oe *operationExecutor) VerifyControllerAttachedVolume(
  1047  	logger klog.Logger,
  1048  	volumeToMount VolumeToMount,
  1049  	nodeName types.NodeName,
  1050  	actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
  1051  	generatedOperations, err :=
  1052  		oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(logger, volumeToMount, nodeName, actualStateOfWorld)
  1053  	if err != nil {
  1054  		return err
  1055  	}
  1056  
  1057  	return oe.pendingOperations.Run(
  1058  		volumeToMount.VolumeName, "" /* podName */, "" /* nodeName */, generatedOperations)
  1059  }
  1060  
  1061  // ReconstructVolumeOperation return a func to create volumeSpec from mount path
  1062  func (oe *operationExecutor) ReconstructVolumeOperation(
  1063  	volumeMode v1.PersistentVolumeMode,
  1064  	plugin volume.VolumePlugin,
  1065  	mapperPlugin volume.BlockVolumePlugin,
  1066  	uid types.UID,
  1067  	podName volumetypes.UniquePodName,
  1068  	volumeSpecName string,
  1069  	volumePath string,
  1070  	pluginName string) (volume.ReconstructedVolume, error) {
  1071  
  1072  	// Filesystem Volume case
  1073  	if volumeMode == v1.PersistentVolumeFilesystem {
  1074  		// Create volumeSpec from mount path
  1075  		klog.V(5).Infof("Starting operationExecutor.ReconstructVolume for file volume on pod %q", podName)
  1076  		reconstructed, err := plugin.ConstructVolumeSpec(volumeSpecName, volumePath)
  1077  		if err != nil {
  1078  			return volume.ReconstructedVolume{}, err
  1079  		}
  1080  		return reconstructed, nil
  1081  	}
  1082  
  1083  	// Block Volume case
  1084  	// Create volumeSpec from mount path
  1085  	klog.V(5).Infof("Starting operationExecutor.ReconstructVolume for block volume on pod %q", podName)
  1086  
  1087  	// volumePath contains volumeName on the path. In the case of block volume, {volumeName} is symbolic link
  1088  	// corresponding to raw block device.
  1089  	// ex. volumePath: pods/{podUid}}/{DefaultKubeletVolumeDevicesDirName}/{escapeQualifiedPluginName}/{volumeName}
  1090  	volumeSpec, err := mapperPlugin.ConstructBlockVolumeSpec(uid, volumeSpecName, volumePath)
  1091  	if err != nil {
  1092  		return volume.ReconstructedVolume{}, err
  1093  	}
  1094  	return volume.ReconstructedVolume{
  1095  		Spec: volumeSpec,
  1096  	}, nil
  1097  }
  1098  

View as plain text