...

Source file src/k8s.io/kubernetes/pkg/kubelet/volume_host.go

Documentation: k8s.io/kubernetes/pkg/kubelet

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubelet
    18  
    19  import (
    20  	"fmt"
    21  	"net"
    22  	"runtime"
    23  
    24  	"k8s.io/klog/v2"
    25  	"k8s.io/mount-utils"
    26  	utilexec "k8s.io/utils/exec"
    27  
    28  	authenticationv1 "k8s.io/api/authentication/v1"
    29  	v1 "k8s.io/api/core/v1"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/types"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	"k8s.io/client-go/informers"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	storagelisters "k8s.io/client-go/listers/storage/v1"
    36  	"k8s.io/client-go/tools/cache"
    37  	"k8s.io/client-go/tools/record"
    38  	cloudprovider "k8s.io/cloud-provider"
    39  	"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
    40  	"k8s.io/kubernetes/pkg/kubelet/configmap"
    41  	"k8s.io/kubernetes/pkg/kubelet/secret"
    42  	"k8s.io/kubernetes/pkg/kubelet/token"
    43  	"k8s.io/kubernetes/pkg/volume"
    44  	"k8s.io/kubernetes/pkg/volume/util"
    45  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
    46  	"k8s.io/kubernetes/pkg/volume/util/subpath"
    47  )
    48  
    49  // NewInitializedVolumePluginMgr returns a new instance of
    50  // volume.VolumePluginMgr initialized with kubelets implementation of the
    51  // volume.VolumeHost interface.
    52  //
    53  // kubelet - used by VolumeHost methods to expose kubelet specific parameters
    54  // plugins - used to initialize volumePluginMgr
    55  func NewInitializedVolumePluginMgr(
    56  	kubelet *Kubelet,
    57  	secretManager secret.Manager,
    58  	configMapManager configmap.Manager,
    59  	tokenManager *token.Manager,
    60  	clusterTrustBundleManager clustertrustbundle.Manager,
    61  	plugins []volume.VolumePlugin,
    62  	prober volume.DynamicPluginProber) (*volume.VolumePluginMgr, error) {
    63  
    64  	// Initialize csiDriverLister before calling InitPlugins
    65  	var informerFactory informers.SharedInformerFactory
    66  	var csiDriverLister storagelisters.CSIDriverLister
    67  	var csiDriversSynced cache.InformerSynced
    68  	const resyncPeriod = 0
    69  	// Don't initialize if kubeClient is nil
    70  	if kubelet.kubeClient != nil {
    71  		informerFactory = informers.NewSharedInformerFactory(kubelet.kubeClient, resyncPeriod)
    72  		csiDriverInformer := informerFactory.Storage().V1().CSIDrivers()
    73  		csiDriverLister = csiDriverInformer.Lister()
    74  		csiDriversSynced = csiDriverInformer.Informer().HasSynced
    75  
    76  	} else {
    77  		klog.InfoS("KubeClient is nil. Skip initialization of CSIDriverLister")
    78  	}
    79  
    80  	kvh := &kubeletVolumeHost{
    81  		kubelet:                   kubelet,
    82  		volumePluginMgr:           volume.VolumePluginMgr{},
    83  		secretManager:             secretManager,
    84  		configMapManager:          configMapManager,
    85  		tokenManager:              tokenManager,
    86  		clusterTrustBundleManager: clusterTrustBundleManager,
    87  		informerFactory:           informerFactory,
    88  		csiDriverLister:           csiDriverLister,
    89  		csiDriversSynced:          csiDriversSynced,
    90  		exec:                      utilexec.New(),
    91  	}
    92  
    93  	if err := kvh.volumePluginMgr.InitPlugins(plugins, prober, kvh); err != nil {
    94  		return nil, fmt.Errorf(
    95  			"could not initialize volume plugins for KubeletVolumePluginMgr: %v",
    96  			err)
    97  	}
    98  
    99  	return &kvh.volumePluginMgr, nil
   100  }
   101  
   102  // Compile-time check to ensure kubeletVolumeHost implements the VolumeHost interface
   103  var _ volume.VolumeHost = &kubeletVolumeHost{}
   104  var _ volume.KubeletVolumeHost = &kubeletVolumeHost{}
   105  
   106  func (kvh *kubeletVolumeHost) GetPluginDir(pluginName string) string {
   107  	return kvh.kubelet.getPluginDir(pluginName)
   108  }
   109  
   110  type kubeletVolumeHost struct {
   111  	kubelet                   *Kubelet
   112  	volumePluginMgr           volume.VolumePluginMgr
   113  	secretManager             secret.Manager
   114  	tokenManager              *token.Manager
   115  	configMapManager          configmap.Manager
   116  	clusterTrustBundleManager clustertrustbundle.Manager
   117  	informerFactory           informers.SharedInformerFactory
   118  	csiDriverLister           storagelisters.CSIDriverLister
   119  	csiDriversSynced          cache.InformerSynced
   120  	exec                      utilexec.Interface
   121  }
   122  
   123  func (kvh *kubeletVolumeHost) SetKubeletError(err error) {
   124  	kvh.kubelet.runtimeState.setStorageState(err)
   125  }
   126  
   127  func (kvh *kubeletVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {
   128  	return kvh.kubelet.getVolumeDevicePluginDir(pluginName)
   129  }
   130  
   131  func (kvh *kubeletVolumeHost) GetPodsDir() string {
   132  	return kvh.kubelet.getPodsDir()
   133  }
   134  
   135  func (kvh *kubeletVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName string, volumeName string) string {
   136  	dir := kvh.kubelet.getPodVolumeDir(podUID, pluginName, volumeName)
   137  	if runtime.GOOS == "windows" {
   138  		dir = util.GetWindowsPath(dir)
   139  	}
   140  	return dir
   141  }
   142  
   143  func (kvh *kubeletVolumeHost) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
   144  	return kvh.kubelet.getPodVolumeDeviceDir(podUID, pluginName)
   145  }
   146  
   147  func (kvh *kubeletVolumeHost) GetPodPluginDir(podUID types.UID, pluginName string) string {
   148  	return kvh.kubelet.getPodPluginDir(podUID, pluginName)
   149  }
   150  
   151  func (kvh *kubeletVolumeHost) GetKubeClient() clientset.Interface {
   152  	return kvh.kubelet.kubeClient
   153  }
   154  
   155  func (kvh *kubeletVolumeHost) GetSubpather() subpath.Interface {
   156  	return kvh.kubelet.subpather
   157  }
   158  
   159  func (kvh *kubeletVolumeHost) GetHostUtil() hostutil.HostUtils {
   160  	return kvh.kubelet.hostutil
   161  }
   162  
   163  func (kvh *kubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
   164  	return kvh.informerFactory
   165  }
   166  
   167  func (kvh *kubeletVolumeHost) CSIDriverLister() storagelisters.CSIDriverLister {
   168  	return kvh.csiDriverLister
   169  }
   170  
   171  func (kvh *kubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
   172  	return kvh.csiDriversSynced
   173  }
   174  
   175  // WaitForCacheSync is a helper function that waits for cache sync for CSIDriverLister
   176  func (kvh *kubeletVolumeHost) WaitForCacheSync() error {
   177  	if kvh.csiDriversSynced == nil {
   178  		klog.ErrorS(nil, "CsiDriversSynced not found on KubeletVolumeHost")
   179  		return fmt.Errorf("csiDriversSynced not found on KubeletVolumeHost")
   180  	}
   181  
   182  	synced := []cache.InformerSynced{kvh.csiDriversSynced}
   183  	if !cache.WaitForCacheSync(wait.NeverStop, synced...) {
   184  		klog.InfoS("Failed to wait for cache sync for CSIDriverLister")
   185  		return fmt.Errorf("failed to wait for cache sync for CSIDriverLister")
   186  	}
   187  
   188  	return nil
   189  }
   190  
   191  func (kvh *kubeletVolumeHost) NewWrapperMounter(
   192  	volName string,
   193  	spec volume.Spec,
   194  	pod *v1.Pod,
   195  	opts volume.VolumeOptions) (volume.Mounter, error) {
   196  	// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
   197  	wrapperVolumeName := "wrapped_" + volName
   198  	if spec.Volume != nil {
   199  		spec.Volume.Name = wrapperVolumeName
   200  	}
   201  
   202  	return kvh.kubelet.newVolumeMounterFromPlugins(&spec, pod, opts)
   203  }
   204  
   205  func (kvh *kubeletVolumeHost) NewWrapperUnmounter(volName string, spec volume.Spec, podUID types.UID) (volume.Unmounter, error) {
   206  	// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
   207  	wrapperVolumeName := "wrapped_" + volName
   208  	if spec.Volume != nil {
   209  		spec.Volume.Name = wrapperVolumeName
   210  	}
   211  
   212  	plugin, err := kvh.kubelet.volumePluginMgr.FindPluginBySpec(&spec)
   213  	if err != nil {
   214  		return nil, err
   215  	}
   216  
   217  	return plugin.NewUnmounter(spec.Name(), podUID)
   218  }
   219  
   220  func (kvh *kubeletVolumeHost) GetCloudProvider() cloudprovider.Interface {
   221  	return kvh.kubelet.cloud
   222  }
   223  
   224  func (kvh *kubeletVolumeHost) GetMounter(pluginName string) mount.Interface {
   225  	return kvh.kubelet.mounter
   226  }
   227  
   228  func (kvh *kubeletVolumeHost) GetHostName() string {
   229  	return kvh.kubelet.hostname
   230  }
   231  
   232  func (kvh *kubeletVolumeHost) GetHostIP() (net.IP, error) {
   233  	hostIPs, err := kvh.kubelet.GetHostIPs()
   234  	if err != nil {
   235  		return nil, err
   236  	}
   237  	return hostIPs[0], err
   238  }
   239  
   240  func (kvh *kubeletVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) {
   241  	node, err := kvh.kubelet.getNodeAnyWay()
   242  	if err != nil {
   243  		return nil, fmt.Errorf("error retrieving node: %v", err)
   244  	}
   245  	return node.Status.Allocatable, nil
   246  }
   247  
   248  func (kvh *kubeletVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
   249  	if kvh.secretManager != nil {
   250  		return kvh.secretManager.GetSecret
   251  	}
   252  	return func(namespace, name string) (*v1.Secret, error) {
   253  		return nil, fmt.Errorf("not supported due to running kubelet in standalone mode")
   254  	}
   255  }
   256  
   257  func (kvh *kubeletVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
   258  	if kvh.configMapManager != nil {
   259  		return kvh.configMapManager.GetConfigMap
   260  	}
   261  	return func(namespace, name string) (*v1.ConfigMap, error) {
   262  		return nil, fmt.Errorf("not supported due to running kubelet in standalone mode")
   263  	}
   264  }
   265  
   266  func (kvh *kubeletVolumeHost) GetServiceAccountTokenFunc() func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
   267  	return kvh.tokenManager.GetServiceAccountToken
   268  }
   269  
   270  func (kvh *kubeletVolumeHost) DeleteServiceAccountTokenFunc() func(podUID types.UID) {
   271  	return kvh.tokenManager.DeleteServiceAccountToken
   272  }
   273  
   274  func (kvh *kubeletVolumeHost) GetTrustAnchorsByName(name string, allowMissing bool) ([]byte, error) {
   275  	return kvh.clusterTrustBundleManager.GetTrustAnchorsByName(name, allowMissing)
   276  }
   277  
   278  func (kvh *kubeletVolumeHost) GetTrustAnchorsBySigner(signerName string, labelSelector *metav1.LabelSelector, allowMissing bool) ([]byte, error) {
   279  	return kvh.clusterTrustBundleManager.GetTrustAnchorsBySigner(signerName, labelSelector, allowMissing)
   280  }
   281  
   282  func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
   283  	node, err := kvh.kubelet.GetNode()
   284  	if err != nil {
   285  		return nil, fmt.Errorf("error retrieving node: %v", err)
   286  	}
   287  	return node.Labels, nil
   288  }
   289  
   290  func (kvh *kubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
   291  	node, err := kvh.kubelet.GetNode()
   292  	if err != nil {
   293  		return nil, fmt.Errorf("error retrieving node: %v", err)
   294  	}
   295  	attachedVolumes := node.Status.VolumesAttached
   296  	result := map[v1.UniqueVolumeName]string{}
   297  	for i := range attachedVolumes {
   298  		attachedVolume := attachedVolumes[i]
   299  		result[attachedVolume.Name] = attachedVolume.DevicePath
   300  	}
   301  	return result, nil
   302  }
   303  
   304  func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
   305  	return kvh.kubelet.nodeName
   306  }
   307  
   308  func (kvh *kubeletVolumeHost) GetEventRecorder() record.EventRecorder {
   309  	return kvh.kubelet.recorder
   310  }
   311  
   312  func (kvh *kubeletVolumeHost) GetExec(pluginName string) utilexec.Interface {
   313  	return kvh.exec
   314  }
   315  

View as plain text