...

Source file src/k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler/reconstruct_common.go

Documentation: k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package reconciler
    18  
    19  import (
    20  	"fmt"
    21  	"io/fs"
    22  	"os"
    23  	"path/filepath"
    24  	"time"
    25  
    26  	"github.com/go-logr/logr"
    27  	v1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/types"
    30  	"k8s.io/klog/v2"
    31  	"k8s.io/kubernetes/pkg/kubelet/config"
    32  	"k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
    33  	volumepkg "k8s.io/kubernetes/pkg/volume"
    34  	"k8s.io/kubernetes/pkg/volume/util"
    35  	"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
    36  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    37  	utilpath "k8s.io/utils/path"
    38  	utilstrings "k8s.io/utils/strings"
    39  )
    40  
    41  // these interfaces are necessary to keep the structures private
    42  // and at the same time log them correctly in structured logs.
    43  var _ logr.Marshaler = podVolume{}
    44  var _ logr.Marshaler = reconstructedVolume{}
    45  var _ logr.Marshaler = globalVolumeInfo{}
    46  
    47  type podVolume struct {
    48  	podName        volumetypes.UniquePodName
    49  	volumeSpecName string
    50  	volumePath     string
    51  	pluginName     string
    52  	volumeMode     v1.PersistentVolumeMode
    53  }
    54  
    55  func (p podVolume) MarshalLog() interface{} {
    56  	return struct {
    57  		PodName        string `json:"podName"`
    58  		VolumeSpecName string `json:"volumeSpecName"`
    59  		VolumePath     string `json:"volumePath"`
    60  		PluginName     string `json:"pluginName"`
    61  		VolumeMode     string `json:"volumeMode"`
    62  	}{
    63  		PodName:        string(p.podName),
    64  		VolumeSpecName: p.volumeSpecName,
    65  		VolumePath:     p.volumePath,
    66  		PluginName:     p.pluginName,
    67  		VolumeMode:     string(p.volumeMode),
    68  	}
    69  }
    70  
    71  type reconstructedVolume struct {
    72  	volumeName          v1.UniqueVolumeName
    73  	podName             volumetypes.UniquePodName
    74  	volumeSpec          *volumepkg.Spec
    75  	outerVolumeSpecName string
    76  	pod                 *v1.Pod
    77  	volumeGidValue      string
    78  	devicePath          string
    79  	mounter             volumepkg.Mounter
    80  	deviceMounter       volumepkg.DeviceMounter
    81  	blockVolumeMapper   volumepkg.BlockVolumeMapper
    82  	seLinuxMountContext string
    83  }
    84  
    85  func (rv reconstructedVolume) MarshalLog() interface{} {
    86  	return struct {
    87  		VolumeName          string `json:"volumeName"`
    88  		PodName             string `json:"podName"`
    89  		VolumeSpecName      string `json:"volumeSpecName"`
    90  		OuterVolumeSpecName string `json:"outerVolumeSpecName"`
    91  		PodUID              string `json:"podUID"`
    92  		VolumeGIDValue      string `json:"volumeGIDValue"`
    93  		DevicePath          string `json:"devicePath"`
    94  		SeLinuxMountContext string `json:"seLinuxMountContext"`
    95  	}{
    96  		VolumeName:          string(rv.volumeName),
    97  		PodName:             string(rv.podName),
    98  		VolumeSpecName:      rv.volumeSpec.Name(),
    99  		OuterVolumeSpecName: rv.outerVolumeSpecName,
   100  		PodUID:              string(rv.pod.UID),
   101  		VolumeGIDValue:      rv.volumeGidValue,
   102  		DevicePath:          rv.devicePath,
   103  		SeLinuxMountContext: rv.seLinuxMountContext,
   104  	}
   105  }
   106  
   107  // globalVolumeInfo stores reconstructed volume information
   108  // for each pod that was using that volume.
   109  type globalVolumeInfo struct {
   110  	volumeName        v1.UniqueVolumeName
   111  	volumeSpec        *volumepkg.Spec
   112  	devicePath        string
   113  	mounter           volumepkg.Mounter
   114  	deviceMounter     volumepkg.DeviceMounter
   115  	blockVolumeMapper volumepkg.BlockVolumeMapper
   116  	podVolumes        map[volumetypes.UniquePodName]*reconstructedVolume
   117  }
   118  
   119  func (gvi globalVolumeInfo) MarshalLog() interface{} {
   120  	podVolumes := make(map[volumetypes.UniquePodName]v1.UniqueVolumeName)
   121  	for podName, volume := range gvi.podVolumes {
   122  		podVolumes[podName] = volume.volumeName
   123  	}
   124  
   125  	return struct {
   126  		VolumeName     string                                            `json:"volumeName"`
   127  		VolumeSpecName string                                            `json:"volumeSpecName"`
   128  		DevicePath     string                                            `json:"devicePath"`
   129  		PodVolumes     map[volumetypes.UniquePodName]v1.UniqueVolumeName `json:"podVolumes"`
   130  	}{
   131  		VolumeName:     string(gvi.volumeName),
   132  		VolumeSpecName: gvi.volumeSpec.Name(),
   133  		DevicePath:     gvi.devicePath,
   134  		PodVolumes:     podVolumes,
   135  	}
   136  }
   137  
   138  func (rc *reconciler) updateLastSyncTime() {
   139  	rc.timeOfLastSyncLock.Lock()
   140  	defer rc.timeOfLastSyncLock.Unlock()
   141  	rc.timeOfLastSync = time.Now()
   142  }
   143  
   144  func (rc *reconciler) StatesHasBeenSynced() bool {
   145  	rc.timeOfLastSyncLock.Lock()
   146  	defer rc.timeOfLastSyncLock.Unlock()
   147  	return !rc.timeOfLastSync.IsZero()
   148  }
   149  
   150  func (gvi *globalVolumeInfo) addPodVolume(rcv *reconstructedVolume) {
   151  	if gvi.podVolumes == nil {
   152  		gvi.podVolumes = map[volumetypes.UniquePodName]*reconstructedVolume{}
   153  	}
   154  	gvi.podVolumes[rcv.podName] = rcv
   155  }
   156  
   157  func (rc *reconciler) cleanupMounts(volume podVolume) {
   158  	klog.V(2).InfoS("Reconciler sync states: could not find volume information in desired state, clean up the mount points", "podName", volume.podName, "volumeSpecName", volume.volumeSpecName)
   159  	mountedVolume := operationexecutor.MountedVolume{
   160  		PodName: volume.podName,
   161  		// VolumeName should be generated by `GetUniqueVolumeNameFromSpec` or `GetUniqueVolumeNameFromSpecWithPod`.
   162  		// However, since we don't have the volume information in asw when cleanup mounts, it doesn't matter what we put here.
   163  		VolumeName:          v1.UniqueVolumeName(volume.volumeSpecName),
   164  		InnerVolumeSpecName: volume.volumeSpecName,
   165  		PluginName:          volume.pluginName,
   166  		PodUID:              types.UID(volume.podName),
   167  	}
   168  	metrics.ForceCleanedFailedVolumeOperationsTotal.Inc()
   169  	// TODO: Currently cleanupMounts only includes UnmountVolume operation. In the next PR, we will add
   170  	// to unmount both volume and device in the same routine.
   171  	err := rc.operationExecutor.UnmountVolume(mountedVolume, rc.actualStateOfWorld, rc.kubeletPodsDir)
   172  	if err != nil {
   173  		metrics.ForceCleanedFailedVolumeOperationsErrorsTotal.Inc()
   174  		klog.ErrorS(err, mountedVolume.GenerateErrorDetailed("volumeHandler.UnmountVolumeHandler for UnmountVolume failed", err).Error())
   175  		return
   176  	}
   177  }
   178  
   179  // getDeviceMountPath returns device mount path for block volume which
   180  // implements BlockVolumeMapper or filesystem volume which implements
   181  // DeviceMounter
   182  func getDeviceMountPath(gvi *globalVolumeInfo) (string, error) {
   183  	if gvi.blockVolumeMapper != nil {
   184  		// for block gvi, we return its global map path
   185  		return gvi.blockVolumeMapper.GetGlobalMapPath(gvi.volumeSpec)
   186  	} else if gvi.deviceMounter != nil {
   187  		// for filesystem gvi, we return its device mount path if the plugin implements DeviceMounter
   188  		return gvi.deviceMounter.GetDeviceMountPath(gvi.volumeSpec)
   189  	} else {
   190  		return "", fmt.Errorf("blockVolumeMapper or deviceMounter required")
   191  	}
   192  }
   193  
   194  // getVolumesFromPodDir scans through the volumes directories under the given pod directory.
   195  // It returns a list of pod volume information including pod's uid, volume's plugin name, mount path,
   196  // and volume spec name.
   197  func getVolumesFromPodDir(podDir string) ([]podVolume, error) {
   198  	podsDirInfo, err := os.ReadDir(podDir)
   199  	if err != nil {
   200  		return nil, err
   201  	}
   202  	volumes := []podVolume{}
   203  	for i := range podsDirInfo {
   204  		if !podsDirInfo[i].IsDir() {
   205  			continue
   206  		}
   207  		podName := podsDirInfo[i].Name()
   208  		podDir := filepath.Join(podDir, podName)
   209  
   210  		// Find filesystem volume information
   211  		// ex. filesystem volume: /pods/{podUid}/volumes/{escapeQualifiedPluginName}/{volumeName}
   212  		volumesDirs := map[v1.PersistentVolumeMode]string{
   213  			v1.PersistentVolumeFilesystem: filepath.Join(podDir, config.DefaultKubeletVolumesDirName),
   214  		}
   215  		// Find block volume information
   216  		// ex. block volume: /pods/{podUid}/volumeDevices/{escapeQualifiedPluginName}/{volumeName}
   217  		volumesDirs[v1.PersistentVolumeBlock] = filepath.Join(podDir, config.DefaultKubeletVolumeDevicesDirName)
   218  
   219  		for volumeMode, volumesDir := range volumesDirs {
   220  			var volumesDirInfo []fs.DirEntry
   221  			if volumesDirInfo, err = os.ReadDir(volumesDir); err != nil {
   222  				// Just skip the loop because given volumesDir doesn't exist depending on volumeMode
   223  				continue
   224  			}
   225  			for _, volumeDir := range volumesDirInfo {
   226  				pluginName := volumeDir.Name()
   227  				volumePluginPath := filepath.Join(volumesDir, pluginName)
   228  				volumePluginDirs, err := utilpath.ReadDirNoStat(volumePluginPath)
   229  				if err != nil {
   230  					klog.ErrorS(err, "Could not read volume plugin directory", "volumePluginPath", volumePluginPath)
   231  					continue
   232  				}
   233  				unescapePluginName := utilstrings.UnescapeQualifiedName(pluginName)
   234  				for _, volumeName := range volumePluginDirs {
   235  					volumePath := filepath.Join(volumePluginPath, volumeName)
   236  					klog.V(5).InfoS("Volume path from volume plugin directory", "podName", podName, "volumePath", volumePath)
   237  					volumes = append(volumes, podVolume{
   238  						podName:        volumetypes.UniquePodName(podName),
   239  						volumeSpecName: volumeName,
   240  						volumePath:     volumePath,
   241  						pluginName:     unescapePluginName,
   242  						volumeMode:     volumeMode,
   243  					})
   244  				}
   245  			}
   246  		}
   247  	}
   248  	for _, volume := range volumes {
   249  		klog.V(4).InfoS("Get volume from pod directory", "path", podDir, "volume", volume)
   250  	}
   251  	return volumes, nil
   252  }
   253  
   254  // Reconstruct volume data structure by reading the pod's volume directories
   255  func (rc *reconciler) reconstructVolume(volume podVolume) (rvolume *reconstructedVolume, rerr error) {
   256  	metrics.ReconstructVolumeOperationsTotal.Inc()
   257  	defer func() {
   258  		if rerr != nil {
   259  			metrics.ReconstructVolumeOperationsErrorsTotal.Inc()
   260  		}
   261  	}()
   262  
   263  	// plugin initializations
   264  	plugin, err := rc.volumePluginMgr.FindPluginByName(volume.pluginName)
   265  	if err != nil {
   266  		return nil, err
   267  	}
   268  
   269  	// Create pod object
   270  	pod := &v1.Pod{
   271  		ObjectMeta: metav1.ObjectMeta{
   272  			UID: types.UID(volume.podName),
   273  		},
   274  	}
   275  	mapperPlugin, err := rc.volumePluginMgr.FindMapperPluginByName(volume.pluginName)
   276  	if err != nil {
   277  		return nil, err
   278  	}
   279  	if volume.volumeMode == v1.PersistentVolumeBlock && mapperPlugin == nil {
   280  		return nil, fmt.Errorf("could not find block volume plugin %q (spec.Name: %q) pod %q (UID: %q)", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
   281  	}
   282  
   283  	reconstructed, err := rc.operationExecutor.ReconstructVolumeOperation(
   284  		volume.volumeMode,
   285  		plugin,
   286  		mapperPlugin,
   287  		pod.UID,
   288  		volume.podName,
   289  		volume.volumeSpecName,
   290  		volume.volumePath,
   291  		volume.pluginName)
   292  	if err != nil {
   293  		return nil, err
   294  	}
   295  	volumeSpec := reconstructed.Spec
   296  	if volumeSpec == nil {
   297  		return nil, fmt.Errorf("failed to reconstruct volume for plugin %q (spec.Name: %q) pod %q (UID: %q): got nil", volume.pluginName, volume.volumeSpecName, volume.podName, pod.UID)
   298  	}
   299  
   300  	// We have to find the plugins by volume spec (NOT by plugin name) here
   301  	// in order to correctly reconstruct ephemeral volume types.
   302  	// Searching by spec checks whether the volume is actually attachable
   303  	// (i.e. has a PV) whereas searching by plugin name can only tell whether
   304  	// the plugin supports attachable volumes.
   305  	deviceMountablePlugin, err := rc.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeSpec)
   306  	if err != nil {
   307  		return nil, err
   308  	}
   309  
   310  	// The unique volume name used depends on whether the volume is attachable/device-mountable
   311  	// (needsNameFromSpec = true) or not.
   312  	needsNameFromSpec := deviceMountablePlugin != nil
   313  	if !needsNameFromSpec {
   314  		// Check attach-ability of a volume only as a fallback to avoid calling
   315  		// FindAttachablePluginBySpec for CSI volumes - it needs a connection to the API server,
   316  		// but it may not be available at this stage of kubelet startup.
   317  		// All CSI volumes are device-mountable, so they won't reach this code.
   318  		attachablePlugin, err := rc.volumePluginMgr.FindAttachablePluginBySpec(volumeSpec)
   319  		if err != nil {
   320  			return nil, err
   321  		}
   322  		needsNameFromSpec = attachablePlugin != nil
   323  	}
   324  
   325  	var uniqueVolumeName v1.UniqueVolumeName
   326  	if needsNameFromSpec {
   327  		uniqueVolumeName, err = util.GetUniqueVolumeNameFromSpec(plugin, volumeSpec)
   328  		if err != nil {
   329  			return nil, err
   330  		}
   331  	} else {
   332  		uniqueVolumeName = util.GetUniqueVolumeNameFromSpecWithPod(volume.podName, plugin, volumeSpec)
   333  	}
   334  
   335  	var volumeMapper volumepkg.BlockVolumeMapper
   336  	var volumeMounter volumepkg.Mounter
   337  	var deviceMounter volumepkg.DeviceMounter
   338  
   339  	if volume.volumeMode == v1.PersistentVolumeBlock {
   340  		var newMapperErr error
   341  		volumeMapper, newMapperErr = mapperPlugin.NewBlockVolumeMapper(
   342  			volumeSpec,
   343  			pod,
   344  			volumepkg.VolumeOptions{})
   345  		if newMapperErr != nil {
   346  			return nil, fmt.Errorf(
   347  				"reconstructVolume.NewBlockVolumeMapper failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
   348  				uniqueVolumeName,
   349  				volumeSpec.Name(),
   350  				volume.podName,
   351  				pod.UID,
   352  				newMapperErr)
   353  		}
   354  	} else {
   355  		var err error
   356  		volumeMounter, err = plugin.NewMounter(
   357  			volumeSpec,
   358  			pod,
   359  			volumepkg.VolumeOptions{})
   360  		if err != nil {
   361  			return nil, fmt.Errorf(
   362  				"reconstructVolume.NewMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
   363  				uniqueVolumeName,
   364  				volumeSpec.Name(),
   365  				volume.podName,
   366  				pod.UID,
   367  				err)
   368  		}
   369  		if deviceMountablePlugin != nil {
   370  			deviceMounter, err = deviceMountablePlugin.NewDeviceMounter()
   371  			if err != nil {
   372  				return nil, fmt.Errorf("reconstructVolume.NewDeviceMounter failed for volume %q (spec.Name: %q) pod %q (UID: %q) with: %v",
   373  					uniqueVolumeName,
   374  					volumeSpec.Name(),
   375  					volume.podName,
   376  					pod.UID,
   377  					err)
   378  			}
   379  		}
   380  	}
   381  
   382  	reconstructedVolume := &reconstructedVolume{
   383  		volumeName: uniqueVolumeName,
   384  		podName:    volume.podName,
   385  		volumeSpec: volumeSpec,
   386  		// volume.volumeSpecName is actually InnerVolumeSpecName. It will not be used
   387  		// for volume cleanup.
   388  		// in case pod is added back to desired state, outerVolumeSpecName will be updated from dsw information.
   389  		// See issue #103143 and its fix for details.
   390  		outerVolumeSpecName: volume.volumeSpecName,
   391  		pod:                 pod,
   392  		deviceMounter:       deviceMounter,
   393  		volumeGidValue:      "",
   394  		// devicePath is updated during updateStates() by checking node status's VolumesAttached data.
   395  		// TODO: get device path directly from the volume mount path.
   396  		devicePath:          "",
   397  		mounter:             volumeMounter,
   398  		blockVolumeMapper:   volumeMapper,
   399  		seLinuxMountContext: reconstructed.SELinuxMountContext,
   400  	}
   401  	return reconstructedVolume, nil
   402  }
   403  

View as plain text