1
16
17
18
19
20
21 package operationexecutor
22
23 import (
24 "errors"
25 "fmt"
26 "time"
27
28 "github.com/go-logr/logr"
29
30 "k8s.io/klog/v2"
31
32 v1 "k8s.io/api/core/v1"
33 "k8s.io/apimachinery/pkg/api/resource"
34 "k8s.io/apimachinery/pkg/types"
35 "k8s.io/kubernetes/pkg/volume"
36 "k8s.io/kubernetes/pkg/volume/util"
37 "k8s.io/kubernetes/pkg/volume/util/hostutil"
38 "k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations"
39 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
40 )
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 type OperationExecutor interface {
64
65
66 AttachVolume(logger klog.Logger, volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
67
68
69
70
71
72
73 VerifyVolumesAreAttachedPerNode(AttachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
74
75
76
77 VerifyVolumesAreAttached(volumesToVerify map[types.NodeName][]AttachedVolume, actualStateOfWorld ActualStateOfWorldAttacherUpdater)
78
79
80
81
82
83
84 DetachVolume(logger klog.Logger, volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108 MountVolume(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, isRemount bool) error
109
110
111
112
113
114
115
116
117 UnmountVolume(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) error
118
119
120
121
122
123
124
125
126
127
128 UnmountDevice(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) error
129
130
131
132
133
134
135
136
137
138
139
140 VerifyControllerAttachedVolume(logger klog.Logger, volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) error
141
142
143
144 IsOperationPending(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName) bool
145
146
147 IsOperationSafeToRetry(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName, nodeName types.NodeName, operationName string) bool
148
149 ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error
150
151 ReconstructVolumeOperation(volumeMode v1.PersistentVolumeMode, plugin volume.VolumePlugin, mapperPlugin volume.BlockVolumePlugin, uid types.UID, podName volumetypes.UniquePodName, volumeSpecName string, volumePath string, pluginName string) (volume.ReconstructedVolume, error)
152 }
153
154
155 func NewOperationExecutor(
156 operationGenerator OperationGenerator) OperationExecutor {
157
158 return &operationExecutor{
159 pendingOperations: nestedpendingoperations.NewNestedPendingOperations(
160 true ),
161 operationGenerator: operationGenerator,
162 }
163 }
164
165
166 type MarkVolumeOpts struct {
167 PodName volumetypes.UniquePodName
168 PodUID types.UID
169 VolumeName v1.UniqueVolumeName
170 Mounter volume.Mounter
171 BlockVolumeMapper volume.BlockVolumeMapper
172 OuterVolumeSpecName string
173 VolumeGidVolume string
174 VolumeSpec *volume.Spec
175 VolumeMountState VolumeMountState
176 SELinuxMountContext string
177 }
178
179
180
181 type ActualStateOfWorldMounterUpdater interface {
182
183 MarkVolumeAsMounted(markVolumeOpts MarkVolumeOpts) error
184
185
186 MarkVolumeAsUnmounted(podName volumetypes.UniquePodName, volumeName v1.UniqueVolumeName) error
187
188
189 MarkVolumeMountAsUncertain(markVolumeOpts MarkVolumeOpts) error
190
191
192 MarkDeviceAsMounted(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath, seLinuxMountContext string) error
193
194
195 MarkDeviceAsUncertain(volumeName v1.UniqueVolumeName, devicePath, deviceMountPath, seLinuxMountContext string) error
196
197
198 MarkDeviceAsUnmounted(volumeName v1.UniqueVolumeName) error
199
200
201 MarkVolumeAsResized(volumeName v1.UniqueVolumeName, claimSize *resource.Quantity) bool
202
203
204 GetDeviceMountState(volumeName v1.UniqueVolumeName) DeviceMountState
205
206
207 GetVolumeMountState(volumName v1.UniqueVolumeName, podName volumetypes.UniquePodName) VolumeMountState
208
209
210 IsVolumeMountedElsewhere(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
211
212
213
214 MarkForInUseExpansionError(volumeName v1.UniqueVolumeName)
215
216
217
218
219
220
221 CheckAndMarkVolumeAsUncertainViaReconstruction(opts MarkVolumeOpts) (bool, error)
222
223
224
225
226
227 CheckAndMarkDeviceUncertainViaReconstruction(volumeName v1.UniqueVolumeName, deviceMountPath string) bool
228
229
230
231 IsVolumeReconstructed(volumeName v1.UniqueVolumeName, podName volumetypes.UniquePodName) bool
232
233
234
235 IsVolumeDeviceReconstructed(volumeName v1.UniqueVolumeName) bool
236 }
237
238
239
240 type ActualStateOfWorldAttacherUpdater interface {
241
242
243
244
245
246
247
248 MarkVolumeAsAttached(logger klog.Logger, volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName, devicePath string) error
249
250
251
252
253
254 MarkVolumeAsUncertain(logger klog.Logger, volumeName v1.UniqueVolumeName, volumeSpec *volume.Spec, nodeName types.NodeName) error
255
256
257 MarkVolumeAsDetached(volumeName v1.UniqueVolumeName, nodeName types.NodeName)
258
259
260
261 RemoveVolumeFromReportAsAttached(volumeName v1.UniqueVolumeName, nodeName types.NodeName) error
262
263
264
265 AddVolumeToReportAsAttached(logger klog.Logger, volumeName v1.UniqueVolumeName, nodeName types.NodeName)
266
267
268 InitializeClaimSize(logger klog.Logger, volumeName v1.UniqueVolumeName, claimSize *resource.Quantity)
269
270 GetClaimSize(volumeName v1.UniqueVolumeName) *resource.Quantity
271 }
272
273
274 type VolumeLogger interface {
275
276
277
278 GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string)
279
280
281
282 GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error)
283
284
285
286
287 GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string)
288
289
290
291 GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error)
292 }
293
294
295 func errSuffix(err error) string {
296 errStr := ""
297 if err != nil {
298 errStr = fmt.Sprintf(": %v", err)
299 }
300 return errStr
301 }
302
303
304 func generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details string) (detailedMsg string) {
305 return fmt.Sprintf("%v for volume %q %v %v", prefixMsg, volumeName, details, suffixMsg)
306 }
307
308
309 func generateVolumeMsg(prefixMsg, suffixMsg, volumeName, details string) (simpleMsg, detailedMsg string) {
310 simpleMsg = fmt.Sprintf("%v for volume %q %v", prefixMsg, volumeName, suffixMsg)
311 return simpleMsg, generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeName, details)
312 }
313
314
315 type VolumeToAttach struct {
316
317
318 MultiAttachErrorReported bool
319
320
321
322 VolumeName v1.UniqueVolumeName
323
324
325
326 VolumeSpec *volume.Spec
327
328
329
330 NodeName types.NodeName
331
332
333
334
335
336 ScheduledPods []*v1.Pod
337 }
338
339
340 func (volume *VolumeToAttach) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
341 detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
342 volumeSpecName := "nil"
343 if volume.VolumeSpec != nil {
344 volumeSpecName = volume.VolumeSpec.Name()
345 }
346 return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
347 }
348
349
350 func (volume *VolumeToAttach) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
351 detailedStr := fmt.Sprintf("(UniqueName: %q) from node %q", volume.VolumeName, volume.NodeName)
352 volumeSpecName := "nil"
353 if volume.VolumeSpec != nil {
354 volumeSpecName = volume.VolumeSpec.Name()
355 }
356 return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
357 }
358
359
360 func (volume *VolumeToAttach) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
361 return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
362 }
363
364
365 func (volume *VolumeToAttach) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
366 simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
367 return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
368 }
369
370
371 func (volume *VolumeToAttach) String() string {
372 volumeSpecName := "nil"
373 if volume.VolumeSpec != nil {
374 volumeSpecName = volume.VolumeSpec.Name()
375 }
376 return fmt.Sprintf("%s (UniqueName: %s) from node %s", volumeSpecName, volume.VolumeName, volume.NodeName)
377 }
378
379
380 func (volume *VolumeToAttach) MarshalLog() interface{} {
381 volumeSpecName := "nil"
382 if volume.VolumeSpec != nil {
383 volumeSpecName = volume.VolumeSpec.Name()
384 }
385 return struct {
386 VolumeName, UniqueName, NodeName string
387 }{
388 VolumeName: volumeSpecName,
389 UniqueName: string(volume.VolumeName),
390 NodeName: string(volume.NodeName),
391 }
392 }
393
394 var _ fmt.Stringer = &VolumeToAttach{}
395 var _ logr.Marshaler = &VolumeToAttach{}
396
397
398
399 type VolumeToMount struct {
400
401
402 VolumeName v1.UniqueVolumeName
403
404
405
406 PodName volumetypes.UniquePodName
407
408
409
410
411 VolumeSpec *volume.Spec
412
413
414
415
416 OuterVolumeSpecName string
417
418
419 Pod *v1.Pod
420
421
422
423 PluginIsAttachable bool
424
425
426
427 PluginIsDeviceMountable bool
428
429
430 VolumeGidValue string
431
432
433
434 DevicePath string
435
436
437
438 ReportedInUse bool
439
440
441
442 DesiredSizeLimit *resource.Quantity
443
444
445 MountRequestTime time.Time
446
447
448
449 DesiredPersistentVolumeSize resource.Quantity
450
451
452
453
454
455 SELinuxLabel string
456 }
457
458
459 type DeviceMountState string
460
461 const (
462
463 DeviceGloballyMounted DeviceMountState = "DeviceGloballyMounted"
464
465
466
467 DeviceMountUncertain DeviceMountState = "DeviceMountUncertain"
468
469
470 DeviceNotMounted DeviceMountState = "DeviceNotMounted"
471 )
472
473
474 type VolumeMountState string
475
476 const (
477
478 VolumeMounted VolumeMountState = "VolumeMounted"
479
480
481 VolumeMountUncertain VolumeMountState = "VolumeMountUncertain"
482
483
484 VolumeNotMounted VolumeMountState = "VolumeNotMounted"
485 )
486
487 type MountPreConditionFailed struct {
488 msg string
489 }
490
491 func (err *MountPreConditionFailed) Error() string {
492 return err.msg
493 }
494
495 func NewMountPreConditionFailedError(msg string) *MountPreConditionFailed {
496 return &MountPreConditionFailed{msg: msg}
497 }
498
499 func IsMountFailedPreconditionError(err error) bool {
500 var failedPreconditionError *MountPreConditionFailed
501 return errors.As(err, &failedPreconditionError)
502 }
503
504
505 func (volume *VolumeToMount) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
506 detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
507 volumeSpecName := "nil"
508 if volume.VolumeSpec != nil {
509 volumeSpecName = volume.VolumeSpec.Name()
510 }
511 return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
512 }
513
514
515 func (volume *VolumeToMount) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
516 detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.Pod.Name, volume.Pod.UID)
517 volumeSpecName := "nil"
518 if volume.VolumeSpec != nil {
519 volumeSpecName = volume.VolumeSpec.Name()
520 }
521 return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
522 }
523
524
525 func (volume *VolumeToMount) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
526 return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
527 }
528
529
530 func (volume *VolumeToMount) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
531 simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
532 return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
533 }
534
535
536 type AttachedVolume struct {
537
538 VolumeName v1.UniqueVolumeName
539
540
541
542 VolumeSpec *volume.Spec
543
544
545 NodeName types.NodeName
546
547
548
549 PluginIsAttachable bool
550
551
552
553 DevicePath string
554
555
556
557 DeviceMountPath string
558
559
560
561 PluginName string
562
563 SELinuxMountContext string
564 }
565
566
567 func (volume *AttachedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
568 detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
569 volumeSpecName := "nil"
570 if volume.VolumeSpec != nil {
571 volumeSpecName = volume.VolumeSpec.Name()
572 }
573 return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
574 }
575
576
577 func (volume *AttachedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
578 detailedStr := fmt.Sprintf("(UniqueName: %q) on node %q", volume.VolumeName, volume.NodeName)
579 volumeSpecName := "nil"
580 if volume.VolumeSpec != nil {
581 volumeSpecName = volume.VolumeSpec.Name()
582 }
583 return generateVolumeMsg(prefixMsg, suffixMsg, volumeSpecName, detailedStr)
584 }
585
586
587 func (volume *AttachedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
588 return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
589 }
590
591
592 func (volume *AttachedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
593 simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
594 return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
595 }
596
597
598 func (volume *AttachedVolume) String() string {
599 volumeSpecName := "nil"
600 if volume.VolumeSpec != nil {
601 volumeSpecName = volume.VolumeSpec.Name()
602 }
603 return fmt.Sprintf("%s (UniqueName: %s) from node %s", volumeSpecName, volume.VolumeName, volume.NodeName)
604 }
605
606
607 func (volume *AttachedVolume) MarshalLog() interface{} {
608 volumeSpecName := "nil"
609 if volume.VolumeSpec != nil {
610 volumeSpecName = volume.VolumeSpec.Name()
611 }
612 return struct {
613 VolumeName, UniqueName, NodeName string
614 }{
615 VolumeName: volumeSpecName,
616 UniqueName: string(volume.VolumeName),
617 NodeName: string(volume.NodeName),
618 }
619 }
620
621 var _ fmt.Stringer = &AttachedVolume{}
622 var _ logr.Marshaler = &AttachedVolume{}
623
624
625 type MountedVolume struct {
626
627 PodName volumetypes.UniquePodName
628
629
630 VolumeName v1.UniqueVolumeName
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668 InnerVolumeSpecName string
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706 OuterVolumeSpecName string
707
708
709
710
711
712
713 PluginName string
714
715
716
717
718 PodUID types.UID
719
720
721
722
723 Mounter volume.Mounter
724
725
726
727
728 BlockVolumeMapper volume.BlockVolumeMapper
729
730
731 VolumeGidValue string
732
733
734
735 VolumeSpec *volume.Spec
736
737
738
739 DeviceMountPath string
740
741
742
743 SELinuxMountContext string
744 }
745
746
747 func (volume *MountedVolume) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
748 detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
749 return generateVolumeMsgDetailed(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
750 }
751
752
753 func (volume *MountedVolume) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
754 detailedStr := fmt.Sprintf("(UniqueName: %q) pod %q (UID: %q)", volume.VolumeName, volume.PodName, volume.PodUID)
755 return generateVolumeMsg(prefixMsg, suffixMsg, volume.OuterVolumeSpecName, detailedStr)
756 }
757
758
759 func (volume *MountedVolume) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
760 return fmt.Errorf(volume.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
761 }
762
763
764 func (volume *MountedVolume) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
765 simpleMsg, detailedMsg := volume.GenerateMsg(prefixMsg, errSuffix(err))
766 return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
767 }
768
769 type operationExecutor struct {
770
771
772 pendingOperations nestedpendingoperations.NestedPendingOperations
773
774
775
776 operationGenerator OperationGenerator
777 }
778
779 func (oe *operationExecutor) IsOperationPending(
780 volumeName v1.UniqueVolumeName,
781 podName volumetypes.UniquePodName,
782 nodeName types.NodeName) bool {
783 return oe.pendingOperations.IsOperationPending(volumeName, podName, nodeName)
784 }
785
786 func (oe *operationExecutor) IsOperationSafeToRetry(
787 volumeName v1.UniqueVolumeName,
788 podName volumetypes.UniquePodName,
789 nodeName types.NodeName,
790 operationName string) bool {
791 return oe.pendingOperations.IsOperationSafeToRetry(volumeName, podName, nodeName, operationName)
792 }
793
794 func (oe *operationExecutor) AttachVolume(
795 logger klog.Logger,
796 volumeToAttach VolumeToAttach,
797 actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
798 generatedOperations :=
799 oe.operationGenerator.GenerateAttachVolumeFunc(logger, volumeToAttach, actualStateOfWorld)
800
801 if util.IsMultiAttachAllowed(volumeToAttach.VolumeSpec) {
802 return oe.pendingOperations.Run(
803 volumeToAttach.VolumeName, "" , volumeToAttach.NodeName, generatedOperations)
804 }
805
806 return oe.pendingOperations.Run(
807 volumeToAttach.VolumeName, "" , "" , generatedOperations)
808 }
809
810 func (oe *operationExecutor) DetachVolume(
811 logger klog.Logger,
812 volumeToDetach AttachedVolume,
813 verifySafeToDetach bool,
814 actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
815 generatedOperations, err :=
816 oe.operationGenerator.GenerateDetachVolumeFunc(logger, volumeToDetach, verifySafeToDetach, actualStateOfWorld)
817 if err != nil {
818 return err
819 }
820
821 if util.IsMultiAttachAllowed(volumeToDetach.VolumeSpec) {
822 return oe.pendingOperations.Run(
823 volumeToDetach.VolumeName, "" , volumeToDetach.NodeName, generatedOperations)
824 }
825 return oe.pendingOperations.Run(
826 volumeToDetach.VolumeName, "" , "" , generatedOperations)
827
828 }
829
830 func (oe *operationExecutor) VerifyVolumesAreAttached(
831 attachedVolumes map[types.NodeName][]AttachedVolume,
832 actualStateOfWorld ActualStateOfWorldAttacherUpdater) {
833
834
835 bulkVerifyPluginsByNode := make(map[string]map[types.NodeName][]*volume.Spec)
836 volumeSpecMapByPlugin := make(map[string]map[*volume.Spec]v1.UniqueVolumeName)
837
838 for node, nodeAttachedVolumes := range attachedVolumes {
839 needIndividualVerifyVolumes := []AttachedVolume{}
840 for _, volumeAttached := range nodeAttachedVolumes {
841 if volumeAttached.VolumeSpec == nil {
842 klog.Errorf("VerifyVolumesAreAttached: nil spec for volume %s", volumeAttached.VolumeName)
843 continue
844 }
845
846 volumePlugin, err :=
847 oe.operationGenerator.GetVolumePluginMgr().FindPluginBySpec(volumeAttached.VolumeSpec)
848 if err != nil {
849 klog.Errorf(
850 "VolumesAreAttached.FindPluginBySpec failed for volume %q (spec.Name: %q) on node %q with error: %v",
851 volumeAttached.VolumeName,
852 volumeAttached.VolumeSpec.Name(),
853 volumeAttached.NodeName,
854 err)
855 continue
856 }
857 if volumePlugin == nil {
858
859 klog.Errorf(
860 "Failed to find volume plugin for volume %q (spec.Name: %q) on node %q",
861 volumeAttached.VolumeName,
862 volumeAttached.VolumeSpec.Name(),
863 volumeAttached.NodeName)
864 continue
865 }
866
867 pluginName := volumePlugin.GetPluginName()
868
869 if volumePlugin.SupportsBulkVolumeVerification() {
870 pluginNodes, pluginNodesExist := bulkVerifyPluginsByNode[pluginName]
871
872 if !pluginNodesExist {
873 pluginNodes = make(map[types.NodeName][]*volume.Spec)
874 }
875
876 volumeSpecList, nodeExists := pluginNodes[node]
877 if !nodeExists {
878 volumeSpecList = []*volume.Spec{}
879 }
880 volumeSpecList = append(volumeSpecList, volumeAttached.VolumeSpec)
881 pluginNodes[node] = volumeSpecList
882
883 bulkVerifyPluginsByNode[pluginName] = pluginNodes
884 volumeSpecMap, mapExists := volumeSpecMapByPlugin[pluginName]
885
886 if !mapExists {
887 volumeSpecMap = make(map[*volume.Spec]v1.UniqueVolumeName)
888 }
889 volumeSpecMap[volumeAttached.VolumeSpec] = volumeAttached.VolumeName
890 volumeSpecMapByPlugin[pluginName] = volumeSpecMap
891 continue
892 }
893
894 needIndividualVerifyVolumes = append(needIndividualVerifyVolumes, volumeAttached)
895 }
896 nodeError := oe.VerifyVolumesAreAttachedPerNode(needIndividualVerifyVolumes, node, actualStateOfWorld)
897 if nodeError != nil {
898 klog.Errorf("VerifyVolumesAreAttached failed for volumes %v, node %q with error %v", needIndividualVerifyVolumes, node, nodeError)
899 }
900 }
901
902 for pluginName, pluginNodeVolumes := range bulkVerifyPluginsByNode {
903 generatedOperations, err := oe.operationGenerator.GenerateBulkVolumeVerifyFunc(
904 pluginNodeVolumes,
905 pluginName,
906 volumeSpecMapByPlugin[pluginName],
907 actualStateOfWorld)
908 if err != nil {
909 klog.Errorf("BulkVerifyVolumes.GenerateBulkVolumeVerifyFunc error bulk verifying volumes for plugin %q with %v", pluginName, err)
910 }
911
912
913 uniquePluginName := v1.UniqueVolumeName(pluginName)
914 err = oe.pendingOperations.Run(uniquePluginName, "" , "" , generatedOperations)
915 if err != nil {
916 klog.Errorf("BulkVerifyVolumes.Run Error bulk volume verification for plugin %q with %v", pluginName, err)
917 }
918 }
919 }
920
921 func (oe *operationExecutor) VerifyVolumesAreAttachedPerNode(
922 attachedVolumes []AttachedVolume,
923 nodeName types.NodeName,
924 actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
925 generatedOperations, err :=
926 oe.operationGenerator.GenerateVolumesAreAttachedFunc(attachedVolumes, nodeName, actualStateOfWorld)
927 if err != nil {
928 return err
929 }
930
931
932 return oe.pendingOperations.Run("" , "" , "" , generatedOperations)
933 }
934
935 func (oe *operationExecutor) MountVolume(
936 waitForAttachTimeout time.Duration,
937 volumeToMount VolumeToMount,
938 actualStateOfWorld ActualStateOfWorldMounterUpdater,
939 isRemount bool) error {
940 fsVolume, err := util.CheckVolumeModeFilesystem(volumeToMount.VolumeSpec)
941 if err != nil {
942 return err
943 }
944 var generatedOperations volumetypes.GeneratedOperations
945 if fsVolume {
946
947
948 generatedOperations = oe.operationGenerator.GenerateMountVolumeFunc(
949 waitForAttachTimeout, volumeToMount, actualStateOfWorld, isRemount)
950
951 } else {
952
953
954 generatedOperations, err = oe.operationGenerator.GenerateMapVolumeFunc(
955 waitForAttachTimeout, volumeToMount, actualStateOfWorld)
956 }
957 if err != nil {
958 return err
959 }
960
961
962 podName := nestedpendingoperations.EmptyUniquePodName
963
964
965 if !volumeToMount.PluginIsAttachable && !volumeToMount.PluginIsDeviceMountable {
966
967
968 podName = util.GetUniquePodName(volumeToMount.Pod)
969 }
970
971
972 return oe.pendingOperations.Run(
973 volumeToMount.VolumeName, podName, "" , generatedOperations)
974 }
975
976 func (oe *operationExecutor) UnmountVolume(
977 volumeToUnmount MountedVolume,
978 actualStateOfWorld ActualStateOfWorldMounterUpdater,
979 podsDir string) error {
980 fsVolume, err := util.CheckVolumeModeFilesystem(volumeToUnmount.VolumeSpec)
981 if err != nil {
982 return err
983 }
984 var generatedOperations volumetypes.GeneratedOperations
985 if fsVolume {
986
987
988 generatedOperations, err = oe.operationGenerator.GenerateUnmountVolumeFunc(
989 volumeToUnmount, actualStateOfWorld, podsDir)
990 } else {
991
992
993 generatedOperations, err = oe.operationGenerator.GenerateUnmapVolumeFunc(
994 volumeToUnmount, actualStateOfWorld)
995 }
996 if err != nil {
997 return err
998 }
999
1000
1001 podName := volumetypes.UniquePodName(volumeToUnmount.PodUID)
1002
1003 return oe.pendingOperations.Run(
1004 volumeToUnmount.VolumeName, podName, "" , generatedOperations)
1005 }
1006
1007 func (oe *operationExecutor) UnmountDevice(
1008 deviceToDetach AttachedVolume,
1009 actualStateOfWorld ActualStateOfWorldMounterUpdater,
1010 hostutil hostutil.HostUtils) error {
1011 fsVolume, err := util.CheckVolumeModeFilesystem(deviceToDetach.VolumeSpec)
1012 if err != nil {
1013 return err
1014 }
1015 var generatedOperations volumetypes.GeneratedOperations
1016 if fsVolume {
1017
1018
1019 generatedOperations, err = oe.operationGenerator.GenerateUnmountDeviceFunc(
1020 deviceToDetach, actualStateOfWorld, hostutil)
1021 } else {
1022
1023
1024 generatedOperations, err = oe.operationGenerator.GenerateUnmapDeviceFunc(
1025 deviceToDetach, actualStateOfWorld, hostutil)
1026 }
1027 if err != nil {
1028 return err
1029 }
1030
1031
1032 podName := nestedpendingoperations.EmptyUniquePodName
1033
1034 return oe.pendingOperations.Run(
1035 deviceToDetach.VolumeName, podName, "" , generatedOperations)
1036 }
1037
1038 func (oe *operationExecutor) ExpandInUseVolume(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) error {
1039 generatedOperations, err := oe.operationGenerator.GenerateExpandInUseVolumeFunc(volumeToMount, actualStateOfWorld, currentSize)
1040 if err != nil {
1041 return err
1042 }
1043 return oe.pendingOperations.Run(volumeToMount.VolumeName, "", "" , generatedOperations)
1044 }
1045
1046 func (oe *operationExecutor) VerifyControllerAttachedVolume(
1047 logger klog.Logger,
1048 volumeToMount VolumeToMount,
1049 nodeName types.NodeName,
1050 actualStateOfWorld ActualStateOfWorldAttacherUpdater) error {
1051 generatedOperations, err :=
1052 oe.operationGenerator.GenerateVerifyControllerAttachedVolumeFunc(logger, volumeToMount, nodeName, actualStateOfWorld)
1053 if err != nil {
1054 return err
1055 }
1056
1057 return oe.pendingOperations.Run(
1058 volumeToMount.VolumeName, "" , "" , generatedOperations)
1059 }
1060
1061
1062 func (oe *operationExecutor) ReconstructVolumeOperation(
1063 volumeMode v1.PersistentVolumeMode,
1064 plugin volume.VolumePlugin,
1065 mapperPlugin volume.BlockVolumePlugin,
1066 uid types.UID,
1067 podName volumetypes.UniquePodName,
1068 volumeSpecName string,
1069 volumePath string,
1070 pluginName string) (volume.ReconstructedVolume, error) {
1071
1072
1073 if volumeMode == v1.PersistentVolumeFilesystem {
1074
1075 klog.V(5).Infof("Starting operationExecutor.ReconstructVolume for file volume on pod %q", podName)
1076 reconstructed, err := plugin.ConstructVolumeSpec(volumeSpecName, volumePath)
1077 if err != nil {
1078 return volume.ReconstructedVolume{}, err
1079 }
1080 return reconstructed, nil
1081 }
1082
1083
1084
1085 klog.V(5).Infof("Starting operationExecutor.ReconstructVolume for block volume on pod %q", podName)
1086
1087
1088
1089
1090 volumeSpec, err := mapperPlugin.ConstructBlockVolumeSpec(uid, volumeSpecName, volumePath)
1091 if err != nil {
1092 return volume.ReconstructedVolume{}, err
1093 }
1094 return volume.ReconstructedVolume{
1095 Spec: volumeSpec,
1096 }, nil
1097 }
1098
View as plain text