1
16
17 package csi
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "os"
24 "path/filepath"
25 "strings"
26 "time"
27
28 "k8s.io/klog/v2"
29
30 authenticationv1 "k8s.io/api/authentication/v1"
31 api "k8s.io/api/core/v1"
32 storage "k8s.io/api/storage/v1"
33 apierrors "k8s.io/apimachinery/pkg/api/errors"
34 meta "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/types"
36 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
37 utilversion "k8s.io/apimachinery/pkg/util/version"
38 "k8s.io/apimachinery/pkg/util/wait"
39 utilfeature "k8s.io/apiserver/pkg/util/feature"
40 clientset "k8s.io/client-go/kubernetes"
41 storagelisters "k8s.io/client-go/listers/storage/v1"
42 csitranslationplugins "k8s.io/csi-translation-lib/plugins"
43 "k8s.io/kubernetes/pkg/features"
44 "k8s.io/kubernetes/pkg/kubelet/util"
45 "k8s.io/kubernetes/pkg/volume"
46 "k8s.io/kubernetes/pkg/volume/csi/nodeinfomanager"
47 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
48 )
49
50 const (
51
52 CSIPluginName = "kubernetes.io/csi"
53
54 csiTimeout = 2 * time.Minute
55 volNameSep = "^"
56 volDataFileName = "vol_data.json"
57 fsTypeBlockName = "block"
58
59
60
61 CsiResyncPeriod = time.Minute
62 )
63
64 type csiPlugin struct {
65 host volume.VolumeHost
66 csiDriverLister storagelisters.CSIDriverLister
67 serviceAccountTokenGetter func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error)
68 volumeAttachmentLister storagelisters.VolumeAttachmentLister
69 }
70
71
72 func ProbeVolumePlugins() []volume.VolumePlugin {
73 p := &csiPlugin{
74 host: nil,
75 }
76 return []volume.VolumePlugin{p}
77 }
78
79
80 var _ volume.VolumePlugin = &csiPlugin{}
81
82
83 type RegistrationHandler struct {
84 }
85
86
87
88
89 var csiDrivers = &DriversStore{}
90
91 var nim nodeinfomanager.Interface
92
93
94
95 var PluginHandler = &RegistrationHandler{}
96
97
98
99 func (h *RegistrationHandler) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
100 klog.Infof(log("Trying to validate a new CSI Driver with name: %s endpoint: %s versions: %s",
101 pluginName, endpoint, strings.Join(versions, ",")))
102
103 _, err := h.validateVersions("ValidatePlugin", pluginName, endpoint, versions)
104 if err != nil {
105 return fmt.Errorf("validation failed for CSI Driver %s at endpoint %s: %v", pluginName, endpoint, err)
106 }
107
108 return err
109 }
110
111
112 func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
113 klog.Infof(log("Register new plugin with name: %s at endpoint: %s", pluginName, endpoint))
114
115 highestSupportedVersion, err := h.validateVersions("RegisterPlugin", pluginName, endpoint, versions)
116 if err != nil {
117 return err
118 }
119
120
121
122 csiDrivers.Set(pluginName, Driver{
123 endpoint: endpoint,
124 highestSupportedVersion: highestSupportedVersion,
125 })
126
127
128 csi, err := newCsiDriverClient(csiDriverName(pluginName))
129 if err != nil {
130 return err
131 }
132
133 var timeout time.Duration
134 if pluginClientTimeout == nil {
135 timeout = csiTimeout
136 } else {
137 timeout = *pluginClientTimeout
138 }
139
140 ctx, cancel := context.WithTimeout(context.Background(), timeout)
141 defer cancel()
142
143 driverNodeID, maxVolumePerNode, accessibleTopology, err := csi.NodeGetInfo(ctx)
144 if err != nil {
145 if unregErr := unregisterDriver(pluginName); unregErr != nil {
146 klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
147 }
148 return err
149 }
150
151 err = nim.InstallCSIDriver(pluginName, driverNodeID, maxVolumePerNode, accessibleTopology)
152 if err != nil {
153 if unregErr := unregisterDriver(pluginName); unregErr != nil {
154 klog.Error(log("registrationHandler.RegisterPlugin failed to unregister plugin due to previous error: %v", unregErr))
155 }
156 return err
157 }
158
159 return nil
160 }
161
162 func (h *RegistrationHandler) validateVersions(callerName, pluginName string, endpoint string, versions []string) (*utilversion.Version, error) {
163 if len(versions) == 0 {
164 return nil, errors.New(log("%s for CSI driver %q failed. Plugin returned an empty list for supported versions", callerName, pluginName))
165 }
166
167
168
169
170
171
172
173 newDriverHighestVersion, err := utilversion.HighestSupportedVersion(versions)
174 if err != nil {
175 return nil, errors.New(log("%s for CSI driver %q failed. None of the versions specified %q are supported. err=%v", callerName, pluginName, versions, err))
176 }
177
178 existingDriver, driverExists := csiDrivers.Get(pluginName)
179 if driverExists {
180 if !existingDriver.highestSupportedVersion.LessThan(newDriverHighestVersion) {
181 return nil, errors.New(log("%s for CSI driver %q failed. Another driver with the same name is already registered with a higher supported version: %q", callerName, pluginName, existingDriver.highestSupportedVersion))
182 }
183 }
184
185 return newDriverHighestVersion, nil
186 }
187
188
189
190 func (h *RegistrationHandler) DeRegisterPlugin(pluginName string) {
191 klog.Info(log("registrationHandler.DeRegisterPlugin request for plugin %s", pluginName))
192 if err := unregisterDriver(pluginName); err != nil {
193 klog.Error(log("registrationHandler.DeRegisterPlugin failed: %v", err))
194 }
195 }
196
197 func (p *csiPlugin) Init(host volume.VolumeHost) error {
198 p.host = host
199
200 csiClient := host.GetKubeClient()
201 if csiClient == nil {
202 klog.Warning(log("kubeclient not set, assuming standalone kubelet"))
203 } else {
204
205 adcHost, ok := host.(volume.AttachDetachVolumeHost)
206 if ok {
207 p.csiDriverLister = adcHost.CSIDriverLister()
208 if p.csiDriverLister == nil {
209 klog.Error(log("CSIDriverLister not found on AttachDetachVolumeHost"))
210 }
211 p.volumeAttachmentLister = adcHost.VolumeAttachmentLister()
212 if p.volumeAttachmentLister == nil {
213 klog.Error(log("VolumeAttachmentLister not found on AttachDetachVolumeHost"))
214 }
215 }
216 kletHost, ok := host.(volume.KubeletVolumeHost)
217 if ok {
218 p.csiDriverLister = kletHost.CSIDriverLister()
219 if p.csiDriverLister == nil {
220 klog.Error(log("CSIDriverLister not found on KubeletVolumeHost"))
221 }
222 p.serviceAccountTokenGetter = host.GetServiceAccountTokenFunc()
223 if p.serviceAccountTokenGetter == nil {
224 klog.Error(log("ServiceAccountTokenGetter not found on KubeletVolumeHost"))
225 }
226
227 p.volumeAttachmentLister = nil
228 }
229 }
230
231 var migratedPlugins = map[string](func() bool){
232 csitranslationplugins.GCEPDInTreePluginName: func() bool {
233 return true
234 },
235 csitranslationplugins.AWSEBSInTreePluginName: func() bool {
236 return true
237 },
238 csitranslationplugins.CinderInTreePluginName: func() bool {
239 return true
240 },
241 csitranslationplugins.AzureDiskInTreePluginName: func() bool {
242 return true
243 },
244 csitranslationplugins.AzureFileInTreePluginName: func() bool {
245 return true
246 },
247 csitranslationplugins.VSphereInTreePluginName: func() bool {
248 return true
249 },
250 csitranslationplugins.PortworxVolumePluginName: func() bool {
251 return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationPortworx)
252 },
253 csitranslationplugins.RBDVolumePluginName: func() bool {
254 return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationRBD)
255 },
256 }
257
258
259 nim = nodeinfomanager.NewNodeInfoManager(host.GetNodeName(), host, migratedPlugins)
260
261
262
263 if err := initializeCSINode(host); err != nil {
264 return errors.New(log("failed to initialize CSINode: %v", err))
265 }
266
267 return nil
268 }
269
270 func initializeCSINode(host volume.VolumeHost) error {
271 kvh, ok := host.(volume.KubeletVolumeHost)
272 if !ok {
273 klog.V(4).Info("Cast from VolumeHost to KubeletVolumeHost failed. Skipping CSINode initialization, not running on kubelet")
274 return nil
275 }
276 kubeClient := host.GetKubeClient()
277 if kubeClient == nil {
278
279 klog.Warning("Skipping CSINode initialization, kubelet running in standalone mode")
280 return nil
281 }
282
283 kvh.SetKubeletError(errors.New("CSINode is not yet initialized"))
284
285 go func() {
286 defer utilruntime.HandleCrash()
287
288
289 nodeName := host.GetNodeName()
290 err := waitForAPIServerForever(kubeClient, nodeName)
291 if err != nil {
292 klog.Fatalf("Failed to initialize CSINode while waiting for API server to report ok: %v", err)
293 }
294
295
296
297 initBackoff := wait.Backoff{
298 Steps: 6,
299 Duration: 15 * time.Millisecond,
300 Factor: 6.0,
301 Jitter: 0.1,
302 }
303 err = wait.ExponentialBackoff(initBackoff, func() (bool, error) {
304 klog.V(4).Infof("Initializing migrated drivers on CSINode")
305 err := nim.InitializeCSINodeWithAnnotation()
306 if err != nil {
307 kvh.SetKubeletError(fmt.Errorf("failed to initialize CSINode: %v", err))
308 klog.Errorf("Failed to initialize CSINode: %v", err)
309 return false, nil
310 }
311
312
313 kvh.SetKubeletError(nil)
314 return true, nil
315 })
316 if err != nil {
317
318
319
320
321
322 klog.Fatalf("Failed to initialize CSINode after retrying: %v", err)
323 }
324 }()
325 return nil
326 }
327
328 func (p *csiPlugin) GetPluginName() string {
329 return CSIPluginName
330 }
331
332
333
334 func (p *csiPlugin) GetVolumeName(spec *volume.Spec) (string, error) {
335 csi, err := getPVSourceFromSpec(spec)
336 if err != nil {
337 return "", errors.New(log("plugin.GetVolumeName failed to extract volume source from spec: %v", err))
338 }
339
340
341 return fmt.Sprintf("%s%s%s", csi.Driver, volNameSep, csi.VolumeHandle), nil
342 }
343
344 func (p *csiPlugin) CanSupport(spec *volume.Spec) bool {
345
346
347 if spec == nil {
348 return false
349 }
350 return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.CSI != nil) ||
351 (spec.Volume != nil && spec.Volume.CSI != nil)
352 }
353
354 func (p *csiPlugin) RequiresRemount(spec *volume.Spec) bool {
355 if p.csiDriverLister == nil {
356 return false
357 }
358 driverName, err := GetCSIDriverName(spec)
359 if err != nil {
360 klog.V(5).Info(log("Failed to mark %q as republish required, err: %v", spec.Name(), err))
361 return false
362 }
363 csiDriver, err := p.getCSIDriver(driverName)
364 if err != nil {
365 klog.V(5).Info(log("Failed to mark %q as republish required, err: %v", spec.Name(), err))
366 return false
367 }
368 return *csiDriver.Spec.RequiresRepublish
369 }
370
371 func (p *csiPlugin) NewMounter(
372 spec *volume.Spec,
373 pod *api.Pod,
374 _ volume.VolumeOptions) (volume.Mounter, error) {
375
376 volSrc, pvSrc, err := getSourceFromSpec(spec)
377 if err != nil {
378 return nil, err
379 }
380
381 var (
382 driverName string
383 volumeHandle string
384 readOnly bool
385 )
386
387 switch {
388 case volSrc != nil:
389 volumeHandle = makeVolumeHandle(string(pod.UID), spec.Name())
390 driverName = volSrc.Driver
391 if volSrc.ReadOnly != nil {
392 readOnly = *volSrc.ReadOnly
393 }
394 case pvSrc != nil:
395 driverName = pvSrc.Driver
396 volumeHandle = pvSrc.VolumeHandle
397 readOnly = spec.ReadOnly
398 default:
399 return nil, errors.New(log("volume source not found in volume.Spec"))
400 }
401
402 volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
403 if err != nil {
404 return nil, err
405 }
406
407 k8s := p.host.GetKubeClient()
408 if k8s == nil {
409 return nil, errors.New(log("failed to get a kubernetes client"))
410 }
411
412 kvh, ok := p.host.(volume.KubeletVolumeHost)
413 if !ok {
414 return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed"))
415 }
416
417 mounter := &csiMountMgr{
418 plugin: p,
419 k8s: k8s,
420 spec: spec,
421 pod: pod,
422 podUID: pod.UID,
423 driverName: csiDriverName(driverName),
424 volumeLifecycleMode: volumeLifecycleMode,
425 volumeID: volumeHandle,
426 specVolumeID: spec.Name(),
427 readOnly: readOnly,
428 kubeVolHost: kvh,
429 }
430 mounter.csiClientGetter.driverName = csiDriverName(driverName)
431
432 dir := mounter.GetPath()
433 mounter.MetricsProvider = NewMetricsCsi(volumeHandle, dir, csiDriverName(driverName))
434 klog.V(4).Info(log("mounter created successfully"))
435 return mounter, nil
436 }
437
438 func (p *csiPlugin) NewUnmounter(specName string, podUID types.UID) (volume.Unmounter, error) {
439 klog.V(4).Infof(log("setting up unmounter for [name=%v, podUID=%v]", specName, podUID))
440
441 kvh, ok := p.host.(volume.KubeletVolumeHost)
442 if !ok {
443 return nil, errors.New(log("cast from VolumeHost to KubeletVolumeHost failed"))
444 }
445
446 unmounter := &csiMountMgr{
447 plugin: p,
448 podUID: podUID,
449 specVolumeID: specName,
450 kubeVolHost: kvh,
451 }
452
453
454 dir := unmounter.GetPath()
455 dataDir := filepath.Dir(dir)
456 data, err := loadVolumeData(dataDir, volDataFileName)
457 if err != nil {
458 return nil, errors.New(log("unmounter failed to load volume data file [%s]: %v", dir, err))
459 }
460 unmounter.driverName = csiDriverName(data[volDataKey.driverName])
461 unmounter.volumeID = data[volDataKey.volHandle]
462 unmounter.csiClientGetter.driverName = unmounter.driverName
463
464 return unmounter, nil
465 }
466
467 func (p *csiPlugin) ConstructVolumeSpec(volumeName, mountPath string) (volume.ReconstructedVolume, error) {
468 klog.V(4).Info(log("plugin.ConstructVolumeSpec [pv.Name=%v, path=%v]", volumeName, mountPath))
469
470 volData, err := loadVolumeData(mountPath, volDataFileName)
471 if err != nil {
472 return volume.ReconstructedVolume{}, errors.New(log("plugin.ConstructVolumeSpec failed loading volume data using [%s]: %v", mountPath, err))
473 }
474 klog.V(4).Info(log("plugin.ConstructVolumeSpec extracted [%#v]", volData))
475
476 var ret volume.ReconstructedVolume
477 if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
478 ret.SELinuxMountContext = volData[volDataKey.seLinuxMountContext]
479 }
480
481
482
483
484 if storage.VolumeLifecycleMode(volData[volDataKey.volumeLifecycleMode]) == storage.VolumeLifecycleEphemeral {
485 ret.Spec = p.constructVolSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName])
486 return ret, nil
487 }
488
489 ret.Spec = p.constructPVSourceSpec(volData[volDataKey.specVolID], volData[volDataKey.driverName], volData[volDataKey.volHandle])
490 return ret, nil
491 }
492
493
494 func (p *csiPlugin) constructVolSourceSpec(volSpecName, driverName string) *volume.Spec {
495 vol := &api.Volume{
496 Name: volSpecName,
497 VolumeSource: api.VolumeSource{
498 CSI: &api.CSIVolumeSource{
499 Driver: driverName,
500 },
501 },
502 }
503 return volume.NewSpecFromVolume(vol)
504 }
505
506
507 func (p *csiPlugin) constructPVSourceSpec(volSpecName, driverName, volumeHandle string) *volume.Spec {
508 fsMode := api.PersistentVolumeFilesystem
509 pv := &api.PersistentVolume{
510 ObjectMeta: meta.ObjectMeta{
511 Name: volSpecName,
512 },
513 Spec: api.PersistentVolumeSpec{
514 PersistentVolumeSource: api.PersistentVolumeSource{
515 CSI: &api.CSIPersistentVolumeSource{
516 Driver: driverName,
517 VolumeHandle: volumeHandle,
518 },
519 },
520 VolumeMode: &fsMode,
521 },
522 }
523 return volume.NewSpecFromPersistentVolume(pv, false)
524 }
525
526 func (p *csiPlugin) SupportsMountOption() bool {
527
528
529
530
531
532 return true
533 }
534
535 func (p *csiPlugin) SupportsBulkVolumeVerification() bool {
536 return false
537 }
538
539 func (p *csiPlugin) SupportsSELinuxContextMount(spec *volume.Spec) (bool, error) {
540 if utilfeature.DefaultFeatureGate.Enabled(features.SELinuxMountReadWriteOncePod) {
541 driver, err := GetCSIDriverName(spec)
542 if err != nil {
543 return false, err
544 }
545 csiDriver, err := p.getCSIDriver(driver)
546 if err != nil {
547 if apierrors.IsNotFound(err) {
548 return false, nil
549 }
550 return false, err
551 }
552 if csiDriver.Spec.SELinuxMount != nil {
553 return *csiDriver.Spec.SELinuxMount, nil
554 }
555 return false, nil
556 }
557 return false, nil
558 }
559
560
561 var _ volume.AttachableVolumePlugin = &csiPlugin{}
562
563 var _ volume.DeviceMountableVolumePlugin = &csiPlugin{}
564
565 func (p *csiPlugin) NewAttacher() (volume.Attacher, error) {
566 return p.newAttacherDetacher()
567 }
568
569 func (p *csiPlugin) NewDeviceMounter() (volume.DeviceMounter, error) {
570 return p.NewAttacher()
571 }
572
573 func (p *csiPlugin) NewDetacher() (volume.Detacher, error) {
574 return p.newAttacherDetacher()
575 }
576
577 func (p *csiPlugin) CanAttach(spec *volume.Spec) (bool, error) {
578 volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
579 if err != nil {
580 return false, err
581 }
582
583 if volumeLifecycleMode == storage.VolumeLifecycleEphemeral {
584 klog.V(5).Info(log("plugin.CanAttach = false, ephemeral mode detected for spec %v", spec.Name()))
585 return false, nil
586 }
587
588 pvSrc, err := getCSISourceFromSpec(spec)
589 if err != nil {
590 return false, err
591 }
592
593 driverName := pvSrc.Driver
594
595 skipAttach, err := p.skipAttach(driverName)
596 if err != nil {
597 return false, err
598 }
599
600 return !skipAttach, nil
601 }
602
603
604 func (p *csiPlugin) CanDeviceMount(spec *volume.Spec) (bool, error) {
605 volumeLifecycleMode, err := p.getVolumeLifecycleMode(spec)
606 if err != nil {
607 return false, err
608 }
609
610 if volumeLifecycleMode == storage.VolumeLifecycleEphemeral {
611 klog.V(5).Info(log("plugin.CanDeviceMount skipped ephemeral mode detected for spec %v", spec.Name()))
612 return false, nil
613 }
614
615
616 return true, nil
617 }
618
619 func (p *csiPlugin) NewDeviceUnmounter() (volume.DeviceUnmounter, error) {
620 return p.NewDetacher()
621 }
622
623 func (p *csiPlugin) GetDeviceMountRefs(deviceMountPath string) ([]string, error) {
624 m := p.host.GetMounter(p.GetPluginName())
625 return m.GetMountRefs(deviceMountPath)
626 }
627
628
629 var _ volume.BlockVolumePlugin = &csiPlugin{}
630
631 func (p *csiPlugin) NewBlockVolumeMapper(spec *volume.Spec, podRef *api.Pod, opts volume.VolumeOptions) (volume.BlockVolumeMapper, error) {
632 pvSource, err := getCSISourceFromSpec(spec)
633 if err != nil {
634 return nil, err
635 }
636 readOnly, err := getReadOnlyFromSpec(spec)
637 if err != nil {
638 return nil, err
639 }
640
641 klog.V(4).Info(log("setting up block mapper for [volume=%v,driver=%v]", pvSource.VolumeHandle, pvSource.Driver))
642
643 k8s := p.host.GetKubeClient()
644 if k8s == nil {
645 return nil, errors.New(log("failed to get a kubernetes client"))
646 }
647
648 mapper := &csiBlockMapper{
649 k8s: k8s,
650 plugin: p,
651 volumeID: pvSource.VolumeHandle,
652 driverName: csiDriverName(pvSource.Driver),
653 readOnly: readOnly,
654 spec: spec,
655 specName: spec.Name(),
656 pod: podRef,
657 podUID: podRef.UID,
658 }
659 mapper.csiClientGetter.driverName = csiDriverName(pvSource.Driver)
660
661
662 dataDir := getVolumeDeviceDataDir(spec.Name(), p.host)
663
664 if err := os.MkdirAll(dataDir, 0750); err != nil {
665 return nil, errors.New(log("failed to create data dir %s: %v", dataDir, err))
666 }
667 klog.V(4).Info(log("created path successfully [%s]", dataDir))
668
669 blockPath, err := mapper.GetGlobalMapPath(spec)
670 if err != nil {
671 return nil, errors.New(log("failed to get device path: %v", err))
672 }
673
674 mapper.MetricsProvider = NewMetricsCsi(pvSource.VolumeHandle, blockPath+"/"+string(podRef.UID), csiDriverName(pvSource.Driver))
675
676
677 node := string(p.host.GetNodeName())
678 attachID := getAttachmentName(pvSource.VolumeHandle, pvSource.Driver, node)
679 volData := map[string]string{
680 volDataKey.specVolID: spec.Name(),
681 volDataKey.volHandle: pvSource.VolumeHandle,
682 volDataKey.driverName: pvSource.Driver,
683 volDataKey.nodeName: node,
684 volDataKey.attachmentID: attachID,
685 }
686
687 err = saveVolumeData(dataDir, volDataFileName, volData)
688 defer func() {
689
690
691 if err != nil && volumetypes.IsOperationFinishedError(err) {
692
693 if err = removeMountDir(p, dataDir); err != nil {
694 klog.Error(log("attacher.MountDevice failed to remove mount dir after error [%s]: %v", dataDir, err))
695 }
696 }
697 }()
698 if err != nil {
699 errorMsg := log("csi.NewBlockVolumeMapper failed to save volume info data: %v", err)
700 klog.Error(errorMsg)
701 return nil, errors.New(errorMsg)
702 }
703
704 return mapper, nil
705 }
706
707 func (p *csiPlugin) NewBlockVolumeUnmapper(volName string, podUID types.UID) (volume.BlockVolumeUnmapper, error) {
708 klog.V(4).Infof(log("setting up block unmapper for [Spec=%v, podUID=%v]", volName, podUID))
709 unmapper := &csiBlockMapper{
710 plugin: p,
711 podUID: podUID,
712 specName: volName,
713 }
714
715
716 dataDir := getVolumeDeviceDataDir(unmapper.specName, p.host)
717 data, err := loadVolumeData(dataDir, volDataFileName)
718 if err != nil {
719 return nil, errors.New(log("unmapper failed to load volume data file [%s]: %v", dataDir, err))
720 }
721 unmapper.driverName = csiDriverName(data[volDataKey.driverName])
722 unmapper.volumeID = data[volDataKey.volHandle]
723 unmapper.csiClientGetter.driverName = unmapper.driverName
724
725 return unmapper, nil
726 }
727
728 func (p *csiPlugin) ConstructBlockVolumeSpec(podUID types.UID, specVolName, mapPath string) (*volume.Spec, error) {
729 klog.V(4).Infof("plugin.ConstructBlockVolumeSpec [podUID=%s, specVolName=%s, path=%s]", string(podUID), specVolName, mapPath)
730
731 dataDir := getVolumeDeviceDataDir(specVolName, p.host)
732 volData, err := loadVolumeData(dataDir, volDataFileName)
733 if err != nil {
734 return nil, errors.New(log("plugin.ConstructBlockVolumeSpec failed loading volume data using [%s]: %v", mapPath, err))
735 }
736
737 klog.V(4).Info(log("plugin.ConstructBlockVolumeSpec extracted [%#v]", volData))
738
739 blockMode := api.PersistentVolumeBlock
740 pv := &api.PersistentVolume{
741 ObjectMeta: meta.ObjectMeta{
742 Name: volData[volDataKey.specVolID],
743 },
744 Spec: api.PersistentVolumeSpec{
745 PersistentVolumeSource: api.PersistentVolumeSource{
746 CSI: &api.CSIPersistentVolumeSource{
747 Driver: volData[volDataKey.driverName],
748 VolumeHandle: volData[volDataKey.volHandle],
749 },
750 },
751 VolumeMode: &blockMode,
752 },
753 }
754
755 return volume.NewSpecFromPersistentVolume(pv, false), nil
756 }
757
758
759
760 func (p *csiPlugin) skipAttach(driver string) (bool, error) {
761 csiDriver, err := p.getCSIDriver(driver)
762 if err != nil {
763 if apierrors.IsNotFound(err) {
764
765 return false, nil
766 }
767 return false, err
768 }
769 if csiDriver.Spec.AttachRequired != nil && *csiDriver.Spec.AttachRequired == false {
770 return true, nil
771 }
772 return false, nil
773 }
774
775 func (p *csiPlugin) getCSIDriver(driver string) (*storage.CSIDriver, error) {
776 kletHost, ok := p.host.(volume.KubeletVolumeHost)
777 if ok {
778 if err := kletHost.WaitForCacheSync(); err != nil {
779 return nil, err
780 }
781 }
782
783 if p.csiDriverLister == nil {
784 return nil, errors.New("CSIDriver lister does not exist")
785 }
786 csiDriver, err := p.csiDriverLister.Get(driver)
787 return csiDriver, err
788 }
789
790
791
792
793
794 func (p *csiPlugin) getVolumeLifecycleMode(spec *volume.Spec) (storage.VolumeLifecycleMode, error) {
795
796
797 volSrc, _, err := getSourceFromSpec(spec)
798 if err != nil {
799 return "", err
800 }
801
802 if volSrc != nil {
803 return storage.VolumeLifecycleEphemeral, nil
804 }
805 return storage.VolumeLifecyclePersistent, nil
806 }
807
808 func (p *csiPlugin) getPublishContext(client clientset.Interface, handle, driver, nodeName string) (map[string]string, error) {
809 skip, err := p.skipAttach(driver)
810 if err != nil {
811 return nil, err
812 }
813 if skip {
814 return nil, nil
815 }
816
817 attachID := getAttachmentName(handle, driver, nodeName)
818
819
820 attachment, err := client.StorageV1().VolumeAttachments().Get(context.TODO(), attachID, meta.GetOptions{})
821 if err != nil {
822 return nil, err
823 }
824
825 if attachment == nil {
826 err = errors.New("no existing VolumeAttachment found")
827 return nil, err
828 }
829 return attachment.Status.AttachmentMetadata, nil
830 }
831
832 func (p *csiPlugin) newAttacherDetacher() (*csiAttacher, error) {
833 k8s := p.host.GetKubeClient()
834 if k8s == nil {
835 return nil, errors.New(log("unable to get kubernetes client from host"))
836 }
837
838 return &csiAttacher{
839 plugin: p,
840 k8s: k8s,
841 watchTimeout: csiTimeout,
842 }, nil
843 }
844
845
846 func (p *csiPlugin) podInfoEnabled(driverName string) (bool, error) {
847 csiDriver, err := p.getCSIDriver(driverName)
848 if err != nil {
849 if apierrors.IsNotFound(err) {
850 klog.V(4).Infof(log("CSIDriver %q not found, not adding pod information", driverName))
851 return false, nil
852 }
853 return false, err
854 }
855
856
857 if csiDriver.Spec.PodInfoOnMount == nil || *csiDriver.Spec.PodInfoOnMount == false {
858 klog.V(4).Infof(log("CSIDriver %q does not require pod information", driverName))
859 return false, nil
860 }
861 return true, nil
862 }
863
864 func unregisterDriver(driverName string) error {
865 csiDrivers.Delete(driverName)
866
867 if err := nim.UninstallCSIDriver(driverName); err != nil {
868 return errors.New(log("Error uninstalling CSI driver: %v", err))
869 }
870
871 return nil
872 }
873
874
875
876 func waitForAPIServerForever(client clientset.Interface, nodeName types.NodeName) error {
877 var lastErr error
878
879
880 opts := meta.GetOptions{}
881 util.FromApiserverCache(&opts)
882 err := wait.PollImmediateInfinite(time.Second, func() (bool, error) {
883
884
885
886
887 _, lastErr = client.StorageV1().CSINodes().Get(context.TODO(), string(nodeName), opts)
888 if lastErr == nil || apierrors.IsNotFound(lastErr) {
889
890 return true, nil
891 }
892 klog.V(2).Infof("Failed to contact API server when waiting for CSINode publishing: %s", lastErr)
893 return false, nil
894 })
895 if err != nil {
896
897 return fmt.Errorf("%v: %v", err, lastErr)
898 }
899
900 return nil
901 }
902
View as plain text