1
16
17 package operationexecutor
18
19 import (
20 "context"
21 goerrors "errors"
22 "fmt"
23 "os"
24 "path/filepath"
25 "strings"
26 "time"
27
28 "k8s.io/apimachinery/pkg/api/resource"
29
30 v1 "k8s.io/api/core/v1"
31 "k8s.io/apimachinery/pkg/api/errors"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/types"
34 utilfeature "k8s.io/apiserver/pkg/util/feature"
35 clientset "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/tools/record"
37 volerr "k8s.io/cloud-provider/volume/errors"
38 storagehelpers "k8s.io/component-helpers/storage/volume"
39 csitrans "k8s.io/csi-translation-lib"
40 "k8s.io/klog/v2"
41 v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
42 "k8s.io/kubernetes/pkg/features"
43 kevents "k8s.io/kubernetes/pkg/kubelet/events"
44 "k8s.io/kubernetes/pkg/volume"
45 "k8s.io/kubernetes/pkg/volume/util"
46 "k8s.io/kubernetes/pkg/volume/util/hostutil"
47 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
48 "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
49 )
50
51 const (
52 unknownVolumePlugin string = "UnknownVolumePlugin"
53 unknownAttachableVolumePlugin string = "UnknownAttachableVolumePlugin"
54 DetachOperationName string = "volume_detach"
55 VerifyControllerAttachedVolumeOpName string = "verify_controller_attached_volume"
56 )
57
58
59
60 type InTreeToCSITranslator interface {
61 IsPVMigratable(pv *v1.PersistentVolume) bool
62 IsInlineMigratable(vol *v1.Volume) bool
63 IsMigratableIntreePluginByName(inTreePluginName string) bool
64 GetInTreePluginNameFromSpec(pv *v1.PersistentVolume, vol *v1.Volume) (string, error)
65 GetCSINameFromInTreeName(pluginName string) (string, error)
66 TranslateInTreePVToCSI(pv *v1.PersistentVolume) (*v1.PersistentVolume, error)
67 TranslateInTreeInlineVolumeToCSI(volume *v1.Volume, podNamespace string) (*v1.PersistentVolume, error)
68 }
69
70 var _ OperationGenerator = &operationGenerator{}
71
72 type operationGenerator struct {
73
74
75 kubeClient clientset.Interface
76
77
78
79 volumePluginMgr *volume.VolumePluginMgr
80
81
82 recorder record.EventRecorder
83
84
85 blkUtil volumepathhandler.BlockVolumePathHandler
86
87 translator InTreeToCSITranslator
88 }
89
90 type inTreeResizeResponse struct {
91 pvc *v1.PersistentVolumeClaim
92 pv *v1.PersistentVolume
93
94 err error
95
96
97 resizeCalled bool
98 }
99
100
101 func NewOperationGenerator(kubeClient clientset.Interface,
102 volumePluginMgr *volume.VolumePluginMgr,
103 recorder record.EventRecorder,
104 blkUtil volumepathhandler.BlockVolumePathHandler) OperationGenerator {
105
106 return &operationGenerator{
107 kubeClient: kubeClient,
108 volumePluginMgr: volumePluginMgr,
109 recorder: recorder,
110 blkUtil: blkUtil,
111 translator: csitrans.New(),
112 }
113 }
114
115
116 type OperationGenerator interface {
117
118 GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations
119
120
121 GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error)
122
123
124 GenerateAttachVolumeFunc(logger klog.Logger, volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations
125
126
127 GenerateDetachVolumeFunc(logger klog.Logger, volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
128
129
130 GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
131
132
133 GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter hostutil.HostUtils) (volumetypes.GeneratedOperations, error)
134
135
136 GenerateVerifyControllerAttachedVolumeFunc(logger klog.Logger, volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
137
138
139 GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
140
141
142 GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error)
143
144
145 GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, mounter hostutil.HostUtils) (volumetypes.GeneratedOperations, error)
146
147
148 GetVolumePluginMgr() *volume.VolumePluginMgr
149
150
151 GetCSITranslator() InTreeToCSITranslator
152
153 GenerateBulkVolumeVerifyFunc(
154 map[types.NodeName][]*volume.Spec,
155 string,
156 map[*volume.Spec]v1.UniqueVolumeName, ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error)
157
158 GenerateExpandVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume) (volumetypes.GeneratedOperations, error)
159
160 GenerateExpandAndRecoverVolumeFunc(*v1.PersistentVolumeClaim, *v1.PersistentVolume, string) (volumetypes.GeneratedOperations, error)
161
162
163
164
165 GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error)
166 }
167
168 type inTreeResizeOpts struct {
169 resizerName string
170 pvc *v1.PersistentVolumeClaim
171 pv *v1.PersistentVolume
172 volumeSpec *volume.Spec
173 volumePlugin volume.ExpandableVolumePlugin
174 }
175
176 type nodeResizeOperationOpts struct {
177 vmt VolumeToMount
178 pvc *v1.PersistentVolumeClaim
179 pv *v1.PersistentVolume
180 pluginResizeOpts volume.NodeResizeOptions
181 volumePlugin volume.NodeExpandableVolumePlugin
182 actualStateOfWorld ActualStateOfWorldMounterUpdater
183 }
184
185 func (og *operationGenerator) GenerateVolumesAreAttachedFunc(
186 attachedVolumes []AttachedVolume,
187 nodeName types.NodeName,
188 actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
189
190
191 volumesPerPlugin := make(map[string][]*volume.Spec)
192
193
194 volumeSpecMap := make(map[*volume.Spec]v1.UniqueVolumeName)
195
196
197 for _, volumeAttached := range attachedVolumes {
198 if volumeAttached.VolumeSpec == nil {
199 klog.Errorf("VerifyVolumesAreAttached.GenerateVolumesAreAttachedFunc: nil spec for volume %s", volumeAttached.VolumeName)
200 continue
201 }
202 volumePlugin, err :=
203 og.volumePluginMgr.FindPluginBySpec(volumeAttached.VolumeSpec)
204 if err != nil || volumePlugin == nil {
205 klog.Errorf(volumeAttached.GenerateErrorDetailed("VolumesAreAttached.FindPluginBySpec failed", err).Error())
206 continue
207 }
208 volumeSpecList, pluginExists := volumesPerPlugin[volumePlugin.GetPluginName()]
209 if !pluginExists {
210 volumeSpecList = []*volume.Spec{}
211 }
212 volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
213 volumesPerPlugin[volumePlugin.GetPluginName()] = volumeSpecList
214
215 volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
216 }
217
218 volumesAreAttachedFunc := func() volumetypes.OperationContext {
219
220
221
222 for pluginName, volumesSpecs := range volumesPerPlugin {
223 attachableVolumePlugin, err :=
224 og.volumePluginMgr.FindAttachablePluginByName(pluginName)
225 if err != nil || attachableVolumePlugin == nil {
226 klog.Errorf(
227 "VolumeAreAttached.FindAttachablePluginBySpec failed for plugin %q with: %v",
228 pluginName,
229 err)
230 continue
231 }
232
233 volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
234 if newAttacherErr != nil {
235 klog.Errorf(
236 "VolumesAreAttached.NewAttacher failed for getting plugin %q with: %v",
237 pluginName,
238 newAttacherErr)
239 continue
240 }
241
242 attached, areAttachedErr := volumeAttacher.VolumesAreAttached(volumesSpecs, nodeName)
243 if areAttachedErr != nil {
244 klog.Errorf(
245 "VolumesAreAttached failed for checking on node %q with: %v",
246 nodeName,
247 areAttachedErr)
248 continue
249 }
250
251 for spec, check := range attached {
252 if !check {
253 actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[spec], nodeName)
254 klog.V(1).Infof("VerifyVolumesAreAttached determined volume %q (spec.Name: %q) is no longer attached to node %q, therefore it was marked as detached.",
255 volumeSpecMap[spec], spec.Name(), nodeName)
256 }
257 }
258 }
259
260
261 return volumetypes.NewOperationContext(nil, nil, false)
262 }
263
264 return volumetypes.GeneratedOperations{
265 OperationName: "verify_volumes_are_attached_per_node",
266 OperationFunc: volumesAreAttachedFunc,
267 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume("<n/a>", nil), "verify_volumes_are_attached_per_node"),
268 EventRecorderFunc: nil,
269 }, nil
270 }
271
272 func (og *operationGenerator) GenerateBulkVolumeVerifyFunc(
273 pluginNodeVolumes map[types.NodeName][]*volume.Spec,
274 pluginName string,
275 volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
276 actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
277
278
279
280
281
282 bulkVolumeVerifyFunc := func() volumetypes.OperationContext {
283 attachableVolumePlugin, err :=
284 og.volumePluginMgr.FindAttachablePluginByName(pluginName)
285 if err != nil || attachableVolumePlugin == nil {
286 klog.Errorf(
287 "BulkVerifyVolume.FindAttachablePluginBySpec failed for plugin %q with: %v",
288 pluginName,
289 err)
290 return volumetypes.NewOperationContext(nil, nil, false)
291 }
292
293 volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
294
295 if newAttacherErr != nil {
296 klog.Errorf(
297 "BulkVerifyVolume.NewAttacher failed for getting plugin %q with: %v",
298 attachableVolumePlugin,
299 newAttacherErr)
300 return volumetypes.NewOperationContext(nil, nil, false)
301 }
302 bulkVolumeVerifier, ok := volumeAttacher.(volume.BulkVolumeVerifier)
303
304 if !ok {
305 klog.Errorf("BulkVerifyVolume failed to type assert attacher %q", bulkVolumeVerifier)
306 return volumetypes.NewOperationContext(nil, nil, false)
307 }
308
309 attached, bulkAttachErr := bulkVolumeVerifier.BulkVerifyVolumes(pluginNodeVolumes)
310 if bulkAttachErr != nil {
311 klog.Errorf("BulkVerifyVolume.BulkVerifyVolumes Error checking volumes are attached with %v", bulkAttachErr)
312 return volumetypes.NewOperationContext(nil, nil, false)
313 }
314
315 for nodeName, volumeSpecs := range pluginNodeVolumes {
316 for _, volumeSpec := range volumeSpecs {
317 nodeVolumeSpecs, nodeChecked := attached[nodeName]
318
319 if !nodeChecked {
320 klog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and leaving volume %q as attached",
321 nodeName,
322 volumeSpec.Name())
323 continue
324 }
325
326 check := nodeVolumeSpecs[volumeSpec]
327
328 if !check {
329 klog.V(2).Infof("VerifyVolumesAreAttached.BulkVerifyVolumes failed for node %q and volume %q",
330 nodeName,
331 volumeSpec.Name())
332 actualStateOfWorld.MarkVolumeAsDetached(volumeSpecMap[volumeSpec], nodeName)
333 }
334 }
335 }
336
337
338 return volumetypes.NewOperationContext(nil, nil, false)
339 }
340
341 return volumetypes.GeneratedOperations{
342 OperationName: "verify_volumes_are_attached",
343 OperationFunc: bulkVolumeVerifyFunc,
344 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, nil), "verify_volumes_are_attached"),
345 EventRecorderFunc: nil,
346 }, nil
347
348 }
349
350 func (og *operationGenerator) GenerateAttachVolumeFunc(
351 logger klog.Logger,
352 volumeToAttach VolumeToAttach,
353 actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
354
355 attachVolumeFunc := func() volumetypes.OperationContext {
356 attachableVolumePlugin, err :=
357 og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
358
359 migrated := getMigratedStatusBySpec(volumeToAttach.VolumeSpec)
360
361 if err != nil || attachableVolumePlugin == nil {
362 eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.FindAttachablePluginBySpec failed", err)
363 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
364 }
365
366 volumeAttacher, newAttacherErr := attachableVolumePlugin.NewAttacher()
367 if newAttacherErr != nil {
368 eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.NewAttacher failed", newAttacherErr)
369 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
370 }
371
372
373 devicePath, attachErr := volumeAttacher.Attach(
374 volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
375
376 if attachErr != nil {
377 uncertainNode := volumeToAttach.NodeName
378 if derr, ok := attachErr.(*volerr.DanglingAttachError); ok {
379 uncertainNode = derr.CurrentNode
380 }
381 addErr := actualStateOfWorld.MarkVolumeAsUncertain(
382 logger,
383 volumeToAttach.VolumeName,
384 volumeToAttach.VolumeSpec,
385 uncertainNode)
386 if addErr != nil {
387 klog.Errorf("AttachVolume.MarkVolumeAsUncertain fail to add the volume %q to actual state with %s", volumeToAttach.VolumeName, addErr)
388 }
389
390
391 eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.Attach failed", attachErr)
392 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
393 }
394
395
396 simpleMsg, _ := volumeToAttach.GenerateMsg("AttachVolume.Attach succeeded", "")
397 for _, pod := range volumeToAttach.ScheduledPods {
398 og.recorder.Eventf(pod, v1.EventTypeNormal, kevents.SuccessfulAttachVolume, simpleMsg)
399 }
400 klog.Infof(volumeToAttach.GenerateMsgDetailed("AttachVolume.Attach succeeded", ""))
401
402
403 addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
404 logger, v1.UniqueVolumeName(""), volumeToAttach.VolumeSpec, volumeToAttach.NodeName, devicePath)
405 if addVolumeNodeErr != nil {
406
407 eventErr, detailedErr := volumeToAttach.GenerateError("AttachVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
408 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
409 }
410
411 return volumetypes.NewOperationContext(nil, nil, migrated)
412 }
413
414 eventRecorderFunc := func(err *error) {
415 if *err != nil {
416 for _, pod := range volumeToAttach.ScheduledPods {
417 og.recorder.Eventf(pod, v1.EventTypeWarning, kevents.FailedAttachVolume, (*err).Error())
418 }
419 }
420 }
421
422 attachableVolumePluginName := unknownAttachableVolumePlugin
423
424
425 attachableVolumePlugin, err :=
426 og.volumePluginMgr.FindAttachablePluginBySpec(volumeToAttach.VolumeSpec)
427
428
429
430
431 if err == nil && attachableVolumePlugin != nil {
432 attachableVolumePluginName = attachableVolumePlugin.GetPluginName()
433 }
434
435 return volumetypes.GeneratedOperations{
436 OperationName: "volume_attach",
437 OperationFunc: attachVolumeFunc,
438 EventRecorderFunc: eventRecorderFunc,
439 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(attachableVolumePluginName, volumeToAttach.VolumeSpec), "volume_attach"),
440 }
441 }
442
443 func (og *operationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
444 return og.volumePluginMgr
445 }
446
447 func (og *operationGenerator) GetCSITranslator() InTreeToCSITranslator {
448 return og.translator
449 }
450
451 func (og *operationGenerator) GenerateDetachVolumeFunc(
452 logger klog.Logger,
453 volumeToDetach AttachedVolume,
454 verifySafeToDetach bool,
455 actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
456 var volumeName string
457 var attachableVolumePlugin volume.AttachableVolumePlugin
458 var pluginName string
459 var err error
460
461 if volumeToDetach.VolumeSpec != nil {
462 attachableVolumePlugin, err = findDetachablePluginBySpec(volumeToDetach.VolumeSpec, og.volumePluginMgr)
463 if err != nil || attachableVolumePlugin == nil {
464 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.findDetachablePluginBySpec failed", err)
465 }
466
467 volumeName, err =
468 attachableVolumePlugin.GetVolumeName(volumeToDetach.VolumeSpec)
469 if err != nil {
470 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.GetVolumeName failed", err)
471 }
472 } else {
473
474
475
476 pluginName, volumeName, err = util.SplitUniqueName(volumeToDetach.VolumeName)
477 if err != nil {
478 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.SplitUniqueName failed", err)
479 }
480
481 attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(pluginName)
482 if err != nil || attachableVolumePlugin == nil {
483 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginByName failed", err)
484 }
485
486 }
487
488 if pluginName == "" {
489 pluginName = attachableVolumePlugin.GetPluginName()
490 }
491
492 volumeDetacher, err := attachableVolumePlugin.NewDetacher()
493 if err != nil {
494 return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.NewDetacher failed", err)
495 }
496
497 detachVolumeFunc := func() volumetypes.OperationContext {
498 var err error
499 if verifySafeToDetach {
500 err = og.verifyVolumeIsSafeToDetach(volumeToDetach)
501 }
502 if err == nil {
503 err = volumeDetacher.Detach(volumeName, volumeToDetach.NodeName)
504 }
505
506 migrated := getMigratedStatusBySpec(volumeToDetach.VolumeSpec)
507
508 if err != nil {
509
510
511 uncertainError := actualStateOfWorld.MarkVolumeAsUncertain(
512 logger, volumeToDetach.VolumeName, volumeToDetach.VolumeSpec, volumeToDetach.NodeName)
513 if uncertainError != nil {
514 klog.Errorf("DetachVolume.MarkVolumeAsUncertain failed to add the volume %q to actual state after detach error: %s", volumeToDetach.VolumeName, uncertainError)
515 }
516 eventErr, detailedErr := volumeToDetach.GenerateError("DetachVolume.Detach failed", err)
517 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
518 }
519
520 klog.Infof(volumeToDetach.GenerateMsgDetailed("DetachVolume.Detach succeeded", ""))
521
522
523 actualStateOfWorld.MarkVolumeAsDetached(
524 volumeToDetach.VolumeName, volumeToDetach.NodeName)
525
526 return volumetypes.NewOperationContext(nil, nil, migrated)
527 }
528
529 return volumetypes.GeneratedOperations{
530 OperationName: DetachOperationName,
531 OperationFunc: detachVolumeFunc,
532 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(pluginName, volumeToDetach.VolumeSpec), DetachOperationName),
533 EventRecorderFunc: nil,
534 }, nil
535 }
536
537 func (og *operationGenerator) GenerateMountVolumeFunc(
538 waitForAttachTimeout time.Duration,
539 volumeToMount VolumeToMount,
540 actualStateOfWorld ActualStateOfWorldMounterUpdater,
541 isRemount bool) volumetypes.GeneratedOperations {
542
543 volumePluginName := unknownVolumePlugin
544 volumePlugin, err :=
545 og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
546 if err == nil && volumePlugin != nil {
547 volumePluginName = volumePlugin.GetPluginName()
548 }
549
550 mountVolumeFunc := func() volumetypes.OperationContext {
551
552 volumePlugin, err := og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
553
554 migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
555
556 if err != nil || volumePlugin == nil {
557 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.FindPluginBySpec failed", err)
558 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
559 }
560
561 affinityErr := checkNodeAffinity(og, volumeToMount)
562 if affinityErr != nil {
563 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NodeAffinity check failed", affinityErr)
564 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
565 }
566
567 volumeMounter, newMounterErr := volumePlugin.NewMounter(
568 volumeToMount.VolumeSpec,
569 volumeToMount.Pod,
570 volume.VolumeOptions{})
571 if newMounterErr != nil {
572 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.NewMounter initialization failed", newMounterErr)
573 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
574 }
575
576 mountCheckError := checkMountOptionSupport(og, volumeToMount, volumePlugin)
577 if mountCheckError != nil {
578 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountOptionSupport check failed", mountCheckError)
579 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
580 }
581
582
583 if actualStateOfWorld.IsVolumeMountedElsewhere(volumeToMount.VolumeName, volumeToMount.PodName) &&
584
585 len(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes) == 1 &&
586 v1helper.ContainsAccessMode(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes, v1.ReadWriteOncePod) {
587
588 err = goerrors.New("volume uses the ReadWriteOncePod access mode and is already in use by another pod")
589 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", err)
590 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
591 }
592
593
594 attachableVolumePlugin, _ :=
595 og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
596 var volumeAttacher volume.Attacher
597 if attachableVolumePlugin != nil {
598 volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
599 }
600
601
602 deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
603 var volumeDeviceMounter volume.DeviceMounter
604 if deviceMountableVolumePlugin != nil {
605 volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
606 }
607
608 var fsGroup *int64
609 var fsGroupChangePolicy *v1.PodFSGroupChangePolicy
610 if podSc := volumeToMount.Pod.Spec.SecurityContext; podSc != nil {
611 if podSc.FSGroup != nil {
612 fsGroup = podSc.FSGroup
613 }
614 if podSc.FSGroupChangePolicy != nil {
615 fsGroupChangePolicy = podSc.FSGroupChangePolicy
616 }
617 }
618
619 devicePath := volumeToMount.DevicePath
620 if volumeAttacher != nil {
621
622 klog.InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
623
624 devicePath, err = volumeAttacher.WaitForAttach(
625 volumeToMount.VolumeSpec, devicePath, volumeToMount.Pod, waitForAttachTimeout)
626 if err != nil {
627
628 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.WaitForAttach failed", err)
629 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
630 }
631
632 klog.InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath)), "pod", klog.KObj(volumeToMount.Pod))
633 }
634
635 var resizeError error
636 resizeOptions := volume.NodeResizeOptions{
637 DevicePath: devicePath,
638 }
639
640 if volumeDeviceMounter != nil && actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) != DeviceGloballyMounted {
641 deviceMountPath, err :=
642 volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
643 if err != nil {
644
645 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed", err)
646 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
647 }
648
649
650 err = volumeDeviceMounter.MountDevice(
651 volumeToMount.VolumeSpec,
652 devicePath,
653 deviceMountPath,
654 volume.DeviceMounterArgs{FsGroup: fsGroup, SELinuxLabel: volumeToMount.SELinuxLabel},
655 )
656 if err != nil {
657 og.checkForFailedMount(volumeToMount, err)
658 og.markDeviceErrorState(volumeToMount, devicePath, deviceMountPath, err, actualStateOfWorld)
659
660 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MountDevice failed", err)
661 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
662 }
663
664 klog.InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.MountDevice succeeded", fmt.Sprintf("device mount path %q", deviceMountPath)), "pod", klog.KObj(volumeToMount.Pod))
665
666
667 markDeviceMountedErr := actualStateOfWorld.MarkDeviceAsMounted(
668 volumeToMount.VolumeName, devicePath, deviceMountPath, volumeToMount.SELinuxLabel)
669 if markDeviceMountedErr != nil {
670
671 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkDeviceAsMounted failed", markDeviceMountedErr)
672 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
673 }
674
675 resizeOptions.DeviceStagePath = deviceMountPath
676 }
677
678 if volumeDeviceMounter != nil && resizeOptions.DeviceStagePath == "" {
679 deviceStagePath, err := volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
680 if err != nil {
681
682 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.GetDeviceMountPath failed for expansion", err)
683 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
684 }
685 resizeOptions.DeviceStagePath = deviceStagePath
686 }
687
688
689 mountErr := volumeMounter.SetUp(volume.MounterArgs{
690 FsUser: util.FsUserFrom(volumeToMount.Pod),
691 FsGroup: fsGroup,
692 DesiredSize: volumeToMount.DesiredSizeLimit,
693 FSGroupChangePolicy: fsGroupChangePolicy,
694 SELinuxLabel: volumeToMount.SELinuxLabel,
695 })
696
697 markOpts := MarkVolumeOpts{
698 PodName: volumeToMount.PodName,
699 PodUID: volumeToMount.Pod.UID,
700 VolumeName: volumeToMount.VolumeName,
701 Mounter: volumeMounter,
702 OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
703 VolumeGidVolume: volumeToMount.VolumeGidValue,
704 VolumeSpec: volumeToMount.VolumeSpec,
705 VolumeMountState: VolumeMounted,
706 SELinuxMountContext: volumeToMount.SELinuxLabel,
707 }
708 if mountErr != nil {
709 og.checkForFailedMount(volumeToMount, mountErr)
710 og.markVolumeErrorState(volumeToMount, markOpts, mountErr, actualStateOfWorld)
711
712 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.SetUp failed", mountErr)
713 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
714 }
715
716 detailedMsg := volumeToMount.GenerateMsgDetailed("MountVolume.SetUp succeeded", "")
717 verbosity := klog.Level(1)
718 if isRemount {
719 verbosity = klog.Level(4)
720 }
721 klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
722 resizeOptions.DeviceMountPath = volumeMounter.GetPath()
723
724 _, resizeError = og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions)
725 if resizeError != nil {
726 klog.Errorf("MountVolume.NodeExpandVolume failed with %v", resizeError)
727 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.Setup failed while expanding volume", resizeError)
728
729
730
731
732
733 if err := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts); err != nil {
734 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", err).Error())
735 }
736 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
737 }
738
739
740
741 mountRequestTime := volumeToMount.MountRequestTime
742 totalTimeTaken := time.Since(mountRequestTime).Seconds()
743 util.RecordOperationLatencyMetric(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "overall_volume_mount", totalTimeTaken)
744
745 markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markOpts)
746 if markVolMountedErr != nil {
747
748 eventErr, detailedErr := volumeToMount.GenerateError("MountVolume.MarkVolumeAsMounted failed", markVolMountedErr)
749 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
750 }
751 return volumetypes.NewOperationContext(nil, nil, migrated)
752 }
753
754 eventRecorderFunc := func(err *error) {
755 if *err != nil {
756 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, (*err).Error())
757 }
758 }
759
760 return volumetypes.GeneratedOperations{
761 OperationName: "volume_mount",
762 OperationFunc: mountVolumeFunc,
763 EventRecorderFunc: eventRecorderFunc,
764 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePluginName, volumeToMount.VolumeSpec), "volume_mount"),
765 }
766 }
767
768 func (og *operationGenerator) checkForFailedMount(volumeToMount VolumeToMount, mountError error) {
769 pv := volumeToMount.VolumeSpec.PersistentVolume
770 if pv == nil {
771 return
772 }
773
774 if volumetypes.IsFilesystemMismatchError(mountError) {
775 simpleMsg, _ := volumeToMount.GenerateMsg("MountVolume failed", mountError.Error())
776 og.recorder.Eventf(pv, v1.EventTypeWarning, kevents.FailedMountOnFilesystemMismatch, simpleMsg)
777 }
778 }
779
780 func (og *operationGenerator) markDeviceErrorState(volumeToMount VolumeToMount, devicePath, deviceMountPath string, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
781 if volumetypes.IsOperationFinishedError(mountError) &&
782 actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceMountUncertain {
783
784 if actualStateOfWorld.IsVolumeDeviceReconstructed(volumeToMount.VolumeName) {
785 klog.V(2).InfoS("MountVolume.markDeviceErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName)
786 return
787 }
788
789
790 markDeviceUnmountError := actualStateOfWorld.MarkDeviceAsUnmounted(volumeToMount.VolumeName)
791 if markDeviceUnmountError != nil {
792 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUnmounted failed", markDeviceUnmountError).Error())
793 }
794 return
795 }
796
797 if volumetypes.IsUncertainProgressError(mountError) &&
798 actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) == DeviceNotMounted {
799
800
801 markDeviceUncertainError := actualStateOfWorld.MarkDeviceAsUncertain(volumeToMount.VolumeName, devicePath, deviceMountPath, volumeToMount.SELinuxLabel)
802 if markDeviceUncertainError != nil {
803 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainError).Error())
804 }
805 }
806
807 }
808
809 func (og *operationGenerator) markVolumeErrorState(volumeToMount VolumeToMount, markOpts MarkVolumeOpts, mountError error, actualStateOfWorld ActualStateOfWorldMounterUpdater) {
810 if volumetypes.IsOperationFinishedError(mountError) &&
811 actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeMountUncertain {
812
813
814 if actualStateOfWorld.IsVolumeReconstructed(volumeToMount.VolumeName, volumeToMount.PodName) {
815 klog.V(3).InfoS("MountVolume.markVolumeErrorState leaving volume uncertain", "volumeName", volumeToMount.VolumeName)
816 return
817 }
818
819 t := actualStateOfWorld.MarkVolumeAsUnmounted(volumeToMount.PodName, volumeToMount.VolumeName)
820 if t != nil {
821 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeAsUnmounted failed", t).Error())
822 }
823 return
824
825 }
826
827 if volumetypes.IsUncertainProgressError(mountError) &&
828 actualStateOfWorld.GetVolumeMountState(volumeToMount.VolumeName, markOpts.PodName) == VolumeNotMounted {
829 t := actualStateOfWorld.MarkVolumeMountAsUncertain(markOpts)
830 if t != nil {
831 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", t).Error())
832 }
833 }
834 }
835
836 func (og *operationGenerator) GenerateUnmountVolumeFunc(
837 volumeToUnmount MountedVolume,
838 actualStateOfWorld ActualStateOfWorldMounterUpdater,
839 podsDir string) (volumetypes.GeneratedOperations, error) {
840
841 volumePlugin, err := og.volumePluginMgr.FindPluginByName(volumeToUnmount.PluginName)
842 if err != nil || volumePlugin == nil {
843 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.FindPluginByName failed", err)
844 }
845 volumeUnmounter, newUnmounterErr := volumePlugin.NewUnmounter(
846 volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID)
847 if newUnmounterErr != nil {
848 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmountVolume.NewUnmounter failed", newUnmounterErr)
849 }
850
851 unmountVolumeFunc := func() volumetypes.OperationContext {
852 subpather := og.volumePluginMgr.Host.GetSubpather()
853
854 migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec)
855
856
857 podDir := filepath.Join(podsDir, string(volumeToUnmount.PodUID))
858 if err := subpather.CleanSubPaths(podDir, volumeToUnmount.InnerVolumeSpecName); err != nil {
859 eventErr, detailedErr := volumeToUnmount.GenerateError("error cleaning subPath mounts", err)
860 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
861 }
862
863
864 unmountErr := volumeUnmounter.TearDown()
865 if unmountErr != nil {
866
867 opts := MarkVolumeOpts{
868 PodName: volumeToUnmount.PodName,
869 PodUID: volumeToUnmount.PodUID,
870 VolumeName: volumeToUnmount.VolumeName,
871 OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName,
872 VolumeGidVolume: volumeToUnmount.VolumeGidValue,
873 VolumeSpec: volumeToUnmount.VolumeSpec,
874 VolumeMountState: VolumeMountUncertain,
875 }
876 markMountUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(opts)
877 if markMountUncertainErr != nil {
878
879 klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeMountAsUncertain failed", markMountUncertainErr).Error())
880 }
881
882
883 eventErr, detailedErr := volumeToUnmount.GenerateError("UnmountVolume.TearDown failed", unmountErr)
884 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
885 }
886
887 klog.Infof(
888 "UnmountVolume.TearDown succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q",
889 volumeToUnmount.VolumeName,
890 volumeToUnmount.OuterVolumeSpecName,
891 volumeToUnmount.PodName,
892 volumeToUnmount.PodUID,
893 volumeToUnmount.InnerVolumeSpecName,
894 volumeToUnmount.PluginName,
895 volumeToUnmount.VolumeGidValue)
896
897
898 markVolMountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
899 volumeToUnmount.PodName, volumeToUnmount.VolumeName)
900 if markVolMountedErr != nil {
901
902 klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmountVolume.MarkVolumeAsUnmounted failed", markVolMountedErr).Error())
903 }
904
905 return volumetypes.NewOperationContext(nil, nil, migrated)
906 }
907
908 return volumetypes.GeneratedOperations{
909 OperationName: "volume_unmount",
910 OperationFunc: unmountVolumeFunc,
911 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "volume_unmount"),
912 EventRecorderFunc: nil,
913 }, nil
914 }
915
916 func (og *operationGenerator) GenerateUnmountDeviceFunc(
917 deviceToDetach AttachedVolume,
918 actualStateOfWorld ActualStateOfWorldMounterUpdater,
919 hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
920
921 deviceMountableVolumePlugin, err :=
922 og.volumePluginMgr.FindDeviceMountablePluginByName(deviceToDetach.PluginName)
923 if err != nil || deviceMountableVolumePlugin == nil {
924 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.FindDeviceMountablePluginByName failed", err)
925 }
926
927 volumeDeviceUnmounter, err := deviceMountableVolumePlugin.NewDeviceUnmounter()
928 if err != nil {
929 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceUnmounter failed", err)
930 }
931
932 volumeDeviceMounter, err := deviceMountableVolumePlugin.NewDeviceMounter()
933 if err != nil {
934 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmountDevice.NewDeviceMounter failed", err)
935 }
936
937 unmountDeviceFunc := func() volumetypes.OperationContext {
938
939 migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec)
940
941
942 deviceMountPath, err :=
943 volumeDeviceMounter.GetDeviceMountPath(deviceToDetach.VolumeSpec)
944 if err != nil {
945
946 if !strings.Contains(err.Error(), "does not exist") {
947 eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountPath failed", err)
948 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
949 }
950
951
952 deviceMountPath = deviceToDetach.DeviceMountPath
953 klog.Warningf(deviceToDetach.GenerateMsgDetailed(fmt.Sprintf(
954 "GetDeviceMountPath failed, but unmount operation will proceed using deviceMountPath=%s: %v", deviceMountPath, err), ""))
955 }
956 refs, err := deviceMountableVolumePlugin.GetDeviceMountRefs(deviceMountPath)
957
958 if err != nil || util.HasMountRefs(deviceMountPath, refs) {
959 if err == nil {
960 err = fmt.Errorf("the device mount path %q is still mounted by other references %v", deviceMountPath, refs)
961 }
962 eventErr, detailedErr := deviceToDetach.GenerateError("GetDeviceMountRefs check failed", err)
963 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
964 }
965
966 unmountDeviceErr := volumeDeviceUnmounter.UnmountDevice(deviceMountPath)
967 if unmountDeviceErr != nil {
968
969 markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(deviceToDetach.VolumeName, deviceToDetach.DevicePath, deviceMountPath, deviceToDetach.SELinuxMountContext)
970 if markDeviceUncertainErr != nil {
971
972 klog.Errorf(deviceToDetach.GenerateErrorDetailed("UnmountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr).Error())
973 }
974
975
976 eventErr, detailedErr := deviceToDetach.GenerateError("UnmountDevice failed", unmountDeviceErr)
977 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
978 }
979
980
981
982
983 deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil)
984 if deviceOpenedErr != nil {
985 return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated)
986 }
987
988 if deviceOpened {
989
990 markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(deviceToDetach.VolumeName, deviceToDetach.DevicePath, deviceMountPath, deviceToDetach.SELinuxMountContext)
991 if markDeviceUncertainErr != nil {
992
993 klog.Errorf(deviceToDetach.GenerateErrorDetailed("UnmountDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr).Error())
994 }
995 eventErr, detailedErr := deviceToDetach.GenerateError(
996 "UnmountDevice failed",
997 goerrors.New("the device is in use when it was no longer expected to be in use"))
998 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
999 }
1000
1001 klog.Info(deviceToDetach.GenerateMsgDetailed("UnmountDevice succeeded", ""))
1002
1003
1004 markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted(
1005 deviceToDetach.VolumeName)
1006 if markDeviceUnmountedErr != nil {
1007
1008 eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
1009 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1010 }
1011
1012 return volumetypes.NewOperationContext(nil, nil, migrated)
1013 }
1014
1015 return volumetypes.GeneratedOperations{
1016 OperationName: "unmount_device",
1017 OperationFunc: unmountDeviceFunc,
1018 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(deviceMountableVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmount_device"),
1019 EventRecorderFunc: nil,
1020 }, nil
1021 }
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032 func (og *operationGenerator) GenerateMapVolumeFunc(
1033 waitForAttachTimeout time.Duration,
1034 volumeToMount VolumeToMount,
1035 actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
1036
1037
1038 blockVolumePlugin, err :=
1039 og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec)
1040 if err != nil {
1041 return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed", err)
1042 }
1043
1044 if blockVolumePlugin == nil {
1045 return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
1046 }
1047
1048 affinityErr := checkNodeAffinity(og, volumeToMount)
1049 if affinityErr != nil {
1050 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NodeAffinity check failed", affinityErr)
1051 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error())
1052 return volumetypes.GeneratedOperations{}, detailedErr
1053 }
1054 blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper(
1055 volumeToMount.VolumeSpec,
1056 volumeToMount.Pod,
1057 volume.VolumeOptions{})
1058 if newMapperErr != nil {
1059 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr)
1060 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, eventErr.Error())
1061 return volumetypes.GeneratedOperations{}, detailedErr
1062 }
1063
1064
1065 attachableVolumePlugin, _ :=
1066 og.volumePluginMgr.FindAttachablePluginBySpec(volumeToMount.VolumeSpec)
1067 var volumeAttacher volume.Attacher
1068 if attachableVolumePlugin != nil {
1069 volumeAttacher, _ = attachableVolumePlugin.NewAttacher()
1070 }
1071
1072 mapVolumeFunc := func() (operationContext volumetypes.OperationContext) {
1073 var devicePath string
1074 var stagingPath string
1075
1076 migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
1077
1078
1079 if actualStateOfWorld.IsVolumeMountedElsewhere(volumeToMount.VolumeName, volumeToMount.PodName) &&
1080
1081 len(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes) == 1 &&
1082 v1helper.ContainsAccessMode(volumeToMount.VolumeSpec.PersistentVolume.Spec.AccessModes, v1.ReadWriteOncePod) {
1083
1084 err = goerrors.New("volume uses the ReadWriteOncePod access mode and is already in use by another pod")
1085 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUpDevice failed", err)
1086 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1087 }
1088
1089
1090 globalMapPath, err :=
1091 blockVolumeMapper.GetGlobalMapPath(volumeToMount.VolumeSpec)
1092 if err != nil {
1093
1094 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.GetGlobalMapPath failed", err)
1095 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1096 }
1097 if volumeAttacher != nil {
1098
1099 klog.InfoS(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
1100
1101 devicePath, err = volumeAttacher.WaitForAttach(
1102 volumeToMount.VolumeSpec, volumeToMount.DevicePath, volumeToMount.Pod, waitForAttachTimeout)
1103 if err != nil {
1104
1105 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.WaitForAttach failed", err)
1106 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1107 }
1108
1109 klog.InfoS(volumeToMount.GenerateMsgDetailed("MapVolume.WaitForAttach succeeded", fmt.Sprintf("DevicePath %q", devicePath)), "pod", klog.KObj(volumeToMount.Pod))
1110
1111 }
1112
1113 if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok && actualStateOfWorld.GetDeviceMountState(volumeToMount.VolumeName) != DeviceGloballyMounted {
1114 var mapErr error
1115 stagingPath, mapErr = customBlockVolumeMapper.SetUpDevice()
1116 if mapErr != nil {
1117 og.markDeviceErrorState(volumeToMount, devicePath, globalMapPath, mapErr, actualStateOfWorld)
1118
1119 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.SetUpDevice failed", mapErr)
1120 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1121 }
1122 }
1123
1124
1125 markedDevicePath := devicePath
1126 markDeviceMappedErr := actualStateOfWorld.MarkDeviceAsMounted(
1127 volumeToMount.VolumeName, markedDevicePath, globalMapPath, "")
1128 if markDeviceMappedErr != nil {
1129
1130 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
1131 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1132 }
1133
1134 markVolumeOpts := MarkVolumeOpts{
1135 PodName: volumeToMount.PodName,
1136 PodUID: volumeToMount.Pod.UID,
1137 VolumeName: volumeToMount.VolumeName,
1138 BlockVolumeMapper: blockVolumeMapper,
1139 OuterVolumeSpecName: volumeToMount.OuterVolumeSpecName,
1140 VolumeGidVolume: volumeToMount.VolumeGidValue,
1141 VolumeSpec: volumeToMount.VolumeSpec,
1142 VolumeMountState: VolumeMounted,
1143 }
1144
1145
1146 if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok {
1147
1148 pluginDevicePath, mapErr := customBlockVolumeMapper.MapPodDevice()
1149 if mapErr != nil {
1150
1151 og.markVolumeErrorState(volumeToMount, markVolumeOpts, mapErr, actualStateOfWorld)
1152 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapPodDevice failed", mapErr)
1153 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1154 }
1155
1156
1157
1158 defer func() {
1159 if operationContext.EventErr != nil {
1160 errText := operationContext.EventErr.Error()
1161 og.markVolumeErrorState(volumeToMount, markVolumeOpts, volumetypes.NewUncertainProgressError(errText), actualStateOfWorld)
1162 }
1163 }()
1164
1165
1166
1167 if len(pluginDevicePath) != 0 {
1168 devicePath = pluginDevicePath
1169 }
1170 if len(devicePath) == 0 {
1171 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume failed", goerrors.New("device path of the volume is empty"))
1172 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1173 }
1174 }
1175
1176
1177
1178
1179
1180 kvh, ok := og.GetVolumePluginMgr().Host.(volume.KubeletVolumeHost)
1181 if !ok {
1182 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume type assertion error", fmt.Errorf("volume host does not implement KubeletVolumeHost interface"))
1183 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1184 }
1185 hu := kvh.GetHostUtil()
1186 devicePath, err = hu.EvalHostSymlinks(devicePath)
1187 if err != nil {
1188 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.EvalHostSymlinks failed", err)
1189 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1190 }
1191
1192
1193
1194 if markedDevicePath != devicePath {
1195 markDeviceMappedErr := actualStateOfWorld.MarkDeviceAsMounted(
1196 volumeToMount.VolumeName, devicePath, globalMapPath, "")
1197 if markDeviceMappedErr != nil {
1198
1199 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkDeviceAsMounted failed", markDeviceMappedErr)
1200 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1201 }
1202 }
1203
1204
1205 volumeMapPath, volName := blockVolumeMapper.GetPodDeviceMapPath()
1206 mapErr := util.MapBlockVolume(og.blkUtil, devicePath, globalMapPath, volumeMapPath, volName, volumeToMount.Pod.UID)
1207 if mapErr != nil {
1208
1209 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MapBlockVolume failed", mapErr)
1210 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1211 }
1212
1213
1214 simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MapVolume.MapPodDevice succeeded", fmt.Sprintf("globalMapPath %q", globalMapPath))
1215 verbosity := klog.Level(4)
1216 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.SuccessfulMountVolume, simpleMsg)
1217 klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
1218
1219
1220 simpleMsg, detailedMsg = volumeToMount.GenerateMsg("MapVolume.MapPodDevice succeeded", fmt.Sprintf("volumeMapPath %q", volumeMapPath))
1221 verbosity = klog.Level(1)
1222 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.SuccessfulMountVolume, simpleMsg)
1223 klog.V(verbosity).InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
1224
1225 resizeOptions := volume.NodeResizeOptions{
1226 DevicePath: devicePath,
1227 DeviceStagePath: stagingPath,
1228 }
1229 _, resizeError := og.expandVolumeDuringMount(volumeToMount, actualStateOfWorld, resizeOptions)
1230 if resizeError != nil {
1231 klog.Errorf("MapVolume.NodeExpandVolume failed with %v", resizeError)
1232 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed while expanding volume", resizeError)
1233
1234
1235
1236
1237
1238 if err := actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts); err != nil {
1239 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.MarkVolumeMountAsUncertain failed", err).Error())
1240 }
1241 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1242 }
1243
1244 markVolMountedErr := actualStateOfWorld.MarkVolumeAsMounted(markVolumeOpts)
1245 if markVolMountedErr != nil {
1246
1247 eventErr, detailedErr := volumeToMount.GenerateError("MapVolume.MarkVolumeAsMounted failed", markVolMountedErr)
1248 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1249 }
1250
1251 return volumetypes.NewOperationContext(nil, nil, migrated)
1252 }
1253
1254 eventRecorderFunc := func(err *error) {
1255 if *err != nil {
1256 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMapVolume, (*err).Error())
1257 }
1258 }
1259
1260 return volumetypes.GeneratedOperations{
1261 OperationName: "map_volume",
1262 OperationFunc: mapVolumeFunc,
1263 EventRecorderFunc: eventRecorderFunc,
1264 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "map_volume"),
1265 }, nil
1266 }
1267
1268
1269
1270
1271
1272 func (og *operationGenerator) GenerateUnmapVolumeFunc(
1273 volumeToUnmount MountedVolume,
1274 actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
1275
1276
1277 blockVolumePlugin, err :=
1278 og.volumePluginMgr.FindMapperPluginByName(volumeToUnmount.PluginName)
1279 if err != nil {
1280 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed", err)
1281 }
1282 if blockVolumePlugin == nil {
1283 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
1284 }
1285 blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
1286 volumeToUnmount.InnerVolumeSpecName, volumeToUnmount.PodUID)
1287 if newUnmapperErr != nil {
1288 return volumetypes.GeneratedOperations{}, volumeToUnmount.GenerateErrorDetailed("UnmapVolume.NewUnmapper failed", newUnmapperErr)
1289 }
1290
1291 unmapVolumeFunc := func() volumetypes.OperationContext {
1292
1293 migrated := getMigratedStatusBySpec(volumeToUnmount.VolumeSpec)
1294
1295
1296 podDeviceUnmapPath, volName := blockVolumeUnmapper.GetPodDeviceMapPath()
1297
1298 globalUnmapPath, err := blockVolumeUnmapper.GetGlobalMapPath(volumeToUnmount.VolumeSpec)
1299 if err != nil {
1300
1301 eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.GetGlobalMapPath failed", err)
1302 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1303 }
1304
1305
1306
1307
1308 markVolumeOpts := MarkVolumeOpts{
1309 PodName: volumeToUnmount.PodName,
1310 PodUID: volumeToUnmount.PodUID,
1311 VolumeName: volumeToUnmount.VolumeName,
1312 OuterVolumeSpecName: volumeToUnmount.OuterVolumeSpecName,
1313 VolumeGidVolume: volumeToUnmount.VolumeGidValue,
1314 VolumeSpec: volumeToUnmount.VolumeSpec,
1315 VolumeMountState: VolumeMountUncertain,
1316 }
1317 markVolumeUncertainErr := actualStateOfWorld.MarkVolumeMountAsUncertain(markVolumeOpts)
1318 if markVolumeUncertainErr != nil {
1319
1320 eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.MarkDeviceAsUncertain failed", markVolumeUncertainErr)
1321 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1322 }
1323
1324
1325 unmapErr := util.UnmapBlockVolume(og.blkUtil, globalUnmapPath, podDeviceUnmapPath, volName, volumeToUnmount.PodUID)
1326 if unmapErr != nil {
1327
1328 eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapBlockVolume failed", unmapErr)
1329 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1330 }
1331
1332
1333 if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok {
1334
1335 unmapErr = customBlockVolumeUnmapper.UnmapPodDevice()
1336 if unmapErr != nil {
1337
1338 eventErr, detailedErr := volumeToUnmount.GenerateError("UnmapVolume.UnmapPodDevice failed", unmapErr)
1339 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1340 }
1341 }
1342
1343 klog.Infof(
1344 "UnmapVolume succeeded for volume %q (OuterVolumeSpecName: %q) pod %q (UID: %q). InnerVolumeSpecName %q. PluginName %q, VolumeGidValue %q",
1345 volumeToUnmount.VolumeName,
1346 volumeToUnmount.OuterVolumeSpecName,
1347 volumeToUnmount.PodName,
1348 volumeToUnmount.PodUID,
1349 volumeToUnmount.InnerVolumeSpecName,
1350 volumeToUnmount.PluginName,
1351 volumeToUnmount.VolumeGidValue)
1352
1353
1354 markVolUnmountedErr := actualStateOfWorld.MarkVolumeAsUnmounted(
1355 volumeToUnmount.PodName, volumeToUnmount.VolumeName)
1356 if markVolUnmountedErr != nil {
1357
1358 klog.Errorf(volumeToUnmount.GenerateErrorDetailed("UnmapVolume.MarkVolumeAsUnmounted failed", markVolUnmountedErr).Error())
1359 }
1360
1361 return volumetypes.NewOperationContext(nil, nil, migrated)
1362 }
1363
1364 return volumetypes.GeneratedOperations{
1365 OperationName: "unmap_volume",
1366 OperationFunc: unmapVolumeFunc,
1367 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), volumeToUnmount.VolumeSpec), "unmap_volume"),
1368 EventRecorderFunc: nil,
1369 }, nil
1370 }
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382 func (og *operationGenerator) GenerateUnmapDeviceFunc(
1383 deviceToDetach AttachedVolume,
1384 actualStateOfWorld ActualStateOfWorldMounterUpdater,
1385 hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
1386
1387 blockVolumePlugin, err :=
1388 og.volumePluginMgr.FindMapperPluginByName(deviceToDetach.PluginName)
1389 if err != nil {
1390 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed", err)
1391 }
1392
1393 if blockVolumePlugin == nil {
1394 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
1395 }
1396
1397 blockVolumeUnmapper, newUnmapperErr := blockVolumePlugin.NewBlockVolumeUnmapper(
1398 deviceToDetach.VolumeSpec.Name(),
1399 "" )
1400 if newUnmapperErr != nil {
1401 return volumetypes.GeneratedOperations{}, deviceToDetach.GenerateErrorDetailed("UnmapDevice.NewUnmapper failed", newUnmapperErr)
1402 }
1403
1404 unmapDeviceFunc := func() volumetypes.OperationContext {
1405 migrated := getMigratedStatusBySpec(deviceToDetach.VolumeSpec)
1406
1407
1408 globalMapPath := deviceToDetach.DeviceMountPath
1409 refs, err := og.blkUtil.GetDeviceBindMountRefs(deviceToDetach.DevicePath, globalMapPath)
1410 if err != nil {
1411 if os.IsNotExist(err) {
1412
1413 refs = nil
1414 } else {
1415 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.GetDeviceBindMountRefs check failed", err)
1416 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1417 }
1418 }
1419 if len(refs) > 0 {
1420 err = fmt.Errorf("the device %q is still referenced from other Pods %v", globalMapPath, refs)
1421 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice failed", err)
1422 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1423 }
1424
1425
1426
1427
1428 markDeviceUncertainErr := actualStateOfWorld.MarkDeviceAsUncertain(
1429 deviceToDetach.VolumeName, deviceToDetach.DevicePath, globalMapPath, "" )
1430 if markDeviceUncertainErr != nil {
1431
1432 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.MarkDeviceAsUncertain failed", markDeviceUncertainErr)
1433 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1434 }
1435
1436
1437 if customBlockVolumeUnmapper, ok := blockVolumeUnmapper.(volume.CustomBlockVolumeUnmapper); ok {
1438
1439 unmapErr := customBlockVolumeUnmapper.TearDownDevice(globalMapPath, deviceToDetach.DevicePath)
1440 if unmapErr != nil {
1441
1442 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.TearDownDevice failed", unmapErr)
1443 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1444 }
1445 }
1446
1447
1448
1449 removeMapPathErr := og.blkUtil.RemoveMapPath(globalMapPath)
1450 if removeMapPathErr != nil {
1451
1452 eventErr, detailedErr := deviceToDetach.GenerateError("UnmapDevice.RemoveMapPath failed", removeMapPathErr)
1453 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1454 }
1455
1456
1457
1458
1459
1460 deviceOpened, deviceOpenedErr := isDeviceOpened(deviceToDetach, hostutil)
1461 if deviceOpenedErr != nil {
1462 return volumetypes.NewOperationContext(nil, deviceOpenedErr, migrated)
1463 }
1464
1465 if deviceOpened {
1466 eventErr, detailedErr := deviceToDetach.GenerateError(
1467 "UnmapDevice failed",
1468 fmt.Errorf("the device is in use when it was no longer expected to be in use"))
1469 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1470 }
1471
1472 klog.Infof(deviceToDetach.GenerateMsgDetailed("UnmapDevice succeeded", ""))
1473
1474
1475 markDeviceUnmountedErr := actualStateOfWorld.MarkDeviceAsUnmounted(
1476 deviceToDetach.VolumeName)
1477 if markDeviceUnmountedErr != nil {
1478
1479 eventErr, detailedErr := deviceToDetach.GenerateError("MarkDeviceAsUnmounted failed", markDeviceUnmountedErr)
1480 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1481 }
1482
1483 return volumetypes.NewOperationContext(nil, nil, migrated)
1484 }
1485
1486 return volumetypes.GeneratedOperations{
1487 OperationName: "unmap_device",
1488 OperationFunc: unmapDeviceFunc,
1489 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(blockVolumePlugin.GetPluginName(), deviceToDetach.VolumeSpec), "unmap_device"),
1490 EventRecorderFunc: nil,
1491 }, nil
1492 }
1493
1494 func (og *operationGenerator) GenerateVerifyControllerAttachedVolumeFunc(
1495 logger klog.Logger,
1496 volumeToMount VolumeToMount,
1497 nodeName types.NodeName,
1498 actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
1499 volumePlugin, err :=
1500 og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
1501 if err != nil || volumePlugin == nil {
1502 return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("VerifyControllerAttachedVolume.FindPluginBySpec failed", err)
1503 }
1504
1505
1506
1507
1508
1509 if volumeToMount.PluginIsAttachable {
1510 cachedAttachedVolumes, _ := og.volumePluginMgr.Host.GetAttachedVolumesFromNodeStatus()
1511 if cachedAttachedVolumes != nil {
1512 _, volumeFound := cachedAttachedVolumes[volumeToMount.VolumeName]
1513 if !volumeFound {
1514 return volumetypes.GeneratedOperations{}, NewMountPreConditionFailedError(fmt.Sprintf("volume %s is not yet in node's status", volumeToMount.VolumeName))
1515 }
1516 }
1517 }
1518
1519 verifyControllerAttachedVolumeFunc := func() volumetypes.OperationContext {
1520 migrated := getMigratedStatusBySpec(volumeToMount.VolumeSpec)
1521 claimSize := actualStateOfWorld.GetClaimSize(volumeToMount.VolumeName)
1522
1523
1524 if volumeToMount.VolumeSpec.PersistentVolume != nil && claimSize == nil && !volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
1525 pv := volumeToMount.VolumeSpec.PersistentVolume
1526 pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
1527 if err != nil {
1528 eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume fetching pvc failed", err)
1529 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1530 }
1531 pvcStatusSize := pvc.Status.Capacity.Storage()
1532 if pvcStatusSize != nil {
1533 claimSize = pvcStatusSize
1534 }
1535 }
1536
1537 if !volumeToMount.PluginIsAttachable {
1538
1539
1540
1541
1542 addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
1543 logger, volumeToMount.VolumeName, volumeToMount.VolumeSpec, nodeName, "" )
1544 if addVolumeNodeErr != nil {
1545
1546 eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttachedByUniqueVolumeName failed", addVolumeNodeErr)
1547 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1548 }
1549 actualStateOfWorld.InitializeClaimSize(logger, volumeToMount.VolumeName, claimSize)
1550 return volumetypes.NewOperationContext(nil, nil, migrated)
1551 }
1552
1553 if !volumeToMount.ReportedInUse {
1554
1555
1556
1557
1558
1559
1560 eventErr, detailedErr := volumeToMount.GenerateError("Volume has not been added to the list of VolumesInUse in the node's volume status", nil)
1561 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1562 }
1563
1564
1565 node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(nodeName), metav1.GetOptions{})
1566 if fetchErr != nil {
1567
1568 eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume failed fetching node from API server", fetchErr)
1569 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1570 }
1571
1572 for _, attachedVolume := range node.Status.VolumesAttached {
1573 if attachedVolume.Name == volumeToMount.VolumeName {
1574 addVolumeNodeErr := actualStateOfWorld.MarkVolumeAsAttached(
1575 logger, v1.UniqueVolumeName(""), volumeToMount.VolumeSpec, nodeName, attachedVolume.DevicePath)
1576 klog.InfoS(volumeToMount.GenerateMsgDetailed("Controller attach succeeded", fmt.Sprintf("device path: %q", attachedVolume.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
1577 if addVolumeNodeErr != nil {
1578
1579 eventErr, detailedErr := volumeToMount.GenerateError("VerifyControllerAttachedVolume.MarkVolumeAsAttached failed", addVolumeNodeErr)
1580 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1581 }
1582 actualStateOfWorld.InitializeClaimSize(logger, volumeToMount.VolumeName, claimSize)
1583 return volumetypes.NewOperationContext(nil, nil, migrated)
1584 }
1585 }
1586
1587
1588 eventErr, detailedErr := volumeToMount.GenerateError("Volume not attached according to node status", nil)
1589 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1590 }
1591
1592 return volumetypes.GeneratedOperations{
1593 OperationName: VerifyControllerAttachedVolumeOpName,
1594 OperationFunc: verifyControllerAttachedVolumeFunc,
1595 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "verify_controller_attached_volume"),
1596 EventRecorderFunc: nil,
1597 }, nil
1598
1599 }
1600
1601 func (og *operationGenerator) verifyVolumeIsSafeToDetach(
1602 volumeToDetach AttachedVolume) error {
1603
1604 node, fetchErr := og.kubeClient.CoreV1().Nodes().Get(context.TODO(), string(volumeToDetach.NodeName), metav1.GetOptions{})
1605 if fetchErr != nil {
1606 if errors.IsNotFound(fetchErr) {
1607 klog.Warningf(volumeToDetach.GenerateMsgDetailed("Node not found on API server. DetachVolume will skip safe to detach check", ""))
1608 return nil
1609 }
1610
1611
1612 return volumeToDetach.GenerateErrorDetailed("DetachVolume failed fetching node from API server", fetchErr)
1613 }
1614
1615 for _, inUseVolume := range node.Status.VolumesInUse {
1616 if inUseVolume == volumeToDetach.VolumeName {
1617 return volumeToDetach.GenerateErrorDetailed(
1618 "DetachVolume failed",
1619 fmt.Errorf("volume is still in use by node, according to Node status"))
1620 }
1621 }
1622
1623
1624 klog.Infof(volumeToDetach.GenerateMsgDetailed("Verified volume is safe to detach", ""))
1625 return nil
1626 }
1627
1628 func (og *operationGenerator) GenerateExpandVolumeFunc(
1629 pvc *v1.PersistentVolumeClaim,
1630 pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
1631
1632 volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
1633
1634 volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
1635 if err != nil {
1636 return volumetypes.GeneratedOperations{}, fmt.Errorf("error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
1637 }
1638
1639 if volumePlugin == nil {
1640 return volumetypes.GeneratedOperations{}, fmt.Errorf("can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc))
1641 }
1642
1643 expandVolumeFunc := func() volumetypes.OperationContext {
1644 migrated := false
1645
1646 newSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
1647 statusSize := pvc.Status.Capacity[v1.ResourceStorage]
1648 pvSize := pv.Spec.Capacity[v1.ResourceStorage]
1649 if pvSize.Cmp(newSize) < 0 {
1650 updatedSize, expandErr := volumePlugin.ExpandVolumeDevice(
1651 volumeSpec,
1652 newSize,
1653 statusSize)
1654 if expandErr != nil {
1655 detailedErr := fmt.Errorf("error expanding volume %q of plugin %q: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), volumePlugin.GetPluginName(), expandErr)
1656 return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
1657 }
1658
1659 klog.Infof("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
1660
1661 newSize = updatedSize
1662
1663
1664
1665 _, updateErr := util.UpdatePVSize(pv, newSize, og.kubeClient)
1666 if updateErr != nil {
1667 detailedErr := fmt.Errorf("error updating PV spec capacity for volume %q with : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr)
1668 return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
1669 }
1670
1671 klog.Infof("ExpandVolume.UpdatePV succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
1672 }
1673
1674 fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec)
1675
1676
1677
1678 if !volumePlugin.RequiresFSResize() || !fsVolume {
1679 klog.V(4).Infof("Controller resizing done for PVC %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
1680 _, err := util.MarkResizeFinished(pvc, newSize, og.kubeClient)
1681 if err != nil {
1682 detailedErr := fmt.Errorf("error marking pvc %s as resized : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
1683 return volumetypes.NewOperationContext(detailedErr, detailedErr, migrated)
1684 }
1685 successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
1686 og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
1687 } else {
1688 _, err := util.MarkForFSResize(pvc, og.kubeClient)
1689 if err != nil {
1690 detailedErr := fmt.Errorf("error updating pvc %s condition for fs resize : %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
1691 klog.Warning(detailedErr)
1692 return volumetypes.NewOperationContext(nil, nil, migrated)
1693 }
1694 oldCapacity := pvc.Status.Capacity[v1.ResourceStorage]
1695 err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient)
1696 if err != nil {
1697 detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err)
1698 klog.Warning(detailedErr)
1699 return volumetypes.NewOperationContext(nil, nil, migrated)
1700 }
1701
1702 }
1703 return volumetypes.NewOperationContext(nil, nil, migrated)
1704 }
1705
1706 eventRecorderFunc := func(err *error) {
1707 if *err != nil {
1708 og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
1709 }
1710 }
1711
1712 return volumetypes.GeneratedOperations{
1713 OperationName: "expand_volume",
1714 OperationFunc: expandVolumeFunc,
1715 EventRecorderFunc: eventRecorderFunc,
1716 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeSpec), "expand_volume"),
1717 }, nil
1718 }
1719
1720 func (og *operationGenerator) GenerateExpandAndRecoverVolumeFunc(
1721 pvc *v1.PersistentVolumeClaim,
1722 pv *v1.PersistentVolume, resizerName string) (volumetypes.GeneratedOperations, error) {
1723
1724 volumeSpec := volume.NewSpecFromPersistentVolume(pv, false)
1725
1726 volumePlugin, err := og.volumePluginMgr.FindExpandablePluginBySpec(volumeSpec)
1727 if err != nil {
1728 return volumetypes.GeneratedOperations{}, fmt.Errorf("error finding plugin for expanding volume: %q with error %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
1729 }
1730
1731 if volumePlugin == nil {
1732 return volumetypes.GeneratedOperations{}, fmt.Errorf("can not find plugin for expanding volume: %q", util.GetPersistentVolumeClaimQualifiedName(pvc))
1733 }
1734
1735 expandVolumeFunc := func() volumetypes.OperationContext {
1736 resizeOpts := inTreeResizeOpts{
1737 pvc: pvc,
1738 pv: pv,
1739 resizerName: resizerName,
1740 volumePlugin: volumePlugin,
1741 volumeSpec: volumeSpec,
1742 }
1743 migrated := false
1744 resp := og.expandAndRecoverFunction(resizeOpts)
1745 if resp.err != nil {
1746 return volumetypes.NewOperationContext(resp.err, resp.err, migrated)
1747 }
1748 return volumetypes.NewOperationContext(nil, nil, migrated)
1749 }
1750
1751 eventRecorderFunc := func(err *error) {
1752 if *err != nil {
1753 og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
1754 }
1755 }
1756
1757 return volumetypes.GeneratedOperations{
1758 OperationName: "expand_volume",
1759 OperationFunc: expandVolumeFunc,
1760 EventRecorderFunc: eventRecorderFunc,
1761 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeSpec), "expand_volume"),
1762 }, nil
1763 }
1764
1765 func (og *operationGenerator) expandAndRecoverFunction(resizeOpts inTreeResizeOpts) inTreeResizeResponse {
1766 pvc := resizeOpts.pvc
1767 pv := resizeOpts.pv
1768 resizerName := resizeOpts.resizerName
1769 volumePlugin := resizeOpts.volumePlugin
1770 volumeSpec := resizeOpts.volumeSpec
1771
1772 pvcSpecSize := pvc.Spec.Resources.Requests[v1.ResourceStorage]
1773 pvcStatusSize := pvc.Status.Capacity[v1.ResourceStorage]
1774 pvSize := pv.Spec.Capacity[v1.ResourceStorage]
1775
1776 resizeResponse := inTreeResizeResponse{
1777 pvc: pvc,
1778 pv: pv,
1779 resizeCalled: false,
1780 }
1781
1782
1783 newSize := pvcSpecSize
1784
1785 var resizeStatus v1.ClaimResourceStatus
1786 if status, ok := pvc.Status.AllocatedResourceStatuses[v1.ResourceStorage]; ok {
1787 resizeStatus = status
1788 }
1789
1790 var allocatedSize *resource.Quantity
1791 t, ok := pvc.Status.AllocatedResources[v1.ResourceStorage]
1792 if ok {
1793 allocatedSize = &t
1794 }
1795 var err error
1796
1797 if pvSize.Cmp(pvcSpecSize) < 0 {
1798
1799
1800 switch resizeStatus {
1801 case v1.PersistentVolumeClaimControllerResizeInProgress,
1802 v1.PersistentVolumeClaimNodeResizePending,
1803 v1.PersistentVolumeClaimNodeResizeInProgress,
1804 v1.PersistentVolumeClaimNodeResizeFailed:
1805 if allocatedSize != nil {
1806 newSize = *allocatedSize
1807 }
1808 default:
1809 newSize = pvcSpecSize
1810 }
1811 } else {
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822 switch resizeStatus {
1823 case v1.PersistentVolumeClaimNodeResizeInProgress,
1824 v1.PersistentVolumeClaimNodeResizePending:
1825
1826
1827 return resizeResponse
1828 case v1.PersistentVolumeClaimNodeResizeFailed:
1829
1830 pvc, err = og.markForPendingNodeExpansion(pvc, pv)
1831 resizeResponse.pvc = pvc
1832 resizeResponse.err = err
1833 return resizeResponse
1834 case v1.PersistentVolumeClaimControllerResizeInProgress,
1835 v1.PersistentVolumeClaimControllerResizeFailed:
1836
1837
1838 if allocatedSize != nil {
1839 newSize = *allocatedSize
1840 }
1841 default:
1842
1843
1844
1845 if resizeStatus == "" && allocatedSize != nil {
1846 newSize = *allocatedSize
1847 } else {
1848 newSize = pvcSpecSize
1849 }
1850 }
1851 }
1852
1853 pvc, err = util.MarkControllerReisizeInProgress(pvc, resizerName, newSize, og.kubeClient)
1854 if err != nil {
1855 msg := fmt.Errorf("error updating pvc %s with resize in progress: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
1856 resizeResponse.err = msg
1857 resizeResponse.pvc = pvc
1858 return resizeResponse
1859 }
1860
1861 updatedSize, err := volumePlugin.ExpandVolumeDevice(volumeSpec, newSize, pvcStatusSize)
1862 resizeResponse.resizeCalled = true
1863
1864 if err != nil {
1865 msg := fmt.Errorf("error expanding pvc %s: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
1866 resizeResponse.err = msg
1867 resizeResponse.pvc = pvc
1868 return resizeResponse
1869 }
1870
1871
1872 var updateErr error
1873 pv, updateErr = util.UpdatePVSize(pv, updatedSize, og.kubeClient)
1874
1875 if updateErr != nil {
1876 msg := fmt.Errorf("error updating pv for pvc %s: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), updateErr)
1877 resizeResponse.err = msg
1878 return resizeResponse
1879 }
1880 resizeResponse.pv = pv
1881
1882 fsVolume, _ := util.CheckVolumeModeFilesystem(volumeSpec)
1883
1884 if !volumePlugin.RequiresFSResize() || !fsVolume {
1885 pvc, err = util.MarkResizeFinished(pvc, updatedSize, og.kubeClient)
1886 if err != nil {
1887 msg := fmt.Errorf("error marking pvc %s as resized: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
1888 resizeResponse.err = msg
1889 return resizeResponse
1890 }
1891 resizeResponse.pvc = pvc
1892 successMsg := fmt.Sprintf("ExpandVolume succeeded for volume %s", util.GetPersistentVolumeClaimQualifiedName(pvc))
1893 og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.VolumeResizeSuccess, successMsg)
1894 } else {
1895 pvc, err = og.markForPendingNodeExpansion(pvc, pv)
1896 resizeResponse.pvc = pvc
1897 if err != nil {
1898 msg := fmt.Errorf("error marking pvc %s for node expansion: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
1899 resizeResponse.err = msg
1900 return resizeResponse
1901 }
1902 }
1903 return resizeResponse
1904 }
1905
1906 func (og *operationGenerator) markForPendingNodeExpansion(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (*v1.PersistentVolumeClaim, error) {
1907 var err error
1908 pvc, err = util.MarkForFSResize(pvc, og.kubeClient)
1909 if err != nil {
1910 msg := fmt.Errorf("error marking pvc %s for node expansion: %v", util.GetPersistentVolumeClaimQualifiedName(pvc), err)
1911 return pvc, msg
1912 }
1913
1914
1915 oldCapacity := pvc.Status.Capacity[v1.ResourceStorage]
1916 err = util.AddAnnPreResizeCapacity(pv, oldCapacity, og.kubeClient)
1917 if err != nil {
1918 detailedErr := fmt.Errorf("error updating pv %s annotation (%s) with pre-resize capacity %s: %v", pv.ObjectMeta.Name, util.AnnPreResizeCapacity, oldCapacity.String(), err)
1919 klog.Warning(detailedErr)
1920 return pvc, detailedErr
1921 }
1922 return pvc, nil
1923 }
1924
1925 func (og *operationGenerator) GenerateExpandInUseVolumeFunc(
1926 volumeToMount VolumeToMount,
1927 actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
1928
1929 volumePlugin, err :=
1930 og.volumePluginMgr.FindPluginBySpec(volumeToMount.VolumeSpec)
1931 if err != nil || volumePlugin == nil {
1932 return volumetypes.GeneratedOperations{}, volumeToMount.GenerateErrorDetailed("NodeExpandVolume.FindPluginBySpec failed", err)
1933 }
1934
1935 fsResizeFunc := func() volumetypes.OperationContext {
1936 var resizeDone bool
1937 var eventErr, detailedErr error
1938 migrated := false
1939
1940 if currentSize.IsZero() || volumeToMount.DesiredPersistentVolumeSize.IsZero() {
1941 err := fmt.Errorf("current or new size of the volume is not set")
1942 eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.expansion failed", err)
1943 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1944 }
1945
1946 resizeOptions := volume.NodeResizeOptions{
1947 VolumeSpec: volumeToMount.VolumeSpec,
1948 DevicePath: volumeToMount.DevicePath,
1949 OldSize: currentSize,
1950 NewSize: volumeToMount.DesiredPersistentVolumeSize,
1951 }
1952 fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
1953 if err != nil {
1954 eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandvolume.CheckVolumeModeFilesystem failed", err)
1955 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1956 }
1957
1958 if fsVolume {
1959 volumeMounter, newMounterErr := volumePlugin.NewMounter(
1960 volumeToMount.VolumeSpec,
1961 volumeToMount.Pod,
1962 volume.VolumeOptions{})
1963 if newMounterErr != nil {
1964 eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NewMounter initialization failed", newMounterErr)
1965 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1966 }
1967
1968 resizeOptions.DeviceMountPath = volumeMounter.GetPath()
1969
1970 deviceMountableVolumePlugin, _ := og.volumePluginMgr.FindDeviceMountablePluginBySpec(volumeToMount.VolumeSpec)
1971 var volumeDeviceMounter volume.DeviceMounter
1972 if deviceMountableVolumePlugin != nil {
1973 volumeDeviceMounter, _ = deviceMountableVolumePlugin.NewDeviceMounter()
1974 }
1975
1976 if volumeDeviceMounter != nil {
1977 deviceStagePath, err := volumeDeviceMounter.GetDeviceMountPath(volumeToMount.VolumeSpec)
1978 if err != nil {
1979 eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.GetDeviceMountPath failed", err)
1980 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1981 }
1982 resizeOptions.DeviceStagePath = deviceStagePath
1983 }
1984 } else {
1985
1986 blockVolumePlugin, err :=
1987 og.volumePluginMgr.FindMapperPluginBySpec(volumeToMount.VolumeSpec)
1988 if err != nil {
1989 eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed", err)
1990 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1991 }
1992
1993 if blockVolumePlugin == nil {
1994 eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.", nil)
1995 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
1996 }
1997
1998 blockVolumeMapper, newMapperErr := blockVolumePlugin.NewBlockVolumeMapper(
1999 volumeToMount.VolumeSpec,
2000 volumeToMount.Pod,
2001 volume.VolumeOptions{})
2002 if newMapperErr != nil {
2003 eventErr, detailedErr = volumeToMount.GenerateError("MapVolume.NewBlockVolumeMapper initialization failed", newMapperErr)
2004 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
2005 }
2006
2007
2008 if customBlockVolumeMapper, ok := blockVolumeMapper.(volume.CustomBlockVolumeMapper); ok {
2009 resizeOptions.DeviceStagePath = customBlockVolumeMapper.GetStagingPath()
2010 }
2011 }
2012
2013
2014 resizeDone, eventErr, detailedErr = og.doOnlineExpansion(volumeToMount, actualStateOfWorld, resizeOptions)
2015 if eventErr != nil || detailedErr != nil {
2016 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
2017 }
2018 if resizeDone {
2019 return volumetypes.NewOperationContext(nil, nil, migrated)
2020 }
2021
2022 err = fmt.Errorf("volume resizing failed for unknown reason")
2023 eventErr, detailedErr = volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed to resize volume", err)
2024 return volumetypes.NewOperationContext(eventErr, detailedErr, migrated)
2025 }
2026
2027 eventRecorderFunc := func(err *error) {
2028 if *err != nil {
2029 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.VolumeResizeFailed, (*err).Error())
2030 }
2031 }
2032
2033 return volumetypes.GeneratedOperations{
2034 OperationName: "volume_fs_resize",
2035 OperationFunc: fsResizeFunc,
2036 EventRecorderFunc: eventRecorderFunc,
2037 CompleteFunc: util.OperationCompleteHook(util.GetFullQualifiedPluginNameForVolume(volumePlugin.GetPluginName(), volumeToMount.VolumeSpec), "volume_fs_resize"),
2038 }, nil
2039 }
2040
2041 func (og *operationGenerator) doOnlineExpansion(volumeToMount VolumeToMount,
2042 actualStateOfWorld ActualStateOfWorldMounterUpdater,
2043 resizeOptions volume.NodeResizeOptions) (bool, error, error) {
2044
2045 resizeDone, err := og.nodeExpandVolume(volumeToMount, actualStateOfWorld, resizeOptions)
2046 if err != nil {
2047 e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.NodeExpandVolume failed", err)
2048 klog.Errorf(e2.Error())
2049 return false, e1, e2
2050 }
2051 if resizeDone {
2052 markingDone := actualStateOfWorld.MarkVolumeAsResized(volumeToMount.VolumeName, &resizeOptions.NewSize)
2053 if !markingDone {
2054
2055 genericFailureError := fmt.Errorf("unable to mark volume as resized")
2056 e1, e2 := volumeToMount.GenerateError("NodeExpandVolume.MarkVolumeAsResized failed", genericFailureError)
2057 return false, e1, e2
2058 }
2059 return true, nil, nil
2060 }
2061 return false, nil, nil
2062 }
2063
2064 func (og *operationGenerator) expandVolumeDuringMount(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, rsOpts volume.NodeResizeOptions) (bool, error) {
2065 supportsExpansion, expandablePlugin := og.checkIfSupportsNodeExpansion(volumeToMount)
2066 if supportsExpansion {
2067 pv := volumeToMount.VolumeSpec.PersistentVolume
2068 pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
2069 if err != nil {
2070
2071 return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err)
2072 }
2073
2074 pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
2075 pvSpecCap := pv.Spec.Capacity[v1.ResourceStorage]
2076 if pvcStatusCap.Cmp(pvSpecCap) < 0 {
2077 if volumeToMount.VolumeSpec.ReadOnly {
2078 simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system")
2079 klog.Warningf(detailedMsg)
2080 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
2081 og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
2082 return true, nil
2083 }
2084
2085 rsOpts.NewSize = pvSpecCap
2086 rsOpts.OldSize = pvcStatusCap
2087 resizeOp := nodeResizeOperationOpts{
2088 vmt: volumeToMount,
2089 pvc: pvc,
2090 pv: pv,
2091 pluginResizeOpts: rsOpts,
2092 volumePlugin: expandablePlugin,
2093 actualStateOfWorld: actualStateOfWorld,
2094 }
2095 if og.checkForRecoveryFromExpansion(pvc, volumeToMount) {
2096 nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
2097 resizeFinished, err, _ := nodeExpander.expandOnPlugin()
2098 return resizeFinished, err
2099 } else {
2100 return og.legacyCallNodeExpandOnPlugin(resizeOp)
2101 }
2102 }
2103 }
2104 return true, nil
2105 }
2106
2107 func (og *operationGenerator) checkIfSupportsNodeExpansion(volumeToMount VolumeToMount) (bool, volume.NodeExpandableVolumePlugin) {
2108 if volumeToMount.VolumeSpec != nil &&
2109 volumeToMount.VolumeSpec.InlineVolumeSpecForCSIMigration {
2110 klog.V(4).Infof("This volume %s is a migrated inline volume and is not resizable", volumeToMount.VolumeName)
2111 return false, nil
2112 }
2113
2114
2115 expandableVolumePlugin, _ :=
2116 og.volumePluginMgr.FindNodeExpandablePluginBySpec(volumeToMount.VolumeSpec)
2117 if expandableVolumePlugin != nil &&
2118 expandableVolumePlugin.RequiresFSResize() &&
2119 volumeToMount.VolumeSpec.PersistentVolume != nil {
2120 return true, expandableVolumePlugin
2121 }
2122 return false, nil
2123 }
2124
2125 func (og *operationGenerator) nodeExpandVolume(
2126 volumeToMount VolumeToMount,
2127 actualStateOfWorld ActualStateOfWorldMounterUpdater,
2128 rsOpts volume.NodeResizeOptions) (bool, error) {
2129
2130 supportsExpansion, expandableVolumePlugin := og.checkIfSupportsNodeExpansion(volumeToMount)
2131
2132 if supportsExpansion {
2133
2134 if rsOpts.NewSize.Cmp(rsOpts.OldSize) > 0 {
2135 pv := volumeToMount.VolumeSpec.PersistentVolume
2136 pvc, err := og.kubeClient.CoreV1().PersistentVolumeClaims(pv.Spec.ClaimRef.Namespace).Get(context.TODO(), pv.Spec.ClaimRef.Name, metav1.GetOptions{})
2137 if err != nil {
2138
2139 return false, fmt.Errorf("mountVolume.NodeExpandVolume get PVC failed : %v", err)
2140 }
2141
2142 if volumeToMount.VolumeSpec.ReadOnly {
2143 simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume failed", "requested read-only file system")
2144 klog.Warningf(detailedMsg)
2145 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
2146 og.recorder.Eventf(pvc, v1.EventTypeWarning, kevents.FileSystemResizeFailed, simpleMsg)
2147 return true, nil
2148 }
2149 resizeOp := nodeResizeOperationOpts{
2150 vmt: volumeToMount,
2151 pvc: pvc,
2152 pv: pv,
2153 pluginResizeOpts: rsOpts,
2154 volumePlugin: expandableVolumePlugin,
2155 actualStateOfWorld: actualStateOfWorld,
2156 }
2157
2158 if og.checkForRecoveryFromExpansion(pvc, volumeToMount) {
2159 nodeExpander := newNodeExpander(resizeOp, og.kubeClient, og.recorder)
2160 resizeFinished, err, _ := nodeExpander.expandOnPlugin()
2161 return resizeFinished, err
2162 } else {
2163 return og.legacyCallNodeExpandOnPlugin(resizeOp)
2164 }
2165 }
2166 }
2167 return true, nil
2168 }
2169
2170 func (og *operationGenerator) checkForRecoveryFromExpansion(pvc *v1.PersistentVolumeClaim, volumeToMount VolumeToMount) bool {
2171 resizeStatus := pvc.Status.AllocatedResourceStatuses[v1.ResourceStorage]
2172 allocatedResource := pvc.Status.AllocatedResources
2173 featureGateStatus := utilfeature.DefaultFeatureGate.Enabled(features.RecoverVolumeExpansionFailure)
2174
2175 if !featureGateStatus {
2176 return false
2177 }
2178
2179
2180
2181
2182 if resizeStatus == "" && allocatedResource == nil {
2183 _, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume running with", "older external resize controller")
2184 klog.Warningf(detailedMsg)
2185 return false
2186 }
2187 return true
2188 }
2189
2190
2191
2192
2193 func (og *operationGenerator) legacyCallNodeExpandOnPlugin(resizeOp nodeResizeOperationOpts) (bool, error) {
2194 pvc := resizeOp.pvc
2195 volumeToMount := resizeOp.vmt
2196 rsOpts := resizeOp.pluginResizeOpts
2197 actualStateOfWorld := resizeOp.actualStateOfWorld
2198 expandableVolumePlugin := resizeOp.volumePlugin
2199
2200 pvcStatusCap := pvc.Status.Capacity[v1.ResourceStorage]
2201
2202 nodeName := volumeToMount.Pod.Spec.NodeName
2203
2204 var err error
2205
2206
2207 klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume entering", fmt.Sprintf("DevicePath %q", volumeToMount.DevicePath)), "pod", klog.KObj(volumeToMount.Pod))
2208
2209 rsOpts.VolumeSpec = volumeToMount.VolumeSpec
2210
2211 _, resizeErr := expandableVolumePlugin.NodeExpand(rsOpts)
2212 if resizeErr != nil {
2213
2214
2215
2216 if volumetypes.IsOperationNotSupportedError(resizeErr) {
2217 klog.V(4).InfoS(volumeToMount.GenerateMsgDetailed("MountVolume.NodeExpandVolume failed", "NodeExpandVolume not supported"), "pod", klog.KObj(volumeToMount.Pod))
2218 return true, nil
2219 }
2220
2221
2222
2223
2224 if volumetypes.IsFailedPreconditionError(resizeErr) {
2225 actualStateOfWorld.MarkForInUseExpansionError(volumeToMount.VolumeName)
2226 klog.Errorf(volumeToMount.GenerateErrorDetailed("MountVolume.NodeExapndVolume failed", resizeErr).Error())
2227 return true, nil
2228 }
2229 return false, resizeErr
2230 }
2231
2232 simpleMsg, detailedMsg := volumeToMount.GenerateMsg("MountVolume.NodeExpandVolume succeeded", nodeName)
2233 og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
2234 og.recorder.Eventf(pvc, v1.EventTypeNormal, kevents.FileSystemResizeSuccess, simpleMsg)
2235 klog.InfoS(detailedMsg, "pod", klog.KObj(volumeToMount.Pod))
2236
2237
2238 if pvcStatusCap.Cmp(rsOpts.NewSize) >= 0 {
2239 return true, nil
2240 }
2241
2242
2243 _, err = util.MarkFSResizeFinished(pvc, rsOpts.NewSize, og.kubeClient)
2244 if err != nil {
2245
2246 return false, fmt.Errorf("mountVolume.NodeExpandVolume update PVC status failed : %v", err)
2247 }
2248 return true, nil
2249 }
2250
2251 func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error {
2252 mountOptions := util.MountOptionFromSpec(volumeToMount.VolumeSpec)
2253
2254 if len(mountOptions) > 0 && !plugin.SupportsMountOption() {
2255 return fmt.Errorf("mount options are not supported for this volume type")
2256 }
2257 return nil
2258 }
2259
2260
2261
2262 func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount) error {
2263 pv := volumeToMount.VolumeSpec.PersistentVolume
2264 if pv != nil {
2265 nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels()
2266 if err != nil {
2267 return err
2268 }
2269 err = storagehelpers.CheckNodeAffinity(pv, nodeLabels)
2270 if err != nil {
2271 return err
2272 }
2273 }
2274 return nil
2275 }
2276
2277
2278 func isDeviceOpened(deviceToDetach AttachedVolume, hostUtil hostutil.HostUtils) (bool, error) {
2279 isDevicePath, devicePathErr := hostUtil.PathIsDevice(deviceToDetach.DevicePath)
2280 var deviceOpened bool
2281 var deviceOpenedErr error
2282 if !isDevicePath && devicePathErr == nil ||
2283 (devicePathErr != nil && strings.Contains(devicePathErr.Error(), "does not exist")) {
2284
2285
2286 klog.V(3).Infof("The path isn't device path or doesn't exist. Skip checking device path: %s", deviceToDetach.DevicePath)
2287 deviceOpened = false
2288 } else if devicePathErr != nil {
2289 return false, deviceToDetach.GenerateErrorDetailed("PathIsDevice failed", devicePathErr)
2290 } else {
2291 deviceOpened, deviceOpenedErr = hostUtil.DeviceOpened(deviceToDetach.DevicePath)
2292 if deviceOpenedErr != nil {
2293 return false, deviceToDetach.GenerateErrorDetailed("DeviceOpened failed", deviceOpenedErr)
2294 }
2295 }
2296 return deviceOpened, nil
2297 }
2298
2299
2300
2301
2302
2303 func findDetachablePluginBySpec(spec *volume.Spec, pm *volume.VolumePluginMgr) (volume.AttachableVolumePlugin, error) {
2304 volumePlugin, err := pm.FindPluginBySpec(spec)
2305 if err != nil {
2306 return nil, err
2307 }
2308 if attachableVolumePlugin, ok := volumePlugin.(volume.AttachableVolumePlugin); ok {
2309 if attachableVolumePlugin.GetPluginName() == "kubernetes.io/csi" {
2310 return attachableVolumePlugin, nil
2311 }
2312 if canAttach, err := attachableVolumePlugin.CanAttach(spec); err != nil {
2313 return nil, err
2314 } else if canAttach {
2315 return attachableVolumePlugin, nil
2316 }
2317 }
2318 return nil, nil
2319 }
2320
2321 func getMigratedStatusBySpec(spec *volume.Spec) bool {
2322 migrated := false
2323 if spec != nil {
2324 migrated = spec.Migrated
2325 }
2326 return migrated
2327 }
2328
View as plain text