...

Source file src/k8s.io/kubernetes/pkg/volume/csi/csi_util.go

Documentation: k8s.io/kubernetes/pkg/volume/csi

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package csi
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"errors"
    23  	"fmt"
    24  	"os"
    25  	"path/filepath"
    26  	"strconv"
    27  	"time"
    28  
    29  	api "k8s.io/api/core/v1"
    30  	storage "k8s.io/api/storage/v1"
    31  	meta "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/client-go/kubernetes"
    33  	"k8s.io/klog/v2"
    34  	"k8s.io/kubernetes/pkg/volume"
    35  	utilstrings "k8s.io/utils/strings"
    36  )
    37  
    38  const (
    39  	// TestInformerSyncPeriod is informer sync period duration for testing
    40  	TestInformerSyncPeriod = 100 * time.Millisecond
    41  	// TestInformerSyncTimeout is informer timeout duration for testing
    42  	TestInformerSyncTimeout = 30 * time.Second
    43  )
    44  
    45  func getCredentialsFromSecret(k8s kubernetes.Interface, secretRef *api.SecretReference) (map[string]string, error) {
    46  	credentials := map[string]string{}
    47  	secret, err := k8s.CoreV1().Secrets(secretRef.Namespace).Get(context.TODO(), secretRef.Name, meta.GetOptions{})
    48  	if err != nil {
    49  		return credentials, errors.New(log("failed to find the secret %s in the namespace %s with error: %v", secretRef.Name, secretRef.Namespace, err))
    50  	}
    51  	for key, value := range secret.Data {
    52  		credentials[key] = string(value)
    53  	}
    54  
    55  	return credentials, nil
    56  }
    57  
    58  // saveVolumeData persists parameter data as json file at the provided location
    59  func saveVolumeData(dir string, fileName string, data map[string]string) error {
    60  	dataFilePath := filepath.Join(dir, fileName)
    61  	klog.V(4).Info(log("saving volume data file [%s]", dataFilePath))
    62  	file, err := os.Create(dataFilePath)
    63  	if err != nil {
    64  		return errors.New(log("failed to save volume data file %s: %v", dataFilePath, err))
    65  	}
    66  	defer file.Close()
    67  	if err := json.NewEncoder(file).Encode(data); err != nil {
    68  		return errors.New(log("failed to save volume data file %s: %v", dataFilePath, err))
    69  	}
    70  	klog.V(4).Info(log("volume data file saved successfully [%s]", dataFilePath))
    71  	return nil
    72  }
    73  
    74  // loadVolumeData loads volume info from specified json file/location
    75  func loadVolumeData(dir string, fileName string) (map[string]string, error) {
    76  	// remove /mount at the end
    77  	dataFileName := filepath.Join(dir, fileName)
    78  	klog.V(4).Info(log("loading volume data file [%s]", dataFileName))
    79  
    80  	file, err := os.Open(dataFileName)
    81  	if err != nil {
    82  		return nil, fmt.Errorf("%s: %w", log("failed to open volume data file [%s]", dataFileName), err)
    83  	}
    84  	defer file.Close()
    85  	data := map[string]string{}
    86  	if err := json.NewDecoder(file).Decode(&data); err != nil {
    87  		return nil, errors.New(log("failed to parse volume data file [%s]: %v", dataFileName, err))
    88  	}
    89  
    90  	return data, nil
    91  }
    92  
    93  func getCSISourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) {
    94  	return getPVSourceFromSpec(spec)
    95  }
    96  
    97  func getReadOnlyFromSpec(spec *volume.Spec) (bool, error) {
    98  	if spec.PersistentVolume != nil &&
    99  		spec.PersistentVolume.Spec.CSI != nil {
   100  		return spec.ReadOnly, nil
   101  	}
   102  
   103  	return false, fmt.Errorf("CSIPersistentVolumeSource not defined in spec")
   104  }
   105  
   106  // log prepends log string with `kubernetes.io/csi`
   107  func log(msg string, parts ...interface{}) string {
   108  	return fmt.Sprintf(fmt.Sprintf("%s: %s", CSIPluginName, msg), parts...)
   109  }
   110  
   111  // getVolumePluginDir returns the path where CSI plugin keeps metadata for given volume
   112  func getVolumePluginDir(specVolID string, host volume.VolumeHost) string {
   113  	sanitizedSpecVolID := utilstrings.EscapeQualifiedName(specVolID)
   114  	return filepath.Join(host.GetVolumeDevicePluginDir(CSIPluginName), sanitizedSpecVolID)
   115  }
   116  
   117  // getVolumeDevicePluginDir returns the path where the CSI plugin keeps the
   118  // symlink for a block device associated with a given specVolumeID.
   119  // path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/dev
   120  func getVolumeDevicePluginDir(specVolID string, host volume.VolumeHost) string {
   121  	return filepath.Join(getVolumePluginDir(specVolID, host), "dev")
   122  }
   123  
   124  // getVolumeDeviceDataDir returns the path where the CSI plugin keeps the
   125  // volume data for a block device associated with a given specVolumeID.
   126  // path: plugins/kubernetes.io/csi/volumeDevices/{specVolumeID}/data
   127  func getVolumeDeviceDataDir(specVolID string, host volume.VolumeHost) string {
   128  	return filepath.Join(getVolumePluginDir(specVolID, host), "data")
   129  }
   130  
   131  // hasReadWriteOnce returns true if modes contains v1.ReadWriteOnce
   132  func hasReadWriteOnce(modes []api.PersistentVolumeAccessMode) bool {
   133  	if modes == nil {
   134  		return false
   135  	}
   136  	for _, mode := range modes {
   137  		if mode == api.ReadWriteOnce {
   138  			return true
   139  		}
   140  	}
   141  	return false
   142  }
   143  
   144  // getSourceFromSpec returns either CSIVolumeSource or CSIPersistentVolumeSource, but not both
   145  func getSourceFromSpec(spec *volume.Spec) (*api.CSIVolumeSource, *api.CSIPersistentVolumeSource, error) {
   146  	if spec == nil {
   147  		return nil, nil, fmt.Errorf("volume.Spec nil")
   148  	}
   149  	if spec.Volume != nil && spec.PersistentVolume != nil {
   150  		return nil, nil, fmt.Errorf("volume.Spec has both volume and persistent volume sources")
   151  	}
   152  	if spec.Volume != nil && spec.Volume.CSI != nil {
   153  		return spec.Volume.CSI, nil, nil
   154  	}
   155  	if spec.PersistentVolume != nil &&
   156  		spec.PersistentVolume.Spec.CSI != nil {
   157  		return nil, spec.PersistentVolume.Spec.CSI, nil
   158  	}
   159  
   160  	return nil, nil, fmt.Errorf("volume source not found in volume.Spec")
   161  }
   162  
   163  // getPVSourceFromSpec ensures only CSIPersistentVolumeSource is present in volume.Spec
   164  func getPVSourceFromSpec(spec *volume.Spec) (*api.CSIPersistentVolumeSource, error) {
   165  	volSrc, pvSrc, err := getSourceFromSpec(spec)
   166  	if err != nil {
   167  		return nil, err
   168  	}
   169  	if volSrc != nil {
   170  		return nil, fmt.Errorf("unexpected api.CSIVolumeSource found in volume.Spec")
   171  	}
   172  	return pvSrc, nil
   173  }
   174  
   175  // GetCSIMounterPath returns the mounter path given the base path.
   176  func GetCSIMounterPath(path string) string {
   177  	return filepath.Join(path, "/mount")
   178  }
   179  
   180  // GetCSIDriverName returns the csi driver name
   181  func GetCSIDriverName(spec *volume.Spec) (string, error) {
   182  	volSrc, pvSrc, err := getSourceFromSpec(spec)
   183  	if err != nil {
   184  		return "", err
   185  	}
   186  
   187  	switch {
   188  	case volSrc != nil:
   189  		return volSrc.Driver, nil
   190  	case pvSrc != nil:
   191  		return pvSrc.Driver, nil
   192  	default:
   193  		return "", errors.New(log("volume source not found in volume.Spec"))
   194  	}
   195  }
   196  
   197  func createCSIOperationContext(volumeSpec *volume.Spec, timeout time.Duration) (context.Context, context.CancelFunc) {
   198  	migrated := false
   199  	if volumeSpec != nil {
   200  		migrated = volumeSpec.Migrated
   201  	}
   202  	ctx := context.WithValue(context.Background(), additionalInfoKey, additionalInfo{Migrated: strconv.FormatBool(migrated)})
   203  	return context.WithTimeout(ctx, timeout)
   204  }
   205  
   206  // getPodInfoAttrs returns pod info for NodePublish
   207  func getPodInfoAttrs(pod *api.Pod, volumeMode storage.VolumeLifecycleMode) map[string]string {
   208  	attrs := map[string]string{
   209  		"csi.storage.k8s.io/pod.name":            pod.Name,
   210  		"csi.storage.k8s.io/pod.namespace":       pod.Namespace,
   211  		"csi.storage.k8s.io/pod.uid":             string(pod.UID),
   212  		"csi.storage.k8s.io/serviceAccount.name": pod.Spec.ServiceAccountName,
   213  		"csi.storage.k8s.io/ephemeral":           strconv.FormatBool(volumeMode == storage.VolumeLifecycleEphemeral),
   214  	}
   215  	return attrs
   216  }
   217  

View as plain text