...

Source file src/k8s.io/kubernetes/pkg/volume/portworx/portworx.go

Documentation: k8s.io/kubernetes/pkg/volume/portworx

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package portworx
    18  
    19  import (
    20  	"fmt"
    21  	"net"
    22  	"os"
    23  	"strconv"
    24  
    25  	"k8s.io/klog/v2"
    26  	"k8s.io/mount-utils"
    27  	utilstrings "k8s.io/utils/strings"
    28  
    29  	volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
    30  	v1 "k8s.io/api/core/v1"
    31  	"k8s.io/apimachinery/pkg/api/resource"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/types"
    34  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    35  	"k8s.io/kubernetes/pkg/features"
    36  	"k8s.io/kubernetes/pkg/volume"
    37  	"k8s.io/kubernetes/pkg/volume/util"
    38  )
    39  
    40  const (
    41  	attachContextKey = "context"
    42  	attachHostKey    = "host"
    43  )
    44  
    45  // ProbeVolumePlugins is the primary entrypoint for volume plugins.
    46  func ProbeVolumePlugins() []volume.VolumePlugin {
    47  	return []volume.VolumePlugin{&portworxVolumePlugin{nil, nil}}
    48  }
    49  
    50  type portworxVolumePlugin struct {
    51  	host volume.VolumeHost
    52  	util *portworxVolumeUtil
    53  }
    54  
    55  var _ volume.VolumePlugin = &portworxVolumePlugin{}
    56  var _ volume.PersistentVolumePlugin = &portworxVolumePlugin{}
    57  var _ volume.DeletableVolumePlugin = &portworxVolumePlugin{}
    58  var _ volume.ProvisionableVolumePlugin = &portworxVolumePlugin{}
    59  var _ volume.ExpandableVolumePlugin = &portworxVolumePlugin{}
    60  
    61  const (
    62  	portworxVolumePluginName = "kubernetes.io/portworx-volume"
    63  )
    64  
    65  func getPath(uid types.UID, volName string, host volume.VolumeHost) string {
    66  	return host.GetPodVolumeDir(uid, utilstrings.EscapeQualifiedName(portworxVolumePluginName), volName)
    67  }
    68  
    69  func (plugin *portworxVolumePlugin) IsMigratedToCSI() bool {
    70  	return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationPortworx)
    71  }
    72  
    73  func (plugin *portworxVolumePlugin) Init(host volume.VolumeHost) error {
    74  	client, err := volumeclient.NewDriverClient(
    75  		fmt.Sprintf("http://%s", net.JoinHostPort(host.GetHostName(), strconv.Itoa(osdMgmtDefaultPort))),
    76  		pxdDriverName, osdDriverVersion, pxDriverName)
    77  	if err != nil {
    78  		return err
    79  	}
    80  
    81  	plugin.host = host
    82  	plugin.util = &portworxVolumeUtil{
    83  		portworxClient: client,
    84  	}
    85  
    86  	return nil
    87  }
    88  
    89  func (plugin *portworxVolumePlugin) GetPluginName() string {
    90  	return portworxVolumePluginName
    91  }
    92  
    93  func (plugin *portworxVolumePlugin) GetVolumeName(spec *volume.Spec) (string, error) {
    94  	volumeSource, _, err := getVolumeSource(spec)
    95  	if err != nil {
    96  		return "", err
    97  	}
    98  
    99  	return volumeSource.VolumeID, nil
   100  }
   101  
   102  func (plugin *portworxVolumePlugin) CanSupport(spec *volume.Spec) bool {
   103  	return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.PortworxVolume != nil) ||
   104  		(spec.Volume != nil && spec.Volume.PortworxVolume != nil)
   105  }
   106  
   107  func (plugin *portworxVolumePlugin) RequiresRemount(spec *volume.Spec) bool {
   108  	return false
   109  }
   110  
   111  func (plugin *portworxVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
   112  	return []v1.PersistentVolumeAccessMode{
   113  		v1.ReadWriteOnce,
   114  		v1.ReadWriteMany,
   115  	}
   116  }
   117  
   118  func (plugin *portworxVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
   119  	return plugin.newMounterInternal(spec, pod.UID, plugin.util, plugin.host.GetMounter(plugin.GetPluginName()))
   120  }
   121  
   122  func (plugin *portworxVolumePlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, manager portworxManager, mounter mount.Interface) (volume.Mounter, error) {
   123  	pwx, readOnly, err := getVolumeSource(spec)
   124  	if err != nil {
   125  		return nil, err
   126  	}
   127  
   128  	volumeID := pwx.VolumeID
   129  	fsType := pwx.FSType
   130  
   131  	return &portworxVolumeMounter{
   132  		portworxVolume: &portworxVolume{
   133  			podUID:          podUID,
   134  			volName:         spec.Name(),
   135  			volumeID:        volumeID,
   136  			manager:         manager,
   137  			mounter:         mounter,
   138  			plugin:          plugin,
   139  			MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, spec.Name(), plugin.host)),
   140  		},
   141  		fsType:      fsType,
   142  		readOnly:    readOnly,
   143  		diskMounter: util.NewSafeFormatAndMountFromHost(plugin.GetPluginName(), plugin.host)}, nil
   144  }
   145  
   146  func (plugin *portworxVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
   147  	return plugin.newUnmounterInternal(volName, podUID, plugin.util, plugin.host.GetMounter(plugin.GetPluginName()))
   148  }
   149  
   150  func (plugin *portworxVolumePlugin) newUnmounterInternal(volName string, podUID types.UID, manager portworxManager,
   151  	mounter mount.Interface) (volume.Unmounter, error) {
   152  	return &portworxVolumeUnmounter{
   153  		&portworxVolume{
   154  			podUID:          podUID,
   155  			volName:         volName,
   156  			manager:         manager,
   157  			mounter:         mounter,
   158  			plugin:          plugin,
   159  			MetricsProvider: volume.NewMetricsStatFS(getPath(podUID, volName, plugin.host)),
   160  		}}, nil
   161  }
   162  
   163  func (plugin *portworxVolumePlugin) NewDeleter(logger klog.Logger, spec *volume.Spec) (volume.Deleter, error) {
   164  	return plugin.newDeleterInternal(spec, plugin.util)
   165  }
   166  
   167  func (plugin *portworxVolumePlugin) newDeleterInternal(spec *volume.Spec, manager portworxManager) (volume.Deleter, error) {
   168  	if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.PortworxVolume == nil {
   169  		return nil, fmt.Errorf("spec.PersistentVolumeSource.PortworxVolume is nil")
   170  	}
   171  
   172  	return &portworxVolumeDeleter{
   173  		portworxVolume: &portworxVolume{
   174  			volName:  spec.Name(),
   175  			volumeID: spec.PersistentVolume.Spec.PortworxVolume.VolumeID,
   176  			manager:  manager,
   177  			plugin:   plugin,
   178  		}}, nil
   179  }
   180  
   181  func (plugin *portworxVolumePlugin) NewProvisioner(logger klog.Logger, options volume.VolumeOptions) (volume.Provisioner, error) {
   182  	return plugin.newProvisionerInternal(options, plugin.util)
   183  }
   184  
   185  func (plugin *portworxVolumePlugin) newProvisionerInternal(options volume.VolumeOptions, manager portworxManager) (volume.Provisioner, error) {
   186  	return &portworxVolumeProvisioner{
   187  		portworxVolume: &portworxVolume{
   188  			manager: manager,
   189  			plugin:  plugin,
   190  		},
   191  		options: options,
   192  	}, nil
   193  }
   194  
   195  func (plugin *portworxVolumePlugin) RequiresFSResize() bool {
   196  	return false
   197  }
   198  
   199  func (plugin *portworxVolumePlugin) ExpandVolumeDevice(
   200  	spec *volume.Spec,
   201  	newSize resource.Quantity,
   202  	oldSize resource.Quantity) (resource.Quantity, error) {
   203  	klog.V(4).Infof("Expanding: %s from %v to %v", spec.Name(), oldSize, newSize)
   204  	err := plugin.util.ResizeVolume(spec, newSize, plugin.host)
   205  	if err != nil {
   206  		return oldSize, err
   207  	}
   208  
   209  	klog.V(4).Infof("Successfully resized %s to %v", spec.Name(), newSize)
   210  	return newSize, nil
   211  }
   212  
   213  func (plugin *portworxVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
   214  	portworxVolume := &v1.Volume{
   215  		Name: volumeName,
   216  		VolumeSource: v1.VolumeSource{
   217  			PortworxVolume: &v1.PortworxVolumeSource{
   218  				VolumeID: volumeName,
   219  			},
   220  		},
   221  	}
   222  	return volume.ReconstructedVolume{
   223  		Spec: volume.NewSpecFromVolume(portworxVolume),
   224  	}, nil
   225  }
   226  
   227  func (plugin *portworxVolumePlugin) SupportsMountOption() bool {
   228  	return false
   229  }
   230  
   231  func (plugin *portworxVolumePlugin) SupportsBulkVolumeVerification() bool {
   232  	return false
   233  }
   234  
   235  func (plugin *portworxVolumePlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) {
   236  	return false, nil
   237  }
   238  
   239  func getVolumeSource(
   240  	spec *volume.Spec) (*v1.PortworxVolumeSource, bool, error) {
   241  	if spec.Volume != nil && spec.Volume.PortworxVolume != nil {
   242  		return spec.Volume.PortworxVolume, spec.Volume.PortworxVolume.ReadOnly, nil
   243  	} else if spec.PersistentVolume != nil &&
   244  		spec.PersistentVolume.Spec.PortworxVolume != nil {
   245  		return spec.PersistentVolume.Spec.PortworxVolume, spec.ReadOnly, nil
   246  	}
   247  
   248  	return nil, false, fmt.Errorf("Spec does not reference a Portworx Volume type")
   249  }
   250  
   251  // Abstract interface to PD operations.
   252  type portworxManager interface {
   253  	// Creates a volume
   254  	CreateVolume(provisioner *portworxVolumeProvisioner) (volumeID string, volumeSizeGB int64, labels map[string]string, err error)
   255  	// Deletes a volume
   256  	DeleteVolume(deleter *portworxVolumeDeleter) error
   257  	// Attach a volume
   258  	AttachVolume(mounter *portworxVolumeMounter, attachOptions map[string]string) (string, error)
   259  	// Detach a volume
   260  	DetachVolume(unmounter *portworxVolumeUnmounter) error
   261  	// Mount a volume
   262  	MountVolume(mounter *portworxVolumeMounter, mountDir string) error
   263  	// Unmount a volume
   264  	UnmountVolume(unmounter *portworxVolumeUnmounter, mountDir string) error
   265  	// Resize a volume
   266  	ResizeVolume(spec *volume.Spec, newSize resource.Quantity, host volume.VolumeHost) error
   267  }
   268  
   269  // portworxVolume volumes are portworx block devices
   270  // that are attached to the kubelet's host machine and exposed to the pod.
   271  type portworxVolume struct {
   272  	volName string
   273  	podUID  types.UID
   274  	// Unique id of the PD, used to find the disk resource in the provider.
   275  	volumeID string
   276  	// Utility interface that provides API calls to the provider to attach/detach disks.
   277  	manager portworxManager
   278  	// Mounter interface that provides system calls to mount the global path to the pod local path.
   279  	mounter mount.Interface
   280  	plugin  *portworxVolumePlugin
   281  	volume.MetricsProvider
   282  }
   283  
   284  type portworxVolumeMounter struct {
   285  	*portworxVolume
   286  	// Filesystem type, optional.
   287  	fsType string
   288  	// Specifies whether the disk will be attached as read-only.
   289  	readOnly bool
   290  	// diskMounter provides the interface that is used to mount the actual block device.
   291  	diskMounter *mount.SafeFormatAndMount
   292  }
   293  
   294  var _ volume.Mounter = &portworxVolumeMounter{}
   295  
   296  func (b *portworxVolumeMounter) GetAttributes() volume.Attributes {
   297  	return volume.Attributes{
   298  		ReadOnly:       b.readOnly,
   299  		Managed:        !b.readOnly,
   300  		SELinuxRelabel: false,
   301  	}
   302  }
   303  
   304  // SetUp attaches the disk and bind mounts to the volume path.
   305  func (b *portworxVolumeMounter) SetUp(mounterArgs volume.MounterArgs) error {
   306  	return b.SetUpAt(b.GetPath(), mounterArgs)
   307  }
   308  
   309  // SetUpAt attaches the disk and bind mounts to the volume path.
   310  func (b *portworxVolumeMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
   311  	notMnt, err := b.mounter.IsLikelyNotMountPoint(dir)
   312  	klog.Infof("Portworx Volume set up. Dir: %s %v %v", dir, !notMnt, err)
   313  	if err != nil && !os.IsNotExist(err) {
   314  		klog.Errorf("Cannot validate mountpoint: %s", dir)
   315  		return err
   316  	}
   317  	if !notMnt {
   318  		return nil
   319  	}
   320  
   321  	attachOptions := make(map[string]string)
   322  	attachOptions[attachContextKey] = dir
   323  	attachOptions[attachHostKey] = b.plugin.host.GetHostName()
   324  	if _, err := b.manager.AttachVolume(b, attachOptions); err != nil {
   325  		return err
   326  	}
   327  
   328  	klog.V(4).Infof("Portworx Volume %s attached", b.volumeID)
   329  
   330  	if err := os.MkdirAll(dir, 0750); err != nil {
   331  		return err
   332  	}
   333  
   334  	if err := b.manager.MountVolume(b, dir); err != nil {
   335  		return err
   336  	}
   337  	if !b.readOnly {
   338  		volume.SetVolumeOwnership(b, dir, mounterArgs.FsGroup, mounterArgs.FSGroupChangePolicy, util.FSGroupCompleteHook(b.plugin, nil))
   339  	}
   340  	klog.Infof("Portworx Volume %s setup at %s", b.volumeID, dir)
   341  	return nil
   342  }
   343  
   344  func (pwx *portworxVolume) GetPath() string {
   345  	return getPath(pwx.podUID, pwx.volName, pwx.plugin.host)
   346  }
   347  
   348  type portworxVolumeUnmounter struct {
   349  	*portworxVolume
   350  }
   351  
   352  var _ volume.Unmounter = &portworxVolumeUnmounter{}
   353  
   354  // Unmounts the bind mount, and detaches the disk only if the PD
   355  // resource was the last reference to that disk on the kubelet.
   356  func (c *portworxVolumeUnmounter) TearDown() error {
   357  	return c.TearDownAt(c.GetPath())
   358  }
   359  
   360  // Unmounts the bind mount, and detaches the disk only if the PD
   361  // resource was the last reference to that disk on the kubelet.
   362  func (c *portworxVolumeUnmounter) TearDownAt(dir string) error {
   363  	klog.Infof("Portworx Volume TearDown of %s", dir)
   364  
   365  	if err := c.manager.UnmountVolume(c, dir); err != nil {
   366  		return err
   367  	}
   368  
   369  	// Call Portworx Detach Volume.
   370  	if err := c.manager.DetachVolume(c); err != nil {
   371  		return err
   372  	}
   373  
   374  	return nil
   375  }
   376  
   377  type portworxVolumeDeleter struct {
   378  	*portworxVolume
   379  }
   380  
   381  var _ volume.Deleter = &portworxVolumeDeleter{}
   382  
   383  func (d *portworxVolumeDeleter) GetPath() string {
   384  	return getPath(d.podUID, d.volName, d.plugin.host)
   385  }
   386  
   387  func (d *portworxVolumeDeleter) Delete() error {
   388  	return d.manager.DeleteVolume(d)
   389  }
   390  
   391  type portworxVolumeProvisioner struct {
   392  	*portworxVolume
   393  	options volume.VolumeOptions
   394  }
   395  
   396  var _ volume.Provisioner = &portworxVolumeProvisioner{}
   397  
   398  func (c *portworxVolumeProvisioner) Provision(selectedNode *v1.Node, allowedTopologies []v1.TopologySelectorTerm) (*v1.PersistentVolume, error) {
   399  	if !util.ContainsAllAccessModes(c.plugin.GetAccessModes(), c.options.PVC.Spec.AccessModes) {
   400  		return nil, fmt.Errorf("invalid AccessModes %v: only AccessModes %v are supported", c.options.PVC.Spec.AccessModes, c.plugin.GetAccessModes())
   401  	}
   402  
   403  	if util.CheckPersistentVolumeClaimModeBlock(c.options.PVC) {
   404  		return nil, fmt.Errorf("%s does not support block volume provisioning", c.plugin.GetPluginName())
   405  	}
   406  
   407  	volumeID, sizeGiB, labels, err := c.manager.CreateVolume(c)
   408  	if err != nil {
   409  		return nil, err
   410  	}
   411  
   412  	pv := &v1.PersistentVolume{
   413  		ObjectMeta: metav1.ObjectMeta{
   414  			Name:   c.options.PVName,
   415  			Labels: map[string]string{},
   416  			Annotations: map[string]string{
   417  				util.VolumeDynamicallyCreatedByKey: "portworx-volume-dynamic-provisioner",
   418  			},
   419  		},
   420  		Spec: v1.PersistentVolumeSpec{
   421  			PersistentVolumeReclaimPolicy: c.options.PersistentVolumeReclaimPolicy,
   422  			AccessModes:                   c.options.PVC.Spec.AccessModes,
   423  			Capacity: v1.ResourceList{
   424  				v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGiB)),
   425  			},
   426  			PersistentVolumeSource: v1.PersistentVolumeSource{
   427  				PortworxVolume: &v1.PortworxVolumeSource{
   428  					VolumeID: volumeID,
   429  				},
   430  			},
   431  		},
   432  	}
   433  
   434  	if len(labels) != 0 {
   435  		if pv.Labels == nil {
   436  			pv.Labels = make(map[string]string)
   437  		}
   438  		for k, v := range labels {
   439  			pv.Labels[k] = v
   440  		}
   441  	}
   442  
   443  	if len(c.options.PVC.Spec.AccessModes) == 0 {
   444  		pv.Spec.AccessModes = c.plugin.GetAccessModes()
   445  	}
   446  
   447  	return pv, nil
   448  }
   449  

View as plain text