1
16
17 package cephfs
18
19 import (
20 "context"
21 "fmt"
22 "os"
23 "os/exec"
24 "path/filepath"
25 "runtime"
26 "strings"
27
28 "k8s.io/klog/v2"
29 "k8s.io/mount-utils"
30 utilstrings "k8s.io/utils/strings"
31
32 v1 "k8s.io/api/core/v1"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/types"
35 "k8s.io/kubernetes/pkg/volume"
36 "k8s.io/kubernetes/pkg/volume/util"
37 )
38
39
40 func ProbeVolumePlugins() []volume.VolumePlugin {
41 return []volume.VolumePlugin{&cephfsPlugin{nil}}
42 }
43
44 type cephfsPlugin struct {
45 host volume.VolumeHost
46 }
47
48 var _ volume.VolumePlugin = &cephfsPlugin{}
49
50 const (
51 cephfsPluginName = "kubernetes.io/cephfs"
52 )
53
54 func (plugin *cephfsPlugin) Init(host volume.VolumeHost) error {
55 plugin.host = host
56 return nil
57 }
58
59 func (plugin *cephfsPlugin) GetPluginName() string {
60 return cephfsPluginName
61 }
62
63 func (plugin *cephfsPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
64 mon, _, _, _, _, err := getVolumeSource(spec)
65 if err != nil {
66 return "", err
67 }
68
69 return fmt.Sprintf("%v", mon), nil
70 }
71
72 func (plugin *cephfsPlugin) CanSupport(spec *volume.Spec) bool {
73 return (spec.Volume != nil && spec.Volume.CephFS != nil) || (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CephFS != nil)
74 }
75
76 func (plugin *cephfsPlugin) RequiresRemount(spec *volume.Spec) bool {
77 return false
78 }
79
80 func (plugin *cephfsPlugin) SupportsMountOption() bool {
81 return true
82 }
83
84 func (plugin *cephfsPlugin) SupportsBulkVolumeVerification() bool {
85 return false
86 }
87
88 func (plugin *cephfsPlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) {
89 return false, nil
90 }
91
92 func (plugin *cephfsPlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
93 return []v1.PersistentVolumeAccessMode{
94 v1.ReadWriteOnce,
95 v1.ReadOnlyMany,
96 v1.ReadWriteMany,
97 }
98 }
99
100 func (plugin *cephfsPlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
101 secretName, secretNs, err := getSecretNameAndNamespace(spec, pod.Namespace)
102 if err != nil {
103 return nil, err
104 }
105 secret := ""
106 if len(secretName) > 0 && len(secretNs) > 0 {
107
108 kubeClient := plugin.host.GetKubeClient()
109 if kubeClient == nil {
110 return nil, fmt.Errorf("cannot get kube client")
111 }
112 secrets, err := kubeClient.CoreV1().Secrets(secretNs).Get(context.TODO(), secretName, metav1.GetOptions{})
113 if err != nil {
114 err = fmt.Errorf("couldn't get secret %v/%v err: %w", secretNs, secretName, err)
115 return nil, err
116 }
117 for name, data := range secrets.Data {
118 secret = string(data)
119 klog.V(4).Infof("found ceph secret info: %s", name)
120 }
121 }
122 return plugin.newMounterInternal(spec, pod.UID, plugin.host.GetMounter(plugin.GetPluginName()), secret)
123 }
124
125 func (plugin *cephfsPlugin) newMounterInternal(spec *volume.Spec, podUID types.UID, mounter mount.Interface, secret string) (volume.Mounter, error) {
126 mon, path, id, secretFile, readOnly, err := getVolumeSource(spec)
127 if err != nil {
128 return nil, err
129 }
130
131 if id == "" {
132 id = "admin"
133 }
134 if path == "" {
135 path = "/"
136 }
137 if !strings.HasPrefix(path, "/") {
138 path = "/" + path
139 }
140
141 if secretFile == "" {
142 secretFile = "/etc/ceph/" + id + ".secret"
143 }
144
145 return &cephfsMounter{
146 cephfs: &cephfs{
147 podUID: podUID,
148 volName: spec.Name(),
149 mon: mon,
150 path: path,
151 secret: secret,
152 id: id,
153 secretFile: secretFile,
154 readonly: readOnly,
155 mounter: mounter,
156 plugin: plugin,
157 mountOptions: util.MountOptionFromSpec(spec),
158 },
159 }, nil
160 }
161
162 func (plugin *cephfsPlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
163 return plugin.newUnmounterInternal(volName, podUID, plugin.host.GetMounter(plugin.GetPluginName()))
164 }
165
166 func (plugin *cephfsPlugin) newUnmounterInternal(volName string, podUID types.UID, mounter mount.Interface) (volume.Unmounter, error) {
167 return &cephfsUnmounter{
168 cephfs: &cephfs{
169 podUID: podUID,
170 volName: volName,
171 mounter: mounter,
172 plugin: plugin},
173 }, nil
174 }
175
176 func (plugin *cephfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
177 cephfsVolume := &v1.Volume{
178 Name: volumeName,
179 VolumeSource: v1.VolumeSource{
180 CephFS: &v1.CephFSVolumeSource{
181 Monitors: []string{},
182 Path: mountPath,
183 },
184 },
185 }
186 return volume.ReconstructedVolume{
187 Spec: volume.NewSpecFromVolume(cephfsVolume),
188 }, nil
189 }
190
191
192 type cephfs struct {
193 volName string
194 podUID types.UID
195 mon []string
196 path string
197 id string
198 secret string `datapolicy:"token"`
199 secretFile string
200 readonly bool
201 mounter mount.Interface
202 plugin *cephfsPlugin
203 volume.MetricsNil
204 mountOptions []string
205 }
206
207 type cephfsMounter struct {
208 *cephfs
209 }
210
211 var _ volume.Mounter = &cephfsMounter{}
212
213 func (cephfsVolume *cephfsMounter) GetAttributes() volume.Attributes {
214 return volume.Attributes{
215 ReadOnly: cephfsVolume.readonly,
216 Managed: false,
217 SELinuxRelabel: false,
218 }
219 }
220
221
222 func (cephfsVolume *cephfsMounter) SetUp(mounterArgs volume.MounterArgs) error {
223 return cephfsVolume.SetUpAt(cephfsVolume.GetPath(), mounterArgs)
224 }
225
226
227 func (cephfsVolume *cephfsMounter) SetUpAt(dir string, mounterArgs volume.MounterArgs) error {
228 notMnt, err := cephfsVolume.mounter.IsLikelyNotMountPoint(dir)
229 klog.V(4).Infof("CephFS mount set up: %s %v %v", dir, !notMnt, err)
230 if err != nil && !os.IsNotExist(err) {
231 return err
232 }
233 if !notMnt {
234 return nil
235 }
236
237 if err := os.MkdirAll(dir, 0750); err != nil {
238 return err
239 }
240
241
242 if cephfsVolume.checkFuseMount() {
243 klog.V(4).Info("CephFS fuse mount.")
244 err = cephfsVolume.execFuseMount(dir)
245
246 keyringPath := cephfsVolume.GetKeyringPath()
247 _, StatErr := os.Stat(keyringPath)
248 if !os.IsNotExist(StatErr) {
249 os.RemoveAll(keyringPath)
250 }
251 if err == nil {
252
253 return nil
254 }
255
256 klog.V(2).Infof("CephFS fuse mount failed: %v, fallback to kernel mount.", err)
257
258 }
259 klog.V(4).Info("CephFS kernel mount.")
260
261 err = cephfsVolume.execMount(dir)
262 if err != nil {
263
264 mount.CleanupMountPoint(dir, cephfsVolume.mounter, false)
265 return err
266 }
267 return nil
268 }
269
270 type cephfsUnmounter struct {
271 *cephfs
272 }
273
274 var _ volume.Unmounter = &cephfsUnmounter{}
275
276
277 func (cephfsVolume *cephfsUnmounter) TearDown() error {
278 return cephfsVolume.TearDownAt(cephfsVolume.GetPath())
279 }
280
281
282 func (cephfsVolume *cephfsUnmounter) TearDownAt(dir string) error {
283 return mount.CleanupMountPoint(dir, cephfsVolume.mounter, false)
284 }
285
286
287 func (cephfsVolume *cephfs) GetPath() string {
288 name := cephfsPluginName
289 return cephfsVolume.plugin.host.GetPodVolumeDir(cephfsVolume.podUID, utilstrings.EscapeQualifiedName(name), cephfsVolume.volName)
290 }
291
292
293 func (cephfsVolume *cephfs) GetKeyringPath() string {
294 name := cephfsPluginName
295 volumeDir := cephfsVolume.plugin.host.GetPodVolumeDir(cephfsVolume.podUID, utilstrings.EscapeQualifiedName(name), cephfsVolume.volName)
296 volumeKeyringDir := volumeDir + "~keyring"
297 return volumeKeyringDir
298 }
299
300 func (cephfsVolume *cephfs) execMount(mountpoint string) error {
301
302 cephSensitiveOpt := []string{"name=" + cephfsVolume.id}
303
304 if cephfsVolume.secret != "" {
305 cephSensitiveOpt = append(cephSensitiveOpt, "secret="+cephfsVolume.secret)
306 } else {
307 cephSensitiveOpt = append(cephSensitiveOpt, "secretfile="+cephfsVolume.secretFile)
308 }
309
310 opt := []string{}
311 if cephfsVolume.readonly {
312 opt = append(opt, "ro")
313 }
314
315
316 src := strings.Join(cephfsVolume.mon, ",") + ":" + cephfsVolume.path
317
318 opt = util.JoinMountOptions(cephfsVolume.mountOptions, opt)
319 if err := cephfsVolume.mounter.MountSensitive(src, mountpoint, "ceph", opt, cephSensitiveOpt); err != nil {
320 return fmt.Errorf("CephFS: mount failed: %v", err)
321 }
322
323 return nil
324 }
325
326 func (cephfsVolume *cephfsMounter) checkFuseMount() bool {
327 execute := cephfsVolume.plugin.host.GetExec(cephfsVolume.plugin.GetPluginName())
328 switch runtime.GOOS {
329 case "linux":
330 if _, err := execute.Command("/usr/bin/test", "-x", "/sbin/mount.fuse.ceph").CombinedOutput(); err == nil {
331 klog.V(4).Info("/sbin/mount.fuse.ceph exists, it should be fuse mount.")
332 return true
333 }
334 return false
335 }
336 return false
337 }
338
339 func (cephfsVolume *cephfs) execFuseMount(mountpoint string) error {
340
341 keyringFile := ""
342
343 if cephfsVolume.secret != "" {
344
345
346 klog.V(4).Info("cephfs mount begin using fuse.")
347
348 keyringPath := cephfsVolume.GetKeyringPath()
349 if err := os.MkdirAll(keyringPath, 0750); err != nil {
350 return err
351 }
352
353 payload := make(map[string]util.FileProjection, 1)
354 var fileProjection util.FileProjection
355
356 keyring := fmt.Sprintf("[client.%s]\nkey = %s\n", cephfsVolume.id, cephfsVolume.secret)
357
358 fileProjection.Data = []byte(keyring)
359 fileProjection.Mode = int32(0644)
360 fileName := cephfsVolume.id + ".keyring"
361
362 payload[fileName] = fileProjection
363
364 writerContext := fmt.Sprintf("cephfuse:%v.keyring", cephfsVolume.id)
365 writer, err := util.NewAtomicWriter(keyringPath, writerContext)
366 if err != nil {
367 klog.Errorf("failed to create atomic writer: %v", err)
368 return err
369 }
370
371 err = writer.Write(payload, nil )
372 if err != nil {
373 klog.Errorf("failed to write payload to dir: %v", err)
374 return err
375 }
376
377 keyringFile = filepath.Join(keyringPath, fileName)
378
379 } else {
380 keyringFile = cephfsVolume.secretFile
381 }
382
383 src := strings.Join(cephfsVolume.mon, ",")
384
385 mountArgs := []string{}
386 mountArgs = append(mountArgs, "-k")
387 mountArgs = append(mountArgs, keyringFile)
388 mountArgs = append(mountArgs, "-m")
389 mountArgs = append(mountArgs, src)
390 mountArgs = append(mountArgs, mountpoint)
391 mountArgs = append(mountArgs, "-r")
392 mountArgs = append(mountArgs, cephfsVolume.path)
393 mountArgs = append(mountArgs, "--id")
394 mountArgs = append(mountArgs, cephfsVolume.id)
395
396
397 opt := []string{}
398 if cephfsVolume.readonly {
399 opt = append(opt, "ro")
400 }
401 opt = util.JoinMountOptions(cephfsVolume.mountOptions, opt)
402 if len(opt) > 0 {
403 mountArgs = append(mountArgs, "-o")
404 mountArgs = append(mountArgs, strings.Join(opt, ","))
405 }
406
407 klog.V(4).Infof("Mounting cmd ceph-fuse with arguments (%s)", mountArgs)
408 command := exec.Command("ceph-fuse", mountArgs...)
409 output, err := command.CombinedOutput()
410 if err != nil || !(strings.Contains(string(output), "starting fuse")) {
411 return fmt.Errorf("Ceph-fuse failed: %v\narguments: %s\nOutput: %s", err, mountArgs, string(output))
412 }
413
414 return nil
415 }
416
417 func getVolumeSource(spec *volume.Spec) ([]string, string, string, string, bool, error) {
418 if spec.Volume != nil && spec.Volume.CephFS != nil {
419 mon := spec.Volume.CephFS.Monitors
420 path := spec.Volume.CephFS.Path
421 user := spec.Volume.CephFS.User
422 secretFile := spec.Volume.CephFS.SecretFile
423 readOnly := spec.Volume.CephFS.ReadOnly
424 return mon, path, user, secretFile, readOnly, nil
425 } else if spec.PersistentVolume != nil &&
426 spec.PersistentVolume.Spec.CephFS != nil {
427 mon := spec.PersistentVolume.Spec.CephFS.Monitors
428 path := spec.PersistentVolume.Spec.CephFS.Path
429 user := spec.PersistentVolume.Spec.CephFS.User
430 secretFile := spec.PersistentVolume.Spec.CephFS.SecretFile
431 readOnly := spec.PersistentVolume.Spec.CephFS.ReadOnly
432 return mon, path, user, secretFile, readOnly, nil
433 }
434
435 return nil, "", "", "", false, fmt.Errorf("Spec does not reference a CephFS volume type")
436 }
437
438 func getSecretNameAndNamespace(spec *volume.Spec, defaultNamespace string) (string, string, error) {
439 if spec.Volume != nil && spec.Volume.CephFS != nil {
440 localSecretRef := spec.Volume.CephFS.SecretRef
441 if localSecretRef != nil {
442 return localSecretRef.Name, defaultNamespace, nil
443 }
444 return "", "", nil
445
446 } else if spec.PersistentVolume != nil &&
447 spec.PersistentVolume.Spec.CephFS != nil {
448 secretRef := spec.PersistentVolume.Spec.CephFS.SecretRef
449 secretNs := defaultNamespace
450 if secretRef != nil {
451 if len(secretRef.Namespace) != 0 {
452 secretNs = secretRef.Namespace
453 }
454 return secretRef.Name, secretNs, nil
455 }
456 return "", "", nil
457 }
458 return "", "", fmt.Errorf("Spec does not reference an CephFS volume type")
459 }
460
View as plain text