
Source file src/k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler/reconciler_common.go

Documentation: k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package reconciler
    19  import (
    20  	"fmt"
    21  	"sync"
    22  	"time"
    24  	v1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/api/resource"
    26  	"k8s.io/apimachinery/pkg/types"
    27  	clientset "k8s.io/client-go/kubernetes"
    28  	"k8s.io/klog/v2"
    29  	"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
    30  	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
    31  	volumepkg "k8s.io/kubernetes/pkg/volume"
    32  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
    33  	"k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
    34  	"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
    35  	"k8s.io/mount-utils"
    36  )
    38  // Reconciler runs a periodic loop to reconcile the desired state of the world
    39  // with the actual state of the world by triggering attach, detach, mount, and
    40  // unmount operations.
    41  // Note: This is distinct from the Reconciler implemented by the attach/detach
    42  // controller. This reconciles state for the kubelet volume manager. That
    43  // reconciles state for the attach/detach controller.
    44  type Reconciler interface {
    45  	// Starts running the reconciliation loop which executes periodically, checks
    46  	// if volumes that should be mounted are mounted and volumes that should
    47  	// be unmounted are unmounted. If not, it will trigger mount/unmount
    48  	// operations to rectify.
    49  	// If attach/detach management is enabled, the manager will also check if
    50  	// volumes that should be attached are attached and volumes that should
    51  	// be detached are detached and trigger attach/detach operations as needed.
    52  	Run(stopCh <-chan struct{})
    54  	// StatesHasBeenSynced returns true only after syncStates process starts to sync
    55  	// states at least once after kubelet starts
    56  	StatesHasBeenSynced() bool
    57  }
    59  // NewReconciler returns a new instance of Reconciler.
    60  //
    61  // controllerAttachDetachEnabled - if true, indicates that the attach/detach
    62  // controller is responsible for managing the attach/detach operations for
    63  // this node, and therefore the volume manager should not
    64  //
    65  // loopSleepDuration - the amount of time the reconciler loop sleeps between
    66  // successive executions
    67  //
    68  // waitForAttachTimeout - the amount of time the Mount function will wait for
    69  // the volume to be attached
    70  //
    71  // nodeName - the Name for this node, used by Attach and Detach methods
    72  //
    73  // desiredStateOfWorld - cache containing the desired state of the world
    74  //
    75  // actualStateOfWorld - cache containing the actual state of the world
    76  //
    77  // populatorHasAddedPods - checker for whether the populator has finished
    78  // adding pods to the desiredStateOfWorld cache at least once after sources
    79  // are all ready (before sources are ready, pods are probably missing)
    80  //
    81  // operationExecutor - used to trigger attach/detach/mount/unmount operations
    82  // safely (prevents more than one operation from being triggered on the same
    83  // volume)
    84  //
    85  // mounter - mounter passed in from kubelet, passed down unmount path
    86  //
    87  // hostutil - hostutil passed in from kubelet
    88  //
    89  // volumePluginMgr - volume plugin manager passed from kubelet
    90  func NewReconciler(
    91  	kubeClient clientset.Interface,
    92  	controllerAttachDetachEnabled bool,
    93  	loopSleepDuration time.Duration,
    94  	waitForAttachTimeout time.Duration,
    95  	nodeName types.NodeName,
    96  	desiredStateOfWorld cache.DesiredStateOfWorld,
    97  	actualStateOfWorld cache.ActualStateOfWorld,
    98  	populatorHasAddedPods func() bool,
    99  	operationExecutor operationexecutor.OperationExecutor,
   100  	mounter mount.Interface,
   101  	hostutil hostutil.HostUtils,
   102  	volumePluginMgr *volumepkg.VolumePluginMgr,
   103  	kubeletPodsDir string) Reconciler {
   104  	return &reconciler{
   105  		kubeClient:                      kubeClient,
   106  		controllerAttachDetachEnabled:   controllerAttachDetachEnabled,
   107  		loopSleepDuration:               loopSleepDuration,
   108  		waitForAttachTimeout:            waitForAttachTimeout,
   109  		nodeName:                        nodeName,
   110  		desiredStateOfWorld:             desiredStateOfWorld,
   111  		actualStateOfWorld:              actualStateOfWorld,
   112  		populatorHasAddedPods:           populatorHasAddedPods,
   113  		operationExecutor:               operationExecutor,
   114  		mounter:                         mounter,
   115  		hostutil:                        hostutil,
   116  		skippedDuringReconstruction:     map[v1.UniqueVolumeName]*globalVolumeInfo{},
   117  		volumePluginMgr:                 volumePluginMgr,
   118  		kubeletPodsDir:                  kubeletPodsDir,
   119  		timeOfLastSync:                  time.Time{},
   120  		volumesFailedReconstruction:     make([]podVolume, 0),
   121  		volumesNeedUpdateFromNodeStatus: make([]v1.UniqueVolumeName, 0),
   122  		volumesNeedReportedInUse:        make([]v1.UniqueVolumeName, 0),
   123  	}
   124  }
   126  type reconciler struct {
   127  	kubeClient                    clientset.Interface
   128  	controllerAttachDetachEnabled bool
   129  	loopSleepDuration             time.Duration
   130  	waitForAttachTimeout          time.Duration
   131  	nodeName                      types.NodeName
   132  	desiredStateOfWorld           cache.DesiredStateOfWorld
   133  	actualStateOfWorld            cache.ActualStateOfWorld
   134  	populatorHasAddedPods         func() bool
   135  	operationExecutor             operationexecutor.OperationExecutor
   136  	mounter                       mount.Interface
   137  	hostutil                      hostutil.HostUtils
   138  	volumePluginMgr               *volumepkg.VolumePluginMgr
   139  	skippedDuringReconstruction   map[v1.UniqueVolumeName]*globalVolumeInfo
   140  	kubeletPodsDir                string
   141  	// lock protects timeOfLastSync for updating and checking
   142  	timeOfLastSyncLock              sync.Mutex
   143  	timeOfLastSync                  time.Time
   144  	volumesFailedReconstruction     []podVolume
   145  	volumesNeedUpdateFromNodeStatus []v1.UniqueVolumeName
   146  	volumesNeedReportedInUse        []v1.UniqueVolumeName
   147  }
   149  func (rc *reconciler) unmountVolumes() {
   150  	// Ensure volumes that should be unmounted are unmounted.
   151  	for _, mountedVolume := range rc.actualStateOfWorld.GetAllMountedVolumes() {
   152  		if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName, mountedVolume.SELinuxMountContext) {
   153  			// Volume is mounted, unmount it
   154  			klog.V(5).InfoS(mountedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountVolume", ""))
   155  			err := rc.operationExecutor.UnmountVolume(
   156  				mountedVolume.MountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
   157  			if err != nil && !isExpectedError(err) {
   158  				klog.ErrorS(err, mountedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
   159  			}
   160  			if err == nil {
   161  				klog.InfoS(mountedVolume.GenerateMsgDetailed("operationExecutor.UnmountVolume started", ""))
   162  			}
   163  		}
   164  	}
   165  }
   167  func (rc *reconciler) mountOrAttachVolumes() {
   168  	// Ensure volumes that should be attached/mounted are attached/mounted.
   169  	for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {
   170  		volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName, volumeToMount.DesiredPersistentVolumeSize, volumeToMount.SELinuxLabel)
   171  		volumeToMount.DevicePath = devicePath
   172  		if cache.IsSELinuxMountMismatchError(err) {
   173  			// The volume is mounted, but with an unexpected SELinux context.
   174  			// It will get unmounted in unmountVolumes / unmountDetachDevices and
   175  			// then removed from actualStateOfWorld.
   176  			rc.desiredStateOfWorld.AddErrorToPod(volumeToMount.PodName, err.Error())
   177  			continue
   178  		} else if cache.IsVolumeNotAttachedError(err) {
   179  			rc.waitForVolumeAttach(volumeToMount)
   180  		} else if !volMounted || cache.IsRemountRequiredError(err) {
   181  			rc.mountAttachedVolumes(volumeToMount, err)
   182  		} else if cache.IsFSResizeRequiredError(err) {
   183  			fsResizeRequiredErr, _ := err.(cache.FsResizeRequiredError)
   184  			rc.expandVolume(volumeToMount, fsResizeRequiredErr.CurrentSize)
   185  		}
   186  	}
   187  }
   189  func (rc *reconciler) expandVolume(volumeToMount cache.VolumeToMount, currentSize resource.Quantity) {
   190  	klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.ExpandInUseVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
   191  	err := rc.operationExecutor.ExpandInUseVolume(volumeToMount.VolumeToMount, rc.actualStateOfWorld, currentSize)
   193  	if err != nil && !isExpectedError(err) {
   194  		klog.ErrorS(err, volumeToMount.GenerateErrorDetailed("operationExecutor.ExpandInUseVolume failed", err).Error(), "pod", klog.KObj(volumeToMount.Pod))
   195  	}
   197  	if err == nil {
   198  		klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.ExpandInUseVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
   199  	}
   200  }
   202  func (rc *reconciler) mountAttachedVolumes(volumeToMount cache.VolumeToMount, podExistError error) {
   203  	// Volume is not mounted, or is already mounted, but requires remounting
   204  	remountingLogStr := ""
   205  	isRemount := cache.IsRemountRequiredError(podExistError)
   206  	if isRemount {
   207  		remountingLogStr = "Volume is already mounted to pod, but remount was requested."
   208  	}
   209  	klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.MountVolume", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
   210  	err := rc.operationExecutor.MountVolume(
   211  		rc.waitForAttachTimeout,
   212  		volumeToMount.VolumeToMount,
   213  		rc.actualStateOfWorld,
   214  		isRemount)
   215  	if err != nil && !isExpectedError(err) {
   216  		klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.MountVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
   217  	}
   218  	if err == nil {
   219  		if remountingLogStr == "" {
   220  			klog.V(1).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
   221  		} else {
   222  			klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.MountVolume started", remountingLogStr), "pod", klog.KObj(volumeToMount.Pod))
   223  		}
   224  	}
   225  }
   227  func (rc *reconciler) waitForVolumeAttach(volumeToMount cache.VolumeToMount) {
   228  	logger := klog.TODO()
   229  	if rc.controllerAttachDetachEnabled || !volumeToMount.PluginIsAttachable {
   230  		//// lets not spin a goroutine and unnecessarily trigger exponential backoff if this happens
   231  		if volumeToMount.PluginIsAttachable && !volumeToMount.ReportedInUse {
   232  			klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume failed", " volume not marked in-use"), "pod", klog.KObj(volumeToMount.Pod))
   233  			return
   234  		}
   235  		// Volume is not attached (or doesn't implement attacher), kubelet attach is disabled, wait
   236  		// for controller to finish attaching volume.
   237  		klog.V(5).InfoS(volumeToMount.GenerateMsgDetailed("Starting operationExecutor.VerifyControllerAttachedVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
   238  		err := rc.operationExecutor.VerifyControllerAttachedVolume(
   239  			logger,
   240  			volumeToMount.VolumeToMount,
   241  			rc.nodeName,
   242  			rc.actualStateOfWorld)
   243  		if err != nil && !isExpectedError(err) {
   244  			klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.VerifyControllerAttachedVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
   245  		}
   246  		if err == nil {
   247  			klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.VerifyControllerAttachedVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
   248  		}
   249  	} else {
   250  		// Volume is not attached to node, kubelet attach is enabled, volume implements an attacher,
   251  		// so attach it
   252  		volumeToAttach := operationexecutor.VolumeToAttach{
   253  			VolumeName: volumeToMount.VolumeName,
   254  			VolumeSpec: volumeToMount.VolumeSpec,
   255  			NodeName:   rc.nodeName,
   256  		}
   257  		klog.V(5).InfoS(volumeToAttach.GenerateMsgDetailed("Starting operationExecutor.AttachVolume", ""), "pod", klog.KObj(volumeToMount.Pod))
   258  		err := rc.operationExecutor.AttachVolume(logger, volumeToAttach, rc.actualStateOfWorld)
   259  		if err != nil && !isExpectedError(err) {
   260  			klog.ErrorS(err, volumeToMount.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.AttachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error(), "pod", klog.KObj(volumeToMount.Pod))
   261  		}
   262  		if err == nil {
   263  			klog.InfoS(volumeToMount.GenerateMsgDetailed("operationExecutor.AttachVolume started", ""), "pod", klog.KObj(volumeToMount.Pod))
   264  		}
   265  	}
   266  }
   268  func (rc *reconciler) unmountDetachDevices() {
   269  	for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {
   270  		// Check IsOperationPending to avoid marking a volume as detached if it's in the process of mounting.
   271  		if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName, attachedVolume.SELinuxMountContext) &&
   272  			!rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName, nestedpendingoperations.EmptyNodeName) {
   273  			if attachedVolume.DeviceMayBeMounted() {
   274  				// Volume is globally mounted to device, unmount it
   275  				klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.UnmountDevice", ""))
   276  				err := rc.operationExecutor.UnmountDevice(
   277  					attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.hostutil)
   278  				if err != nil && !isExpectedError(err) {
   279  					klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.UnmountDevice failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
   280  				}
   281  				if err == nil {
   282  					klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.UnmountDevice started", ""))
   283  				}
   284  			} else {
   285  				// Volume is attached to node, detach it
   286  				// Kubelet not responsible for detaching or this volume has a non-attachable volume plugin.
   287  				if rc.controllerAttachDetachEnabled || !attachedVolume.PluginIsAttachable {
   288  					rc.actualStateOfWorld.MarkVolumeAsDetached(attachedVolume.VolumeName, attachedVolume.NodeName)
   289  					klog.InfoS(attachedVolume.GenerateMsgDetailed("Volume detached", fmt.Sprintf("DevicePath %q", attachedVolume.DevicePath)))
   290  				} else {
   291  					// Only detach if kubelet detach is enabled
   292  					klog.V(5).InfoS(attachedVolume.GenerateMsgDetailed("Starting operationExecutor.DetachVolume", ""))
   293  					err := rc.operationExecutor.DetachVolume(
   294  						klog.TODO(), attachedVolume.AttachedVolume, false /* verifySafeToDetach */, rc.actualStateOfWorld)
   295  					if err != nil && !isExpectedError(err) {
   296  						klog.ErrorS(err, attachedVolume.GenerateErrorDetailed(fmt.Sprintf("operationExecutor.DetachVolume failed (controllerAttachDetachEnabled %v)", rc.controllerAttachDetachEnabled), err).Error())
   297  					}
   298  					if err == nil {
   299  						klog.InfoS(attachedVolume.GenerateMsgDetailed("operationExecutor.DetachVolume started", ""))
   300  					}
   301  				}
   302  			}
   303  		}
   304  	}
   305  }
   307  // ignore nestedpendingoperations.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
   308  func isExpectedError(err error) bool {
   309  	return nestedpendingoperations.IsAlreadyExists(err) || exponentialbackoff.IsExponentialBackoff(err) || operationexecutor.IsMountFailedPreconditionError(err)
   310  }

View as plain text