1
16
17 package reconciler
18
19 import (
20 "crypto/md5"
21 "fmt"
22 "path/filepath"
23 "testing"
24 "time"
25
26 csitrans "k8s.io/csi-translation-lib"
27 "k8s.io/kubernetes/pkg/volume/csimigration"
28
29 "github.com/stretchr/testify/assert"
30 "k8s.io/mount-utils"
31
32 v1 "k8s.io/api/core/v1"
33 "k8s.io/apimachinery/pkg/api/resource"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/runtime"
36 k8stypes "k8s.io/apimachinery/pkg/types"
37 "k8s.io/apimachinery/pkg/util/wait"
38 "k8s.io/client-go/kubernetes/fake"
39 core "k8s.io/client-go/testing"
40 "k8s.io/client-go/tools/record"
41 "k8s.io/klog/v2"
42 "k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
43 "k8s.io/kubernetes/pkg/volume"
44 volumetesting "k8s.io/kubernetes/pkg/volume/testing"
45 "k8s.io/kubernetes/pkg/volume/util"
46 "k8s.io/kubernetes/pkg/volume/util/hostutil"
47 "k8s.io/kubernetes/pkg/volume/util/operationexecutor"
48 "k8s.io/kubernetes/pkg/volume/util/types"
49 )
50
51 const (
52
53
54 reconcilerLoopSleepDuration = 1 * time.Nanosecond
55
56
57 waitForAttachTimeout = 1 * time.Second
58 nodeName = k8stypes.NodeName("mynodename")
59 kubeletPodsDir = "fake-dir"
60 testOperationBackOffDuration = 100 * time.Millisecond
61 reconcilerSyncWaitDuration = 10 * time.Second
62 )
63
64 func hasAddedPods() bool { return true }
65
66
67
68 func Test_Run_Positive_DoNothing(t *testing.T) {
69
70 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
71 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
72 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
73 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
74 kubeClient := createTestClient()
75 fakeRecorder := &record.FakeRecorder{}
76 fakeHandler := volumetesting.NewBlockVolumePathHandler()
77 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
78 kubeClient,
79 volumePluginMgr,
80 fakeRecorder,
81 fakeHandler,
82 ))
83 reconciler := NewReconciler(
84 kubeClient,
85 false,
86 reconcilerLoopSleepDuration,
87 waitForAttachTimeout,
88 nodeName,
89 dsw,
90 asw,
91 hasAddedPods,
92 oex,
93 mount.NewFakeMounter(nil),
94 hostutil.NewFakeHostUtil(nil),
95 volumePluginMgr,
96 kubeletPodsDir)
97
98
99 runReconciler(reconciler)
100
101
102 assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
103 assert.NoError(t, volumetesting.VerifyZeroWaitForAttachCallCount(fakePlugin))
104 assert.NoError(t, volumetesting.VerifyZeroMountDeviceCallCount(fakePlugin))
105 assert.NoError(t, volumetesting.VerifyZeroSetUpCallCount(fakePlugin))
106 assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
107 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
108 }
109
110
111
112
113 func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
114
115 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
116 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
117 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
118 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
119 kubeClient := createTestClient()
120 fakeRecorder := &record.FakeRecorder{}
121 fakeHandler := volumetesting.NewBlockVolumePathHandler()
122 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
123 kubeClient,
124 volumePluginMgr,
125 fakeRecorder,
126 fakeHandler))
127 reconciler := NewReconciler(
128 kubeClient,
129 false,
130 reconcilerLoopSleepDuration,
131 waitForAttachTimeout,
132 nodeName,
133 dsw,
134 asw,
135 hasAddedPods,
136 oex,
137 mount.NewFakeMounter(nil),
138 hostutil.NewFakeHostUtil(nil),
139 volumePluginMgr,
140 kubeletPodsDir)
141 pod := &v1.Pod{
142 ObjectMeta: metav1.ObjectMeta{
143 Name: "pod1",
144 UID: "pod1uid",
145 },
146 Spec: v1.PodSpec{
147 Volumes: []v1.Volume{
148 {
149 Name: "volume-name",
150 VolumeSource: v1.VolumeSource{
151 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
152 PDName: "fake-device1",
153 },
154 },
155 },
156 },
157 },
158 }
159
160 volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
161 podName := util.GetUniquePodName(pod)
162 generatedVolumeName, err := dsw.AddPodToVolume(
163 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
164
165
166 if err != nil {
167 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
168 }
169
170
171 runReconciler(reconciler)
172 waitForMount(t, fakePlugin, generatedVolumeName, asw)
173
174 assert.NoError(t, volumetesting.VerifyAttachCallCount(
175 1 , fakePlugin))
176 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
177 1 , fakePlugin))
178 assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
179 1 , fakePlugin))
180 assert.NoError(t, volumetesting.VerifySetUpCallCount(
181 1 , fakePlugin))
182 assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
183 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
184 }
185
186
187
188
189 func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
190
191 intreeToCSITranslator := csitrans.New()
192 node := &v1.Node{
193 ObjectMeta: metav1.ObjectMeta{
194 Name: string(nodeName),
195 },
196 Spec: v1.NodeSpec{},
197 Status: v1.NodeStatus{
198 VolumesAttached: []v1.AttachedVolume{
199 {
200 Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", "pd.csi.storage.gke.io-fake-device1")),
201 DevicePath: "fake/path",
202 },
203 },
204 },
205 }
206 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
207 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
208 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
209
210 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
211 kubeClient := createTestClient(v1.AttachedVolume{
212 Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", "pd.csi.storage.gke.io-fake-device1")),
213 DevicePath: "fake/path",
214 })
215
216 fakeRecorder := &record.FakeRecorder{}
217 fakeHandler := volumetesting.NewBlockVolumePathHandler()
218 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
219 kubeClient,
220 volumePluginMgr,
221 fakeRecorder,
222 fakeHandler))
223 reconciler := NewReconciler(
224 kubeClient,
225 true,
226 reconcilerLoopSleepDuration,
227 waitForAttachTimeout,
228 nodeName,
229 dsw,
230 asw,
231 hasAddedPods,
232 oex,
233 mount.NewFakeMounter(nil),
234 hostutil.NewFakeHostUtil(nil),
235 volumePluginMgr,
236 kubeletPodsDir)
237 pod := &v1.Pod{
238 ObjectMeta: metav1.ObjectMeta{
239 Name: "pod1",
240 UID: "pod1uid",
241 },
242 Spec: v1.PodSpec{
243 Volumes: []v1.Volume{
244 {
245 Name: "volume-name",
246 VolumeSource: v1.VolumeSource{
247 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
248 PDName: "fake-device1",
249 },
250 },
251 },
252 },
253 },
254 }
255
256 volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
257 migratedSpec, err := csimigration.TranslateInTreeSpecToCSI(volumeSpec, pod.Namespace, intreeToCSITranslator)
258 if err != nil {
259 t.Fatalf("unexpected error while translating spec %v: %v", volumeSpec, err)
260 }
261
262 podName := util.GetUniquePodName(pod)
263 generatedVolumeName, err := dsw.AddPodToVolume(
264 podName,
265 pod,
266 migratedSpec,
267 migratedSpec.Name(),
268 "",
269 nil,
270 )
271
272
273 if err != nil {
274 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
275 }
276 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
277
278
279 runReconciler(reconciler)
280 waitForMount(t, fakePlugin, generatedVolumeName, asw)
281
282 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
283 1 , fakePlugin))
284 assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
285 1 , fakePlugin))
286 assert.NoError(t, volumetesting.VerifySetUpCallCount(
287 1 , fakePlugin))
288 assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
289 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
290 }
291
292
293
294
295
296
297 func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
298
299 node := &v1.Node{
300 ObjectMeta: metav1.ObjectMeta{
301 Name: string(nodeName),
302 },
303 Status: v1.NodeStatus{
304 VolumesAttached: []v1.AttachedVolume{
305 {
306 Name: "fake-plugin/fake-device1",
307 DevicePath: "fake/path",
308 },
309 },
310 },
311 }
312 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
313 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
314 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
315 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
316 kubeClient := createTestClient()
317 fakeRecorder := &record.FakeRecorder{}
318 fakeHandler := volumetesting.NewBlockVolumePathHandler()
319 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
320 kubeClient,
321 volumePluginMgr,
322 fakeRecorder,
323 fakeHandler))
324 reconciler := NewReconciler(
325 kubeClient,
326 true,
327 reconcilerLoopSleepDuration,
328 waitForAttachTimeout,
329 nodeName,
330 dsw,
331 asw,
332 hasAddedPods,
333 oex,
334 mount.NewFakeMounter(nil),
335 hostutil.NewFakeHostUtil(nil),
336 volumePluginMgr,
337 kubeletPodsDir)
338 pod := &v1.Pod{
339 ObjectMeta: metav1.ObjectMeta{
340 Name: "pod1",
341 UID: "pod1uid",
342 },
343 Spec: v1.PodSpec{
344 Volumes: []v1.Volume{
345 {
346 Name: "volume-name",
347 VolumeSource: v1.VolumeSource{
348 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
349 PDName: "fake-device1",
350 },
351 },
352 },
353 },
354 },
355 }
356
357 volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
358 podName := util.GetUniquePodName(pod)
359 generatedVolumeName, err := dsw.AddPodToVolume(
360 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
361 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
362
363
364 if err != nil {
365 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
366 }
367
368
369 runReconciler(reconciler)
370 waitForMount(t, fakePlugin, generatedVolumeName, asw)
371
372
373 assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
374 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
375 1 , fakePlugin))
376 assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
377 1 , fakePlugin))
378 assert.NoError(t, volumetesting.VerifySetUpCallCount(
379 1 , fakePlugin))
380 assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
381 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
382 }
383
384
385
386
387
388
389
390 func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) {
391
392 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
393 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
394 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
395 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
396 kubeClient := createTestClient()
397 fakeRecorder := &record.FakeRecorder{}
398 fakeHandler := volumetesting.NewBlockVolumePathHandler()
399 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
400 kubeClient,
401 volumePluginMgr,
402 fakeRecorder,
403 fakeHandler))
404 reconciler := NewReconciler(
405 kubeClient,
406 true,
407 reconcilerLoopSleepDuration,
408 waitForAttachTimeout,
409 nodeName,
410 dsw,
411 asw,
412 hasAddedPods,
413 oex,
414 mount.NewFakeMounter(nil),
415 hostutil.NewFakeHostUtil(nil),
416 volumePluginMgr,
417 kubeletPodsDir)
418 pod := &v1.Pod{
419 ObjectMeta: metav1.ObjectMeta{
420 Name: "pod1",
421 UID: "pod1uid",
422 },
423 Spec: v1.PodSpec{
424 Volumes: []v1.Volume{
425 {
426 Name: "volume-name",
427 VolumeSource: v1.VolumeSource{
428 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
429 PDName: "fake-device1",
430 },
431 },
432 },
433 },
434 },
435 }
436
437 volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
438 podName := util.GetUniquePodName(pod)
439 generatedVolumeName, err := dsw.AddPodToVolume(
440 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
441
442
443 if err != nil {
444 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
445 }
446
447
448 runReconciler(reconciler)
449 time.Sleep(reconcilerSyncWaitDuration)
450
451 ok := oex.IsOperationSafeToRetry(generatedVolumeName, podName, nodeName, operationexecutor.VerifyControllerAttachedVolumeOpName)
452 if !ok {
453 t.Errorf("operation on volume %s is not safe to retry", generatedVolumeName)
454 }
455
456
457 assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
458 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
459 0 , fakePlugin))
460 assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
461 0 , fakePlugin))
462 }
463
464
465
466
467
468
469 func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
470
471 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
472 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
473 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
474 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
475 kubeClient := createTestClient()
476 fakeRecorder := &record.FakeRecorder{}
477 fakeHandler := volumetesting.NewBlockVolumePathHandler()
478 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
479 kubeClient,
480 volumePluginMgr,
481 fakeRecorder,
482 fakeHandler))
483 reconciler := NewReconciler(
484 kubeClient,
485 false,
486 reconcilerLoopSleepDuration,
487 waitForAttachTimeout,
488 nodeName,
489 dsw,
490 asw,
491 hasAddedPods,
492 oex,
493 mount.NewFakeMounter(nil),
494 hostutil.NewFakeHostUtil(nil),
495 volumePluginMgr,
496 kubeletPodsDir)
497 pod := &v1.Pod{
498 ObjectMeta: metav1.ObjectMeta{
499 Name: "pod1",
500 UID: "pod1uid",
501 },
502 Spec: v1.PodSpec{
503 Volumes: []v1.Volume{
504 {
505 Name: "volume-name",
506 VolumeSource: v1.VolumeSource{
507 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
508 PDName: "fake-device1",
509 },
510 },
511 },
512 },
513 },
514 }
515
516 volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
517 podName := util.GetUniquePodName(pod)
518 generatedVolumeName, err := dsw.AddPodToVolume(
519 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
520
521
522 if err != nil {
523 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
524 }
525
526
527 runReconciler(reconciler)
528 waitForMount(t, fakePlugin, generatedVolumeName, asw)
529
530 assert.NoError(t, volumetesting.VerifyAttachCallCount(
531 1 , fakePlugin))
532 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
533 1 , fakePlugin))
534 assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
535 1 , fakePlugin))
536 assert.NoError(t, volumetesting.VerifySetUpCallCount(
537 1 , fakePlugin))
538 assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
539 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
540
541
542 dsw.DeletePodFromVolume(podName, generatedVolumeName)
543 waitForDetach(t, generatedVolumeName, asw)
544
545
546 assert.NoError(t, volumetesting.VerifyTearDownCallCount(
547 1 , fakePlugin))
548 assert.NoError(t, volumetesting.VerifyDetachCallCount(
549 1 , fakePlugin))
550 }
551
552
553
554
555
556
557
558
559 func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
560
561 node := &v1.Node{
562 ObjectMeta: metav1.ObjectMeta{
563 Name: string(nodeName),
564 },
565 Status: v1.NodeStatus{
566 VolumesAttached: []v1.AttachedVolume{
567 {
568 Name: "fake-plugin/fake-device1",
569 DevicePath: "fake/path",
570 },
571 },
572 },
573 }
574 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
575 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
576 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
577 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
578 kubeClient := createTestClient()
579 fakeRecorder := &record.FakeRecorder{}
580 fakeHandler := volumetesting.NewBlockVolumePathHandler()
581 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
582 kubeClient,
583 volumePluginMgr,
584 fakeRecorder,
585 fakeHandler))
586 reconciler := NewReconciler(
587 kubeClient,
588 true,
589 reconcilerLoopSleepDuration,
590 waitForAttachTimeout,
591 nodeName,
592 dsw,
593 asw,
594 hasAddedPods,
595 oex,
596 mount.NewFakeMounter(nil),
597 hostutil.NewFakeHostUtil(nil),
598 volumePluginMgr,
599 kubeletPodsDir)
600 pod := &v1.Pod{
601 ObjectMeta: metav1.ObjectMeta{
602 Name: "pod1",
603 UID: "pod1uid",
604 },
605 Spec: v1.PodSpec{
606 Volumes: []v1.Volume{
607 {
608 Name: "volume-name",
609 VolumeSource: v1.VolumeSource{
610 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
611 PDName: "fake-device1",
612 },
613 },
614 },
615 },
616 },
617 }
618
619 volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
620 podName := util.GetUniquePodName(pod)
621 generatedVolumeName, err := dsw.AddPodToVolume(
622 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
623
624
625 if err != nil {
626 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
627 }
628
629
630 runReconciler(reconciler)
631
632 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
633 waitForMount(t, fakePlugin, generatedVolumeName, asw)
634
635
636 assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
637 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
638 1 , fakePlugin))
639 assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
640 1 , fakePlugin))
641 assert.NoError(t, volumetesting.VerifySetUpCallCount(
642 1 , fakePlugin))
643 assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
644 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
645
646
647 dsw.DeletePodFromVolume(podName, generatedVolumeName)
648 waitForDetach(t, generatedVolumeName, asw)
649
650
651 assert.NoError(t, volumetesting.VerifyTearDownCallCount(
652 1 , fakePlugin))
653 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
654 }
655
656
657
658
659
660 func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
661 pod := &v1.Pod{
662 ObjectMeta: metav1.ObjectMeta{
663 Name: "pod1",
664 UID: "pod1uid",
665 Namespace: "ns",
666 },
667 Spec: v1.PodSpec{},
668 }
669
670 mode := v1.PersistentVolumeBlock
671 gcepv := &v1.PersistentVolume{
672 ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
673 Spec: v1.PersistentVolumeSpec{
674 Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
675 PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
676 AccessModes: []v1.PersistentVolumeAccessMode{
677 v1.ReadWriteOnce,
678 v1.ReadOnlyMany,
679 },
680 VolumeMode: &mode,
681 ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
682 },
683 }
684
685 gcepvc := &v1.PersistentVolumeClaim{
686 ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
687 Spec: v1.PersistentVolumeClaimSpec{
688 VolumeName: "volume-name",
689 VolumeMode: &mode,
690 },
691 Status: v1.PersistentVolumeClaimStatus{
692 Phase: v1.ClaimBound,
693 Capacity: gcepv.Spec.Capacity,
694 },
695 }
696
697
698 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
699 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
700 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
701 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
702 kubeClient := createtestClientWithPVPVC(gcepv, gcepvc)
703 fakeRecorder := &record.FakeRecorder{}
704 fakeHandler := volumetesting.NewBlockVolumePathHandler()
705 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
706 kubeClient,
707 volumePluginMgr,
708 fakeRecorder,
709 fakeHandler))
710 reconciler := NewReconciler(
711 kubeClient,
712 false,
713 reconcilerLoopSleepDuration,
714 waitForAttachTimeout,
715 nodeName,
716 dsw,
717 asw,
718 hasAddedPods,
719 oex,
720 mount.NewFakeMounter(nil),
721 hostutil.NewFakeHostUtil(nil),
722 volumePluginMgr,
723 kubeletPodsDir)
724
725 volumeSpec := &volume.Spec{
726 PersistentVolume: gcepv,
727 }
728 podName := util.GetUniquePodName(pod)
729 generatedVolumeName, err := dsw.AddPodToVolume(
730 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
731
732
733 if err != nil {
734 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
735 }
736
737
738 runReconciler(reconciler)
739 waitForMount(t, fakePlugin, generatedVolumeName, asw)
740
741 assert.NoError(t, volumetesting.VerifyAttachCallCount(
742 1 , fakePlugin))
743 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
744 1 , fakePlugin))
745 assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
746 1 , fakePlugin))
747 assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
748 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
749 }
750
751
752
753
754
755
756
757 func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
758 pod := &v1.Pod{
759 ObjectMeta: metav1.ObjectMeta{
760 Name: "pod1",
761 UID: "pod1uid",
762 Namespace: "ns",
763 },
764 Spec: v1.PodSpec{},
765 }
766
767 mode := v1.PersistentVolumeBlock
768 gcepv := &v1.PersistentVolume{
769 ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
770 Spec: v1.PersistentVolumeSpec{
771 Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
772 PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
773 AccessModes: []v1.PersistentVolumeAccessMode{
774 v1.ReadWriteOnce,
775 v1.ReadOnlyMany,
776 },
777 VolumeMode: &mode,
778 ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
779 },
780 }
781 gcepvc := &v1.PersistentVolumeClaim{
782 ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
783 Spec: v1.PersistentVolumeClaimSpec{
784 VolumeName: "volume-name",
785 VolumeMode: &mode,
786 },
787 Status: v1.PersistentVolumeClaimStatus{
788 Phase: v1.ClaimBound,
789 Capacity: gcepv.Spec.Capacity,
790 },
791 }
792
793 volumeSpec := &volume.Spec{
794 PersistentVolume: gcepv,
795 }
796 node := &v1.Node{
797 ObjectMeta: metav1.ObjectMeta{
798 Name: string(nodeName),
799 },
800 Status: v1.NodeStatus{
801 VolumesAttached: []v1.AttachedVolume{
802 {
803 Name: "fake-plugin/fake-device1",
804 DevicePath: "fake/path",
805 },
806 },
807 },
808 }
809
810
811 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
812 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
813 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
814 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
815 kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
816 Name: "fake-plugin/fake-device1",
817 DevicePath: "/fake/path",
818 })
819 fakeRecorder := &record.FakeRecorder{}
820 fakeHandler := volumetesting.NewBlockVolumePathHandler()
821 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
822 kubeClient,
823 volumePluginMgr,
824 fakeRecorder,
825 fakeHandler))
826 reconciler := NewReconciler(
827 kubeClient,
828 true,
829 reconcilerLoopSleepDuration,
830 waitForAttachTimeout,
831 nodeName,
832 dsw,
833 asw,
834 hasAddedPods,
835 oex,
836 mount.NewFakeMounter(nil),
837 hostutil.NewFakeHostUtil(nil),
838 volumePluginMgr,
839 kubeletPodsDir)
840
841 podName := util.GetUniquePodName(pod)
842 generatedVolumeName, err := dsw.AddPodToVolume(
843 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
844 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
845
846
847 if err != nil {
848 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
849 }
850
851
852 runReconciler(reconciler)
853 waitForMount(t, fakePlugin, generatedVolumeName, asw)
854
855
856 assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
857 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
858 1 , fakePlugin))
859 assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
860 1 , fakePlugin))
861 assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
862 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
863 }
864
865
866
867
868
869
870
871 func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
872 pod := &v1.Pod{
873 ObjectMeta: metav1.ObjectMeta{
874 Name: "pod1",
875 UID: "pod1uid",
876 Namespace: "ns",
877 },
878 Spec: v1.PodSpec{},
879 }
880
881 mode := v1.PersistentVolumeBlock
882 gcepv := &v1.PersistentVolume{
883 ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
884 Spec: v1.PersistentVolumeSpec{
885 Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
886 PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
887 AccessModes: []v1.PersistentVolumeAccessMode{
888 v1.ReadWriteOnce,
889 v1.ReadOnlyMany,
890 },
891 VolumeMode: &mode,
892 ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
893 },
894 }
895 gcepvc := &v1.PersistentVolumeClaim{
896 ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
897 Spec: v1.PersistentVolumeClaimSpec{
898 VolumeName: "volume-name",
899 VolumeMode: &mode,
900 },
901 Status: v1.PersistentVolumeClaimStatus{
902 Phase: v1.ClaimBound,
903 Capacity: gcepv.Spec.Capacity,
904 },
905 }
906
907 volumeSpec := &volume.Spec{
908 PersistentVolume: gcepv,
909 }
910
911
912 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
913 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
914 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
915 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
916 kubeClient := createtestClientWithPVPVC(gcepv, gcepvc)
917 fakeRecorder := &record.FakeRecorder{}
918 fakeHandler := volumetesting.NewBlockVolumePathHandler()
919 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
920 kubeClient,
921 volumePluginMgr,
922 fakeRecorder,
923 fakeHandler))
924 reconciler := NewReconciler(
925 kubeClient,
926 false,
927 reconcilerLoopSleepDuration,
928 waitForAttachTimeout,
929 nodeName,
930 dsw,
931 asw,
932 hasAddedPods,
933 oex,
934 mount.NewFakeMounter(nil),
935 hostutil.NewFakeHostUtil(nil),
936 volumePluginMgr,
937 kubeletPodsDir)
938
939 podName := util.GetUniquePodName(pod)
940 generatedVolumeName, err := dsw.AddPodToVolume(
941 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
942
943
944 if err != nil {
945 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
946 }
947
948
949 runReconciler(reconciler)
950 waitForMount(t, fakePlugin, generatedVolumeName, asw)
951
952 assert.NoError(t, volumetesting.VerifyAttachCallCount(
953 1 , fakePlugin))
954 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
955 1 , fakePlugin))
956 assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
957 1 , fakePlugin))
958 assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
959 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
960
961
962 dsw.DeletePodFromVolume(podName, generatedVolumeName)
963 waitForDetach(t, generatedVolumeName, asw)
964
965
966 assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
967 1 , fakePlugin))
968 assert.NoError(t, volumetesting.VerifyDetachCallCount(
969 1 , fakePlugin))
970 }
971
972
973
974
975
976
977
978
979 func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
980 pod := &v1.Pod{
981 ObjectMeta: metav1.ObjectMeta{
982 Name: "pod1",
983 UID: "pod1uid",
984 Namespace: "ns",
985 },
986 Spec: v1.PodSpec{},
987 }
988
989 mode := v1.PersistentVolumeBlock
990 gcepv := &v1.PersistentVolume{
991 ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
992 Spec: v1.PersistentVolumeSpec{
993 Capacity: v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
994 PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
995 AccessModes: []v1.PersistentVolumeAccessMode{
996 v1.ReadWriteOnce,
997 v1.ReadOnlyMany,
998 },
999 VolumeMode: &mode,
1000 ClaimRef: &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
1001 },
1002 }
1003 gcepvc := &v1.PersistentVolumeClaim{
1004 ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
1005 Spec: v1.PersistentVolumeClaimSpec{
1006 VolumeName: "volume-name",
1007 VolumeMode: &mode,
1008 },
1009 Status: v1.PersistentVolumeClaimStatus{
1010 Phase: v1.ClaimBound,
1011 Capacity: gcepv.Spec.Capacity,
1012 },
1013 }
1014
1015 volumeSpec := &volume.Spec{
1016 PersistentVolume: gcepv,
1017 }
1018
1019 node := &v1.Node{
1020 ObjectMeta: metav1.ObjectMeta{
1021 Name: string(nodeName),
1022 },
1023 Status: v1.NodeStatus{
1024 VolumesAttached: []v1.AttachedVolume{
1025 {
1026 Name: "fake-plugin/fake-device1",
1027 DevicePath: "/fake/path",
1028 },
1029 },
1030 },
1031 }
1032
1033
1034 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
1035 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
1036 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
1037 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
1038 kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
1039 Name: "fake-plugin/fake-device1",
1040 DevicePath: "/fake/path",
1041 })
1042 fakeRecorder := &record.FakeRecorder{}
1043 fakeHandler := volumetesting.NewBlockVolumePathHandler()
1044 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
1045 kubeClient,
1046 volumePluginMgr,
1047 fakeRecorder,
1048 fakeHandler))
1049 reconciler := NewReconciler(
1050 kubeClient,
1051 true,
1052 reconcilerLoopSleepDuration,
1053 waitForAttachTimeout,
1054 nodeName,
1055 dsw,
1056 asw,
1057 hasAddedPods,
1058 oex,
1059 mount.NewFakeMounter(nil),
1060 hostutil.NewFakeHostUtil(nil),
1061 volumePluginMgr,
1062 kubeletPodsDir)
1063
1064 podName := util.GetUniquePodName(pod)
1065 generatedVolumeName, err := dsw.AddPodToVolume(
1066 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
1067
1068
1069 if err != nil {
1070 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
1071 }
1072
1073
1074 runReconciler(reconciler)
1075
1076 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
1077 waitForMount(t, fakePlugin, generatedVolumeName, asw)
1078
1079
1080 assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
1081 assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
1082 1 , fakePlugin))
1083 assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
1084 1 , fakePlugin))
1085 assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
1086 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
1087
1088
1089 dsw.DeletePodFromVolume(podName, generatedVolumeName)
1090 waitForDetach(t, generatedVolumeName, asw)
1091
1092
1093 assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
1094 1 , fakePlugin))
1095 assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
1096 }
1097
1098 func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) {
1099 testCases := map[string]struct {
1100 volumePlugins []volume.VolumePlugin
1101 expectErr bool
1102 expectedErrMsg string
1103 }{
1104 "volumePlugin is nil": {
1105 volumePlugins: []volume.VolumePlugin{},
1106 expectErr: true,
1107 expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed",
1108 },
1109 "blockVolumePlugin is nil": {
1110 volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
1111 expectErr: true,
1112 expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
1113 },
1114 }
1115
1116 for name, tc := range testCases {
1117 t.Run(name, func(t *testing.T) {
1118 volumePluginMgr := &volume.VolumePluginMgr{}
1119 volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
1120 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
1121 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
1122 nil,
1123 volumePluginMgr,
1124 nil,
1125 nil))
1126
1127 pod := &v1.Pod{
1128 ObjectMeta: metav1.ObjectMeta{
1129 Name: "pod1",
1130 UID: "pod1uid",
1131 },
1132 Spec: v1.PodSpec{},
1133 }
1134 volumeMode := v1.PersistentVolumeBlock
1135 tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
1136 volumeToMount := operationexecutor.VolumeToMount{
1137 Pod: pod,
1138 VolumeSpec: tmpSpec}
1139 err := oex.MountVolume(waitForAttachTimeout, volumeToMount, asw, false)
1140
1141 if assert.Error(t, err) {
1142 assert.Contains(t, err.Error(), tc.expectedErrMsg)
1143 }
1144 })
1145 }
1146 }
1147
1148 func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) {
1149 testCases := map[string]struct {
1150 volumePlugins []volume.VolumePlugin
1151 expectErr bool
1152 expectedErrMsg string
1153 }{
1154 "volumePlugin is nil": {
1155 volumePlugins: []volume.VolumePlugin{},
1156 expectErr: true,
1157 expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed",
1158 },
1159 "blockVolumePlugin is nil": {
1160 volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
1161 expectErr: true,
1162 expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
1163 },
1164 }
1165
1166 for name, tc := range testCases {
1167 t.Run(name, func(t *testing.T) {
1168 volumePluginMgr := &volume.VolumePluginMgr{}
1169 volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
1170 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
1171 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
1172 nil,
1173 volumePluginMgr,
1174 nil,
1175 nil))
1176 volumeMode := v1.PersistentVolumeBlock
1177 tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
1178 volumeToUnmount := operationexecutor.MountedVolume{
1179 PluginName: "fake-file-plugin",
1180 VolumeSpec: tmpSpec}
1181 err := oex.UnmountVolume(volumeToUnmount, asw, "" )
1182
1183 if assert.Error(t, err) {
1184 assert.Contains(t, err.Error(), tc.expectedErrMsg)
1185 }
1186 })
1187 }
1188 }
1189
1190 func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
1191 testCases := map[string]struct {
1192 volumePlugins []volume.VolumePlugin
1193 expectErr bool
1194 expectedErrMsg string
1195 }{
1196 "volumePlugin is nil": {
1197 volumePlugins: []volume.VolumePlugin{},
1198 expectErr: true,
1199 expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed",
1200 },
1201 "blockVolumePlugin is nil": {
1202 volumePlugins: volumetesting.NewFakeFileVolumePlugin(),
1203 expectErr: true,
1204 expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
1205 },
1206 }
1207
1208 for name, tc := range testCases {
1209 t.Run(name, func(t *testing.T) {
1210 volumePluginMgr := &volume.VolumePluginMgr{}
1211 volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
1212 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
1213 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
1214 nil,
1215 volumePluginMgr,
1216 nil,
1217 nil))
1218 var hostutil hostutil.HostUtils
1219 volumeMode := v1.PersistentVolumeBlock
1220 tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
1221 deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: tmpSpec, PluginName: "fake-file-plugin"}
1222 err := oex.UnmountDevice(deviceToDetach, asw, hostutil)
1223
1224 if assert.Error(t, err) {
1225 assert.Contains(t, err.Error(), tc.expectedErrMsg)
1226 }
1227 })
1228 }
1229 }
1230
1231
1232
1233
1234
1235
1236
1237 func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
1238 blockMode := v1.PersistentVolumeBlock
1239 fsMode := v1.PersistentVolumeFilesystem
1240
1241 var tests = []struct {
1242 name string
1243 volumeMode *v1.PersistentVolumeMode
1244 expansionFailed bool
1245 uncertainTest bool
1246 pvName string
1247 pvcSize resource.Quantity
1248 pvcStatusSize resource.Quantity
1249 oldPVSize resource.Quantity
1250 newPVSize resource.Quantity
1251 }{
1252 {
1253 name: "expand-fs-volume",
1254 volumeMode: &fsMode,
1255 pvName: "pv",
1256 pvcSize: resource.MustParse("10G"),
1257 pvcStatusSize: resource.MustParse("10G"),
1258 newPVSize: resource.MustParse("15G"),
1259 oldPVSize: resource.MustParse("10G"),
1260 },
1261 {
1262 name: "expand-raw-block",
1263 volumeMode: &blockMode,
1264 pvName: "pv",
1265 pvcSize: resource.MustParse("10G"),
1266 pvcStatusSize: resource.MustParse("10G"),
1267 newPVSize: resource.MustParse("15G"),
1268 oldPVSize: resource.MustParse("10G"),
1269 },
1270 {
1271 name: "expand-fs-volume with in-use error",
1272 volumeMode: &fsMode,
1273 expansionFailed: true,
1274 pvName: volumetesting.FailWithInUseVolumeName,
1275 pvcSize: resource.MustParse("10G"),
1276 pvcStatusSize: resource.MustParse("10G"),
1277 newPVSize: resource.MustParse("15G"),
1278 oldPVSize: resource.MustParse("13G"),
1279 },
1280 {
1281 name: "expand-fs-volume with unsupported error",
1282 volumeMode: &fsMode,
1283 expansionFailed: false,
1284 pvName: volumetesting.FailWithUnSupportedVolumeName,
1285 pvcSize: resource.MustParse("10G"),
1286 pvcStatusSize: resource.MustParse("10G"),
1287 newPVSize: resource.MustParse("15G"),
1288 oldPVSize: resource.MustParse("13G"),
1289 },
1290 }
1291
1292 for _, tc := range tests {
1293 t.Run(tc.name, func(t *testing.T) {
1294 pv := getTestPV(tc.pvName, tc.volumeMode, tc.oldPVSize)
1295 pvc := getTestPVC("pv", tc.volumeMode, tc.pvcSize, tc.pvcStatusSize)
1296 pod := getTestPod(pvc.Name)
1297
1298
1299 pvWithSize := pv.DeepCopy()
1300 node := &v1.Node{
1301 ObjectMeta: metav1.ObjectMeta{
1302 Name: string(nodeName),
1303 },
1304 Spec: v1.NodeSpec{},
1305 Status: v1.NodeStatus{
1306 VolumesAttached: []v1.AttachedVolume{
1307 {
1308 Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
1309 DevicePath: "fake/path",
1310 },
1311 },
1312 },
1313 }
1314 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
1315 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
1316 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
1317 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
1318 kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
1319 Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
1320 DevicePath: "fake/path",
1321 })
1322 fakeRecorder := &record.FakeRecorder{}
1323 fakeHandler := volumetesting.NewBlockVolumePathHandler()
1324 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
1325 kubeClient,
1326 volumePluginMgr,
1327 fakeRecorder,
1328 fakeHandler))
1329
1330 reconciler := NewReconciler(
1331 kubeClient,
1332 true,
1333 reconcilerLoopSleepDuration,
1334 waitForAttachTimeout,
1335 nodeName,
1336 dsw,
1337 asw,
1338 hasAddedPods,
1339 oex,
1340 mount.NewFakeMounter(nil),
1341 hostutil.NewFakeHostUtil(nil),
1342 volumePluginMgr,
1343 kubeletPodsDir)
1344
1345 volumeSpec := &volume.Spec{PersistentVolume: pv}
1346 podName := util.GetUniquePodName(pod)
1347 volumeName, err := dsw.AddPodToVolume(
1348 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
1349
1350 if err != nil {
1351 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
1352 }
1353 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
1354
1355
1356 stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
1357 go func() {
1358 defer close(stoppedChan)
1359 reconciler.Run(stopChan)
1360 }()
1361 waitForMount(t, fakePlugin, volumeName, asw)
1362
1363 close(stopChan)
1364 <-stoppedChan
1365
1366
1367 pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize
1368 volumeSpec = &volume.Spec{PersistentVolume: pvWithSize}
1369 dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
1370
1371 t.Logf("Changing size of the volume to %s", tc.newPVSize.String())
1372 newSize := tc.newPVSize.DeepCopy()
1373 dsw.UpdatePersistentVolumeSize(volumeName, &newSize)
1374
1375 _, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize, "" )
1376 if tc.expansionFailed {
1377 if cache.IsFSResizeRequiredError(podExistErr) {
1378 t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr)
1379 }
1380 } else {
1381 if !cache.IsFSResizeRequiredError(podExistErr) {
1382 t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
1383 }
1384 go reconciler.Run(wait.NeverStop)
1385
1386 waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
1387 mounted, _, err := asw.PodExistsInVolume(podName, volumeName, newSize, "" )
1388 return mounted && err == nil, nil
1389 })
1390 if waitErr != nil {
1391 t.Fatalf("Volume resize should succeeded %v", waitErr)
1392 }
1393 }
1394
1395 })
1396 }
1397 }
1398
1399 func getTestPVC(pvName string, volumeMode *v1.PersistentVolumeMode, specSize, statusSize resource.Quantity) *v1.PersistentVolumeClaim {
1400 pvc := &v1.PersistentVolumeClaim{
1401 ObjectMeta: metav1.ObjectMeta{
1402 Name: "pvc",
1403 UID: "pvcuid",
1404 },
1405 Spec: v1.PersistentVolumeClaimSpec{
1406 Resources: v1.VolumeResourceRequirements{
1407 Requests: v1.ResourceList{
1408 v1.ResourceStorage: specSize,
1409 },
1410 },
1411 VolumeName: pvName,
1412 VolumeMode: volumeMode,
1413 },
1414 Status: v1.PersistentVolumeClaimStatus{
1415 Capacity: v1.ResourceList{
1416 v1.ResourceStorage: statusSize,
1417 },
1418 },
1419 }
1420 return pvc
1421 }
1422
1423 func getTestPV(pvName string, volumeMode *v1.PersistentVolumeMode, pvSize resource.Quantity) *v1.PersistentVolume {
1424 pv := &v1.PersistentVolume{
1425 ObjectMeta: metav1.ObjectMeta{
1426 Name: pvName,
1427 UID: "pvuid",
1428 },
1429 Spec: v1.PersistentVolumeSpec{
1430 ClaimRef: &v1.ObjectReference{Name: "pvc"},
1431 VolumeMode: volumeMode,
1432 Capacity: v1.ResourceList{
1433 v1.ResourceStorage: pvSize,
1434 },
1435 },
1436 }
1437 return pv
1438 }
1439
1440 func getTestPod(claimName string) *v1.Pod {
1441 pod := &v1.Pod{
1442 ObjectMeta: metav1.ObjectMeta{
1443 Name: "pod1",
1444 UID: "pod1uid",
1445 },
1446 Spec: v1.PodSpec{
1447 Volumes: []v1.Volume{
1448 {
1449 Name: "volume-name",
1450 VolumeSource: v1.VolumeSource{
1451 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
1452 ClaimName: claimName,
1453 },
1454 },
1455 },
1456 },
1457 },
1458 }
1459 return pod
1460 }
1461
1462 func Test_UncertainDeviceGlobalMounts(t *testing.T) {
1463 var tests = []struct {
1464 name string
1465 deviceState operationexecutor.DeviceMountState
1466 unmountDeviceCallCount int
1467 volumeName string
1468 supportRemount bool
1469 }{
1470 {
1471 name: "timed out operations should result in device marked as uncertain",
1472 deviceState: operationexecutor.DeviceMountUncertain,
1473 unmountDeviceCallCount: 1,
1474 volumeName: volumetesting.TimeoutOnMountDeviceVolumeName,
1475 },
1476 {
1477 name: "failed operation should result in not-mounted device",
1478 deviceState: operationexecutor.DeviceNotMounted,
1479 unmountDeviceCallCount: 0,
1480 volumeName: volumetesting.FailMountDeviceVolumeName,
1481 },
1482 {
1483 name: "timeout followed by failed operation should result in non-mounted device",
1484 deviceState: operationexecutor.DeviceNotMounted,
1485 unmountDeviceCallCount: 0,
1486 volumeName: volumetesting.TimeoutAndFailOnMountDeviceVolumeName,
1487 },
1488 {
1489 name: "success followed by timeout operation should result in mounted device",
1490 deviceState: operationexecutor.DeviceGloballyMounted,
1491 unmountDeviceCallCount: 1,
1492 volumeName: volumetesting.SuccessAndTimeoutDeviceName,
1493 supportRemount: true,
1494 },
1495 {
1496 name: "success followed by failed operation should result in mounted device",
1497 deviceState: operationexecutor.DeviceGloballyMounted,
1498 unmountDeviceCallCount: 1,
1499 volumeName: volumetesting.SuccessAndFailOnMountDeviceName,
1500 supportRemount: true,
1501 },
1502 }
1503
1504 modes := []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem}
1505
1506 for modeIndex := range modes {
1507 for tcIndex := range tests {
1508 mode := modes[modeIndex]
1509 tc := tests[tcIndex]
1510 testName := fmt.Sprintf("%s [%s]", tc.name, mode)
1511 uniqueTestString := fmt.Sprintf("global-mount-%s", testName)
1512 uniquePodDir := fmt.Sprintf("%s-%x", kubeletPodsDir, md5.Sum([]byte(uniqueTestString)))
1513 t.Run(testName+"[", func(t *testing.T) {
1514 t.Parallel()
1515 pv := &v1.PersistentVolume{
1516 ObjectMeta: metav1.ObjectMeta{
1517 Name: tc.volumeName,
1518 UID: "pvuid",
1519 },
1520 Spec: v1.PersistentVolumeSpec{
1521 ClaimRef: &v1.ObjectReference{Name: "pvc"},
1522 VolumeMode: &mode,
1523 },
1524 }
1525 pvc := &v1.PersistentVolumeClaim{
1526 ObjectMeta: metav1.ObjectMeta{
1527 Name: "pvc",
1528 UID: "pvcuid",
1529 },
1530 Spec: v1.PersistentVolumeClaimSpec{
1531 VolumeName: tc.volumeName,
1532 VolumeMode: &mode,
1533 },
1534 }
1535 pod := &v1.Pod{
1536 ObjectMeta: metav1.ObjectMeta{
1537 Name: "pod1",
1538 UID: "pod1uid",
1539 },
1540 Spec: v1.PodSpec{
1541 Volumes: []v1.Volume{
1542 {
1543 Name: "volume-name",
1544 VolumeSource: v1.VolumeSource{
1545 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
1546 ClaimName: pvc.Name,
1547 },
1548 },
1549 },
1550 },
1551 },
1552 }
1553
1554 node := &v1.Node{
1555 ObjectMeta: metav1.ObjectMeta{
1556 Name: string(nodeName),
1557 },
1558 Spec: v1.NodeSpec{},
1559 Status: v1.NodeStatus{
1560 VolumesAttached: []v1.AttachedVolume{
1561 {
1562 Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
1563 DevicePath: "fake/path",
1564 },
1565 },
1566 },
1567 }
1568 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
1569 fakePlugin.SupportsRemount = tc.supportRemount
1570 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
1571
1572 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
1573 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
1574 kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
1575 Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
1576 DevicePath: "fake/path",
1577 })
1578 fakeRecorder := &record.FakeRecorder{}
1579 fakeHandler := volumetesting.NewBlockVolumePathHandler()
1580 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
1581 kubeClient,
1582 volumePluginMgr,
1583 fakeRecorder,
1584 fakeHandler))
1585
1586 reconciler := NewReconciler(
1587 kubeClient,
1588 true,
1589 reconcilerLoopSleepDuration,
1590 waitForAttachTimeout,
1591 nodeName,
1592 dsw,
1593 asw,
1594 hasAddedPods,
1595 oex,
1596 &mount.FakeMounter{},
1597 hostutil.NewFakeHostUtil(nil),
1598 volumePluginMgr,
1599 uniquePodDir)
1600 volumeSpec := &volume.Spec{PersistentVolume: pv}
1601 podName := util.GetUniquePodName(pod)
1602 volumeName, err := dsw.AddPodToVolume(
1603 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
1604
1605 if err != nil {
1606 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
1607 }
1608 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
1609
1610
1611 stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
1612 go func() {
1613 reconciler.Run(stopChan)
1614 close(stoppedChan)
1615 }()
1616 waitForVolumeToExistInASW(t, volumeName, asw)
1617 if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName {
1618
1619 time.Sleep(reconcilerSyncWaitDuration)
1620 }
1621
1622 if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName ||
1623 tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName {
1624
1625 waitForMount(t, fakePlugin, volumeName, asw)
1626 asw.MarkRemountRequired(podName)
1627 time.Sleep(reconcilerSyncWaitDuration)
1628 }
1629
1630 if tc.deviceState == operationexecutor.DeviceMountUncertain {
1631 waitForUncertainGlobalMount(t, volumeName, asw)
1632 }
1633
1634 if tc.deviceState == operationexecutor.DeviceGloballyMounted {
1635 waitForMount(t, fakePlugin, volumeName, asw)
1636 }
1637
1638 dsw.DeletePodFromVolume(podName, volumeName)
1639 waitForDetach(t, volumeName, asw)
1640 if mode == v1.PersistentVolumeFilesystem {
1641 err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
1642 } else {
1643 if tc.unmountDeviceCallCount == 0 {
1644 err = volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin)
1645 } else {
1646 err = volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
1647 }
1648 }
1649 if err != nil {
1650 t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
1651 }
1652 })
1653 }
1654 }
1655 }
1656
1657 func Test_UncertainVolumeMountState(t *testing.T) {
1658 var tests = []struct {
1659 name string
1660 volumeState operationexecutor.VolumeMountState
1661 unmountDeviceCallCount int
1662 unmountVolumeCount int
1663 volumeName string
1664 supportRemount bool
1665 pvcStatusSize resource.Quantity
1666 pvSize resource.Quantity
1667 }{
1668 {
1669 name: "timed out operations should result in volume marked as uncertain",
1670 volumeState: operationexecutor.VolumeMountUncertain,
1671 unmountDeviceCallCount: 1,
1672 unmountVolumeCount: 1,
1673 volumeName: volumetesting.TimeoutOnSetupVolumeName,
1674 },
1675 {
1676 name: "failed operation should result in not-mounted volume",
1677 volumeState: operationexecutor.VolumeNotMounted,
1678 unmountDeviceCallCount: 1,
1679 unmountVolumeCount: 0,
1680 volumeName: volumetesting.FailOnSetupVolumeName,
1681 },
1682 {
1683 name: "timeout followed by failed operation should result in non-mounted volume",
1684 volumeState: operationexecutor.VolumeNotMounted,
1685 unmountDeviceCallCount: 1,
1686 unmountVolumeCount: 0,
1687 volumeName: volumetesting.TimeoutAndFailOnSetupVolumeName,
1688 },
1689 {
1690 name: "success followed by timeout operation should result in mounted volume",
1691 volumeState: operationexecutor.VolumeMounted,
1692 unmountDeviceCallCount: 1,
1693 unmountVolumeCount: 1,
1694 volumeName: volumetesting.SuccessAndTimeoutSetupVolumeName,
1695 supportRemount: true,
1696 },
1697 {
1698 name: "success followed by failed operation should result in mounted volume",
1699 volumeState: operationexecutor.VolumeMounted,
1700 unmountDeviceCallCount: 1,
1701 unmountVolumeCount: 1,
1702 volumeName: volumetesting.SuccessAndFailOnSetupVolumeName,
1703 supportRemount: true,
1704 },
1705 {
1706 name: "mount success but fail to expand filesystem",
1707 volumeState: operationexecutor.VolumeMountUncertain,
1708 unmountDeviceCallCount: 1,
1709 unmountVolumeCount: 1,
1710 volumeName: volumetesting.FailVolumeExpansion,
1711 supportRemount: true,
1712 pvSize: resource.MustParse("10G"),
1713 pvcStatusSize: resource.MustParse("2G"),
1714 },
1715 }
1716 modes := []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem}
1717
1718 for modeIndex := range modes {
1719 for tcIndex := range tests {
1720 mode := modes[modeIndex]
1721 tc := tests[tcIndex]
1722 testName := fmt.Sprintf("%s [%s]", tc.name, mode)
1723 uniqueTestString := fmt.Sprintf("local-mount-%s", testName)
1724 uniquePodDir := fmt.Sprintf("%s-%x", kubeletPodsDir, md5.Sum([]byte(uniqueTestString)))
1725 t.Run(testName, func(t *testing.T) {
1726 t.Parallel()
1727 pv := &v1.PersistentVolume{
1728 ObjectMeta: metav1.ObjectMeta{
1729 Name: tc.volumeName,
1730 UID: "pvuid",
1731 },
1732 Spec: v1.PersistentVolumeSpec{
1733 ClaimRef: &v1.ObjectReference{Name: "pvc"},
1734 VolumeMode: &mode,
1735 },
1736 }
1737 if tc.pvSize.CmpInt64(0) > 0 {
1738 pv.Spec.Capacity = v1.ResourceList{
1739 v1.ResourceStorage: tc.pvSize,
1740 }
1741 }
1742 pvc := &v1.PersistentVolumeClaim{
1743 ObjectMeta: metav1.ObjectMeta{
1744 Name: "pvc",
1745 UID: "pvcuid",
1746 },
1747 Spec: v1.PersistentVolumeClaimSpec{
1748 VolumeName: tc.volumeName,
1749 VolumeMode: &mode,
1750 },
1751 }
1752 if tc.pvcStatusSize.CmpInt64(0) > 0 {
1753 pvc.Status = v1.PersistentVolumeClaimStatus{
1754 Capacity: v1.ResourceList{
1755 v1.ResourceStorage: tc.pvcStatusSize,
1756 },
1757 }
1758 }
1759 pod := &v1.Pod{
1760 ObjectMeta: metav1.ObjectMeta{
1761 Name: "pod1",
1762 UID: "pod1uid",
1763 },
1764 Spec: v1.PodSpec{
1765 Volumes: []v1.Volume{
1766 {
1767 Name: "volume-name",
1768 VolumeSource: v1.VolumeSource{
1769 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
1770 ClaimName: pvc.Name,
1771 },
1772 },
1773 },
1774 },
1775 },
1776 }
1777
1778 node := &v1.Node{
1779 ObjectMeta: metav1.ObjectMeta{
1780 Name: string(nodeName),
1781 },
1782 Status: v1.NodeStatus{
1783 VolumesAttached: []v1.AttachedVolume{
1784 {
1785 Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
1786 DevicePath: "fake/path",
1787 },
1788 },
1789 },
1790 }
1791
1792 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
1793 fakePlugin.SupportsRemount = tc.supportRemount
1794 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
1795 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
1796 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
1797 kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
1798 Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
1799 DevicePath: "fake/path",
1800 })
1801 fakeRecorder := &record.FakeRecorder{}
1802 fakeHandler := volumetesting.NewBlockVolumePathHandler()
1803 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
1804 kubeClient,
1805 volumePluginMgr,
1806 fakeRecorder,
1807 fakeHandler))
1808
1809 reconciler := NewReconciler(
1810 kubeClient,
1811 true,
1812 reconcilerLoopSleepDuration,
1813 waitForAttachTimeout,
1814 nodeName,
1815 dsw,
1816 asw,
1817 hasAddedPods,
1818 oex,
1819 &mount.FakeMounter{},
1820 hostutil.NewFakeHostUtil(nil),
1821 volumePluginMgr,
1822 uniquePodDir)
1823 volumeSpec := &volume.Spec{PersistentVolume: pv}
1824 podName := util.GetUniquePodName(pod)
1825 volumeName, err := dsw.AddPodToVolume(
1826 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
1827
1828 if err != nil {
1829 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
1830 }
1831 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
1832
1833
1834 stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
1835 go func() {
1836 reconciler.Run(stopChan)
1837 close(stoppedChan)
1838 }()
1839 waitForVolumeToExistInASW(t, volumeName, asw)
1840
1841
1842 waitForGlobalMount(t, volumeName, asw)
1843 if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName {
1844
1845 time.Sleep(reconcilerSyncWaitDuration)
1846 }
1847
1848 if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName ||
1849 tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName {
1850
1851 waitForMount(t, fakePlugin, volumeName, asw)
1852 asw.MarkRemountRequired(podName)
1853 time.Sleep(reconcilerSyncWaitDuration)
1854 }
1855
1856 if tc.volumeState == operationexecutor.VolumeMountUncertain {
1857 waitForUncertainPodMount(t, volumeName, podName, asw)
1858 }
1859
1860 if tc.volumeState == operationexecutor.VolumeMounted {
1861 waitForMount(t, fakePlugin, volumeName, asw)
1862 }
1863
1864 dsw.DeletePodFromVolume(podName, volumeName)
1865 waitForDetach(t, volumeName, asw)
1866
1867 if mode == v1.PersistentVolumeFilesystem {
1868 if err := volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil {
1869 t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
1870 }
1871 if err := volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin); err != nil {
1872 t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
1873 }
1874 } else {
1875 if tc.unmountVolumeCount == 0 {
1876 if err := volumetesting.VerifyZeroUnmapPodDeviceCallCount(fakePlugin); err != nil {
1877 t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
1878 }
1879 } else {
1880 if err := volumetesting.VerifyUnmapPodDeviceCallCount(tc.unmountVolumeCount, fakePlugin); err != nil {
1881 t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
1882 }
1883 }
1884 if tc.unmountDeviceCallCount == 0 {
1885 if err := volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin); err != nil {
1886 t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
1887 }
1888 } else {
1889 if err := volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil {
1890 t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
1891 }
1892 }
1893 }
1894 })
1895 }
1896 }
1897 }
1898
1899 func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
1900
1901 err := retryWithExponentialBackOff(
1902 testOperationBackOffDuration,
1903 func() (bool, error) {
1904 unmountedVolumes := asw.GetUnmountedVolumes()
1905 for _, v := range unmountedVolumes {
1906 if v.VolumeName == volumeName && v.DeviceMountState == operationexecutor.DeviceMountUncertain {
1907 return true, nil
1908 }
1909 }
1910 return false, nil
1911 },
1912 )
1913
1914 if err != nil {
1915 t.Fatalf("expected volumes %s to be mounted in uncertain state globally", volumeName)
1916 }
1917 }
1918
1919 func waitForGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
1920
1921 err := retryWithExponentialBackOff(
1922 testOperationBackOffDuration,
1923 func() (bool, error) {
1924 mountedVolumes := asw.GetGloballyMountedVolumes()
1925 for _, v := range mountedVolumes {
1926 if v.VolumeName == volumeName {
1927 return true, nil
1928 }
1929 }
1930 return false, nil
1931 },
1932 )
1933
1934 if err != nil {
1935 t.Fatalf("expected volume devices %s to be mounted globally", volumeName)
1936 }
1937 }
1938
1939 func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podName types.UniquePodName, asw cache.ActualStateOfWorld) {
1940
1941 err := retryWithExponentialBackOff(
1942 testOperationBackOffDuration,
1943 func() (bool, error) {
1944 mounted, _, err := asw.PodExistsInVolume(podName, volumeName, resource.Quantity{}, "" )
1945 if mounted || err != nil {
1946 return false, nil
1947 }
1948 allMountedVolumes := asw.GetAllMountedVolumes()
1949 for _, v := range allMountedVolumes {
1950 if v.VolumeName == volumeName {
1951 return true, nil
1952 }
1953 }
1954 return false, nil
1955 },
1956 )
1957
1958 if err != nil {
1959 t.Fatalf("expected volumes %s to be mounted in uncertain state for pod", volumeName)
1960 }
1961 }
1962
1963 func waitForMount(
1964 t *testing.T,
1965 fakePlugin *volumetesting.FakeVolumePlugin,
1966 volumeName v1.UniqueVolumeName,
1967 asw cache.ActualStateOfWorld) {
1968 err := retryWithExponentialBackOff(
1969 testOperationBackOffDuration,
1970 func() (bool, error) {
1971 mountedVolumes := asw.GetMountedVolumes()
1972 for _, mountedVolume := range mountedVolumes {
1973 if mountedVolume.VolumeName == volumeName {
1974 return true, nil
1975 }
1976 }
1977
1978 return false, nil
1979 },
1980 )
1981
1982 if err != nil {
1983 t.Fatalf("Timed out waiting for volume %q to be attached.", volumeName)
1984 }
1985 }
1986
1987 func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
1988 err := retryWithExponentialBackOff(
1989 testOperationBackOffDuration,
1990 func() (bool, error) {
1991 if asw.VolumeExists(volumeName) {
1992 return true, nil
1993 }
1994 return false, nil
1995 },
1996 )
1997 if err != nil {
1998 t.Fatalf("Timed out waiting for volume %q to be exist in asw.", volumeName)
1999 }
2000 }
2001
2002 func waitForDetach(
2003 t *testing.T,
2004 volumeName v1.UniqueVolumeName,
2005 asw cache.ActualStateOfWorld) {
2006 err := retryWithExponentialBackOff(
2007 testOperationBackOffDuration,
2008 func() (bool, error) {
2009 if asw.VolumeExists(volumeName) {
2010 return false, nil
2011 }
2012
2013 return true, nil
2014 },
2015 )
2016
2017 if err != nil {
2018 t.Fatalf("Timed out waiting for volume %q to be detached.", volumeName)
2019 }
2020 }
2021
2022 func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
2023 backoff := wait.Backoff{
2024 Duration: initialDuration,
2025 Factor: 3,
2026 Jitter: 0,
2027 Steps: 6,
2028 }
2029 return wait.ExponentialBackoff(backoff, fn)
2030 }
2031
2032 func createTestClient(attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
2033 fakeClient := &fake.Clientset{}
2034 if len(attachedVolumes) == 0 {
2035 attachedVolumes = append(attachedVolumes, v1.AttachedVolume{
2036 Name: "fake-plugin/fake-device1",
2037 DevicePath: "fake/path",
2038 })
2039 }
2040 fakeClient.AddReactor("get", "nodes",
2041 func(action core.Action) (bool, runtime.Object, error) {
2042 return true, &v1.Node{
2043 ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
2044 Status: v1.NodeStatus{
2045 VolumesAttached: attachedVolumes,
2046 },
2047 }, nil
2048 },
2049 )
2050
2051 fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
2052 return true, nil, fmt.Errorf("no reaction implemented for %s", action)
2053 })
2054 return fakeClient
2055 }
2056
2057 func runReconciler(reconciler Reconciler) {
2058 go reconciler.Run(wait.NeverStop)
2059 }
2060
2061 func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim, attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
2062 fakeClient := &fake.Clientset{}
2063 if len(attachedVolumes) == 0 {
2064 attachedVolumes = append(attachedVolumes, v1.AttachedVolume{
2065 Name: "fake-plugin/pv",
2066 DevicePath: "fake/path",
2067 })
2068 }
2069 fakeClient.AddReactor("get", "nodes",
2070 func(action core.Action) (bool, runtime.Object, error) {
2071 return true, &v1.Node{
2072 ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
2073 Status: v1.NodeStatus{
2074 VolumesAttached: attachedVolumes,
2075 },
2076 }, nil
2077 })
2078 fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
2079 return true, pvc, nil
2080 })
2081 fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) {
2082 return true, pv, nil
2083 })
2084 fakeClient.AddReactor("patch", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
2085 if action.GetSubresource() == "status" {
2086 return true, pvc, nil
2087 }
2088 return true, nil, fmt.Errorf("no reaction implemented for %s", action)
2089 })
2090 fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
2091 return true, nil, fmt.Errorf("no reaction implemented for %s", action)
2092 })
2093 return fakeClient
2094 }
2095
2096 func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
2097
2098 node := &v1.Node{
2099 ObjectMeta: metav1.ObjectMeta{
2100 Name: string(nodeName),
2101 },
2102 Status: v1.NodeStatus{
2103 VolumesAttached: []v1.AttachedVolume{
2104 {
2105 Name: "fake-plugin/fake-device1",
2106 DevicePath: "/fake/path",
2107 },
2108 },
2109 },
2110 }
2111 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
2112 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
2113
2114 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
2115 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
2116 kubeClient := createTestClient()
2117 fakeRecorder := &record.FakeRecorder{}
2118 fakeHandler := volumetesting.NewBlockVolumePathHandler()
2119 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
2120 kubeClient,
2121 volumePluginMgr,
2122 fakeRecorder,
2123 fakeHandler))
2124 reconciler := NewReconciler(
2125 kubeClient,
2126 true,
2127 reconcilerLoopSleepDuration,
2128 waitForAttachTimeout,
2129 nodeName,
2130 dsw,
2131 asw,
2132 hasAddedPods,
2133 oex,
2134 mount.NewFakeMounter(nil),
2135 hostutil.NewFakeHostUtil(nil),
2136 volumePluginMgr,
2137 kubeletPodsDir)
2138 pod := &v1.Pod{
2139 ObjectMeta: metav1.ObjectMeta{
2140 Name: "pod1",
2141 UID: "pod1uid",
2142 },
2143 Spec: v1.PodSpec{
2144 Volumes: []v1.Volume{
2145 {
2146 Name: "volume-name",
2147 VolumeSource: v1.VolumeSource{
2148 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
2149 PDName: "fake-device1",
2150 },
2151 },
2152 },
2153 },
2154 },
2155 }
2156
2157
2158
2159
2160 volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
2161
2162 volumeSpecCopy := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
2163 podName := util.GetUniquePodName(pod)
2164 generatedVolumeName, err := dsw.AddPodToVolume(
2165 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
2166 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
2167
2168 if err != nil {
2169 t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
2170 }
2171
2172 stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
2173 go func() {
2174 reconciler.Run(stopChan)
2175 close(stoppedChan)
2176 }()
2177 waitForMount(t, fakePlugin, generatedVolumeName, asw)
2178
2179 close(stopChan)
2180 <-stoppedChan
2181
2182 finished := make(chan interface{})
2183 fakePlugin.Lock()
2184 fakePlugin.UnmountDeviceHook = func(mountPath string) error {
2185
2186
2187 klog.InfoS("UnmountDevice called")
2188 var generatedVolumeNameCopy v1.UniqueVolumeName
2189 generatedVolumeNameCopy, err = dsw.AddPodToVolume(
2190 podName, pod, volumeSpecCopy, volumeSpec.Name(), "" , nil )
2191 dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeNameCopy})
2192 return nil
2193 }
2194
2195 fakePlugin.WaitForAttachHook = func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) {
2196
2197
2198 if devicePath == "" {
2199 klog.ErrorS(nil, "Expected WaitForAttach called with devicePath from Node.Status")
2200 close(finished)
2201 return "", fmt.Errorf("Expected devicePath from Node.Status")
2202 }
2203 close(finished)
2204 return devicePath, nil
2205 }
2206 fakePlugin.Unlock()
2207
2208
2209 go reconciler.Run(wait.NeverStop)
2210
2211
2212 dsw.DeletePodFromVolume(podName, generatedVolumeName)
2213
2214 <-finished
2215 waitForMount(t, fakePlugin, generatedVolumeName, asw)
2216 }
2217
2218 func getFakeNode() *v1.Node {
2219 return &v1.Node{
2220 ObjectMeta: metav1.ObjectMeta{
2221 Name: string(nodeName),
2222 },
2223 Status: v1.NodeStatus{
2224 VolumesAttached: []v1.AttachedVolume{
2225 {
2226 Name: "fake-plugin/fake-device1",
2227 DevicePath: "/fake/path",
2228 },
2229 },
2230 },
2231 }
2232 }
2233
2234 func getInlineFakePod(podName, podUUID, outerName, innerName string) *v1.Pod {
2235 pod := &v1.Pod{
2236 ObjectMeta: metav1.ObjectMeta{
2237 Name: podName,
2238 UID: k8stypes.UID(podUUID),
2239 },
2240 Spec: v1.PodSpec{
2241 Volumes: []v1.Volume{
2242 {
2243 Name: outerName,
2244 VolumeSource: v1.VolumeSource{
2245 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
2246 PDName: innerName,
2247 },
2248 },
2249 },
2250 },
2251 },
2252 }
2253 return pod
2254 }
2255
2256 func getReconciler(kubeletDir string, t *testing.T, volumePaths []string, kubeClient *fake.Clientset) (Reconciler, *volumetesting.FakeVolumePlugin) {
2257 node := getFakeNode()
2258 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir)
2259 tmpKubeletPodDir := filepath.Join(kubeletDir, "pods")
2260 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
2261
2262 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
2263 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
2264 if kubeClient == nil {
2265 kubeClient = createTestClient()
2266 }
2267
2268 fakeRecorder := &record.FakeRecorder{}
2269 fakeHandler := volumetesting.NewBlockVolumePathHandler()
2270 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
2271 kubeClient,
2272 volumePluginMgr,
2273 fakeRecorder,
2274 fakeHandler))
2275 mountPoints := []mount.MountPoint{}
2276 for _, volumePath := range volumePaths {
2277 mountPoints = append(mountPoints, mount.MountPoint{Path: volumePath})
2278 }
2279 rc := NewReconciler(
2280 kubeClient,
2281 true,
2282 reconcilerLoopSleepDuration,
2283 waitForAttachTimeout,
2284 nodeName,
2285 dsw,
2286 asw,
2287 hasAddedPods,
2288 oex,
2289 mount.NewFakeMounter(mountPoints),
2290 hostutil.NewFakeHostUtil(nil),
2291 volumePluginMgr,
2292 tmpKubeletPodDir)
2293 return rc, fakePlugin
2294 }
2295
2296 func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) {
2297
2298
2299
2300
2301 node := &v1.Node{
2302 ObjectMeta: metav1.ObjectMeta{
2303 Name: string(nodeName),
2304 },
2305 Status: v1.NodeStatus{
2306 VolumesAttached: []v1.AttachedVolume{
2307 {
2308 Name: "fake-plugin/fake-device1",
2309 DevicePath: "fake/path",
2310 },
2311 },
2312 },
2313 }
2314 volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
2315 seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
2316 dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
2317 asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
2318 kubeClient := createTestClient()
2319 fakeRecorder := &record.FakeRecorder{}
2320 fakeHandler := volumetesting.NewBlockVolumePathHandler()
2321 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
2322 kubeClient,
2323 volumePluginMgr,
2324 fakeRecorder,
2325 fakeHandler))
2326 rc := NewReconciler(
2327 kubeClient,
2328 true,
2329 reconcilerLoopSleepDuration,
2330 waitForAttachTimeout,
2331 nodeName,
2332 dsw,
2333 asw,
2334 hasAddedPods,
2335 oex,
2336 mount.NewFakeMounter(nil),
2337 hostutil.NewFakeHostUtil(nil),
2338 volumePluginMgr,
2339 kubeletPodsDir)
2340 reconciler := rc.(*reconciler)
2341
2342
2343 pod := &v1.Pod{
2344 ObjectMeta: metav1.ObjectMeta{
2345 Name: "pod1",
2346 UID: "pod1uid",
2347 },
2348 Spec: v1.PodSpec{
2349 Volumes: []v1.Volume{
2350 {
2351 Name: "volume-name",
2352 VolumeSource: v1.VolumeSource{
2353 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
2354 PDName: "fake-device1",
2355 },
2356 },
2357 },
2358 {
2359 Name: "volume-name2",
2360 VolumeSource: v1.VolumeSource{
2361 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
2362 PDName: "fake-device2",
2363 },
2364 },
2365 },
2366 },
2367 },
2368 }
2369
2370 volumeSpec1 := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
2371 volumeName1 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device1")
2372 volumeSpec2 := &volume.Spec{Volume: &pod.Spec.Volumes[1]}
2373 volumeName2 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device2")
2374
2375 assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName1, volumeSpec1, nodeName, ""))
2376 assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName1, "/dev/badly/reconstructed", "/var/lib/kubelet/plugins/global1", ""))
2377 assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName2, volumeSpec2, nodeName, ""))
2378 assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName2, "/dev/reconstructed", "/var/lib/kubelet/plugins/global2", ""))
2379
2380 assert.False(t, reconciler.StatesHasBeenSynced())
2381
2382 reconciler.volumesNeedUpdateFromNodeStatus = append(reconciler.volumesNeedUpdateFromNodeStatus, volumeName1, volumeName2)
2383
2384
2385 reconciler.reconcileNew()
2386
2387
2388 assert.True(t, reconciler.StatesHasBeenSynced())
2389 assert.Empty(t, reconciler.volumesNeedUpdateFromNodeStatus)
2390
2391 attachedVolumes := asw.GetAttachedVolumes()
2392 assert.Equalf(t, len(attachedVolumes), 2, "two volumes in ASW expected")
2393 for _, vol := range attachedVolumes {
2394 if vol.VolumeName == volumeName1 {
2395
2396 assert.True(t, vol.PluginIsAttachable)
2397 assert.Equal(t, vol.DevicePath, "fake/path")
2398 }
2399 if vol.VolumeName == volumeName2 {
2400
2401 assert.False(t, vol.PluginIsAttachable)
2402 assert.Equal(t, vol.DevicePath, "/dev/reconstructed")
2403 }
2404 }
2405 }
2406
View as plain text