...

Source file src/k8s.io/kubernetes/pkg/volume/cephfs/cephfs.go

Documentation: k8s.io/kubernetes/pkg/volume/cephfs

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cephfs
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"os"
    23  	"os/exec"
    24  	"path/filepath"
    25  	"runtime"
    26  	"strings"
    27  
    28  	"k8s.io/klog/v2"
    29  	"k8s.io/mount-utils"
    30  	utilstrings "k8s.io/utils/strings"
    31  
    32  	v1 "k8s.io/api/core/v1"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/types"
    35  	"k8s.io/kubernetes/pkg/volume"
    36  	"k8s.io/kubernetes/pkg/volume/util"
    37  )
    38  
    39  // ProbeVolumePlugins is the primary entrypoint for volume plugins.
    40  func ProbeVolumePlugins() []volume.VolumePlugin {
    41  	return []volume.VolumePlugin{&cephfsPlugin{nil}}
    42  }
    43  
    44  type cephfsPlugin struct {
    45  	host volume.VolumeHost
    46  }
    47  
    48  var _ volume.VolumePlugin = &cephfsPlugin{}
    49  
    50  const (
    51  	cephfsPluginName = "kubernetes.io/cephfs"
    52  )
    53  
    54  func (plugin *cephfsPlugin) Init(host volume.VolumeHost) error {
    55  	plugin.host = host
    56  	return nil
    57  }
    58  
    59  func (plugin *cephfsPlugin) GetPluginName() string {
    60  	return cephfsPluginName
    61  }
    62  
    63  func (plugin *cephfsPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
    64  	mon, _, _, _, _, err := getVolumeSource(spec)
    65  	if err != nil {
    66  		return "", err
    67  	}
    68  
    69  	return fmt.Sprintf("%v", mon), nil
    70  }
    71  
    72  func (plugin *cephfsPlugin) CanSupport(spec *volume.Spec) bool {
    73  	return (spec.Volume != nil && spec.Volume.CephFS != nil) || (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CephFS != nil)
    74  }
    75  
    76  func (plugin *cephfsPlugin) RequiresRemount(spec *volume.Spec) bool {
    77  	return false
    78  }
    79  
    80  func (plugin *cephfsPlugin) SupportsMountOption() bool {
    81  	return true
    82  }
    83  
    84  func (plugin *cephfsPlugin) SupportsBulkVolumeVerification() bool {
    85  	return false
    86  }
    87  
    88  func (plugin *cephfsPlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) {
    89  	return false, nil
    90  }
    91  
    92  func (plugin *cephfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
    93  	return []v1.PersistentVolumeAccessMode{
    94  		v1.ReadWriteOnce,
    95  		v1.ReadOnlyMany,
    96  		v1.ReadWriteMany,
    97  	}
    98  }
    99  
   100  func (plugin *cephfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
   101  	secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
   102  	if err != nil {
   103  		return nil, err
   104  	}
   105  	secret := ""
   106  	if len(secretName) > 0 && len(secretNs) > 0 {
   107  		// if secret is provideded, retrieve it
   108  		kubeClient := plugin.host.GetKubeClient()
   109  		if kubeClient == nil {
   110  			return nil, fmt.Errorf("cannot get kube client")
   111  		}
   112  		secrets, err := kubeClient.CoreV1().Secrets(secretNs).Get(context.TODO(), secretName, metav1.GetOptions{})
   113  		if err != nil {
   114  			err = fmt.Errorf("couldn't get secret %v/%v err: %w", secretNs, secretName, err)
   115  			return nil, err
   116  		}
   117  		for name, data := range secrets.Data {
   118  			secret = string(data)
   119  			klog.V(4).Infof("found ceph secret info: %s", name)
   120  		}
   121  	}
   122  	return plugin.newMounterInternal(spec, pod.UID, plugin.host.GetMounter(plugin.GetPluginName()), secret)
   123  }
   124  
   125  func (plugin *cephfsPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, mounter mount.Interface, secret string) (volume.Mounter, error) {
   126  	mon, path, id, secretFile, readOnly, err := getVolumeSource(spec)
   127  	if err != nil {
   128  		return nil, err
   129  	}
   130  
   131  	if id == "" {
   132  		id = "admin"
   133  	}
   134  	if path == "" {
   135  		path = "/"
   136  	}
   137  	if !strings.HasPrefix(path, "/") {
   138  		path = "/" + path
   139  	}
   140  
   141  	if secretFile == "" {
   142  		secretFile = "/etc/ceph/" + id + ".secret"
   143  	}
   144  
   145  	return &cephfsMounter{
   146  		cephfs: &cephfs{
   147  			podUID:       podUID,
   148  			volName:      spec.Name(),
   149  			mon:          mon,
   150  			path:         path,
   151  			secret:       secret,
   152  			id:           id,
   153  			secretFile:   secretFile,
   154  			readonly:     readOnly,
   155  			mounter:      mounter,
   156  			plugin:       plugin,
   157  			mountOptions: util.MountOptionFromSpec(spec),
   158  		},
   159  	}, nil
   160  }
   161  
   162  func (plugin *cephfsPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
   163  	return plugin.newUnmounterInternal(volName, podUID, plugin.host.GetMounter(plugin.GetPluginName()))
   164  }
   165  
   166  func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UID, mounter mount.Interface) (volume.Unmounter, error) {
   167  	return &cephfsUnmounter{
   168  		cephfs: &cephfs{
   169  			podUID:  podUID,
   170  			volName: volName,
   171  			mounter: mounter,
   172  			plugin:  plugin},
   173  	}, nil
   174  }
   175  
   176  func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
   177  	cephfsVolume := &v1.Volume{
   178  		Name: volumeName,
   179  		VolumeSource: v1.VolumeSource{
   180  			CephFS: &v1.CephFSVolumeSource{
   181  				Monitors: []string{},
   182  				Path:     mountPath,
   183  			},
   184  		},
   185  	}
   186  	return volume.ReconstructedVolume{
   187  		Spec: volume.NewSpecFromVolume(cephfsVolume),
   188  	}, nil
   189  }
   190  
   191  // CephFS volumes represent a bare host file or directory mount of an CephFS export.
   192  type cephfs struct {
   193  	volName    string
   194  	podUID     types.UID
   195  	mon        []string
   196  	path       string
   197  	id         string
   198  	secret     string `datapolicy:"token"`
   199  	secretFile string
   200  	readonly   bool
   201  	mounter    mount.Interface
   202  	plugin     *cephfsPlugin
   203  	volume.MetricsNil
   204  	mountOptions []string
   205  }
   206  
   207  type cephfsMounter struct {
   208  	*cephfs
   209  }
   210  
   211  var _ volume.Mounter = &cephfsMounter{}
   212  
   213  func (cephfsVolume *cephfsMounter) GetAttributes() volume.Attributes {
   214  	return volume.Attributes{
   215  		ReadOnly:       cephfsVolume.readonly,
   216  		Managed:        false,
   217  		SELinuxRelabel: false,
   218  	}
   219  }
   220  
   221  // SetUp attaches the disk and bind mounts to the volume path.
   222  func (cephfsVolume *cephfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
   223  	return cephfsVolume.SetUpAt(cephfsVolume.GetPath(), mounterArgs)
   224  }
   225  
   226  // SetUpAt attaches the disk and bind mounts to the volume path.
   227  func (cephfsVolume *cephfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
   228  	notMnt, err := cephfsVolume.mounter.IsLikelyNotMountPoint(dir)
   229  	klog.V(4).Infof("CephFS mount set up: %s %v %v", dir, !notMnt, err)
   230  	if err != nil && !os.IsNotExist(err) {
   231  		return err
   232  	}
   233  	if !notMnt {
   234  		return nil
   235  	}
   236  
   237  	if err := os.MkdirAll(dir, 0750); err != nil {
   238  		return err
   239  	}
   240  
   241  	// check whether it belongs to fuse, if not, default to use kernel mount.
   242  	if cephfsVolume.checkFuseMount() {
   243  		klog.V(4).Info("CephFS fuse mount.")
   244  		err = cephfsVolume.execFuseMount(dir)
   245  		// cleanup no matter if fuse mount fail.
   246  		keyringPath := cephfsVolume.GetKeyringPath()
   247  		_, StatErr := os.Stat(keyringPath)
   248  		if !os.IsNotExist(StatErr) {
   249  			os.RemoveAll(keyringPath)
   250  		}
   251  		if err == nil {
   252  			// cephfs fuse mount succeeded.
   253  			return nil
   254  		}
   255  		// if cephfs fuse mount failed, fallback to kernel mount.
   256  		klog.V(2).Infof("CephFS fuse mount failed: %v, fallback to kernel mount.", err)
   257  
   258  	}
   259  	klog.V(4).Info("CephFS kernel mount.")
   260  
   261  	err = cephfsVolume.execMount(dir)
   262  	if err != nil {
   263  		// cleanup upon failure.
   264  		mount.CleanupMountPoint(dir, cephfsVolume.mounter, false)
   265  		return err
   266  	}
   267  	return nil
   268  }
   269  
   270  type cephfsUnmounter struct {
   271  	*cephfs
   272  }
   273  
   274  var _ volume.Unmounter = &cephfsUnmounter{}
   275  
   276  // TearDown unmounts the bind mount
   277  func (cephfsVolume *cephfsUnmounter) TearDown() error {
   278  	return cephfsVolume.TearDownAt(cephfsVolume.GetPath())
   279  }
   280  
   281  // TearDownAt unmounts the bind mount
   282  func (cephfsVolume *cephfsUnmounter) TearDownAt(dir string) error {
   283  	return mount.CleanupMountPoint(dir, cephfsVolume.mounter, false)
   284  }
   285  
   286  // GetPath creates global mount path
   287  func (cephfsVolume *cephfs) GetPath() string {
   288  	name := cephfsPluginName
   289  	return cephfsVolume.plugin.host.GetPodVolumeDir(cephfsVolume.podUID, utilstrings.EscapeQualifiedName(name), cephfsVolume.volName)
   290  }
   291  
   292  // GetKeyringPath creates cephfuse keyring path
   293  func (cephfsVolume *cephfs) GetKeyringPath() string {
   294  	name := cephfsPluginName
   295  	volumeDir := cephfsVolume.plugin.host.GetPodVolumeDir(cephfsVolume.podUID, utilstrings.EscapeQualifiedName(name), cephfsVolume.volName)
   296  	volumeKeyringDir := volumeDir + "~keyring"
   297  	return volumeKeyringDir
   298  }
   299  
   300  func (cephfsVolume *cephfs) execMount(mountpoint string) error {
   301  	// cephfs mount option
   302  	cephSensitiveOpt := []string{"name=" + cephfsVolume.id}
   303  	// override secretfile if secret is provided
   304  	if cephfsVolume.secret != "" {
   305  		cephSensitiveOpt = append(cephSensitiveOpt, "secret="+cephfsVolume.secret)
   306  	} else {
   307  		cephSensitiveOpt = append(cephSensitiveOpt, "secretfile="+cephfsVolume.secretFile)
   308  	}
   309  	// build option array
   310  	opt := []string{}
   311  	if cephfsVolume.readonly {
   312  		opt = append(opt, "ro")
   313  	}
   314  
   315  	// build src like mon1:6789,mon2:6789,mon3:6789:/
   316  	src := strings.Join(cephfsVolume.mon, ",") + ":" + cephfsVolume.path
   317  
   318  	opt = util.JoinMountOptions(cephfsVolume.mountOptions, opt)
   319  	if err := cephfsVolume.mounter.MountSensitive(src, mountpoint, "ceph", opt, cephSensitiveOpt); err != nil {
   320  		return fmt.Errorf("CephFS: mount failed: %v", err)
   321  	}
   322  
   323  	return nil
   324  }
   325  
   326  func (cephfsVolume *cephfsMounter) checkFuseMount() bool {
   327  	execute := cephfsVolume.plugin.host.GetExec(cephfsVolume.plugin.GetPluginName())
   328  	switch runtime.GOOS {
   329  	case "linux":
   330  		if _, err := execute.Command("/usr/bin/test", "-x", "/sbin/mount.fuse.ceph").CombinedOutput(); err == nil {
   331  			klog.V(4).Info("/sbin/mount.fuse.ceph exists, it should be fuse mount.")
   332  			return true
   333  		}
   334  		return false
   335  	}
   336  	return false
   337  }
   338  
   339  func (cephfsVolume *cephfs) execFuseMount(mountpoint string) error {
   340  	// cephfs keyring file
   341  	keyringFile := ""
   342  	// override secretfile if secret is provided
   343  	if cephfsVolume.secret != "" {
   344  		// TODO: cephfs fuse currently doesn't support secret option,
   345  		// remove keyring file create once secret option is supported.
   346  		klog.V(4).Info("cephfs mount begin using fuse.")
   347  
   348  		keyringPath := cephfsVolume.GetKeyringPath()
   349  		if err := os.MkdirAll(keyringPath, 0750); err != nil {
   350  			return err
   351  		}
   352  
   353  		payload := make(map[string]util.FileProjection, 1)
   354  		var fileProjection util.FileProjection
   355  
   356  		keyring := fmt.Sprintf("[client.%s]\nkey = %s\n", cephfsVolume.id, cephfsVolume.secret)
   357  
   358  		fileProjection.Data = []byte(keyring)
   359  		fileProjection.Mode = int32(0644)
   360  		fileName := cephfsVolume.id + ".keyring"
   361  
   362  		payload[fileName] = fileProjection
   363  
   364  		writerContext := fmt.Sprintf("cephfuse:%v.keyring", cephfsVolume.id)
   365  		writer, err := util.NewAtomicWriter(keyringPath, writerContext)
   366  		if err != nil {
   367  			klog.Errorf("failed to create atomic writer: %v", err)
   368  			return err
   369  		}
   370  
   371  		err = writer.Write(payload, nil /*setPerms*/)
   372  		if err != nil {
   373  			klog.Errorf("failed to write payload to dir: %v", err)
   374  			return err
   375  		}
   376  
   377  		keyringFile = filepath.Join(keyringPath, fileName)
   378  
   379  	} else {
   380  		keyringFile = cephfsVolume.secretFile
   381  	}
   382  	// build src like mon1:6789,mon2:6789,mon3:6789:/
   383  	src := strings.Join(cephfsVolume.mon, ",")
   384  
   385  	mountArgs := []string{}
   386  	mountArgs = append(mountArgs, "-k")
   387  	mountArgs = append(mountArgs, keyringFile)
   388  	mountArgs = append(mountArgs, "-m")
   389  	mountArgs = append(mountArgs, src)
   390  	mountArgs = append(mountArgs, mountpoint)
   391  	mountArgs = append(mountArgs, "-r")
   392  	mountArgs = append(mountArgs, cephfsVolume.path)
   393  	mountArgs = append(mountArgs, "--id")
   394  	mountArgs = append(mountArgs, cephfsVolume.id)
   395  
   396  	// build option array
   397  	opt := []string{}
   398  	if cephfsVolume.readonly {
   399  		opt = append(opt, "ro")
   400  	}
   401  	opt = util.JoinMountOptions(cephfsVolume.mountOptions, opt)
   402  	if len(opt) > 0 {
   403  		mountArgs = append(mountArgs, "-o")
   404  		mountArgs = append(mountArgs, strings.Join(opt, ","))
   405  	}
   406  
   407  	klog.V(4).Infof("Mounting cmd ceph-fuse with arguments (%s)", mountArgs)
   408  	command := exec.Command("ceph-fuse", mountArgs...)
   409  	output, err := command.CombinedOutput()
   410  	if err != nil || !(strings.Contains(string(output), "starting fuse")) {
   411  		return fmt.Errorf("Ceph-fuse failed: %v\narguments: %s\nOutput: %s", err, mountArgs, string(output))
   412  	}
   413  
   414  	return nil
   415  }
   416  
   417  func getVolumeSource(spec *volume.Spec) ([]string, string, string, string, bool, error) {
   418  	if spec.Volume != nil && spec.Volume.CephFS != nil {
   419  		mon := spec.Volume.CephFS.Monitors
   420  		path := spec.Volume.CephFS.Path
   421  		user := spec.Volume.CephFS.User
   422  		secretFile := spec.Volume.CephFS.SecretFile
   423  		readOnly := spec.Volume.CephFS.ReadOnly
   424  		return mon, path, user, secretFile, readOnly, nil
   425  	} else if spec.PersistentVolume != nil &&
   426  		spec.PersistentVolume.Spec.CephFS != nil {
   427  		mon := spec.PersistentVolume.Spec.CephFS.Monitors
   428  		path := spec.PersistentVolume.Spec.CephFS.Path
   429  		user := spec.PersistentVolume.Spec.CephFS.User
   430  		secretFile := spec.PersistentVolume.Spec.CephFS.SecretFile
   431  		readOnly := spec.PersistentVolume.Spec.CephFS.ReadOnly
   432  		return mon, path, user, secretFile, readOnly, nil
   433  	}
   434  
   435  	return nil, "", "", "", false, fmt.Errorf("Spec does not reference a CephFS volume type")
   436  }
   437  
   438  func getSecretNameAndNamespace(spec *volume.Spec, defaultNamespace string) (string, string, error) {
   439  	if spec.Volume != nil && spec.Volume.CephFS != nil {
   440  		localSecretRef := spec.Volume.CephFS.SecretRef
   441  		if localSecretRef != nil {
   442  			return localSecretRef.Name, defaultNamespace, nil
   443  		}
   444  		return "", "", nil
   445  
   446  	} else if spec.PersistentVolume != nil &&
   447  		spec.PersistentVolume.Spec.CephFS != nil {
   448  		secretRef := spec.PersistentVolume.Spec.CephFS.SecretRef
   449  		secretNs := defaultNamespace
   450  		if secretRef != nil {
   451  			if len(secretRef.Namespace) != 0 {
   452  				secretNs = secretRef.Namespace
   453  			}
   454  			return secretRef.Name, secretNs, nil
   455  		}
   456  		return "", "", nil
   457  	}
   458  	return "", "", fmt.Errorf("Spec does not reference an CephFS volume type")
   459  }
   460  

View as plain text