...

Source file src/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/volumemanager

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package volumemanager
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"sort"
    24  	"strconv"
    25  	"strings"
    26  	"time"
    27  
    28  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    29  	"k8s.io/klog/v2"
    30  	"k8s.io/mount-utils"
    31  
    32  	v1 "k8s.io/api/core/v1"
    33  	k8stypes "k8s.io/apimachinery/pkg/types"
    34  	"k8s.io/apimachinery/pkg/util/runtime"
    35  	"k8s.io/apimachinery/pkg/util/sets"
    36  	"k8s.io/apimachinery/pkg/util/wait"
    37  	clientset "k8s.io/client-go/kubernetes"
    38  	"k8s.io/client-go/tools/record"
    39  	csitrans "k8s.io/csi-translation-lib"
    40  	"k8s.io/kubernetes/pkg/kubelet/config"
    41  	"k8s.io/kubernetes/pkg/kubelet/container"
    42  	"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
    43  	"k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
    44  	"k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
    45  	"k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler"
    46  	"k8s.io/kubernetes/pkg/volume"
    47  	"k8s.io/kubernetes/pkg/volume/csimigration"
    48  	"k8s.io/kubernetes/pkg/volume/util"
    49  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
    50  	"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
    51  	"k8s.io/kubernetes/pkg/volume/util/types"
    52  	"k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
    53  )
    54  
    55  const (
    56  	// reconcilerLoopSleepPeriod is the amount of time the reconciler loop waits
    57  	// between successive executions
    58  	reconcilerLoopSleepPeriod = 100 * time.Millisecond
    59  
    60  	// desiredStateOfWorldPopulatorLoopSleepPeriod is the amount of time the
    61  	// DesiredStateOfWorldPopulator loop waits between successive executions
    62  	desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
    63  
    64  	// podAttachAndMountTimeout is the maximum amount of time the
    65  	// WaitForAttachAndMount call will wait for all volumes in the specified pod
    66  	// to be attached and mounted. Even though cloud operations can take several
    67  	// minutes to complete, we set the timeout to 2 minutes because kubelet
    68  	// will retry in the next sync iteration. This frees the associated
    69  	// goroutine of the pod to process newer updates if needed (e.g., a delete
    70  	// request to the pod).
    71  	// Value is slightly offset from 2 minutes to make timeouts due to this
    72  	// constant recognizable.
    73  	podAttachAndMountTimeout = 2*time.Minute + 3*time.Second
    74  
    75  	// podAttachAndMountRetryInterval is the amount of time the GetVolumesForPod
    76  	// call waits before retrying
    77  	podAttachAndMountRetryInterval = 300 * time.Millisecond
    78  
    79  	// waitForAttachTimeout is the maximum amount of time a
    80  	// operationexecutor.Mount call will wait for a volume to be attached.
    81  	// Set to 10 minutes because we've seen attach operations take several
    82  	// minutes to complete for some volume plugins in some cases. While this
    83  	// operation is waiting it only blocks other operations on the same device,
    84  	// other devices are not affected.
    85  	waitForAttachTimeout = 10 * time.Minute
    86  )
    87  
    88  // VolumeManager runs a set of asynchronous loops that figure out which volumes
    89  // need to be attached/mounted/unmounted/detached based on the pods scheduled on
    90  // this node and makes it so.
    91  type VolumeManager interface {
    92  	// Starts the volume manager and all the asynchronous loops that it controls
    93  	Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
    94  
    95  	// WaitForAttachAndMount processes the volumes referenced in the specified
    96  	// pod and blocks until they are all attached and mounted (reflected in
    97  	// actual state of the world).
    98  	// An error is returned if all volumes are not attached and mounted within
    99  	// the duration defined in podAttachAndMountTimeout.
   100  	WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error
   101  
   102  	// WaitForUnmount processes the volumes referenced in the specified
   103  	// pod and blocks until they are all unmounted (reflected in the actual
   104  	// state of the world).
   105  	// An error is returned if all volumes are not unmounted within
   106  	// the duration defined in podAttachAndMountTimeout.
   107  	WaitForUnmount(ctx context.Context, pod *v1.Pod) error
   108  
   109  	// GetMountedVolumesForPod returns a VolumeMap containing the volumes
   110  	// referenced by the specified pod that are successfully attached and
   111  	// mounted. The key in the map is the OuterVolumeSpecName (i.e.
   112  	// pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
   113  	// volumes.
   114  	GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
   115  
   116  	// GetPossiblyMountedVolumesForPod returns a VolumeMap containing the volumes
   117  	// referenced by the specified pod that are either successfully attached
   118  	// and mounted or are "uncertain", i.e. a volume plugin may be mounting
   119  	// them right now. The key in the map is the OuterVolumeSpecName (i.e.
   120  	// pod.Spec.Volumes[x].Name). It returns an empty VolumeMap if pod has no
   121  	// volumes.
   122  	GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
   123  
   124  	// GetExtraSupplementalGroupsForPod returns a list of the extra
   125  	// supplemental groups for the Pod. These extra supplemental groups come
   126  	// from annotations on persistent volumes that the pod depends on.
   127  	GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
   128  
   129  	// GetVolumesInUse returns a list of all volumes that implement the volume.Attacher
   130  	// interface and are currently in use according to the actual and desired
   131  	// state of the world caches. A volume is considered "in use" as soon as it
   132  	// is added to the desired state of world, indicating it *should* be
   133  	// attached to this node and remains "in use" until it is removed from both
   134  	// the desired state of the world and the actual state of the world, or it
   135  	// has been unmounted (as indicated in actual state of world).
   136  	GetVolumesInUse() []v1.UniqueVolumeName
   137  
   138  	// ReconcilerStatesHasBeenSynced returns true only after the actual states in reconciler
   139  	// has been synced at least once after kubelet starts so that it is safe to update mounted
   140  	// volume list retrieved from actual state.
   141  	ReconcilerStatesHasBeenSynced() bool
   142  
   143  	// VolumeIsAttached returns true if the given volume is attached to this
   144  	// node.
   145  	VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
   146  
   147  	// Marks the specified volume as having successfully been reported as "in
   148  	// use" in the nodes's volume status.
   149  	MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
   150  }
   151  
   152  // podStateProvider can determine if a pod is going to be terminated
   153  type PodStateProvider interface {
   154  	ShouldPodContainersBeTerminating(k8stypes.UID) bool
   155  	ShouldPodRuntimeBeRemoved(k8stypes.UID) bool
   156  }
   157  
   158  // PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
   159  // See pkg/k8s.io/kubernetes/pkg/kubelet/pod.Manager for method godoc.
   160  type PodManager interface {
   161  	GetPodByUID(k8stypes.UID) (*v1.Pod, bool)
   162  	GetPods() []*v1.Pod
   163  }
   164  
   165  // NewVolumeManager returns a new concrete instance implementing the
   166  // VolumeManager interface.
   167  //
   168  // kubeClient - kubeClient is the kube API client used by DesiredStateOfWorldPopulator
   169  // to communicate with the API server to fetch PV and PVC objects
   170  //
   171  // volumePluginMgr - the volume plugin manager used to access volume plugins.
   172  // Must be pre-initialized.
   173  func NewVolumeManager(
   174  	controllerAttachDetachEnabled bool,
   175  	nodeName k8stypes.NodeName,
   176  	podManager PodManager,
   177  	podStateProvider PodStateProvider,
   178  	kubeClient clientset.Interface,
   179  	volumePluginMgr *volume.VolumePluginMgr,
   180  	kubeContainerRuntime container.Runtime,
   181  	mounter mount.Interface,
   182  	hostutil hostutil.HostUtils,
   183  	kubeletPodsDir string,
   184  	recorder record.EventRecorder,
   185  	keepTerminatedPodVolumes bool,
   186  	blockVolumePathHandler volumepathhandler.BlockVolumePathHandler) VolumeManager {
   187  
   188  	seLinuxTranslator := util.NewSELinuxLabelTranslator()
   189  	vm := &volumeManager{
   190  		kubeClient:          kubeClient,
   191  		volumePluginMgr:     volumePluginMgr,
   192  		desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator),
   193  		actualStateOfWorld:  cache.NewActualStateOfWorld(nodeName, volumePluginMgr),
   194  		operationExecutor: operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   195  			kubeClient,
   196  			volumePluginMgr,
   197  			recorder,
   198  			blockVolumePathHandler)),
   199  	}
   200  
   201  	intreeToCSITranslator := csitrans.New()
   202  	csiMigratedPluginManager := csimigration.NewPluginManager(intreeToCSITranslator, utilfeature.DefaultFeatureGate)
   203  
   204  	vm.intreeToCSITranslator = intreeToCSITranslator
   205  	vm.csiMigratedPluginManager = csiMigratedPluginManager
   206  	vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
   207  		kubeClient,
   208  		desiredStateOfWorldPopulatorLoopSleepPeriod,
   209  		podManager,
   210  		podStateProvider,
   211  		vm.desiredStateOfWorld,
   212  		vm.actualStateOfWorld,
   213  		kubeContainerRuntime,
   214  		keepTerminatedPodVolumes,
   215  		csiMigratedPluginManager,
   216  		intreeToCSITranslator,
   217  		volumePluginMgr)
   218  	vm.reconciler = reconciler.NewReconciler(
   219  		kubeClient,
   220  		controllerAttachDetachEnabled,
   221  		reconcilerLoopSleepPeriod,
   222  		waitForAttachTimeout,
   223  		nodeName,
   224  		vm.desiredStateOfWorld,
   225  		vm.actualStateOfWorld,
   226  		vm.desiredStateOfWorldPopulator.HasAddedPods,
   227  		vm.operationExecutor,
   228  		mounter,
   229  		hostutil,
   230  		volumePluginMgr,
   231  		kubeletPodsDir)
   232  
   233  	return vm
   234  }
   235  
   236  // volumeManager implements the VolumeManager interface
   237  type volumeManager struct {
   238  	// kubeClient is the kube API client used by DesiredStateOfWorldPopulator to
   239  	// communicate with the API server to fetch PV and PVC objects
   240  	kubeClient clientset.Interface
   241  
   242  	// volumePluginMgr is the volume plugin manager used to access volume
   243  	// plugins. It must be pre-initialized.
   244  	volumePluginMgr *volume.VolumePluginMgr
   245  
   246  	// desiredStateOfWorld is a data structure containing the desired state of
   247  	// the world according to the volume manager: i.e. what volumes should be
   248  	// attached and which pods are referencing the volumes).
   249  	// The data structure is populated by the desired state of the world
   250  	// populator using the kubelet pod manager.
   251  	desiredStateOfWorld cache.DesiredStateOfWorld
   252  
   253  	// actualStateOfWorld is a data structure containing the actual state of
   254  	// the world according to the manager: i.e. which volumes are attached to
   255  	// this node and what pods the volumes are mounted to.
   256  	// The data structure is populated upon successful completion of attach,
   257  	// detach, mount, and unmount actions triggered by the reconciler.
   258  	actualStateOfWorld cache.ActualStateOfWorld
   259  
   260  	// operationExecutor is used to start asynchronous attach, detach, mount,
   261  	// and unmount operations.
   262  	operationExecutor operationexecutor.OperationExecutor
   263  
   264  	// reconciler runs an asynchronous periodic loop to reconcile the
   265  	// desiredStateOfWorld with the actualStateOfWorld by triggering attach,
   266  	// detach, mount, and unmount operations using the operationExecutor.
   267  	reconciler reconciler.Reconciler
   268  
   269  	// desiredStateOfWorldPopulator runs an asynchronous periodic loop to
   270  	// populate the desiredStateOfWorld using the kubelet PodManager.
   271  	desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
   272  
   273  	// csiMigratedPluginManager keeps track of CSI migration status of plugins
   274  	csiMigratedPluginManager csimigration.PluginManager
   275  
   276  	// intreeToCSITranslator translates in-tree volume specs to CSI
   277  	intreeToCSITranslator csimigration.InTreeToCSITranslator
   278  }
   279  
   280  func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
   281  	defer runtime.HandleCrash()
   282  
   283  	if vm.kubeClient != nil {
   284  		// start informer for CSIDriver
   285  		go vm.volumePluginMgr.Run(stopCh)
   286  	}
   287  
   288  	go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
   289  	klog.V(2).InfoS("The desired_state_of_world populator starts")
   290  
   291  	klog.InfoS("Starting Kubelet Volume Manager")
   292  	go vm.reconciler.Run(stopCh)
   293  
   294  	metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
   295  
   296  	<-stopCh
   297  	klog.InfoS("Shutting down Kubelet Volume Manager")
   298  }
   299  
   300  func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
   301  	podVolumes := make(container.VolumeMap)
   302  	for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
   303  		podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{
   304  			Mounter:             mountedVolume.Mounter,
   305  			BlockVolumeMapper:   mountedVolume.BlockVolumeMapper,
   306  			ReadOnly:            mountedVolume.VolumeSpec.ReadOnly,
   307  			InnerVolumeSpecName: mountedVolume.InnerVolumeSpecName,
   308  		}
   309  	}
   310  	return podVolumes
   311  }
   312  
   313  func (vm *volumeManager) GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
   314  	podVolumes := make(container.VolumeMap)
   315  	for _, mountedVolume := range vm.actualStateOfWorld.GetPossiblyMountedVolumesForPod(podName) {
   316  		podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{
   317  			Mounter:             mountedVolume.Mounter,
   318  			BlockVolumeMapper:   mountedVolume.BlockVolumeMapper,
   319  			ReadOnly:            mountedVolume.VolumeSpec.ReadOnly,
   320  			InnerVolumeSpecName: mountedVolume.InnerVolumeSpecName,
   321  		}
   322  	}
   323  	return podVolumes
   324  }
   325  
   326  func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 {
   327  	podName := util.GetUniquePodName(pod)
   328  	supplementalGroups := sets.NewString()
   329  
   330  	for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
   331  		if mountedVolume.VolumeGidValue != "" {
   332  			supplementalGroups.Insert(mountedVolume.VolumeGidValue)
   333  		}
   334  	}
   335  
   336  	result := make([]int64, 0, supplementalGroups.Len())
   337  	for _, group := range supplementalGroups.List() {
   338  		iGroup, extra := getExtraSupplementalGid(group, pod)
   339  		if !extra {
   340  			continue
   341  		}
   342  
   343  		result = append(result, int64(iGroup))
   344  	}
   345  
   346  	return result
   347  }
   348  
   349  func (vm *volumeManager) GetVolumesInUse() []v1.UniqueVolumeName {
   350  	// Report volumes in desired state of world and actual state of world so
   351  	// that volumes are marked in use as soon as the decision is made that the
   352  	// volume *should* be attached to this node until it is safely unmounted.
   353  	desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount()
   354  	allAttachedVolumes := vm.actualStateOfWorld.GetAttachedVolumes()
   355  	volumesToReportInUse := make([]v1.UniqueVolumeName, 0, len(desiredVolumes)+len(allAttachedVolumes))
   356  	desiredVolumesMap := make(map[v1.UniqueVolumeName]bool, len(desiredVolumes)+len(allAttachedVolumes))
   357  
   358  	for _, volume := range desiredVolumes {
   359  		if volume.PluginIsAttachable {
   360  			if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
   361  				desiredVolumesMap[volume.VolumeName] = true
   362  				volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
   363  			}
   364  		}
   365  	}
   366  
   367  	for _, volume := range allAttachedVolumes {
   368  		if volume.PluginIsAttachable {
   369  			if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
   370  				volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
   371  			}
   372  		}
   373  	}
   374  
   375  	sort.Slice(volumesToReportInUse, func(i, j int) bool {
   376  		return string(volumesToReportInUse[i]) < string(volumesToReportInUse[j])
   377  	})
   378  	return volumesToReportInUse
   379  }
   380  
   381  func (vm *volumeManager) ReconcilerStatesHasBeenSynced() bool {
   382  	return vm.reconciler.StatesHasBeenSynced()
   383  }
   384  
   385  func (vm *volumeManager) VolumeIsAttached(
   386  	volumeName v1.UniqueVolumeName) bool {
   387  	return vm.actualStateOfWorld.VolumeExists(volumeName)
   388  }
   389  
   390  func (vm *volumeManager) MarkVolumesAsReportedInUse(
   391  	volumesReportedAsInUse []v1.UniqueVolumeName) {
   392  	vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse)
   393  }
   394  
   395  func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error {
   396  	if pod == nil {
   397  		return nil
   398  	}
   399  
   400  	expectedVolumes := getExpectedVolumes(pod)
   401  	if len(expectedVolumes) == 0 {
   402  		// No volumes to verify
   403  		return nil
   404  	}
   405  
   406  	klog.V(3).InfoS("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod))
   407  	uniquePodName := util.GetUniquePodName(pod)
   408  
   409  	// Some pods expect to have Setup called over and over again to update.
   410  	// Remount plugins for which this is true. (Atomically updating volumes,
   411  	// like Downward API, depend on this to update the contents of the volume).
   412  	vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
   413  
   414  	err := wait.PollUntilContextTimeout(
   415  		ctx,
   416  		podAttachAndMountRetryInterval,
   417  		podAttachAndMountTimeout,
   418  		true,
   419  		vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
   420  
   421  	if err != nil {
   422  		unmountedVolumes :=
   423  			vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
   424  		// Also get unattached volumes and volumes not in dsw for error message
   425  		unattachedVolumes :=
   426  			vm.getUnattachedVolumes(uniquePodName)
   427  		volumesNotInDSW :=
   428  			vm.getVolumesNotInDSW(uniquePodName, expectedVolumes)
   429  
   430  		if len(unmountedVolumes) == 0 {
   431  			return nil
   432  		}
   433  
   434  		return fmt.Errorf(
   435  			"unmounted volumes=%v, unattached volumes=%v, failed to process volumes=%v: %w",
   436  			unmountedVolumes,
   437  			unattachedVolumes,
   438  			volumesNotInDSW,
   439  			err)
   440  	}
   441  
   442  	klog.V(3).InfoS("All volumes are attached and mounted for pod", "pod", klog.KObj(pod))
   443  	return nil
   444  }
   445  
   446  func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error {
   447  	if pod == nil {
   448  		return nil
   449  	}
   450  
   451  	klog.V(3).InfoS("Waiting for volumes to unmount for pod", "pod", klog.KObj(pod))
   452  	uniquePodName := util.GetUniquePodName(pod)
   453  
   454  	vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
   455  
   456  	err := wait.PollUntilContextTimeout(
   457  		ctx,
   458  		podAttachAndMountRetryInterval,
   459  		podAttachAndMountTimeout,
   460  		true,
   461  		vm.verifyVolumesUnmountedFunc(uniquePodName))
   462  
   463  	if err != nil {
   464  		var mountedVolumes []string
   465  		for _, v := range vm.actualStateOfWorld.GetMountedVolumesForPod(uniquePodName) {
   466  			mountedVolumes = append(mountedVolumes, v.OuterVolumeSpecName)
   467  		}
   468  		sort.Strings(mountedVolumes)
   469  
   470  		if len(mountedVolumes) == 0 {
   471  			return nil
   472  		}
   473  
   474  		return fmt.Errorf(
   475  			"mounted volumes=%v: %w",
   476  			mountedVolumes,
   477  			err)
   478  	}
   479  
   480  	klog.V(3).InfoS("All volumes are unmounted for pod", "pod", klog.KObj(pod))
   481  	return nil
   482  }
   483  
   484  func (vm *volumeManager) getVolumesNotInDSW(uniquePodName types.UniquePodName, expectedVolumes []string) []string {
   485  	volumesNotInDSW := sets.NewString(expectedVolumes...)
   486  
   487  	for _, volumeToMount := range vm.desiredStateOfWorld.GetVolumesToMount() {
   488  		if volumeToMount.PodName == uniquePodName {
   489  			volumesNotInDSW.Delete(volumeToMount.OuterVolumeSpecName)
   490  		}
   491  	}
   492  
   493  	return volumesNotInDSW.List()
   494  }
   495  
   496  // getUnattachedVolumes returns a list of the volumes that are expected to be attached but
   497  // are not currently attached to the node
   498  func (vm *volumeManager) getUnattachedVolumes(uniquePodName types.UniquePodName) []string {
   499  	unattachedVolumes := []string{}
   500  	for _, volumeToMount := range vm.desiredStateOfWorld.GetVolumesToMount() {
   501  		if volumeToMount.PodName == uniquePodName &&
   502  			volumeToMount.PluginIsAttachable &&
   503  			!vm.actualStateOfWorld.VolumeExists(volumeToMount.VolumeName) {
   504  			unattachedVolumes = append(unattachedVolumes, volumeToMount.OuterVolumeSpecName)
   505  		}
   506  	}
   507  	sort.Strings(unattachedVolumes)
   508  
   509  	return unattachedVolumes
   510  }
   511  
   512  // verifyVolumesMountedFunc returns a method that returns true when all expected
   513  // volumes are mounted.
   514  func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionWithContextFunc {
   515  	return func(_ context.Context) (done bool, err error) {
   516  		if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
   517  			return true, errors.New(strings.Join(errs, "; "))
   518  		}
   519  		return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
   520  	}
   521  }
   522  
   523  // verifyVolumesUnmountedFunc returns a method that is true when there are no mounted volumes for this
   524  // pod.
   525  func (vm *volumeManager) verifyVolumesUnmountedFunc(podName types.UniquePodName) wait.ConditionWithContextFunc {
   526  	return func(_ context.Context) (done bool, err error) {
   527  		if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
   528  			return true, errors.New(strings.Join(errs, "; "))
   529  		}
   530  		return len(vm.actualStateOfWorld.GetMountedVolumesForPod(podName)) == 0, nil
   531  	}
   532  }
   533  
   534  // getUnmountedVolumes fetches the current list of mounted volumes from
   535  // the actual state of the world, and uses it to process the list of
   536  // expectedVolumes. It returns a list of unmounted volumes.
   537  // The list also includes volume that may be mounted in uncertain state.
   538  func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
   539  	mountedVolumes := sets.NewString()
   540  	for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
   541  		mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
   542  	}
   543  	return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
   544  }
   545  
   546  // filterUnmountedVolumes adds each element of expectedVolumes that is not in
   547  // mountedVolumes to a list of unmountedVolumes and returns it.
   548  func filterUnmountedVolumes(mountedVolumes sets.String, expectedVolumes []string) []string {
   549  	unmountedVolumes := []string{}
   550  	for _, expectedVolume := range expectedVolumes {
   551  		if !mountedVolumes.Has(expectedVolume) {
   552  			unmountedVolumes = append(unmountedVolumes, expectedVolume)
   553  		}
   554  	}
   555  	sort.Strings(unmountedVolumes)
   556  
   557  	return unmountedVolumes
   558  }
   559  
   560  // getExpectedVolumes returns a list of volumes that must be mounted in order to
   561  // consider the volume setup step for this pod satisfied.
   562  func getExpectedVolumes(pod *v1.Pod) []string {
   563  	mounts, devices, _ := util.GetPodVolumeNames(pod)
   564  	return mounts.Union(devices).UnsortedList()
   565  }
   566  
   567  // getExtraSupplementalGid returns the value of an extra supplemental GID as
   568  // defined by an annotation on a volume and a boolean indicating whether the
   569  // volume defined a GID that the pod doesn't already request.
   570  func getExtraSupplementalGid(volumeGidValue string, pod *v1.Pod) (int64, bool) {
   571  	if volumeGidValue == "" {
   572  		return 0, false
   573  	}
   574  
   575  	gid, err := strconv.ParseInt(volumeGidValue, 10, 64)
   576  	if err != nil {
   577  		return 0, false
   578  	}
   579  
   580  	if pod.Spec.SecurityContext != nil {
   581  		for _, existingGid := range pod.Spec.SecurityContext.SupplementalGroups {
   582  			if gid == int64(existingGid) {
   583  				return 0, false
   584  			}
   585  		}
   586  	}
   587  
   588  	return gid, true
   589  }
   590  

View as plain text