1
16
17 package volumemanager
18
19 import (
20 "context"
21 "os"
22 "reflect"
23 "strconv"
24 "strings"
25 "testing"
26 "time"
27
28 "k8s.io/mount-utils"
29
30 v1 "k8s.io/api/core/v1"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 kubetypes "k8s.io/apimachinery/pkg/types"
33 "k8s.io/apimachinery/pkg/util/sets"
34 "k8s.io/apimachinery/pkg/util/wait"
35 clientset "k8s.io/client-go/kubernetes"
36 "k8s.io/client-go/kubernetes/fake"
37 "k8s.io/client-go/tools/record"
38 utiltesting "k8s.io/client-go/util/testing"
39 "k8s.io/kubernetes/pkg/kubelet/config"
40 containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
41 kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
42 "k8s.io/kubernetes/pkg/volume"
43 volumetest "k8s.io/kubernetes/pkg/volume/testing"
44 "k8s.io/kubernetes/pkg/volume/util"
45 "k8s.io/kubernetes/pkg/volume/util/hostutil"
46 "k8s.io/kubernetes/pkg/volume/util/types"
47 )
48
49 const (
50 testHostname = "test-hostname"
51 )
52
53 func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
54 tests := []struct {
55 name string
56 pvMode, podMode v1.PersistentVolumeMode
57 expectMount bool
58 expectError bool
59 }{
60 {
61 name: "filesystem volume",
62 pvMode: v1.PersistentVolumeFilesystem,
63 podMode: v1.PersistentVolumeFilesystem,
64 expectMount: true,
65 expectError: false,
66 },
67 {
68 name: "block volume",
69 pvMode: v1.PersistentVolumeBlock,
70 podMode: v1.PersistentVolumeBlock,
71 expectMount: true,
72 expectError: false,
73 },
74 {
75 name: "mismatched volume",
76 pvMode: v1.PersistentVolumeBlock,
77 podMode: v1.PersistentVolumeFilesystem,
78 expectMount: false,
79 expectError: true,
80 },
81 }
82
83 for _, test := range tests {
84 t.Run(test.name, func(t *testing.T) {
85 tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
86 if err != nil {
87 t.Fatalf("can't make a temp dir: %v", err)
88 }
89 defer os.RemoveAll(tmpDir)
90 podManager := kubepod.NewBasicPodManager()
91
92 node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
93 kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
94
95 manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
96
97 stopCh := runVolumeManager(manager)
98 defer close(stopCh)
99
100 podManager.SetPods([]*v1.Pod{pod})
101
102
103 go simulateVolumeInUseUpdate(
104 v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
105 stopCh,
106 manager)
107
108 err = manager.WaitForAttachAndMount(context.Background(), pod)
109 if err != nil && !test.expectError {
110 t.Errorf("Expected success: %v", err)
111 }
112 if err == nil && test.expectError {
113 t.Errorf("Expected error, got none")
114 }
115
116 expectedMounted := pod.Spec.Volumes[0].Name
117 actualMounted := manager.GetMountedVolumesForPod(types.UniquePodName(pod.ObjectMeta.UID))
118 if test.expectMount {
119 if _, ok := actualMounted[expectedMounted]; !ok || (len(actualMounted) != 1) {
120 t.Errorf("Expected %v to be mounted to pod but got %v", expectedMounted, actualMounted)
121 }
122 } else {
123 if _, ok := actualMounted[expectedMounted]; ok || (len(actualMounted) != 0) {
124 t.Errorf("Expected %v not to be mounted to pod but got %v", expectedMounted, actualMounted)
125 }
126 }
127
128 expectedInUse := []v1.UniqueVolumeName{}
129 if test.expectMount {
130 expectedInUse = []v1.UniqueVolumeName{v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name)}
131 }
132 actualInUse := manager.GetVolumesInUse()
133 if !reflect.DeepEqual(expectedInUse, actualInUse) {
134 t.Errorf("Expected %v to be in use but got %v", expectedInUse, actualInUse)
135 }
136 })
137 }
138 }
139
140 func TestWaitForAttachAndMountError(t *testing.T) {
141 tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
142 if err != nil {
143 t.Fatalf("can't make a temp dir: %v", err)
144 }
145 defer os.RemoveAll(tmpDir)
146 podManager := kubepod.NewBasicPodManager()
147
148 pod := &v1.Pod{
149 ObjectMeta: metav1.ObjectMeta{
150 Name: "abc",
151 Namespace: "nsA",
152 UID: "1234",
153 },
154 Spec: v1.PodSpec{
155 Containers: []v1.Container{
156 {
157 Name: "container1",
158 VolumeMounts: []v1.VolumeMount{
159 {
160 Name: volumetest.FailMountDeviceVolumeName,
161 MountPath: "/vol1",
162 },
163 {
164 Name: "vol2",
165 MountPath: "/vol2",
166 },
167 {
168 Name: "vol02",
169 MountPath: "/vol02",
170 },
171 {
172 Name: "vol3",
173 MountPath: "/vol3",
174 },
175 {
176 Name: "vol03",
177 MountPath: "/vol03",
178 },
179 },
180 },
181 },
182 Volumes: []v1.Volume{
183 {
184 Name: volumetest.FailMountDeviceVolumeName,
185 VolumeSource: v1.VolumeSource{
186 ConfigMap: &v1.ConfigMapVolumeSource{},
187 },
188 },
189 {
190 Name: "vol2",
191 VolumeSource: v1.VolumeSource{
192 RBD: &v1.RBDVolumeSource{},
193 },
194 },
195 {
196 Name: "vol02",
197 VolumeSource: v1.VolumeSource{
198 RBD: &v1.RBDVolumeSource{},
199 },
200 },
201 {
202 Name: "vol3",
203 VolumeSource: v1.VolumeSource{
204 AzureDisk: &v1.AzureDiskVolumeSource{},
205 },
206 },
207 {
208 Name: "vol03",
209 VolumeSource: v1.VolumeSource{
210 AzureDisk: &v1.AzureDiskVolumeSource{},
211 },
212 },
213 },
214 },
215 }
216
217 kubeClient := fake.NewSimpleClientset(pod)
218
219 manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, nil)
220
221 stopCh := runVolumeManager(manager)
222 defer close(stopCh)
223
224 podManager.SetPods([]*v1.Pod{pod})
225
226 err = manager.WaitForAttachAndMount(context.Background(), pod)
227 if err == nil {
228 t.Errorf("Expected error, got none")
229 }
230 if !strings.Contains(err.Error(),
231 "unattached volumes=[vol02 vol2], failed to process volumes=[vol03 vol3]") {
232 t.Errorf("Unexpected error info: %v", err)
233 }
234 }
235
236 func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
237 tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
238 if err != nil {
239 t.Fatalf("can't make a temp dir: %v", err)
240 }
241 defer os.RemoveAll(tmpDir)
242 podManager := kubepod.NewBasicPodManager()
243
244 node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
245 claim.Status = v1.PersistentVolumeClaimStatus{
246 Phase: v1.ClaimPending,
247 }
248
249 kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
250
251 manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
252
253 stopCh := runVolumeManager(manager)
254 defer close(stopCh)
255
256 podManager.SetPods([]*v1.Pod{pod})
257
258
259 go simulateVolumeInUseUpdate(
260 v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
261 stopCh,
262 manager)
263
264
265 go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name)
266
267 err = wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
268 err = manager.WaitForAttachAndMount(context.Background(), pod)
269 if err != nil {
270
271 return false, nil
272 }
273 return true, nil
274 })
275 if err != nil {
276 t.Errorf("Expected a volume to be mounted, got: %s", err)
277 }
278
279 }
280
281 func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
282 tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
283 if err != nil {
284 t.Fatalf("can't make a temp dir: %v", err)
285 }
286 defer os.RemoveAll(tmpDir)
287 podManager := kubepod.NewBasicPodManager()
288
289 node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
290
291 existingGid := pod.Spec.SecurityContext.SupplementalGroups[0]
292
293 cases := []struct {
294 gidAnnotation string
295 expected []int64
296 }{
297 {
298 gidAnnotation: "777",
299 expected: []int64{777},
300 },
301 {
302 gidAnnotation: strconv.FormatInt(int64(existingGid), 10),
303 expected: []int64{},
304 },
305 {
306 gidAnnotation: "a",
307 expected: []int64{},
308 },
309 {
310 gidAnnotation: "",
311 expected: []int64{},
312 },
313 }
314
315 for _, tc := range cases {
316 fs := v1.PersistentVolumeFilesystem
317 pv := &v1.PersistentVolume{
318 ObjectMeta: metav1.ObjectMeta{
319 Name: "pvA",
320 Annotations: map[string]string{
321 util.VolumeGidAnnotationKey: tc.gidAnnotation,
322 },
323 },
324 Spec: v1.PersistentVolumeSpec{
325 PersistentVolumeSource: v1.PersistentVolumeSource{
326 RBD: &v1.RBDPersistentVolumeSource{
327 RBDImage: "fake-device",
328 },
329 },
330 ClaimRef: &v1.ObjectReference{
331 Name: claim.ObjectMeta.Name,
332 Namespace: claim.ObjectMeta.Namespace,
333 },
334 VolumeMode: &fs,
335 },
336 }
337 kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
338
339 manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
340
341 stopCh := runVolumeManager(manager)
342 defer close(stopCh)
343
344 podManager.SetPods([]*v1.Pod{pod})
345
346
347 go simulateVolumeInUseUpdate(
348 v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
349 stopCh,
350 manager)
351
352 err = manager.WaitForAttachAndMount(context.Background(), pod)
353 if err != nil {
354 t.Errorf("Expected success: %v", err)
355 continue
356 }
357
358 actual := manager.GetExtraSupplementalGroupsForPod(pod)
359 if !reflect.DeepEqual(tc.expected, actual) {
360 t.Errorf("Expected supplemental groups %v, got %v", tc.expected, actual)
361 }
362 }
363 }
364
365 type fakePodStateProvider struct {
366 shouldRemove map[kubetypes.UID]struct{}
367 terminating map[kubetypes.UID]struct{}
368 }
369
370 func (p *fakePodStateProvider) ShouldPodRuntimeBeRemoved(uid kubetypes.UID) bool {
371 _, ok := p.shouldRemove[uid]
372 return ok
373 }
374
375 func (p *fakePodStateProvider) ShouldPodContainersBeTerminating(uid kubetypes.UID) bool {
376 _, ok := p.terminating[uid]
377 return ok
378 }
379
380 func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface, node *v1.Node) VolumeManager {
381 attachablePlug := &volumetest.FakeVolumePlugin{
382 PluginName: "fake",
383 Host: nil,
384 CanSupportFn: func(spec *volume.Spec) bool {
385 return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.RBD != nil) ||
386 (spec.Volume != nil && spec.Volume.RBD != nil)
387 },
388 }
389 unattachablePlug := &volumetest.FakeVolumePlugin{
390 PluginName: "unattachable-fake-plugin",
391 Host: nil,
392 NonAttachable: true,
393 CanSupportFn: func(spec *volume.Spec) bool {
394 return spec.Volume != nil && spec.Volume.ConfigMap != nil
395 },
396 }
397 fakeRecorder := &record.FakeRecorder{}
398 plugMgr := &volume.VolumePluginMgr{}
399
400 fakeVolumeHost := volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil)
401 fakeVolumeHost.WithNode(node)
402
403 plugMgr.InitPlugins([]volume.VolumePlugin{attachablePlug, unattachablePlug}, nil , fakeVolumeHost)
404 stateProvider := &fakePodStateProvider{}
405 fakePathHandler := volumetest.NewBlockVolumePathHandler()
406 vm := NewVolumeManager(
407 true,
408 testHostname,
409 podManager,
410 stateProvider,
411 kubeClient,
412 plugMgr,
413 &containertest.FakeRuntime{},
414 mount.NewFakeMounter(nil),
415 hostutil.NewFakeHostUtil(nil),
416 "",
417 fakeRecorder,
418 false,
419 fakePathHandler)
420
421 return vm
422 }
423
424
425
426 func createObjects(pvMode, podMode v1.PersistentVolumeMode) (*v1.Node, *v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim) {
427 node := &v1.Node{
428 ObjectMeta: metav1.ObjectMeta{Name: testHostname},
429 Status: v1.NodeStatus{
430 VolumesAttached: []v1.AttachedVolume{
431 {
432 Name: "fake/fake-device",
433 DevicePath: "fake/path",
434 },
435 }},
436 }
437 pod := &v1.Pod{
438 ObjectMeta: metav1.ObjectMeta{
439 Name: "abc",
440 Namespace: "nsA",
441 UID: "1234",
442 },
443 Spec: v1.PodSpec{
444 Containers: []v1.Container{
445 {
446 Name: "container1",
447 },
448 },
449 Volumes: []v1.Volume{
450 {
451 Name: "vol1",
452 VolumeSource: v1.VolumeSource{
453 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
454 ClaimName: "claimA",
455 },
456 },
457 },
458 },
459 SecurityContext: &v1.PodSecurityContext{
460 SupplementalGroups: []int64{555},
461 },
462 },
463 }
464 switch podMode {
465 case v1.PersistentVolumeBlock:
466 pod.Spec.Containers[0].VolumeDevices = []v1.VolumeDevice{
467 {
468 Name: "vol1",
469 DevicePath: "/dev/vol1",
470 },
471 }
472 case v1.PersistentVolumeFilesystem:
473 pod.Spec.Containers[0].VolumeMounts = []v1.VolumeMount{
474 {
475 Name: "vol1",
476 MountPath: "/mnt/vol1",
477 },
478 }
479 default:
480
481 }
482 pv := &v1.PersistentVolume{
483 ObjectMeta: metav1.ObjectMeta{
484 Name: "pvA",
485 },
486 Spec: v1.PersistentVolumeSpec{
487 PersistentVolumeSource: v1.PersistentVolumeSource{
488 RBD: &v1.RBDPersistentVolumeSource{
489 RBDImage: "fake-device",
490 },
491 },
492 ClaimRef: &v1.ObjectReference{
493 Namespace: "nsA",
494 Name: "claimA",
495 },
496 VolumeMode: &pvMode,
497 },
498 }
499 claim := &v1.PersistentVolumeClaim{
500 ObjectMeta: metav1.ObjectMeta{
501 Name: "claimA",
502 Namespace: "nsA",
503 },
504 Spec: v1.PersistentVolumeClaimSpec{
505 VolumeName: "pvA",
506 VolumeMode: &pvMode,
507 },
508 Status: v1.PersistentVolumeClaimStatus{
509 Phase: v1.ClaimBound,
510 },
511 }
512 return node, pod, pv, claim
513 }
514
515 func simulateVolumeInUseUpdate(volumeName v1.UniqueVolumeName, stopCh <-chan struct{}, volumeManager VolumeManager) {
516 ticker := time.NewTicker(100 * time.Millisecond)
517 defer ticker.Stop()
518 for {
519 select {
520 case <-ticker.C:
521 volumeManager.MarkVolumesAsReportedInUse(
522 []v1.UniqueVolumeName{volumeName})
523 case <-stopCh:
524 return
525 }
526 }
527 }
528
529 func delayClaimBecomesBound(
530 kubeClient clientset.Interface,
531 namespace, claimName string,
532 ) {
533 time.Sleep(500 * time.Millisecond)
534 volumeClaim, _ :=
535 kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(context.TODO(), claimName, metav1.GetOptions{})
536 volumeClaim.Status = v1.PersistentVolumeClaimStatus{
537 Phase: v1.ClaimBound,
538 }
539 kubeClient.CoreV1().PersistentVolumeClaims(namespace).Update(context.TODO(), volumeClaim, metav1.UpdateOptions{})
540 }
541
542 func runVolumeManager(manager VolumeManager) chan struct{} {
543 stopCh := make(chan struct{})
544
545
546 sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
547 go manager.Run(sourcesReady, stopCh)
548 return stopCh
549 }
550
View as plain text