...

Source file src/k8s.io/kubernetes/pkg/volume/csi/csi_plugin.go

Documentation: k8s.io/kubernetes/pkg/volume/csi

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package csi
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"os"
    24  	"path/filepath"
    25  	"strings"
    26  	"time"
    27  
    28  	"k8s.io/klog/v2"
    29  
    30  	authenticationv1 "k8s.io/api/authentication/v1"
    31  	api "k8s.io/api/core/v1"
    32  	storage "k8s.io/api/storage/v1"
    33  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    34  	meta "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/types"
    36  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    37  	utilversion "k8s.io/apimachinery/pkg/util/version"
    38  	"k8s.io/apimachinery/pkg/util/wait"
    39  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    40  	clientset "k8s.io/client-go/kubernetes"
    41  	storagelisters "k8s.io/client-go/listers/storage/v1"
    42  	csitranslationplugins "k8s.io/csi-translation-lib/plugins"
    43  	"k8s.io/kubernetes/pkg/features"
    44  	"k8s.io/kubernetes/pkg/kubelet/util"
    45  	"k8s.io/kubernetes/pkg/volume"
    46  	"k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
    47  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    48  )
    49  
    50  const (
    51  	// CSIPluginName is the name of the in-tree CSI Plugin
    52  	CSIPluginName = "kubernetes.io/csi"
    53  
    54  	csiTimeout      = 2 * time.Minute
    55  	volNameSep      = "^"
    56  	volDataFileName = "vol_data.json"
    57  	fsTypeBlockName = "block"
    58  
    59  	// CsiResyncPeriod is default resync period duration
    60  	// TODO: increase to something useful
    61  	CsiResyncPeriod = time.Minute
    62  )
    63  
    64  type csiPlugin struct {
    65  	host                      volume.VolumeHost
    66  	csiDriverLister           storagelisters.CSIDriverLister
    67  	serviceAccountTokenGetter func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error)
    68  	volumeAttachmentLister    storagelisters.VolumeAttachmentLister
    69  }
    70  
    71  // ProbeVolumePlugins returns implemented plugins
    72  func ProbeVolumePlugins() []volume.VolumePlugin {
    73  	p := &csiPlugin{
    74  		host: nil,
    75  	}
    76  	return []volume.VolumePlugin{p}
    77  }
    78  
    79  // volume.VolumePlugin methods
    80  var _ volume.VolumePlugin = &csiPlugin{}
    81  
    82  // RegistrationHandler is the handler which is fed to the pluginwatcher API.
    83  type RegistrationHandler struct {
    84  }
    85  
    86  // TODO (verult) consider using a struct instead of global variables
    87  // csiDrivers map keep track of all registered CSI drivers on the node and their
    88  // corresponding sockets
    89  var csiDrivers = &DriversStore{}
    90  
    91  var nim nodeinfomanager.Interface
    92  
    93  // PluginHandler is the plugin registration handler interface passed to the
    94  // pluginwatcher module in kubelet
    95  var PluginHandler = &RegistrationHandler{}
    96  
    97  // ValidatePlugin is called by kubelet's plugin watcher upon detection
    98  // of a new registration socket opened by CSI Driver registrar side car.
    99  func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
   100  	klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s",
   101  		pluginName, endpoint, strings.Join(versions, ",")))
   102  
   103  	_, err := h.validateVersions("ValidatePlugin", pluginName, endpoint, versions)
   104  	if err != nil {
   105  		return fmt.Errorf("validation failed for CSI Driver %s at endpoint %s: %v", pluginName, endpoint, err)
   106  	}
   107  
   108  	return err
   109  }
   110  
   111  // RegisterPlugin is called when a plugin can be registered
   112  func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
   113  	klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
   114  
   115  	highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
   116  	if err != nil {
   117  		return err
   118  	}
   119  
   120  	// Storing endpoint of newly registered CSI driver into the map, where CSI driver name will be the key
   121  	// all other CSI components will be able to get the actual socket of CSI drivers by its name.
   122  	csiDrivers.Set(pluginName, Driver{
   123  		endpoint:                endpoint,
   124  		highestSupportedVersion: highestSupportedVersion,
   125  	})
   126  
   127  	// Get node info from the driver.
   128  	csi, err := newCsiDriverClient(csiDriverName(pluginName))
   129  	if err != nil {
   130  		return err
   131  	}
   132  
   133  	var timeout time.Duration
   134  	if pluginClientTimeout == nil {
   135  		timeout = csiTimeout
   136  	} else {
   137  		timeout = *pluginClientTimeout
   138  	}
   139  
   140  	ctx, cancel := context.WithTimeout(context.Background(), timeout)
   141  	defer cancel()
   142  
   143  	driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
   144  	if err != nil {
   145  		if unregErr := unregisterDriver(pluginName); unregErr != nil {
   146  			klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
   147  		}
   148  		return err
   149  	}
   150  
   151  	err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
   152  	if err != nil {
   153  		if unregErr := unregisterDriver(pluginName); unregErr != nil {
   154  			klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
   155  		}
   156  		return err
   157  	}
   158  
   159  	return nil
   160  }
   161  
   162  func (h *RegistrationHandler) validateVersions(callerName, pluginName string, endpoint string, versions []string) (*utilversion.Version, error) {
   163  	if len(versions) == 0 {
   164  		return nil, errors.New(log("%s for CSI driver %q failed. Plugin returned an empty list for supported versions", callerName, pluginName))
   165  	}
   166  
   167  	// Validate version
   168  	// CSI currently only has version 0.x and 1.x (see https://github.com/container-storage-interface/spec/releases).
   169  	// Therefore any driver claiming version 2.x+ is ignored as an unsupported versions.
   170  	// Future 1.x versions of CSI are supposed to be backwards compatible so this version of Kubernetes will work with any 1.x driver
   171  	// (or 0.x), but it may not work with 2.x drivers (because 2.x does not have to be backwards compatible with 1.x).
   172  	// CSI v0.x is no longer supported as of Kubernetes v1.17 in accordance with deprecation policy set out in Kubernetes v1.13.
   173  	newDriverHighestVersion, err := utilversion.HighestSupportedVersion(versions)
   174  	if err != nil {
   175  		return nil, errors.New(log("%s for CSI driver %q failed. None of the versions specified %q are supported. err=%v", callerName, pluginName, versions, err))
   176  	}
   177  
   178  	existingDriver, driverExists := csiDrivers.Get(pluginName)
   179  	if driverExists {
   180  		if !existingDriver.highestSupportedVersion.LessThan(newDriverHighestVersion) {
   181  			return nil, errors.New(log("%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q", callerName, pluginName, existingDriver.highestSupportedVersion))
   182  		}
   183  	}
   184  
   185  	return newDriverHighestVersion, nil
   186  }
   187  
   188  // DeRegisterPlugin is called when a plugin removed its socket, signaling
   189  // it is no longer available
   190  func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
   191  	klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName))
   192  	if err := unregisterDriver(pluginName); err != nil {
   193  		klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err))
   194  	}
   195  }
   196  
   197  func (p *csiPlugin) Init(host volume.VolumeHost) error {
   198  	p.host = host
   199  
   200  	csiClient := host.GetKubeClient()
   201  	if csiClient == nil {
   202  		klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
   203  	} else {
   204  		// set CSIDriverLister and volumeAttachmentLister
   205  		adcHost, ok := host.(volume.AttachDetachVolumeHost)
   206  		if ok {
   207  			p.csiDriverLister = adcHost.CSIDriverLister()
   208  			if p.csiDriverLister == nil {
   209  				klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
   210  			}
   211  			p.volumeAttachmentLister = adcHost.VolumeAttachmentLister()
   212  			if p.volumeAttachmentLister == nil {
   213  				klog.Error(log("VolumeAttachmentLister not found on AttachDetachVolumeHost"))
   214  			}
   215  		}
   216  		kletHost, ok := host.(volume.KubeletVolumeHost)
   217  		if ok {
   218  			p.csiDriverLister = kletHost.CSIDriverLister()
   219  			if p.csiDriverLister == nil {
   220  				klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
   221  			}
   222  			p.serviceAccountTokenGetter = host.GetServiceAccountTokenFunc()
   223  			if p.serviceAccountTokenGetter == nil {
   224  				klog.Error(log("ServiceAccountTokenGetter not found on KubeletVolumeHost"))
   225  			}
   226  			// We don't run the volumeAttachmentLister in the kubelet context
   227  			p.volumeAttachmentLister = nil
   228  		}
   229  	}
   230  
   231  	var migratedPlugins = map[string](func() bool){
   232  		csitranslationplugins.GCEPDInTreePluginName: func() bool {
   233  			return true
   234  		},
   235  		csitranslationplugins.AWSEBSInTreePluginName: func() bool {
   236  			return true
   237  		},
   238  		csitranslationplugins.CinderInTreePluginName: func() bool {
   239  			return true
   240  		},
   241  		csitranslationplugins.AzureDiskInTreePluginName: func() bool {
   242  			return true
   243  		},
   244  		csitranslationplugins.AzureFileInTreePluginName: func() bool {
   245  			return true
   246  		},
   247  		csitranslationplugins.VSphereInTreePluginName: func() bool {
   248  			return true
   249  		},
   250  		csitranslationplugins.PortworxVolumePluginName: func() bool {
   251  			return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationPortworx)
   252  		},
   253  		csitranslationplugins.RBDVolumePluginName: func() bool {
   254  			return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationRBD)
   255  		},
   256  	}
   257  
   258  	// Initializing the label management channels
   259  	nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins)
   260  
   261  	// This function prevents Kubelet from posting Ready status until CSINode
   262  	// is both installed and initialized
   263  	if err := initializeCSINode(host); err != nil {
   264  		return errors.New(log("failed to initialize CSINode: %v", err))
   265  	}
   266  
   267  	return nil
   268  }
   269  
   270  func initializeCSINode(host volume.VolumeHost) error {
   271  	kvh, ok := host.(volume.KubeletVolumeHost)
   272  	if !ok {
   273  		klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINode initialization, not running on kubelet")
   274  		return nil
   275  	}
   276  	kubeClient := host.GetKubeClient()
   277  	if kubeClient == nil {
   278  		// Kubelet running in standalone mode. Skip CSINode initialization
   279  		klog.Warning("Skipping CSINode initialization, kubelet running in standalone mode")
   280  		return nil
   281  	}
   282  
   283  	kvh.SetKubeletError(errors.New("CSINode is not yet initialized"))
   284  
   285  	go func() {
   286  		defer utilruntime.HandleCrash()
   287  
   288  		// First wait indefinitely to talk to Kube APIServer
   289  		nodeName := host.GetNodeName()
   290  		err := waitForAPIServerForever(kubeClient, nodeName)
   291  		if err != nil {
   292  			klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err)
   293  		}
   294  
   295  		// Backoff parameters tuned to retry over 140 seconds. Will fail and restart the Kubelet
   296  		// after max retry steps.
   297  		initBackoff := wait.Backoff{
   298  			Steps:    6,
   299  			Duration: 15 * time.Millisecond,
   300  			Factor:   6.0,
   301  			Jitter:   0.1,
   302  		}
   303  		err = wait.ExponentialBackoff(initBackoff, func() (bool, error) {
   304  			klog.V(4).Infof("Initializing migrated drivers on CSINode")
   305  			err := nim.InitializeCSINodeWithAnnotation()
   306  			if err != nil {
   307  				kvh.SetKubeletError(fmt.Errorf("failed to initialize CSINode: %v", err))
   308  				klog.Errorf("Failed to initialize CSINode: %v", err)
   309  				return false, nil
   310  			}
   311  
   312  			// Successfully initialized drivers, allow Kubelet to post Ready
   313  			kvh.SetKubeletError(nil)
   314  			return true, nil
   315  		})
   316  		if err != nil {
   317  			// 2 releases after CSIMigration and all CSIMigrationX (where X is a volume plugin)
   318  			// are permanently enabled the apiserver/controllers can assume that the kubelet is
   319  			// using CSI for all Migrated volume plugins. Then all the CSINode initialization
   320  			// code can be dropped from Kubelet.
   321  			// Kill the Kubelet process and allow it to restart to retry initialization
   322  			klog.Fatalf("Failed to initialize CSINode after retrying: %v", err)
   323  		}
   324  	}()
   325  	return nil
   326  }
   327  
   328  func (p *csiPlugin) GetPluginName() string {
   329  	return CSIPluginName
   330  }
   331  
   332  // GetvolumeName returns a concatenated string of CSIVolumeSource.Driver<volNameSe>CSIVolumeSource.VolumeHandle
   333  // That string value is used in Detach() to extract driver name and volumeName.
   334  func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
   335  	csi, err := getPVSourceFromSpec(spec)
   336  	if err != nil {
   337  		return "", errors.New(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err))
   338  	}
   339  
   340  	// return driverName<separator>volumeHandle
   341  	return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil
   342  }
   343  
   344  func (p *csiPlugin) CanSupport(spec *volume.Spec) bool {
   345  	// TODO (vladimirvivien) CanSupport should also take into account
   346  	// the availability/registration of specified Driver in the volume source
   347  	if spec == nil {
   348  		return false
   349  	}
   350  	return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil) ||
   351  		(spec.Volume != nil && spec.Volume.CSI != nil)
   352  }
   353  
   354  func (p *csiPlugin) RequiresRemount(spec *volume.Spec) bool {
   355  	if p.csiDriverLister == nil {
   356  		return false
   357  	}
   358  	driverName, err := GetCSIDriverName(spec)
   359  	if err != nil {
   360  		klog.V(5).Info(log("Failed to mark %q as republish required, err: %v", spec.Name(), err))
   361  		return false
   362  	}
   363  	csiDriver, err := p.getCSIDriver(driverName)
   364  	if err != nil {
   365  		klog.V(5).Info(log("Failed to mark %q as republish required, err: %v", spec.Name(), err))
   366  		return false
   367  	}
   368  	return *csiDriver.Spec.RequiresRepublish
   369  }
   370  
   371  func (p *csiPlugin) NewMounter(
   372  	spec *volume.Spec,
   373  	pod *api.Pod,
   374  	_ volume.VolumeOptions) (volume.Mounter, error) {
   375  
   376  	volSrc, pvSrc, err := getSourceFromSpec(spec)
   377  	if err != nil {
   378  		return nil, err
   379  	}
   380  
   381  	var (
   382  		driverName   string
   383  		volumeHandle string
   384  		readOnly     bool
   385  	)
   386  
   387  	switch {
   388  	case volSrc != nil:
   389  		volumeHandle = makeVolumeHandle(string(pod.UID), spec.Name())
   390  		driverName = volSrc.Driver
   391  		if volSrc.ReadOnly != nil {
   392  			readOnly = *volSrc.ReadOnly
   393  		}
   394  	case pvSrc != nil:
   395  		driverName = pvSrc.Driver
   396  		volumeHandle = pvSrc.VolumeHandle
   397  		readOnly = spec.ReadOnly
   398  	default:
   399  		return nil, errors.New(log("volume source not found in volume.Spec"))
   400  	}
   401  
   402  	volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
   403  	if err != nil {
   404  		return nil, err
   405  	}
   406  
   407  	k8s := p.host.GetKubeClient()
   408  	if k8s == nil {
   409  		return nil, errors.New(log("failed to get a kubernetes client"))
   410  	}
   411  
   412  	kvh, ok := p.host.(volume.KubeletVolumeHost)
   413  	if !ok {
   414  		return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed"))
   415  	}
   416  
   417  	mounter := &csiMountMgr{
   418  		plugin:              p,
   419  		k8s:                 k8s,
   420  		spec:                spec,
   421  		pod:                 pod,
   422  		podUID:              pod.UID,
   423  		driverName:          csiDriverName(driverName),
   424  		volumeLifecycleMode: volumeLifecycleMode,
   425  		volumeID:            volumeHandle,
   426  		specVolumeID:        spec.Name(),
   427  		readOnly:            readOnly,
   428  		kubeVolHost:         kvh,
   429  	}
   430  	mounter.csiClientGetter.driverName = csiDriverName(driverName)
   431  
   432  	dir := mounter.GetPath()
   433  	mounter.MetricsProvider = NewMetricsCsi(volumeHandle, dir, csiDriverName(driverName))
   434  	klog.V(4).Info(log("mounter created successfully"))
   435  	return mounter, nil
   436  }
   437  
   438  func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) {
   439  	klog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID))
   440  
   441  	kvh, ok := p.host.(volume.KubeletVolumeHost)
   442  	if !ok {
   443  		return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed"))
   444  	}
   445  
   446  	unmounter := &csiMountMgr{
   447  		plugin:       p,
   448  		podUID:       podUID,
   449  		specVolumeID: specName,
   450  		kubeVolHost:  kvh,
   451  	}
   452  
   453  	// load volume info from file
   454  	dir := unmounter.GetPath()
   455  	dataDir := filepath.Dir(dir) // dropoff /mount at end
   456  	data, err := loadVolumeData(dataDir, volDataFileName)
   457  	if err != nil {
   458  		return nil, errors.New(log("unmounter failed to load volume data file [%s]: %v", dir, err))
   459  	}
   460  	unmounter.driverName = csiDriverName(data[volDataKey.driverName])
   461  	unmounter.volumeID = data[volDataKey.volHandle]
   462  	unmounter.csiClientGetter.driverName = unmounter.driverName
   463  
   464  	return unmounter, nil
   465  }
   466  
   467  func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
   468  	klog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath))
   469  
   470  	volData, err := loadVolumeData(mountPath, volDataFileName)
   471  	if err != nil {
   472  		return volume.ReconstructedVolume{}, errors.New(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err))
   473  	}
   474  	klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData))
   475  
   476  	var ret volume.ReconstructedVolume
   477  	if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
   478  		ret.SELinuxMountContext = volData[volDataKey.seLinuxMountContext]
   479  	}
   480  
   481  	// If mode is VolumeLifecycleEphemeral, use constructVolSourceSpec
   482  	// to construct volume source spec. If mode is VolumeLifecyclePersistent,
   483  	// use constructPVSourceSpec to construct volume construct pv source spec.
   484  	if storage.VolumeLifecycleMode(volData[volDataKey.volumeLifecycleMode]) == storage.VolumeLifecycleEphemeral {
   485  		ret.Spec = p.constructVolSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName])
   486  		return ret, nil
   487  	}
   488  
   489  	ret.Spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
   490  	return ret, nil
   491  }
   492  
   493  // constructVolSourceSpec constructs volume.Spec with CSIVolumeSource
   494  func (p *csiPlugin) constructVolSourceSpec(volSpecName, driverName string) *volume.Spec {
   495  	vol := &api.Volume{
   496  		Name: volSpecName,
   497  		VolumeSource: api.VolumeSource{
   498  			CSI: &api.CSIVolumeSource{
   499  				Driver: driverName,
   500  			},
   501  		},
   502  	}
   503  	return volume.NewSpecFromVolume(vol)
   504  }
   505  
   506  // constructPVSourceSpec constructs volume.Spec with CSIPersistentVolumeSource
   507  func (p *csiPlugin) constructPVSourceSpec(volSpecName, driverName, volumeHandle string) *volume.Spec {
   508  	fsMode := api.PersistentVolumeFilesystem
   509  	pv := &api.PersistentVolume{
   510  		ObjectMeta: meta.ObjectMeta{
   511  			Name: volSpecName,
   512  		},
   513  		Spec: api.PersistentVolumeSpec{
   514  			PersistentVolumeSource: api.PersistentVolumeSource{
   515  				CSI: &api.CSIPersistentVolumeSource{
   516  					Driver:       driverName,
   517  					VolumeHandle: volumeHandle,
   518  				},
   519  			},
   520  			VolumeMode: &fsMode,
   521  		},
   522  	}
   523  	return volume.NewSpecFromPersistentVolume(pv, false)
   524  }
   525  
   526  func (p *csiPlugin) SupportsMountOption() bool {
   527  	// TODO (vladimirvivien) use CSI VolumeCapability.MountVolume.mount_flags
   528  	// to probe for the result for this method
   529  	// (bswartz) Until the CSI spec supports probing, our only option is to
   530  	// make plugins register their support for mount options or lack thereof
   531  	// directly with kubernetes.
   532  	return true
   533  }
   534  
   535  func (p *csiPlugin) SupportsBulkVolumeVerification() bool {
   536  	return false
   537  }
   538  
   539  func (p *csiPlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) {
   540  	if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
   541  		driver, err := GetCSIDriverName(spec)
   542  		if err != nil {
   543  			return false, err
   544  		}
   545  		csiDriver, err := p.getCSIDriver(driver)
   546  		if err != nil {
   547  			if apierrors.IsNotFound(err) {
   548  				return false, nil
   549  			}
   550  			return false, err
   551  		}
   552  		if csiDriver.Spec.SELinuxMount != nil {
   553  			return *csiDriver.Spec.SELinuxMount, nil
   554  		}
   555  		return false, nil
   556  	}
   557  	return false, nil
   558  }
   559  
   560  // volume.AttachableVolumePlugin methods
   561  var _ volume.AttachableVolumePlugin = &csiPlugin{}
   562  
   563  var _ volume.DeviceMountableVolumePlugin = &csiPlugin{}
   564  
   565  func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
   566  	return p.newAttacherDetacher()
   567  }
   568  
   569  func (p *csiPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
   570  	return p.NewAttacher()
   571  }
   572  
   573  func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
   574  	return p.newAttacherDetacher()
   575  }
   576  
   577  func (p *csiPlugin) CanAttach(spec *volume.Spec) (bool, error) {
   578  	volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
   579  	if err != nil {
   580  		return false, err
   581  	}
   582  
   583  	if volumeLifecycleMode == storage.VolumeLifecycleEphemeral {
   584  		klog.V(5).Info(log("plugin.CanAttach = false, ephemeral mode detected for spec %v", spec.Name()))
   585  		return false, nil
   586  	}
   587  
   588  	pvSrc, err := getCSISourceFromSpec(spec)
   589  	if err != nil {
   590  		return false, err
   591  	}
   592  
   593  	driverName := pvSrc.Driver
   594  
   595  	skipAttach, err := p.skipAttach(driverName)
   596  	if err != nil {
   597  		return false, err
   598  	}
   599  
   600  	return !skipAttach, nil
   601  }
   602  
   603  // CanDeviceMount returns true if the spec supports device mount
   604  func (p *csiPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
   605  	volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
   606  	if err != nil {
   607  		return false, err
   608  	}
   609  
   610  	if volumeLifecycleMode == storage.VolumeLifecycleEphemeral {
   611  		klog.V(5).Info(log("plugin.CanDeviceMount skipped ephemeral mode detected for spec %v", spec.Name()))
   612  		return false, nil
   613  	}
   614  
   615  	// Persistent volumes support device mount.
   616  	return true, nil
   617  }
   618  
   619  func (p *csiPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
   620  	return p.NewDetacher()
   621  }
   622  
   623  func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
   624  	m := p.host.GetMounter(p.GetPluginName())
   625  	return m.GetMountRefs(deviceMountPath)
   626  }
   627  
   628  // BlockVolumePlugin methods
   629  var _ volume.BlockVolumePlugin = &csiPlugin{}
   630  
   631  func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opts volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
   632  	pvSource, err := getCSISourceFromSpec(spec)
   633  	if err != nil {
   634  		return nil, err
   635  	}
   636  	readOnly, err := getReadOnlyFromSpec(spec)
   637  	if err != nil {
   638  		return nil, err
   639  	}
   640  
   641  	klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
   642  
   643  	k8s := p.host.GetKubeClient()
   644  	if k8s == nil {
   645  		return nil, errors.New(log("failed to get a kubernetes client"))
   646  	}
   647  
   648  	mapper := &csiBlockMapper{
   649  		k8s:        k8s,
   650  		plugin:     p,
   651  		volumeID:   pvSource.VolumeHandle,
   652  		driverName: csiDriverName(pvSource.Driver),
   653  		readOnly:   readOnly,
   654  		spec:       spec,
   655  		specName:   spec.Name(),
   656  		pod:        podRef,
   657  		podUID:     podRef.UID,
   658  	}
   659  	mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
   660  
   661  	// Save volume info in pod dir
   662  	dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
   663  
   664  	if err := os.MkdirAll(dataDir, 0750); err != nil {
   665  		return nil, errors.New(log("failed to create data dir %s:  %v", dataDir, err))
   666  	}
   667  	klog.V(4).Info(log("created path successfully [%s]", dataDir))
   668  
   669  	blockPath, err := mapper.GetGlobalMapPath(spec)
   670  	if err != nil {
   671  		return nil, errors.New(log("failed to get device path: %v", err))
   672  	}
   673  
   674  	mapper.MetricsProvider = NewMetricsCsi(pvSource.VolumeHandle, blockPath+"/"+string(podRef.UID), csiDriverName(pvSource.Driver))
   675  
   676  	// persist volume info data for teardown
   677  	node := string(p.host.GetNodeName())
   678  	attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node)
   679  	volData := map[string]string{
   680  		volDataKey.specVolID:    spec.Name(),
   681  		volDataKey.volHandle:    pvSource.VolumeHandle,
   682  		volDataKey.driverName:   pvSource.Driver,
   683  		volDataKey.nodeName:     node,
   684  		volDataKey.attachmentID: attachID,
   685  	}
   686  
   687  	err = saveVolumeData(dataDir, volDataFileName, volData)
   688  	defer func() {
   689  		// Only if there was an error and volume operation was considered
   690  		// finished, we should remove the directory.
   691  		if err != nil && volumetypes.IsOperationFinishedError(err) {
   692  			// attempt to cleanup volume mount dir.
   693  			if err = removeMountDir(p, dataDir); err != nil {
   694  				klog.Error(log("attacher.MountDevice failed to remove mount dir after error [%s]: %v", dataDir, err))
   695  			}
   696  		}
   697  	}()
   698  	if err != nil {
   699  		errorMsg := log("csi.NewBlockVolumeMapper failed to save volume info data: %v", err)
   700  		klog.Error(errorMsg)
   701  		return nil, errors.New(errorMsg)
   702  	}
   703  
   704  	return mapper, nil
   705  }
   706  
   707  func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
   708  	klog.V(4).Infof(log("setting up block unmapper for [Spec=%v, podUID=%v]", volName, podUID))
   709  	unmapper := &csiBlockMapper{
   710  		plugin:   p,
   711  		podUID:   podUID,
   712  		specName: volName,
   713  	}
   714  
   715  	// load volume info from file
   716  	dataDir := getVolumeDeviceDataDir(unmapper.specName, p.host)
   717  	data, err := loadVolumeData(dataDir, volDataFileName)
   718  	if err != nil {
   719  		return nil, errors.New(log("unmapper failed to load volume data file [%s]: %v", dataDir, err))
   720  	}
   721  	unmapper.driverName = csiDriverName(data[volDataKey.driverName])
   722  	unmapper.volumeID = data[volDataKey.volHandle]
   723  	unmapper.csiClientGetter.driverName = unmapper.driverName
   724  
   725  	return unmapper, nil
   726  }
   727  
   728  func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapPath string) (*volume.Spec, error) {
   729  	klog.V(4).Infof("plugin.ConstructBlockVolumeSpec [podUID=%s, specVolName=%s, path=%s]", string(podUID), specVolName, mapPath)
   730  
   731  	dataDir := getVolumeDeviceDataDir(specVolName, p.host)
   732  	volData, err := loadVolumeData(dataDir, volDataFileName)
   733  	if err != nil {
   734  		return nil, errors.New(log("plugin.ConstructBlockVolumeSpec failed loading volume data using [%s]: %v", mapPath, err))
   735  	}
   736  
   737  	klog.V(4).Info(log("plugin.ConstructBlockVolumeSpec extracted [%#v]", volData))
   738  
   739  	blockMode := api.PersistentVolumeBlock
   740  	pv := &api.PersistentVolume{
   741  		ObjectMeta: meta.ObjectMeta{
   742  			Name: volData[volDataKey.specVolID],
   743  		},
   744  		Spec: api.PersistentVolumeSpec{
   745  			PersistentVolumeSource: api.PersistentVolumeSource{
   746  				CSI: &api.CSIPersistentVolumeSource{
   747  					Driver:       volData[volDataKey.driverName],
   748  					VolumeHandle: volData[volDataKey.volHandle],
   749  				},
   750  			},
   751  			VolumeMode: &blockMode,
   752  		},
   753  	}
   754  
   755  	return volume.NewSpecFromPersistentVolume(pv, false), nil
   756  }
   757  
   758  // skipAttach looks up CSIDriver object associated with driver name
   759  // to determine if driver requires attachment volume operation
   760  func (p *csiPlugin) skipAttach(driver string) (bool, error) {
   761  	csiDriver, err := p.getCSIDriver(driver)
   762  	if err != nil {
   763  		if apierrors.IsNotFound(err) {
   764  			// Don't skip attach if CSIDriver does not exist
   765  			return false, nil
   766  		}
   767  		return false, err
   768  	}
   769  	if csiDriver.Spec.AttachRequired != nil && *csiDriver.Spec.AttachRequired == false {
   770  		return true, nil
   771  	}
   772  	return false, nil
   773  }
   774  
   775  func (p *csiPlugin) getCSIDriver(driver string) (*storage.CSIDriver, error) {
   776  	kletHost, ok := p.host.(volume.KubeletVolumeHost)
   777  	if ok {
   778  		if err := kletHost.WaitForCacheSync(); err != nil {
   779  			return nil, err
   780  		}
   781  	}
   782  
   783  	if p.csiDriverLister == nil {
   784  		return nil, errors.New("CSIDriver lister does not exist")
   785  	}
   786  	csiDriver, err := p.csiDriverLister.Get(driver)
   787  	return csiDriver, err
   788  }
   789  
   790  // getVolumeLifecycleMode returns the mode for the specified spec: {persistent|ephemeral}.
   791  // 1) If mode cannot be determined, it will default to "persistent".
   792  // 2) If Mode cannot be resolved to either {persistent | ephemeral}, an error is returned
   793  // See https://github.com/kubernetes/enhancements/blob/master/keps/sig-storage/596-csi-inline-volumes/README.md
   794  func (p *csiPlugin) getVolumeLifecycleMode(spec *volume.Spec) (storage.VolumeLifecycleMode, error) {
   795  	// 1) if volume.Spec.Volume.CSI != nil -> mode is ephemeral
   796  	// 2) if volume.Spec.PersistentVolume.Spec.CSI != nil -> persistent
   797  	volSrc, _, err := getSourceFromSpec(spec)
   798  	if err != nil {
   799  		return "", err
   800  	}
   801  
   802  	if volSrc != nil {
   803  		return storage.VolumeLifecycleEphemeral, nil
   804  	}
   805  	return storage.VolumeLifecyclePersistent, nil
   806  }
   807  
   808  func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
   809  	skip, err := p.skipAttach(driver)
   810  	if err != nil {
   811  		return nil, err
   812  	}
   813  	if skip {
   814  		return nil, nil
   815  	}
   816  
   817  	attachID := getAttachmentName(handle, driver, nodeName)
   818  
   819  	// search for attachment by VolumeAttachment.Spec.Source.PersistentVolumeName
   820  	attachment, err := client.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
   821  	if err != nil {
   822  		return nil, err // This err already has enough context ("VolumeAttachment xyz not found")
   823  	}
   824  
   825  	if attachment == nil {
   826  		err = errors.New("no existing VolumeAttachment found")
   827  		return nil, err
   828  	}
   829  	return attachment.Status.AttachmentMetadata, nil
   830  }
   831  
   832  func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) {
   833  	k8s := p.host.GetKubeClient()
   834  	if k8s == nil {
   835  		return nil, errors.New(log("unable to get kubernetes client from host"))
   836  	}
   837  
   838  	return &csiAttacher{
   839  		plugin:       p,
   840  		k8s:          k8s,
   841  		watchTimeout: csiTimeout,
   842  	}, nil
   843  }
   844  
   845  // podInfoEnabled  check CSIDriver enabled pod info flag
   846  func (p *csiPlugin) podInfoEnabled(driverName string) (bool, error) {
   847  	csiDriver, err := p.getCSIDriver(driverName)
   848  	if err != nil {
   849  		if apierrors.IsNotFound(err) {
   850  			klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", driverName))
   851  			return false, nil
   852  		}
   853  		return false, err
   854  	}
   855  
   856  	// if PodInfoOnMount is not set or false we do not set pod attributes
   857  	if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false {
   858  		klog.V(4).Infof(log("CSIDriver %q does not require pod information", driverName))
   859  		return false, nil
   860  	}
   861  	return true, nil
   862  }
   863  
   864  func unregisterDriver(driverName string) error {
   865  	csiDrivers.Delete(driverName)
   866  
   867  	if err := nim.UninstallCSIDriver(driverName); err != nil {
   868  		return errors.New(log("Error uninstalling CSI driver: %v", err))
   869  	}
   870  
   871  	return nil
   872  }
   873  
   874  // waitForAPIServerForever waits forever to get a CSINode instance as a proxy
   875  // for a healthy APIServer
   876  func waitForAPIServerForever(client clientset.Interface, nodeName types.NodeName) error {
   877  	var lastErr error
   878  	// Served object is discarded so no risk to have stale object with benefit to
   879  	// reduce the load on APIServer and etcd.
   880  	opts := meta.GetOptions{}
   881  	util.FromApiserverCache(&opts)
   882  	err := wait.PollImmediateInfinite(time.Second, func() (bool, error) {
   883  		// Get a CSINode from API server to make sure 1) kubelet can reach API server
   884  		// and 2) it has enough permissions. Kubelet may have restricted permissions
   885  		// when it's bootstrapping TLS.
   886  		// https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet-tls-bootstrapping/
   887  		_, lastErr = client.StorageV1().CSINodes().Get(context.TODO(), string(nodeName), opts)
   888  		if lastErr == nil || apierrors.IsNotFound(lastErr) {
   889  			// API server contacted
   890  			return true, nil
   891  		}
   892  		klog.V(2).Infof("Failed to contact API server when waiting for CSINode publishing: %s", lastErr)
   893  		return false, nil
   894  	})
   895  	if err != nil {
   896  		// In theory this is unreachable, but just in case:
   897  		return fmt.Errorf("%v: %v", err, lastErr)
   898  	}
   899  
   900  	return nil
   901  }
   902  

View as plain text