...

Source file src/k8s.io/kubernetes/pkg/volume/portworx/portworx_util.go

Documentation: k8s.io/kubernetes/pkg/volume/portworx

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package portworx
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"strconv"
    24  
    25  	osdapi "github.com/libopenstorage/openstorage/api"
    26  	osdclient "github.com/libopenstorage/openstorage/api/client"
    27  	volumeclient "github.com/libopenstorage/openstorage/api/client/volume"
    28  	osdspec "github.com/libopenstorage/openstorage/api/spec"
    29  	volumeapi "github.com/libopenstorage/openstorage/volume"
    30  	v1 "k8s.io/api/core/v1"
    31  	"k8s.io/apimachinery/pkg/api/resource"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	volumehelpers "k8s.io/cloud-provider/volume/helpers"
    34  	"k8s.io/klog/v2"
    35  
    36  	api "k8s.io/kubernetes/pkg/apis/core"
    37  	"k8s.io/kubernetes/pkg/volume"
    38  )
    39  
    40  const (
    41  	osdMgmtDefaultPort = 9001
    42  	osdDriverVersion   = "v1"
    43  	pxdDriverName      = "pxd"
    44  	pvcClaimLabel      = "pvc"
    45  	pvcNamespaceLabel  = "namespace"
    46  	pxServiceName      = "portworx-service"
    47  	pxDriverName       = "pxd-sched"
    48  )
    49  
    50  type portworxVolumeUtil struct {
    51  	portworxClient *osdclient.Client
    52  }
    53  
    54  // CreateVolume creates a Portworx volume.
    55  func (util *portworxVolumeUtil) CreateVolume(p *portworxVolumeProvisioner) (string, int64, map[string]string, error) {
    56  	driver, err := util.getPortworxDriver(p.plugin.host)
    57  	if err != nil || driver == nil {
    58  		klog.Errorf("Failed to get portworx driver. Err: %v", err)
    59  		return "", 0, nil, err
    60  	}
    61  
    62  	klog.Infof("Creating Portworx volume for PVC: %v", p.options.PVC.Name)
    63  
    64  	capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
    65  	// Portworx Volumes are specified in GiB
    66  	requestGiB, err := volumehelpers.RoundUpToGiB(capacity)
    67  	if err != nil {
    68  		return "", 0, nil, err
    69  	}
    70  
    71  	// Perform a best-effort parsing of parameters. Portworx 1.2.9 and later parses volume parameters from
    72  	// spec.VolumeLabels. So even if below SpecFromOpts() fails to parse certain parameters or
    73  	// doesn't support new parameters, the server-side processing will parse it correctly.
    74  	// We still need to call SpecFromOpts() here to handle cases where someone is running Portworx 1.2.8 and lower.
    75  	specHandler := osdspec.NewSpecHandler()
    76  	spec, locator, source, _ := specHandler.SpecFromOpts(p.options.Parameters)
    77  	if spec == nil {
    78  		spec = specHandler.DefaultSpec()
    79  	}
    80  
    81  	// Pass all parameters as volume labels for Portworx server-side processing
    82  	if spec.VolumeLabels == nil {
    83  		spec.VolumeLabels = make(map[string]string, 0)
    84  	}
    85  
    86  	for k, v := range p.options.Parameters {
    87  		spec.VolumeLabels[k] = v
    88  	}
    89  
    90  	// Update the requested size in the spec
    91  	spec.Size = uint64(requestGiB * volumehelpers.GiB)
    92  
    93  	// Change the Portworx Volume name to PV name
    94  	if locator == nil {
    95  		locator = &osdapi.VolumeLocator{
    96  			VolumeLabels: make(map[string]string),
    97  		}
    98  	}
    99  	locator.Name = p.options.PVName
   100  
   101  	// Add claim Name as a part of Portworx Volume Labels
   102  	locator.VolumeLabels[pvcClaimLabel] = p.options.PVC.Name
   103  	locator.VolumeLabels[pvcNamespaceLabel] = p.options.PVC.Namespace
   104  
   105  	for k, v := range p.options.PVC.Annotations {
   106  		if _, present := spec.VolumeLabels[k]; present {
   107  			klog.Warningf("not saving annotation: %s=%s in spec labels due to an existing key", k, v)
   108  			continue
   109  		}
   110  		spec.VolumeLabels[k] = v
   111  	}
   112  
   113  	volumeID, err := driver.Create(locator, source, spec)
   114  	if err != nil {
   115  		klog.Errorf("Error creating Portworx Volume : %v", err)
   116  		return "", 0, nil, err
   117  	}
   118  
   119  	klog.Infof("Successfully created Portworx volume for PVC: %v", p.options.PVC.Name)
   120  	return volumeID, requestGiB, nil, err
   121  }
   122  
   123  // DeleteVolume deletes a Portworx volume
   124  func (util *portworxVolumeUtil) DeleteVolume(d *portworxVolumeDeleter) error {
   125  	driver, err := util.getPortworxDriver(d.plugin.host)
   126  	if err != nil || driver == nil {
   127  		klog.Errorf("Failed to get portworx driver. Err: %v", err)
   128  		return err
   129  	}
   130  
   131  	err = driver.Delete(d.volumeID)
   132  	if err != nil {
   133  		klog.Errorf("Error deleting Portworx Volume (%v): %v", d.volName, err)
   134  		return err
   135  	}
   136  	return nil
   137  }
   138  
   139  // AttachVolume attaches a Portworx Volume
   140  func (util *portworxVolumeUtil) AttachVolume(m *portworxVolumeMounter, attachOptions map[string]string) (string, error) {
   141  	driver, err := util.getLocalPortworxDriver(m.plugin.host)
   142  	if err != nil || driver == nil {
   143  		klog.Errorf("Failed to get portworx driver. Err: %v", err)
   144  		return "", err
   145  	}
   146  
   147  	devicePath, err := driver.Attach(m.volName, attachOptions)
   148  	if err != nil {
   149  		klog.Errorf("Error attaching Portworx Volume (%v): %v", m.volName, err)
   150  		return "", err
   151  	}
   152  	return devicePath, nil
   153  }
   154  
   155  // DetachVolume detaches a Portworx Volume
   156  func (util *portworxVolumeUtil) DetachVolume(u *portworxVolumeUnmounter) error {
   157  	driver, err := util.getLocalPortworxDriver(u.plugin.host)
   158  	if err != nil || driver == nil {
   159  		klog.Errorf("Failed to get portworx driver. Err: %v", err)
   160  		return err
   161  	}
   162  
   163  	err = driver.Detach(u.volName, false /*doNotForceDetach*/)
   164  	if err != nil {
   165  		klog.Errorf("Error detaching Portworx Volume (%v): %v", u.volName, err)
   166  		return err
   167  	}
   168  	return nil
   169  }
   170  
   171  // MountVolume mounts a Portworx Volume on the specified mountPath
   172  func (util *portworxVolumeUtil) MountVolume(m *portworxVolumeMounter, mountPath string) error {
   173  	driver, err := util.getLocalPortworxDriver(m.plugin.host)
   174  	if err != nil || driver == nil {
   175  		klog.Errorf("Failed to get portworx driver. Err: %v", err)
   176  		return err
   177  	}
   178  
   179  	err = driver.Mount(m.volName, mountPath)
   180  	if err != nil {
   181  		klog.Errorf("Error mounting Portworx Volume (%v) on Path (%v): %v", m.volName, mountPath, err)
   182  		return err
   183  	}
   184  	return nil
   185  }
   186  
   187  // UnmountVolume unmounts a Portworx Volume
   188  func (util *portworxVolumeUtil) UnmountVolume(u *portworxVolumeUnmounter, mountPath string) error {
   189  	driver, err := util.getLocalPortworxDriver(u.plugin.host)
   190  	if err != nil || driver == nil {
   191  		klog.Errorf("Failed to get portworx driver. Err: %v", err)
   192  		return err
   193  	}
   194  
   195  	err = driver.Unmount(u.volName, mountPath)
   196  	if err != nil {
   197  		klog.Errorf("Error unmounting Portworx Volume (%v) on Path (%v): %v", u.volName, mountPath, err)
   198  		return err
   199  	}
   200  	return nil
   201  }
   202  
   203  func (util *portworxVolumeUtil) ResizeVolume(spec *volume.Spec, newSize resource.Quantity, volumeHost volume.VolumeHost) error {
   204  	driver, err := util.getPortworxDriver(volumeHost)
   205  	if err != nil || driver == nil {
   206  		klog.Errorf("Failed to get portworx driver. Err: %v", err)
   207  		return err
   208  	}
   209  
   210  	vols, err := driver.Inspect([]string{spec.Name()})
   211  	if err != nil {
   212  		return err
   213  	}
   214  
   215  	if len(vols) != 1 {
   216  		return fmt.Errorf("failed to inspect Portworx volume: %s. Found: %d volumes", spec.Name(), len(vols))
   217  	}
   218  
   219  	vol := vols[0]
   220  	tBytes, err := volumehelpers.RoundUpToB(newSize)
   221  	if err != nil {
   222  		return err
   223  	}
   224  	newSizeInBytes := uint64(tBytes)
   225  	if vol.Spec.Size >= newSizeInBytes {
   226  		klog.Infof("Portworx volume: %s already at size: %d greater than or equal to new "+
   227  			"requested size: %d. Skipping resize.", spec.Name(), vol.Spec.Size, newSizeInBytes)
   228  		return nil
   229  	}
   230  
   231  	vol.Spec.Size = newSizeInBytes
   232  	err = driver.Set(spec.Name(), vol.Locator, vol.Spec)
   233  	if err != nil {
   234  		return err
   235  	}
   236  
   237  	// check if the volume's size actually got updated
   238  	vols, err = driver.Inspect([]string{spec.Name()})
   239  	if err != nil {
   240  		return err
   241  	}
   242  
   243  	if len(vols) != 1 {
   244  		return fmt.Errorf("failed to inspect resized Portworx volume: %s. Found: %d volumes", spec.Name(), len(vols))
   245  	}
   246  
   247  	updatedVol := vols[0]
   248  	if updatedVol.Spec.Size < vol.Spec.Size {
   249  		return fmt.Errorf("Portworx volume: %s doesn't match expected size after resize. expected:%v actual:%v",
   250  			spec.Name(), vol.Spec.Size, updatedVol.Spec.Size)
   251  	}
   252  
   253  	return nil
   254  }
   255  
   256  func isClientValid(client *osdclient.Client) (bool, error) {
   257  	if client == nil {
   258  		return false, nil
   259  	}
   260  
   261  	_, err := client.Versions(osdapi.OsdVolumePath)
   262  	if err != nil {
   263  		klog.Errorf("portworx client failed driver versions check. Err: %v", err)
   264  		return false, err
   265  	}
   266  
   267  	return true, nil
   268  }
   269  
   270  func createDriverClient(hostname string, port int32) (*osdclient.Client, error) {
   271  	client, err := volumeclient.NewDriverClient(fmt.Sprintf("http://%s", net.JoinHostPort(hostname, strconv.Itoa(int(port)))),
   272  		pxdDriverName, osdDriverVersion, pxDriverName)
   273  	if err != nil {
   274  		return nil, err
   275  	}
   276  
   277  	isValid, err := isClientValid(client)
   278  	if isValid {
   279  		return client, nil
   280  	}
   281  	return nil, err
   282  }
   283  
   284  // getPortworxDriver returns a Portworx volume driver which can be used for cluster wide operations.
   285  //
   286  //	Operations like create and delete volume don't need to be restricted to local volume host since
   287  //	any node in the Portworx cluster can coordinate the create/delete request and forward the operations to
   288  //	the Portworx node that will own/owns the data.
   289  func (util *portworxVolumeUtil) getPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
   290  	// check if existing saved client is valid
   291  	if isValid, _ := isClientValid(util.portworxClient); isValid {
   292  		return volumeclient.VolumeDriver(util.portworxClient), nil
   293  	}
   294  
   295  	// create new client
   296  	var err error
   297  	util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osdMgmtDefaultPort) // for backward compatibility
   298  	if err != nil || util.portworxClient == nil {
   299  		// Create client from portworx k8s service.
   300  		svc, err := getPortworxService(volumeHost)
   301  		if err != nil {
   302  			return nil, err
   303  		}
   304  
   305  		// The port here is always the default one since  it's the service port
   306  		util.portworxClient, err = createDriverClient(svc.Spec.ClusterIP, osdMgmtDefaultPort)
   307  		if err != nil || util.portworxClient == nil {
   308  			klog.Errorf("Failed to connect to portworx service. Err: %v", err)
   309  			return nil, err
   310  		}
   311  
   312  		klog.Infof("Using portworx cluster service at: %v:%d as api endpoint",
   313  			svc.Spec.ClusterIP, osdMgmtDefaultPort)
   314  	} else {
   315  		klog.Infof("Using portworx service at: %v:%d as api endpoint",
   316  			volumeHost.GetHostName(), osdMgmtDefaultPort)
   317  	}
   318  
   319  	return volumeclient.VolumeDriver(util.portworxClient), nil
   320  }
   321  
   322  // getLocalPortworxDriver returns driver connected to Portworx API server on volume host.
   323  //
   324  //	This is required to force certain operations (mount, unmount, detach, attach) to
   325  //	go to the volume host instead of the k8s service which might route it to any host. This pertains to how
   326  //	Portworx mounts and attaches a volume to the running container. The node getting these requests needs to
   327  //	see the pod container mounts (specifically /var/lib/kubelet/pods/<pod_id>)
   328  func (util *portworxVolumeUtil) getLocalPortworxDriver(volumeHost volume.VolumeHost) (volumeapi.VolumeDriver, error) {
   329  	if util.portworxClient != nil {
   330  		// check if existing saved client is valid
   331  		if isValid, _ := isClientValid(util.portworxClient); isValid {
   332  			return volumeclient.VolumeDriver(util.portworxClient), nil
   333  		}
   334  	}
   335  
   336  	// Lookup port
   337  	svc, err := getPortworxService(volumeHost)
   338  	if err != nil {
   339  		return nil, err
   340  	}
   341  
   342  	osgMgmtPort := lookupPXAPIPortFromService(svc)
   343  	util.portworxClient, err = createDriverClient(volumeHost.GetHostName(), osgMgmtPort)
   344  	if err != nil {
   345  		return nil, err
   346  	}
   347  
   348  	klog.Infof("Using portworx local service at: %v:%d as api endpoint",
   349  		volumeHost.GetHostName(), osgMgmtPort)
   350  	return volumeclient.VolumeDriver(util.portworxClient), nil
   351  }
   352  
   353  // lookupPXAPIPortFromService goes over all the ports in the given service and returns the target
   354  // port for osdMgmtDefaultPort
   355  func lookupPXAPIPortFromService(svc *v1.Service) int32 {
   356  	for _, p := range svc.Spec.Ports {
   357  		if p.Port == osdMgmtDefaultPort {
   358  			return p.TargetPort.IntVal
   359  		}
   360  	}
   361  	return osdMgmtDefaultPort // default
   362  }
   363  
   364  // getPortworxService returns the portworx cluster service from the API server
   365  func getPortworxService(host volume.VolumeHost) (*v1.Service, error) {
   366  	kubeClient := host.GetKubeClient()
   367  	if kubeClient == nil {
   368  		err := fmt.Errorf("failed to get kubeclient when creating portworx client")
   369  		klog.Errorf(err.Error())
   370  		return nil, err
   371  	}
   372  
   373  	opts := metav1.GetOptions{}
   374  	svc, err := kubeClient.CoreV1().Services(api.NamespaceSystem).Get(context.TODO(), pxServiceName, opts)
   375  	if err != nil {
   376  		klog.Errorf("Failed to get service. Err: %v", err)
   377  		return nil, err
   378  	}
   379  
   380  	if svc == nil {
   381  		err = fmt.Errorf("service: %v not found. Consult Portworx docs to deploy it", pxServiceName)
   382  		klog.Errorf(err.Error())
   383  		return nil, err
   384  	}
   385  
   386  	return svc, nil
   387  }
   388  

View as plain text