
Source file src/k8s.io/kubernetes/pkg/volume/csi/csi_client.go

Documentation: k8s.io/kubernetes/pkg/volume/csi

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package csi
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"net"
    25  	"strconv"
    26  	"sync"
    28  	csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/credentials/insecure"
    32  	"google.golang.org/grpc/status"
    33  	api "k8s.io/api/core/v1"
    34  	"k8s.io/apimachinery/pkg/api/resource"
    35  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    36  	"k8s.io/klog/v2"
    37  	"k8s.io/kubernetes/pkg/features"
    38  	"k8s.io/kubernetes/pkg/volume"
    39  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    40  )
    42  type csiClient interface {
    43  	NodeGetInfo(ctx context.Context) (
    44  		nodeID string,
    45  		maxVolumePerNode int64,
    46  		accessibleTopology map[string]string,
    47  		err error)
    49  	// The caller is responsible for checking whether the driver supports
    50  	// applying FSGroup by calling NodeSupportsVolumeMountGroup().
    51  	// If the driver does not, fsGroup must be set to nil.
    52  	NodePublishVolume(
    53  		ctx context.Context,
    54  		volumeid string,
    55  		readOnly bool,
    56  		stagingTargetPath string,
    57  		targetPath string,
    58  		accessMode api.PersistentVolumeAccessMode,
    59  		publishContext map[string]string,
    60  		volumeContext map[string]string,
    61  		secrets map[string]string,
    62  		fsType string,
    63  		mountOptions []string,
    64  		fsGroup *int64,
    65  	) error
    67  	NodeExpandVolume(ctx context.Context, rsOpts csiResizeOptions) (resource.Quantity, error)
    68  	NodeUnpublishVolume(
    69  		ctx context.Context,
    70  		volID string,
    71  		targetPath string,
    72  	) error
    74  	// The caller is responsible for checking whether the driver supports
    75  	// applying FSGroup by calling NodeSupportsVolumeMountGroup().
    76  	// If the driver does not, fsGroup must be set to nil.
    77  	NodeStageVolume(ctx context.Context,
    78  		volID string,
    79  		publishVolumeInfo map[string]string,
    80  		stagingTargetPath string,
    81  		fsType string,
    82  		accessMode api.PersistentVolumeAccessMode,
    83  		secrets map[string]string,
    84  		volumeContext map[string]string,
    85  		mountOptions []string,
    86  		fsGroup *int64,
    87  	) error
    89  	NodeGetVolumeStats(
    90  		ctx context.Context,
    91  		volID string,
    92  		targetPath string,
    93  	) (*volume.Metrics, error)
    94  	NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error
    95  	NodeSupportsStageUnstage(ctx context.Context) (bool, error)
    96  	NodeSupportsNodeExpand(ctx context.Context) (bool, error)
    97  	NodeSupportsVolumeStats(ctx context.Context) (bool, error)
    98  	NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error)
    99  	NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error)
   100  }
   102  // Strongly typed address
   103  type csiAddr string
   105  // Strongly typed driver name
   106  type csiDriverName string
   108  // csiClient encapsulates all csi-plugin methods
   109  type csiDriverClient struct {
   110  	driverName          csiDriverName
   111  	addr                csiAddr
   112  	metricsManager      *MetricsManager
   113  	nodeV1ClientCreator nodeV1ClientCreator
   114  }
   116  type csiResizeOptions struct {
   117  	volumeID          string
   118  	volumePath        string
   119  	stagingTargetPath string
   120  	fsType            string
   121  	accessMode        api.PersistentVolumeAccessMode
   122  	newSize           resource.Quantity
   123  	mountOptions      []string
   124  	secrets           map[string]string
   125  }
   127  var _ csiClient = &csiDriverClient{}
   129  type nodeV1ClientCreator func(addr csiAddr, metricsManager *MetricsManager) (
   130  	nodeClient csipbv1.NodeClient,
   131  	closer io.Closer,
   132  	err error,
   133  )
   135  type nodeV1AccessModeMapper func(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode
   137  // newV1NodeClient creates a new NodeClient with the internally used gRPC
   138  // connection set up. It also returns a closer which must be called to close
   139  // the gRPC connection when the NodeClient is not used anymore.
   140  // This is the default implementation for the nodeV1ClientCreator, used in
   141  // newCsiDriverClient.
   142  func newV1NodeClient(addr csiAddr, metricsManager *MetricsManager) (nodeClient csipbv1.NodeClient, closer io.Closer, err error) {
   143  	var conn *grpc.ClientConn
   144  	conn, err = newGrpcConn(addr, metricsManager)
   145  	if err != nil {
   146  		return nil, nil, err
   147  	}
   149  	nodeClient = csipbv1.NewNodeClient(conn)
   150  	return nodeClient, conn, nil
   151  }
   153  func newCsiDriverClient(driverName csiDriverName) (*csiDriverClient, error) {
   154  	if driverName == "" {
   155  		return nil, fmt.Errorf("driver name is empty")
   156  	}
   158  	existingDriver, driverExists := csiDrivers.Get(string(driverName))
   159  	if !driverExists {
   160  		return nil, fmt.Errorf("driver name %s not found in the list of registered CSI drivers", driverName)
   161  	}
   163  	nodeV1ClientCreator := newV1NodeClient
   164  	return &csiDriverClient{
   165  		driverName:          driverName,
   166  		addr:                csiAddr(existingDriver.endpoint),
   167  		nodeV1ClientCreator: nodeV1ClientCreator,
   168  		metricsManager:      NewCSIMetricsManager(string(driverName)),
   169  	}, nil
   170  }
   172  func (c *csiDriverClient) NodeGetInfo(ctx context.Context) (
   173  	nodeID string,
   174  	maxVolumePerNode int64,
   175  	accessibleTopology map[string]string,
   176  	err error) {
   177  	klog.V(4).InfoS(log("calling NodeGetInfo rpc"))
   179  	var getNodeInfoError error
   180  	nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError = c.nodeGetInfoV1(ctx)
   181  	if getNodeInfoError != nil {
   182  		klog.InfoS("Error calling CSI NodeGetInfo()", "err", getNodeInfoError.Error())
   183  	}
   184  	return nodeID, maxVolumePerNode, accessibleTopology, getNodeInfoError
   185  }
   187  func (c *csiDriverClient) nodeGetInfoV1(ctx context.Context) (
   188  	nodeID string,
   189  	maxVolumePerNode int64,
   190  	accessibleTopology map[string]string,
   191  	err error) {
   193  	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
   194  	if err != nil {
   195  		return "", 0, nil, err
   196  	}
   197  	defer closer.Close()
   199  	res, err := nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
   200  	if err != nil {
   201  		return "", 0, nil, err
   202  	}
   204  	topology := res.GetAccessibleTopology()
   205  	if topology != nil {
   206  		accessibleTopology = topology.Segments
   207  	}
   208  	return res.GetNodeId(), res.GetMaxVolumesPerNode(), accessibleTopology, nil
   209  }
   211  func (c *csiDriverClient) NodePublishVolume(
   212  	ctx context.Context,
   213  	volID string,
   214  	readOnly bool,
   215  	stagingTargetPath string,
   216  	targetPath string,
   217  	accessMode api.PersistentVolumeAccessMode,
   218  	publishContext map[string]string,
   219  	volumeContext map[string]string,
   220  	secrets map[string]string,
   221  	fsType string,
   222  	mountOptions []string,
   223  	fsGroup *int64,
   224  ) error {
   225  	klog.V(4).InfoS(log("calling NodePublishVolume rpc"), "volID", volID, "targetPath", targetPath)
   226  	if volID == "" {
   227  		return errors.New("missing volume id")
   228  	}
   229  	if targetPath == "" {
   230  		return errors.New("missing target path")
   231  	}
   233  	if c.nodeV1ClientCreator == nil {
   234  		return errors.New("failed to call NodePublishVolume. nodeV1ClientCreator is nil")
   235  	}
   237  	accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
   238  	if err != nil {
   239  		return err
   240  	}
   242  	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
   243  	if err != nil {
   244  		return err
   245  	}
   246  	defer closer.Close()
   248  	req := &csipbv1.NodePublishVolumeRequest{
   249  		VolumeId:       volID,
   250  		TargetPath:     targetPath,
   251  		Readonly:       readOnly,
   252  		PublishContext: publishContext,
   253  		VolumeContext:  volumeContext,
   254  		Secrets:        secrets,
   255  		VolumeCapability: &csipbv1.VolumeCapability{
   256  			AccessMode: &csipbv1.VolumeCapability_AccessMode{
   257  				Mode: accessModeMapper(accessMode),
   258  			},
   259  		},
   260  	}
   261  	if stagingTargetPath != "" {
   262  		req.StagingTargetPath = stagingTargetPath
   263  	}
   265  	if fsType == fsTypeBlockName {
   266  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
   267  			Block: &csipbv1.VolumeCapability_BlockVolume{},
   268  		}
   269  	} else {
   270  		mountVolume := &csipbv1.VolumeCapability_MountVolume{
   271  			FsType:     fsType,
   272  			MountFlags: mountOptions,
   273  		}
   274  		if fsGroup != nil {
   275  			mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 /* base */)
   276  		}
   277  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
   278  			Mount: mountVolume,
   279  		}
   280  	}
   282  	_, err = nodeClient.NodePublishVolume(ctx, req)
   283  	if err != nil && !isFinalError(err) {
   284  		return volumetypes.NewUncertainProgressError(err.Error())
   285  	}
   286  	return err
   287  }
   289  func (c *csiDriverClient) NodeExpandVolume(ctx context.Context, opts csiResizeOptions) (resource.Quantity, error) {
   290  	if c.nodeV1ClientCreator == nil {
   291  		return opts.newSize, fmt.Errorf("version of CSI driver does not support volume expansion")
   292  	}
   294  	if opts.volumeID == "" {
   295  		return opts.newSize, errors.New("missing volume id")
   296  	}
   297  	if opts.volumePath == "" {
   298  		return opts.newSize, errors.New("missing volume path")
   299  	}
   301  	if opts.newSize.Value() < 0 {
   302  		return opts.newSize, errors.New("size can not be less than 0")
   303  	}
   305  	accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
   306  	if err != nil {
   307  		return opts.newSize, err
   308  	}
   310  	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
   311  	if err != nil {
   312  		return opts.newSize, err
   313  	}
   314  	defer closer.Close()
   316  	req := &csipbv1.NodeExpandVolumeRequest{
   317  		VolumeId:      opts.volumeID,
   318  		VolumePath:    opts.volumePath,
   319  		CapacityRange: &csipbv1.CapacityRange{RequiredBytes: opts.newSize.Value()},
   320  		VolumeCapability: &csipbv1.VolumeCapability{
   321  			AccessMode: &csipbv1.VolumeCapability_AccessMode{
   322  				Mode: accessModeMapper(opts.accessMode),
   323  			},
   324  		},
   325  		Secrets: opts.secrets,
   326  	}
   328  	// not all CSI drivers support NodeStageUnstage and hence the StagingTargetPath
   329  	// should only be set when available
   330  	if opts.stagingTargetPath != "" {
   331  		req.StagingTargetPath = opts.stagingTargetPath
   332  	}
   334  	if opts.fsType == fsTypeBlockName {
   335  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
   336  			Block: &csipbv1.VolumeCapability_BlockVolume{},
   337  		}
   338  	} else {
   339  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
   340  			Mount: &csipbv1.VolumeCapability_MountVolume{
   341  				FsType:     opts.fsType,
   342  				MountFlags: opts.mountOptions,
   343  			},
   344  		}
   345  	}
   347  	resp, err := nodeClient.NodeExpandVolume(ctx, req)
   348  	if err != nil {
   349  		if !isFinalError(err) {
   350  			return opts.newSize, volumetypes.NewUncertainProgressError(err.Error())
   351  		}
   352  		return opts.newSize, err
   353  	}
   355  	updatedQuantity := resource.NewQuantity(resp.CapacityBytes, resource.BinarySI)
   356  	return *updatedQuantity, nil
   357  }
   359  func (c *csiDriverClient) NodeUnpublishVolume(ctx context.Context, volID string, targetPath string) error {
   360  	klog.V(4).InfoS(log("calling NodeUnpublishVolume rpc"), "volID", volID, "targetPath", targetPath)
   361  	if volID == "" {
   362  		return errors.New("missing volume id")
   363  	}
   364  	if targetPath == "" {
   365  		return errors.New("missing target path")
   366  	}
   367  	if c.nodeV1ClientCreator == nil {
   368  		return errors.New("nodeV1ClientCreate is nil")
   369  	}
   371  	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
   372  	if err != nil {
   373  		return err
   374  	}
   375  	defer closer.Close()
   377  	req := &csipbv1.NodeUnpublishVolumeRequest{
   378  		VolumeId:   volID,
   379  		TargetPath: targetPath,
   380  	}
   382  	_, err = nodeClient.NodeUnpublishVolume(ctx, req)
   383  	return err
   384  }
   386  func (c *csiDriverClient) NodeStageVolume(ctx context.Context,
   387  	volID string,
   388  	publishContext map[string]string,
   389  	stagingTargetPath string,
   390  	fsType string,
   391  	accessMode api.PersistentVolumeAccessMode,
   392  	secrets map[string]string,
   393  	volumeContext map[string]string,
   394  	mountOptions []string,
   395  	fsGroup *int64,
   396  ) error {
   397  	klog.V(4).InfoS(log("calling NodeStageVolume rpc"), "volID", volID, "stagingTargetPath", stagingTargetPath)
   398  	if volID == "" {
   399  		return errors.New("missing volume id")
   400  	}
   401  	if stagingTargetPath == "" {
   402  		return errors.New("missing staging target path")
   403  	}
   404  	if c.nodeV1ClientCreator == nil {
   405  		return errors.New("nodeV1ClientCreate is nil")
   406  	}
   408  	accessModeMapper, err := c.getNodeV1AccessModeMapper(ctx)
   409  	if err != nil {
   410  		return err
   411  	}
   413  	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
   414  	if err != nil {
   415  		return err
   416  	}
   417  	defer closer.Close()
   419  	req := &csipbv1.NodeStageVolumeRequest{
   420  		VolumeId:          volID,
   421  		PublishContext:    publishContext,
   422  		StagingTargetPath: stagingTargetPath,
   423  		VolumeCapability: &csipbv1.VolumeCapability{
   424  			AccessMode: &csipbv1.VolumeCapability_AccessMode{
   425  				Mode: accessModeMapper(accessMode),
   426  			},
   427  		},
   428  		Secrets:       secrets,
   429  		VolumeContext: volumeContext,
   430  	}
   432  	if fsType == fsTypeBlockName {
   433  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Block{
   434  			Block: &csipbv1.VolumeCapability_BlockVolume{},
   435  		}
   436  	} else {
   437  		mountVolume := &csipbv1.VolumeCapability_MountVolume{
   438  			FsType:     fsType,
   439  			MountFlags: mountOptions,
   440  		}
   441  		if fsGroup != nil {
   442  			mountVolume.VolumeMountGroup = strconv.FormatInt(*fsGroup, 10 /* base */)
   443  		}
   444  		req.VolumeCapability.AccessType = &csipbv1.VolumeCapability_Mount{
   445  			Mount: mountVolume,
   446  		}
   447  	}
   449  	_, err = nodeClient.NodeStageVolume(ctx, req)
   450  	if err != nil && !isFinalError(err) {
   451  		return volumetypes.NewUncertainProgressError(err.Error())
   452  	}
   453  	return err
   454  }
   456  func (c *csiDriverClient) NodeUnstageVolume(ctx context.Context, volID, stagingTargetPath string) error {
   457  	klog.V(4).InfoS(log("calling NodeUnstageVolume rpc"), "volID", volID, "stagingTargetPath", stagingTargetPath)
   458  	if volID == "" {
   459  		return errors.New("missing volume id")
   460  	}
   461  	if stagingTargetPath == "" {
   462  		return errors.New("missing staging target path")
   463  	}
   464  	if c.nodeV1ClientCreator == nil {
   465  		return errors.New("nodeV1ClientCreate is nil")
   466  	}
   468  	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
   469  	if err != nil {
   470  		return err
   471  	}
   472  	defer closer.Close()
   474  	req := &csipbv1.NodeUnstageVolumeRequest{
   475  		VolumeId:          volID,
   476  		StagingTargetPath: stagingTargetPath,
   477  	}
   478  	_, err = nodeClient.NodeUnstageVolume(ctx, req)
   479  	return err
   480  }
   482  func (c *csiDriverClient) NodeSupportsNodeExpand(ctx context.Context) (bool, error) {
   483  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_EXPAND_VOLUME)
   484  }
   486  func (c *csiDriverClient) NodeSupportsStageUnstage(ctx context.Context) (bool, error) {
   487  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME)
   488  }
   490  func (c *csiDriverClient) getNodeV1AccessModeMapper(ctx context.Context) (nodeV1AccessModeMapper, error) {
   491  	supported, err := c.NodeSupportsSingleNodeMultiWriterAccessMode(ctx)
   492  	if err != nil {
   493  		return nil, err
   494  	}
   495  	if supported {
   496  		return asSingleNodeMultiWriterCapableCSIAccessModeV1, nil
   497  	}
   498  	return asCSIAccessModeV1, nil
   499  }
   501  func asCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
   502  	switch am {
   503  	case api.ReadWriteOnce:
   504  		return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
   505  	case api.ReadOnlyMany:
   506  		return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
   507  	case api.ReadWriteMany:
   508  		return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
   509  	// This mapping exists to enable CSI drivers that lack the
   510  	// SINGLE_NODE_MULTI_WRITER capability to work with the
   511  	// ReadWriteOncePod access mode.
   512  	case api.ReadWriteOncePod:
   513  		return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
   514  	}
   515  	return csipbv1.VolumeCapability_AccessMode_UNKNOWN
   516  }
   518  func asSingleNodeMultiWriterCapableCSIAccessModeV1(am api.PersistentVolumeAccessMode) csipbv1.VolumeCapability_AccessMode_Mode {
   519  	switch am {
   520  	case api.ReadWriteOnce:
   521  		return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER
   522  	case api.ReadOnlyMany:
   523  		return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
   524  	case api.ReadWriteMany:
   525  		return csipbv1.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
   526  	case api.ReadWriteOncePod:
   527  		return csipbv1.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER
   528  	}
   529  	return csipbv1.VolumeCapability_AccessMode_UNKNOWN
   530  }
   532  func newGrpcConn(addr csiAddr, metricsManager *MetricsManager) (*grpc.ClientConn, error) {
   533  	network := "unix"
   534  	klog.V(4).InfoS(log("creating new gRPC connection"), "protocol", network, "endpoint", addr)
   536  	return grpc.Dial(
   537  		string(addr),
   538  		grpc.WithAuthority("localhost"),
   539  		grpc.WithTransportCredentials(insecure.NewCredentials()),
   540  		grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
   541  			return (&net.Dialer{}).DialContext(ctx, network, target)
   542  		}),
   543  		grpc.WithChainUnaryInterceptor(metricsManager.RecordMetricsInterceptor),
   544  	)
   545  }
   547  // CSI client getter with cache.
   548  // This provides a method to initialize CSI client with driver name and caches
   549  // it for later use. When CSI clients have not been discovered yet (e.g.
   550  // on kubelet restart), client initialization will fail. Users of CSI client (e.g.
   551  // mounter manager and block mapper) can use this to delay CSI client
   552  // initialization until needed.
   553  type csiClientGetter struct {
   554  	sync.RWMutex
   555  	csiClient  csiClient
   556  	driverName csiDriverName
   557  }
   559  func (c *csiClientGetter) Get() (csiClient, error) {
   560  	c.RLock()
   561  	if c.csiClient != nil {
   562  		c.RUnlock()
   563  		return c.csiClient, nil
   564  	}
   565  	c.RUnlock()
   566  	c.Lock()
   567  	defer c.Unlock()
   568  	// Double-checking locking criterion.
   569  	if c.csiClient != nil {
   570  		return c.csiClient, nil
   571  	}
   572  	csi, err := newCsiDriverClient(c.driverName)
   573  	if err != nil {
   574  		return nil, err
   575  	}
   576  	c.csiClient = csi
   577  	return c.csiClient, nil
   578  }
   580  func (c *csiDriverClient) NodeSupportsVolumeStats(ctx context.Context) (bool, error) {
   581  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_GET_VOLUME_STATS)
   582  }
   584  func (c *csiDriverClient) NodeSupportsSingleNodeMultiWriterAccessMode(ctx context.Context) (bool, error) {
   585  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER)
   586  }
   588  func (c *csiDriverClient) NodeGetVolumeStats(ctx context.Context, volID string, targetPath string) (*volume.Metrics, error) {
   589  	klog.V(4).InfoS(log("calling NodeGetVolumeStats rpc"), "volID", volID, "targetPath", targetPath)
   590  	if volID == "" {
   591  		return nil, errors.New("missing volume id")
   592  	}
   593  	if targetPath == "" {
   594  		return nil, errors.New("missing target path")
   595  	}
   596  	if c.nodeV1ClientCreator == nil {
   597  		return nil, errors.New("nodeV1ClientCreate is nil")
   598  	}
   600  	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
   601  	if err != nil {
   602  		return nil, err
   603  	}
   604  	defer closer.Close()
   606  	req := &csipbv1.NodeGetVolumeStatsRequest{
   607  		VolumeId:   volID,
   608  		VolumePath: targetPath,
   609  	}
   611  	resp, err := nodeClient.NodeGetVolumeStats(ctx, req)
   612  	if err != nil {
   613  		return nil, err
   614  	}
   615  	usages := resp.GetUsage()
   616  	if usages == nil {
   617  		return nil, fmt.Errorf("failed to get usage from response. usage is nil")
   618  	}
   619  	metrics := &volume.Metrics{
   620  		Used:       resource.NewQuantity(int64(0), resource.BinarySI),
   621  		Capacity:   resource.NewQuantity(int64(0), resource.BinarySI),
   622  		Available:  resource.NewQuantity(int64(0), resource.BinarySI),
   623  		InodesUsed: resource.NewQuantity(int64(0), resource.BinarySI),
   624  		Inodes:     resource.NewQuantity(int64(0), resource.BinarySI),
   625  		InodesFree: resource.NewQuantity(int64(0), resource.BinarySI),
   626  	}
   628  	if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
   629  		isSupportNodeVolumeCondition, err := c.nodeSupportsVolumeCondition(ctx)
   630  		if err != nil {
   631  			return nil, err
   632  		}
   634  		if isSupportNodeVolumeCondition {
   635  			abnormal, message := resp.VolumeCondition.GetAbnormal(), resp.VolumeCondition.GetMessage()
   636  			metrics.Abnormal, metrics.Message = &abnormal, &message
   637  		}
   638  	}
   640  	for _, usage := range usages {
   641  		if usage == nil {
   642  			continue
   643  		}
   644  		unit := usage.GetUnit()
   645  		switch unit {
   646  		case csipbv1.VolumeUsage_BYTES:
   647  			metrics.Available = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
   648  			metrics.Capacity = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
   649  			metrics.Used = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
   650  		case csipbv1.VolumeUsage_INODES:
   651  			metrics.InodesFree = resource.NewQuantity(usage.GetAvailable(), resource.BinarySI)
   652  			metrics.Inodes = resource.NewQuantity(usage.GetTotal(), resource.BinarySI)
   653  			metrics.InodesUsed = resource.NewQuantity(usage.GetUsed(), resource.BinarySI)
   654  		default:
   655  			klog.ErrorS(nil, "unknown unit in VolumeUsage", "unit", unit.String())
   656  		}
   658  	}
   659  	return metrics, nil
   660  }
   662  func (c *csiDriverClient) nodeSupportsVolumeCondition(ctx context.Context) (bool, error) {
   663  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_CONDITION)
   664  }
   666  func (c *csiDriverClient) NodeSupportsVolumeMountGroup(ctx context.Context) (bool, error) {
   667  	return c.nodeSupportsCapability(ctx, csipbv1.NodeServiceCapability_RPC_VOLUME_MOUNT_GROUP)
   668  }
   670  func (c *csiDriverClient) nodeSupportsCapability(ctx context.Context, capabilityType csipbv1.NodeServiceCapability_RPC_Type) (bool, error) {
   671  	klog.V(4).Info(log("calling NodeGetCapabilities rpc to determine if the node service has %s capability", capabilityType))
   672  	capabilities, err := c.nodeGetCapabilities(ctx)
   673  	if err != nil {
   674  		return false, err
   675  	}
   677  	for _, capability := range capabilities {
   678  		if capability == nil || capability.GetRpc() == nil {
   679  			continue
   680  		}
   681  		if capability.GetRpc().GetType() == capabilityType {
   682  			return true, nil
   683  		}
   684  	}
   685  	return false, nil
   686  }
   688  func (c *csiDriverClient) nodeGetCapabilities(ctx context.Context) ([]*csipbv1.NodeServiceCapability, error) {
   689  	if c.nodeV1ClientCreator == nil {
   690  		return []*csipbv1.NodeServiceCapability{}, errors.New("nodeV1ClientCreate is nil")
   691  	}
   693  	nodeClient, closer, err := c.nodeV1ClientCreator(c.addr, c.metricsManager)
   694  	if err != nil {
   695  		return []*csipbv1.NodeServiceCapability{}, err
   696  	}
   697  	defer closer.Close()
   699  	req := &csipbv1.NodeGetCapabilitiesRequest{}
   700  	resp, err := nodeClient.NodeGetCapabilities(ctx, req)
   701  	if err != nil {
   702  		return []*csipbv1.NodeServiceCapability{}, err
   703  	}
   704  	return resp.GetCapabilities(), nil
   705  }
   707  func isFinalError(err error) bool {
   708  	// Sources:
   709  	// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
   710  	// https://github.com/container-storage-interface/spec/blob/master/spec.md
   711  	st, ok := status.FromError(err)
   712  	if !ok {
   713  		// This is not gRPC error. The operation must have failed before gRPC
   714  		// method was called, otherwise we would get gRPC error.
   715  		// We don't know if any previous volume operation is in progress, be on the safe side.
   716  		return false
   717  	}
   718  	switch st.Code() {
   719  	case codes.Canceled, // gRPC: Client Application cancelled the request
   720  		codes.DeadlineExceeded,  // gRPC: Timeout
   721  		codes.Unavailable,       // gRPC: Server shutting down, TCP connection broken - previous volume operation may be still in progress.
   722  		codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous volume operation may be still in progress.
   723  		codes.Aborted:           // CSI: Operation pending for volume
   724  		return false
   725  	}
   726  	// All other errors mean that operation either did not
   727  	// even start or failed. It is for sure not in progress.
   728  	return true
   729  }

View as plain text