1
16
17 package reconciler
18
19 import (
20 "fmt"
21 "os"
22 "path/filepath"
23 "reflect"
24 "testing"
25
26 v1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/util/sets"
29 "k8s.io/klog/v2/ktesting"
30 "k8s.io/kubernetes/pkg/volume"
31 volumetesting "k8s.io/kubernetes/pkg/volume/testing"
32 "k8s.io/kubernetes/pkg/volume/util"
33 )
34
35 func TestReconstructVolumes(t *testing.T) {
36 tests := []struct {
37 name string
38 volumePaths []string
39 expectedVolumesNeedReportedInUse []string
40 expectedVolumesNeedDevicePath []string
41 expectedVolumesFailedReconstruction []string
42 verifyFunc func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
43 }{
44 {
45 name: "when two pods are using same volume and both are deleted",
46 volumePaths: []string{
47 filepath.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
48 filepath.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"),
49 },
50 expectedVolumesNeedReportedInUse: []string{"fake-plugin/pvc-abcdef", "fake-plugin/pvc-abcdef"},
51 expectedVolumesNeedDevicePath: []string{"fake-plugin/pvc-abcdef", "fake-plugin/pvc-abcdef"},
52 expectedVolumesFailedReconstruction: []string{},
53 verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
54 mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
55 if len(mountedPods) != 0 {
56 return fmt.Errorf("expected 0 certain pods in asw got %d", len(mountedPods))
57 }
58 allPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
59 if len(allPods) != 2 {
60 return fmt.Errorf("expected 2 uncertain pods in asw got %d", len(allPods))
61 }
62 volumes := rcInstance.actualStateOfWorld.GetPossiblyMountedVolumesForPod("pod1")
63 if len(volumes) != 1 {
64 return fmt.Errorf("expected 1 uncertain volume in asw got %d", len(volumes))
65 }
66
67 if reconstructed := rcInstance.actualStateOfWorld.IsVolumeReconstructed("fake-plugin/pvc-abcdef", "pod1"); !reconstructed {
68 t.Errorf("expected volume to be marked as reconstructed, got %v", reconstructed)
69 }
70 return nil
71 },
72 },
73 {
74 name: "when reconstruction fails for a volume, volumes should be cleaned up",
75 volumePaths: []string{
76 filepath.Join("pod1", "volumes", "missing-plugin", "pvc-abcdef"),
77 },
78 expectedVolumesNeedReportedInUse: []string{},
79 expectedVolumesNeedDevicePath: []string{},
80 expectedVolumesFailedReconstruction: []string{"pvc-abcdef"},
81 },
82 }
83 for _, tc := range tests {
84 t.Run(tc.name, func(t *testing.T) {
85 tmpKubeletDir, err := os.MkdirTemp("", "")
86 if err != nil {
87 t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
88 }
89 defer os.RemoveAll(tmpKubeletDir)
90
91
92 tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
93 os.MkdirAll(tmpKubeletPodDir, 0755)
94
95 mountPaths := []string{}
96
97
98 for _, volumePath := range tc.volumePaths {
99 vp := filepath.Join(tmpKubeletPodDir, volumePath)
100 mountPaths = append(mountPaths, vp)
101 os.MkdirAll(vp, 0755)
102 }
103
104 rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths, nil )
105 rcInstance, _ := rc.(*reconciler)
106
107
108 rcInstance.reconstructVolumes()
109
110
111
112 expectedVolumes := make([]v1.UniqueVolumeName, len(tc.expectedVolumesNeedDevicePath))
113 for i := range tc.expectedVolumesNeedDevicePath {
114 expectedVolumes[i] = v1.UniqueVolumeName(tc.expectedVolumesNeedDevicePath[i])
115 }
116 if !reflect.DeepEqual(expectedVolumes, rcInstance.volumesNeedUpdateFromNodeStatus) {
117 t.Errorf("Expected expectedVolumesNeedDevicePath:\n%v\n got:\n%v", expectedVolumes, rcInstance.volumesNeedUpdateFromNodeStatus)
118 }
119
120 expectedVolumes = make([]v1.UniqueVolumeName, len(tc.expectedVolumesNeedReportedInUse))
121 for i := range tc.expectedVolumesNeedReportedInUse {
122 expectedVolumes[i] = v1.UniqueVolumeName(tc.expectedVolumesNeedReportedInUse[i])
123 }
124 if !reflect.DeepEqual(expectedVolumes, rcInstance.volumesNeedReportedInUse) {
125 t.Errorf("Expected volumesNeedReportedInUse:\n%v\n got:\n%v", expectedVolumes, rcInstance.volumesNeedReportedInUse)
126 }
127
128 volumesFailedReconstruction := sets.NewString()
129 for _, vol := range rcInstance.volumesFailedReconstruction {
130 volumesFailedReconstruction.Insert(vol.volumeSpecName)
131 }
132 if !reflect.DeepEqual(volumesFailedReconstruction.List(), tc.expectedVolumesFailedReconstruction) {
133 t.Errorf("Expected volumesFailedReconstruction:\n%v\n got:\n%v", tc.expectedVolumesFailedReconstruction, volumesFailedReconstruction.List())
134 }
135
136 if tc.verifyFunc != nil {
137 if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil {
138 t.Errorf("Test %s failed: %v", tc.name, err)
139 }
140 }
141 })
142 }
143 }
144
145 func TestCleanOrphanVolumes(t *testing.T) {
146 type podInfo struct {
147 podName string
148 podUID string
149 outerVolumeName string
150 innerVolumeName string
151 }
152 defaultPodInfo := podInfo{
153 podName: "pod1",
154 podUID: "pod1uid",
155 outerVolumeName: "volume-name",
156 innerVolumeName: "volume-name",
157 }
158 defaultVolume := podVolume{
159 podName: "pod1uid",
160 volumeSpecName: "volume-name",
161 volumePath: "",
162 pluginName: "fake-plugin",
163 volumeMode: v1.PersistentVolumeFilesystem,
164 }
165
166 tests := []struct {
167 name string
168 podInfos []podInfo
169 volumesFailedReconstruction []podVolume
170 expectedUnmounts int
171 }{
172 {
173 name: "volume is in DSW and is not cleaned",
174 podInfos: []podInfo{defaultPodInfo},
175 volumesFailedReconstruction: []podVolume{defaultVolume},
176 expectedUnmounts: 0,
177 },
178 {
179 name: "volume is not in DSW and is cleaned",
180 podInfos: []podInfo{},
181 volumesFailedReconstruction: []podVolume{defaultVolume},
182 expectedUnmounts: 1,
183 },
184 }
185 for _, tc := range tests {
186 t.Run(tc.name, func(t *testing.T) {
187
188 tmpKubeletDir, err := os.MkdirTemp("", "")
189 if err != nil {
190 t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
191 }
192 defer os.RemoveAll(tmpKubeletDir)
193
194
195 tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
196 os.MkdirAll(tmpKubeletPodDir, 0755)
197
198 mountPaths := []string{}
199
200 rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths, nil )
201 rcInstance, _ := rc.(*reconciler)
202 rcInstance.volumesFailedReconstruction = tc.volumesFailedReconstruction
203 logger, _ := ktesting.NewTestContext(t)
204 for _, tpodInfo := range tc.podInfos {
205 pod := getInlineFakePod(tpodInfo.podName, tpodInfo.podUID, tpodInfo.outerVolumeName, tpodInfo.innerVolumeName)
206 volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
207 podName := util.GetUniquePodName(pod)
208 volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
209 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
210 if err != nil {
211 t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err)
212 }
213 rcInstance.actualStateOfWorld.MarkVolumeAsAttached(logger, volumeName, volumeSpec, nodeName, "")
214 }
215
216
217 rcInstance.cleanOrphanVolumes()
218
219
220 if len(rcInstance.volumesFailedReconstruction) != 0 {
221 t.Errorf("Expected volumesFailedReconstruction to be empty, got %+v", rcInstance.volumesFailedReconstruction)
222 }
223
224 var lastErr error
225 err = retryWithExponentialBackOff(testOperationBackOffDuration, func() (bool, error) {
226 if err := verifyTearDownCalls(fakePlugin, tc.expectedUnmounts); err != nil {
227 lastErr = err
228 return false, nil
229 }
230 return true, nil
231 })
232 if err != nil {
233 t.Errorf("Error waiting for volumes to get unmounted: %s: %s", err, lastErr)
234 }
235 })
236 }
237 }
238
239 func verifyTearDownCalls(plugin *volumetesting.FakeVolumePlugin, expected int) error {
240 unmounters := plugin.GetUnmounters()
241 if len(unmounters) == 0 && (expected == 0) {
242 return nil
243 }
244 actualCallCount := 0
245 for _, unmounter := range unmounters {
246 actualCallCount = unmounter.GetTearDownCallCount()
247 if actualCallCount == expected {
248 return nil
249 }
250 }
251 return fmt.Errorf("expected TearDown calls %d, got %d", expected, actualCallCount)
252 }
253
254 func TestReconstructVolumesMount(t *testing.T) {
255
256
257
258
259
260 tests := []struct {
261 name string
262 volumePath string
263 expectMount bool
264 volumeMode v1.PersistentVolumeMode
265 deviceMountPath string
266 }{
267 {
268 name: "reconstructed volume is mounted",
269 volumePath: filepath.Join("pod1uid", "volumes", "fake-plugin", "volumename"),
270
271 expectMount: true,
272 volumeMode: v1.PersistentVolumeFilesystem,
273 },
274 {
275 name: "reconstructed volume fails to mount",
276
277 volumePath: filepath.Join("pod1uid", "volumes", "fake-plugin", volumetesting.FailOnSetupVolumeName),
278 expectMount: false,
279 volumeMode: v1.PersistentVolumeFilesystem,
280 },
281 {
282 name: "reconstructed volume device map fails",
283 volumePath: filepath.Join("pod1uid", "volumeDevices", "fake-plugin", volumetesting.FailMountDeviceVolumeName),
284 volumeMode: v1.PersistentVolumeBlock,
285 deviceMountPath: filepath.Join("plugins", "fake-plugin", "volumeDevices", "pluginDependentPath"),
286 },
287 }
288 for _, tc := range tests {
289 t.Run(tc.name, func(t *testing.T) {
290 tmpKubeletDir, err := os.MkdirTemp("", "")
291 if err != nil {
292 t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
293 }
294 defer os.RemoveAll(tmpKubeletDir)
295
296
297 tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
298 os.MkdirAll(tmpKubeletPodDir, 0755)
299
300
301 vp := filepath.Join(tmpKubeletPodDir, tc.volumePath)
302 mountPaths := []string{vp}
303 os.MkdirAll(vp, 0755)
304
305
306 outerName := filepath.Base(tc.volumePath)
307 pod, pv, pvc := getPodPVCAndPV(tc.volumeMode, "pod1", outerName, "pvc1")
308 volumeSpec := &volume.Spec{PersistentVolume: pv}
309 kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
310 Name: v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", outerName)),
311 DevicePath: "fake/path",
312 })
313
314 rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths, kubeClient )
315 rcInstance, _ := rc.(*reconciler)
316
317
318 rcInstance.reconstructVolumes()
319
320
321 mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
322 if len(mountedPods) != 0 {
323 t.Errorf("expected 0 mounted volumes, got %+v", mountedPods)
324 }
325 allPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
326 if len(allPods) != 1 {
327 t.Errorf("expected 1 uncertain volume in asw, got %+v", allPods)
328 }
329
330 podName := util.GetUniquePodName(pod)
331 volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
332 podName, pod, volumeSpec, volumeSpec.Name(), "" , nil )
333 if err != nil {
334 t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err)
335 }
336 logger, _ := ktesting.NewTestContext(t)
337 rcInstance.actualStateOfWorld.MarkVolumeAsAttached(logger, volumeName, volumeSpec, nodeName, "")
338
339 rcInstance.populatorHasAddedPods = func() bool {
340
341 return true
342 }
343
344 rcInstance.volumesNeedUpdateFromNodeStatus = nil
345
346
347 rcInstance.reconcileNew()
348
349
350
351 var lastErr error
352 err = retryWithExponentialBackOff(testOperationBackOffDuration, func() (bool, error) {
353 if tc.volumeMode == v1.PersistentVolumeFilesystem {
354 if err := volumetesting.VerifyMountDeviceCallCount(1, fakePlugin); err != nil {
355 lastErr = err
356 return false, nil
357 }
358 return true, nil
359 } else {
360 return true, nil
361 }
362 })
363 if err != nil {
364 t.Errorf("Error waiting for volumes to get mounted: %s: %s", err, lastErr)
365 }
366
367 if tc.expectMount {
368
369 waitForMount(t, fakePlugin, volumeName, rcInstance.actualStateOfWorld)
370
371 if err := volumetesting.VerifySetUpCallCount(1, fakePlugin); err != nil {
372 t.Errorf("Expected SetUp() to be called, got %s", err)
373 }
374 } else {
375
376 err = retryWithExponentialBackOff(testOperationBackOffDuration, func() (bool, error) {
377 return !rcInstance.operationExecutor.IsOperationPending(volumeName, "pod1uid", nodeName), nil
378 })
379 if err != nil {
380 t.Errorf("Error waiting for operation to get finished: %s", err)
381 }
382
383 mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
384 if len(mountedPods) != 0 {
385 t.Errorf("expected 0 mounted volumes after reconcile, got %+v", mountedPods)
386 }
387 allPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
388 if len(allPods) != 1 {
389 t.Errorf("expected 1 mounted or uncertain volumes after reconcile, got %+v", allPods)
390 }
391 if tc.deviceMountPath != "" {
392 expectedDeviceMountPath := filepath.Join(tmpKubeletDir, tc.deviceMountPath)
393 deviceMountPath := allPods[0].DeviceMountPath
394 if expectedDeviceMountPath != deviceMountPath {
395 t.Errorf("expected deviceMountPath to be %s, got %s", expectedDeviceMountPath, deviceMountPath)
396 }
397 }
398
399 }
400
401
402 verifyTearDownCalls(fakePlugin, 0)
403 })
404 }
405 }
406
407 func getPodPVCAndPV(volumeMode v1.PersistentVolumeMode, podName, pvName, pvcName string) (*v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim) {
408 pv := &v1.PersistentVolume{
409 ObjectMeta: metav1.ObjectMeta{
410 Name: pvName,
411 UID: "pvuid",
412 },
413 Spec: v1.PersistentVolumeSpec{
414 ClaimRef: &v1.ObjectReference{Name: pvcName},
415 VolumeMode: &volumeMode,
416 },
417 }
418 pvc := &v1.PersistentVolumeClaim{
419 ObjectMeta: metav1.ObjectMeta{
420 Name: pvcName,
421 UID: "pvcuid",
422 },
423 Spec: v1.PersistentVolumeClaimSpec{
424 VolumeName: pvName,
425 VolumeMode: &volumeMode,
426 },
427 }
428 pod := &v1.Pod{
429 ObjectMeta: metav1.ObjectMeta{
430 Name: podName,
431 UID: "pod1uid",
432 },
433 Spec: v1.PodSpec{
434 Volumes: []v1.Volume{
435 {
436 Name: "volume-name",
437 VolumeSource: v1.VolumeSource{
438 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
439 ClaimName: pvc.Name,
440 },
441 },
442 },
443 },
444 },
445 }
446 return pod, pv, pvc
447 }
448
View as plain text