1
16
17 package volumemanager
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "sort"
24 "strconv"
25 "strings"
26 "time"
27
28 utilfeature "k8s.io/apiserver/pkg/util/feature"
29 "k8s.io/klog/v2"
30 "k8s.io/mount-utils"
31
32 v1 "k8s.io/api/core/v1"
33 k8stypes "k8s.io/apimachinery/pkg/types"
34 "k8s.io/apimachinery/pkg/util/runtime"
35 "k8s.io/apimachinery/pkg/util/sets"
36 "k8s.io/apimachinery/pkg/util/wait"
37 clientset "k8s.io/client-go/kubernetes"
38 "k8s.io/client-go/tools/record"
39 csitrans "k8s.io/csi-translation-lib"
40 "k8s.io/kubernetes/pkg/kubelet/config"
41 "k8s.io/kubernetes/pkg/kubelet/container"
42 "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
43 "k8s.io/kubernetes/pkg/kubelet/volumemanager/metrics"
44 "k8s.io/kubernetes/pkg/kubelet/volumemanager/populator"
45 "k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler"
46 "k8s.io/kubernetes/pkg/volume"
47 "k8s.io/kubernetes/pkg/volume/csimigration"
48 "k8s.io/kubernetes/pkg/volume/util"
49 "k8s.io/kubernetes/pkg/volume/util/hostutil"
50 "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
51 "k8s.io/kubernetes/pkg/volume/util/types"
52 "k8s.io/kubernetes/pkg/volume/util/volumepathhandler"
53 )
54
55 const (
56
57
58 reconcilerLoopSleepPeriod = 100 * time.Millisecond
59
60
61
62 desiredStateOfWorldPopulatorLoopSleepPeriod = 100 * time.Millisecond
63
64
65
66
67
68
69
70
71
72
73 podAttachAndMountTimeout = 2*time.Minute + 3*time.Second
74
75
76
77 podAttachAndMountRetryInterval = 300 * time.Millisecond
78
79
80
81
82
83
84
85 waitForAttachTimeout = 10 * time.Minute
86 )
87
88
89
90
91 type VolumeManager interface {
92
93 Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
94
95
96
97
98
99
100 WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error
101
102
103
104
105
106
107 WaitForUnmount(ctx context.Context, pod *v1.Pod) error
108
109
110
111
112
113
114 GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
115
116
117
118
119
120
121
122 GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap
123
124
125
126
127 GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64
128
129
130
131
132
133
134
135
136 GetVolumesInUse() []v1.UniqueVolumeName
137
138
139
140
141 ReconcilerStatesHasBeenSynced() bool
142
143
144
145 VolumeIsAttached(volumeName v1.UniqueVolumeName) bool
146
147
148
149 MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)
150 }
151
152
153 type PodStateProvider interface {
154 ShouldPodContainersBeTerminating(k8stypes.UID) bool
155 ShouldPodRuntimeBeRemoved(k8stypes.UID) bool
156 }
157
158
159
160 type PodManager interface {
161 GetPodByUID(k8stypes.UID) (*v1.Pod, bool)
162 GetPods() []*v1.Pod
163 }
164
165
166
167
168
169
170
171
172
173 func NewVolumeManager(
174 controllerAttachDetachEnabled bool,
175 nodeName k8stypes.NodeName,
176 podManager PodManager,
177 podStateProvider PodStateProvider,
178 kubeClient clientset.Interface,
179 volumePluginMgr *volume.VolumePluginMgr,
180 kubeContainerRuntime container.Runtime,
181 mounter mount.Interface,
182 hostutil hostutil.HostUtils,
183 kubeletPodsDir string,
184 recorder record.EventRecorder,
185 keepTerminatedPodVolumes bool,
186 blockVolumePathHandler volumepathhandler.BlockVolumePathHandler) VolumeManager {
187
188 seLinuxTranslator := util.NewSELinuxLabelTranslator()
189 vm := &volumeManager{
190 kubeClient: kubeClient,
191 volumePluginMgr: volumePluginMgr,
192 desiredStateOfWorld: cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator),
193 actualStateOfWorld: cache.NewActualStateOfWorld(nodeName, volumePluginMgr),
194 operationExecutor: operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
195 kubeClient,
196 volumePluginMgr,
197 recorder,
198 blockVolumePathHandler)),
199 }
200
201 intreeToCSITranslator := csitrans.New()
202 csiMigratedPluginManager := csimigration.NewPluginManager(intreeToCSITranslator, utilfeature.DefaultFeatureGate)
203
204 vm.intreeToCSITranslator = intreeToCSITranslator
205 vm.csiMigratedPluginManager = csiMigratedPluginManager
206 vm.desiredStateOfWorldPopulator = populator.NewDesiredStateOfWorldPopulator(
207 kubeClient,
208 desiredStateOfWorldPopulatorLoopSleepPeriod,
209 podManager,
210 podStateProvider,
211 vm.desiredStateOfWorld,
212 vm.actualStateOfWorld,
213 kubeContainerRuntime,
214 keepTerminatedPodVolumes,
215 csiMigratedPluginManager,
216 intreeToCSITranslator,
217 volumePluginMgr)
218 vm.reconciler = reconciler.NewReconciler(
219 kubeClient,
220 controllerAttachDetachEnabled,
221 reconcilerLoopSleepPeriod,
222 waitForAttachTimeout,
223 nodeName,
224 vm.desiredStateOfWorld,
225 vm.actualStateOfWorld,
226 vm.desiredStateOfWorldPopulator.HasAddedPods,
227 vm.operationExecutor,
228 mounter,
229 hostutil,
230 volumePluginMgr,
231 kubeletPodsDir)
232
233 return vm
234 }
235
236
237 type volumeManager struct {
238
239
240 kubeClient clientset.Interface
241
242
243
244 volumePluginMgr *volume.VolumePluginMgr
245
246
247
248
249
250
251 desiredStateOfWorld cache.DesiredStateOfWorld
252
253
254
255
256
257
258 actualStateOfWorld cache.ActualStateOfWorld
259
260
261
262 operationExecutor operationexecutor.OperationExecutor
263
264
265
266
267 reconciler reconciler.Reconciler
268
269
270
271 desiredStateOfWorldPopulator populator.DesiredStateOfWorldPopulator
272
273
274 csiMigratedPluginManager csimigration.PluginManager
275
276
277 intreeToCSITranslator csimigration.InTreeToCSITranslator
278 }
279
280 func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
281 defer runtime.HandleCrash()
282
283 if vm.kubeClient != nil {
284
285 go vm.volumePluginMgr.Run(stopCh)
286 }
287
288 go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)
289 klog.V(2).InfoS("The desired_state_of_world populator starts")
290
291 klog.InfoS("Starting Kubelet Volume Manager")
292 go vm.reconciler.Run(stopCh)
293
294 metrics.Register(vm.actualStateOfWorld, vm.desiredStateOfWorld, vm.volumePluginMgr)
295
296 <-stopCh
297 klog.InfoS("Shutting down Kubelet Volume Manager")
298 }
299
300 func (vm *volumeManager) GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
301 podVolumes := make(container.VolumeMap)
302 for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
303 podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{
304 Mounter: mountedVolume.Mounter,
305 BlockVolumeMapper: mountedVolume.BlockVolumeMapper,
306 ReadOnly: mountedVolume.VolumeSpec.ReadOnly,
307 InnerVolumeSpecName: mountedVolume.InnerVolumeSpecName,
308 }
309 }
310 return podVolumes
311 }
312
313 func (vm *volumeManager) GetPossiblyMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap {
314 podVolumes := make(container.VolumeMap)
315 for _, mountedVolume := range vm.actualStateOfWorld.GetPossiblyMountedVolumesForPod(podName) {
316 podVolumes[mountedVolume.OuterVolumeSpecName] = container.VolumeInfo{
317 Mounter: mountedVolume.Mounter,
318 BlockVolumeMapper: mountedVolume.BlockVolumeMapper,
319 ReadOnly: mountedVolume.VolumeSpec.ReadOnly,
320 InnerVolumeSpecName: mountedVolume.InnerVolumeSpecName,
321 }
322 }
323 return podVolumes
324 }
325
326 func (vm *volumeManager) GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64 {
327 podName := util.GetUniquePodName(pod)
328 supplementalGroups := sets.NewString()
329
330 for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
331 if mountedVolume.VolumeGidValue != "" {
332 supplementalGroups.Insert(mountedVolume.VolumeGidValue)
333 }
334 }
335
336 result := make([]int64, 0, supplementalGroups.Len())
337 for _, group := range supplementalGroups.List() {
338 iGroup, extra := getExtraSupplementalGid(group, pod)
339 if !extra {
340 continue
341 }
342
343 result = append(result, int64(iGroup))
344 }
345
346 return result
347 }
348
349 func (vm *volumeManager) GetVolumesInUse() []v1.UniqueVolumeName {
350
351
352
353 desiredVolumes := vm.desiredStateOfWorld.GetVolumesToMount()
354 allAttachedVolumes := vm.actualStateOfWorld.GetAttachedVolumes()
355 volumesToReportInUse := make([]v1.UniqueVolumeName, 0, len(desiredVolumes)+len(allAttachedVolumes))
356 desiredVolumesMap := make(map[v1.UniqueVolumeName]bool, len(desiredVolumes)+len(allAttachedVolumes))
357
358 for _, volume := range desiredVolumes {
359 if volume.PluginIsAttachable {
360 if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
361 desiredVolumesMap[volume.VolumeName] = true
362 volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
363 }
364 }
365 }
366
367 for _, volume := range allAttachedVolumes {
368 if volume.PluginIsAttachable {
369 if _, exists := desiredVolumesMap[volume.VolumeName]; !exists {
370 volumesToReportInUse = append(volumesToReportInUse, volume.VolumeName)
371 }
372 }
373 }
374
375 sort.Slice(volumesToReportInUse, func(i, j int) bool {
376 return string(volumesToReportInUse[i]) < string(volumesToReportInUse[j])
377 })
378 return volumesToReportInUse
379 }
380
381 func (vm *volumeManager) ReconcilerStatesHasBeenSynced() bool {
382 return vm.reconciler.StatesHasBeenSynced()
383 }
384
385 func (vm *volumeManager) VolumeIsAttached(
386 volumeName v1.UniqueVolumeName) bool {
387 return vm.actualStateOfWorld.VolumeExists(volumeName)
388 }
389
390 func (vm *volumeManager) MarkVolumesAsReportedInUse(
391 volumesReportedAsInUse []v1.UniqueVolumeName) {
392 vm.desiredStateOfWorld.MarkVolumesReportedInUse(volumesReportedAsInUse)
393 }
394
395 func (vm *volumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error {
396 if pod == nil {
397 return nil
398 }
399
400 expectedVolumes := getExpectedVolumes(pod)
401 if len(expectedVolumes) == 0 {
402
403 return nil
404 }
405
406 klog.V(3).InfoS("Waiting for volumes to attach and mount for pod", "pod", klog.KObj(pod))
407 uniquePodName := util.GetUniquePodName(pod)
408
409
410
411
412 vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
413
414 err := wait.PollUntilContextTimeout(
415 ctx,
416 podAttachAndMountRetryInterval,
417 podAttachAndMountTimeout,
418 true,
419 vm.verifyVolumesMountedFunc(uniquePodName, expectedVolumes))
420
421 if err != nil {
422 unmountedVolumes :=
423 vm.getUnmountedVolumes(uniquePodName, expectedVolumes)
424
425 unattachedVolumes :=
426 vm.getUnattachedVolumes(uniquePodName)
427 volumesNotInDSW :=
428 vm.getVolumesNotInDSW(uniquePodName, expectedVolumes)
429
430 if len(unmountedVolumes) == 0 {
431 return nil
432 }
433
434 return fmt.Errorf(
435 "unmounted volumes=%v, unattached volumes=%v, failed to process volumes=%v: %w",
436 unmountedVolumes,
437 unattachedVolumes,
438 volumesNotInDSW,
439 err)
440 }
441
442 klog.V(3).InfoS("All volumes are attached and mounted for pod", "pod", klog.KObj(pod))
443 return nil
444 }
445
446 func (vm *volumeManager) WaitForUnmount(ctx context.Context, pod *v1.Pod) error {
447 if pod == nil {
448 return nil
449 }
450
451 klog.V(3).InfoS("Waiting for volumes to unmount for pod", "pod", klog.KObj(pod))
452 uniquePodName := util.GetUniquePodName(pod)
453
454 vm.desiredStateOfWorldPopulator.ReprocessPod(uniquePodName)
455
456 err := wait.PollUntilContextTimeout(
457 ctx,
458 podAttachAndMountRetryInterval,
459 podAttachAndMountTimeout,
460 true,
461 vm.verifyVolumesUnmountedFunc(uniquePodName))
462
463 if err != nil {
464 var mountedVolumes []string
465 for _, v := range vm.actualStateOfWorld.GetMountedVolumesForPod(uniquePodName) {
466 mountedVolumes = append(mountedVolumes, v.OuterVolumeSpecName)
467 }
468 sort.Strings(mountedVolumes)
469
470 if len(mountedVolumes) == 0 {
471 return nil
472 }
473
474 return fmt.Errorf(
475 "mounted volumes=%v: %w",
476 mountedVolumes,
477 err)
478 }
479
480 klog.V(3).InfoS("All volumes are unmounted for pod", "pod", klog.KObj(pod))
481 return nil
482 }
483
484 func (vm *volumeManager) getVolumesNotInDSW(uniquePodName types.UniquePodName, expectedVolumes []string) []string {
485 volumesNotInDSW := sets.NewString(expectedVolumes...)
486
487 for _, volumeToMount := range vm.desiredStateOfWorld.GetVolumesToMount() {
488 if volumeToMount.PodName == uniquePodName {
489 volumesNotInDSW.Delete(volumeToMount.OuterVolumeSpecName)
490 }
491 }
492
493 return volumesNotInDSW.List()
494 }
495
496
497
498 func (vm *volumeManager) getUnattachedVolumes(uniquePodName types.UniquePodName) []string {
499 unattachedVolumes := []string{}
500 for _, volumeToMount := range vm.desiredStateOfWorld.GetVolumesToMount() {
501 if volumeToMount.PodName == uniquePodName &&
502 volumeToMount.PluginIsAttachable &&
503 !vm.actualStateOfWorld.VolumeExists(volumeToMount.VolumeName) {
504 unattachedVolumes = append(unattachedVolumes, volumeToMount.OuterVolumeSpecName)
505 }
506 }
507 sort.Strings(unattachedVolumes)
508
509 return unattachedVolumes
510 }
511
512
513
514 func (vm *volumeManager) verifyVolumesMountedFunc(podName types.UniquePodName, expectedVolumes []string) wait.ConditionWithContextFunc {
515 return func(_ context.Context) (done bool, err error) {
516 if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
517 return true, errors.New(strings.Join(errs, "; "))
518 }
519 return len(vm.getUnmountedVolumes(podName, expectedVolumes)) == 0, nil
520 }
521 }
522
523
524
525 func (vm *volumeManager) verifyVolumesUnmountedFunc(podName types.UniquePodName) wait.ConditionWithContextFunc {
526 return func(_ context.Context) (done bool, err error) {
527 if errs := vm.desiredStateOfWorld.PopPodErrors(podName); len(errs) > 0 {
528 return true, errors.New(strings.Join(errs, "; "))
529 }
530 return len(vm.actualStateOfWorld.GetMountedVolumesForPod(podName)) == 0, nil
531 }
532 }
533
534
535
536
537
538 func (vm *volumeManager) getUnmountedVolumes(podName types.UniquePodName, expectedVolumes []string) []string {
539 mountedVolumes := sets.NewString()
540 for _, mountedVolume := range vm.actualStateOfWorld.GetMountedVolumesForPod(podName) {
541 mountedVolumes.Insert(mountedVolume.OuterVolumeSpecName)
542 }
543 return filterUnmountedVolumes(mountedVolumes, expectedVolumes)
544 }
545
546
547
548 func filterUnmountedVolumes(mountedVolumes sets.String, expectedVolumes []string) []string {
549 unmountedVolumes := []string{}
550 for _, expectedVolume := range expectedVolumes {
551 if !mountedVolumes.Has(expectedVolume) {
552 unmountedVolumes = append(unmountedVolumes, expectedVolume)
553 }
554 }
555 sort.Strings(unmountedVolumes)
556
557 return unmountedVolumes
558 }
559
560
561
562 func getExpectedVolumes(pod *v1.Pod) []string {
563 mounts, devices, _ := util.GetPodVolumeNames(pod)
564 return mounts.Union(devices).UnsortedList()
565 }
566
567
568
569
570 func getExtraSupplementalGid(volumeGidValue string, pod *v1.Pod) (int64, bool) {
571 if volumeGidValue == "" {
572 return 0, false
573 }
574
575 gid, err := strconv.ParseInt(volumeGidValue, 10, 64)
576 if err != nil {
577 return 0, false
578 }
579
580 if pod.Spec.SecurityContext != nil {
581 for _, existingGid := range pod.Spec.SecurityContext.SupplementalGroups {
582 if gid == int64(existingGid) {
583 return 0, false
584 }
585 }
586 }
587
588 return gid, true
589 }
590
View as plain text