    17  package reconciler
    19  import (
    20  	"fmt"
    21  	"os"
    22  	"path/filepath"
    23  	"reflect"
    24  	"testing"
    26  	v1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	"k8s.io/klog/v2/ktesting"
    30  	"k8s.io/kubernetes/pkg/volume"
    31  	volumetesting "k8s.io/kubernetes/pkg/volume/testing"
    32  	"k8s.io/kubernetes/pkg/volume/util"
    33  )
    35  func TestReconstructVolumes(t *testing.T) {
    36  	tests := []struct {
    37  		name                                string
    38  		volumePaths                         []string
    39  		expectedVolumesNeedReportedInUse    []string
    40  		expectedVolumesNeedDevicePath       []string
    41  		expectedVolumesFailedReconstruction []string
    42  		verifyFunc                          func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error
    43  	}{
    44  		{
    45  			name: "when two pods are using same volume and both are deleted",
    46  			volumePaths: []string{
    47  				filepath.Join("pod1", "volumes", "fake-plugin", "pvc-abcdef"),
    48  				filepath.Join("pod2", "volumes", "fake-plugin", "pvc-abcdef"),
    49  			},
    50  			expectedVolumesNeedReportedInUse:    []string{"fake-plugin/pvc-abcdef", "fake-plugin/pvc-abcdef"},
    51  			expectedVolumesNeedDevicePath:       []string{"fake-plugin/pvc-abcdef", "fake-plugin/pvc-abcdef"},
    52  			expectedVolumesFailedReconstruction: []string{},
    53  			verifyFunc: func(rcInstance *reconciler, fakePlugin *volumetesting.FakeVolumePlugin) error {
    54  				mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
    55  				if len(mountedPods) != 0 {
    56  					return fmt.Errorf("expected 0 certain pods in asw got %d", len(mountedPods))
    57  				}
    58  				allPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
    59  				if len(allPods) != 2 {
    60  					return fmt.Errorf("expected 2 uncertain pods in asw got %d", len(allPods))
    61  				}
    62  				volumes := rcInstance.actualStateOfWorld.GetPossiblyMountedVolumesForPod("pod1")
    63  				if len(volumes) != 1 {
    64  					return fmt.Errorf("expected 1 uncertain volume in asw got %d", len(volumes))
    65  				}
    66  				// The volume should be marked as reconstructed in ASW
    67  				if reconstructed := rcInstance.actualStateOfWorld.IsVolumeReconstructed("fake-plugin/pvc-abcdef", "pod1"); !reconstructed {
    68  					t.Errorf("expected volume to be marked as reconstructed, got %v", reconstructed)
    69  				}
    70  				return nil
    71  			},
    72  		},
    73  		{
    74  			name: "when reconstruction fails for a volume, volumes should be cleaned up",
    75  			volumePaths: []string{
    76  				filepath.Join("pod1", "volumes", "missing-plugin", "pvc-abcdef"),
    77  			},
    78  			expectedVolumesNeedReportedInUse:    []string{},
    79  			expectedVolumesNeedDevicePath:       []string{},
    80  			expectedVolumesFailedReconstruction: []string{"pvc-abcdef"},
    81  		},
    82  	}
    83  	for _, tc := range tests {
    84  		t.Run(tc.name, func(t *testing.T) {
    85  			tmpKubeletDir, err := os.MkdirTemp("", "")
    86  			if err != nil {
    87  				t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
    88  			}
    89  			defer os.RemoveAll(tmpKubeletDir)
    91  			// create kubelet pod directory
    92  			tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
    93  			os.MkdirAll(tmpKubeletPodDir, 0755)
    95  			mountPaths := []string{}
    97  			// create pod and volume directories so as reconciler can find them.
    98  			for _, volumePath := range tc.volumePaths {
    99  				vp := filepath.Join(tmpKubeletPodDir, volumePath)
   100  				mountPaths = append(mountPaths, vp)
   101  				os.MkdirAll(vp, 0755)
   102  			}
   104  			rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths, nil /*custom kubeclient*/)
   105  			rcInstance, _ := rc.(*reconciler)
   107  			// Act
   108  			rcInstance.reconstructVolumes()
   110  			// Assert
   111  			// Convert to []UniqueVolumeName
   112  			expectedVolumes := make([]v1.UniqueVolumeName, len(tc.expectedVolumesNeedDevicePath))
   113  			for i := range tc.expectedVolumesNeedDevicePath {
   114  				expectedVolumes[i] = v1.UniqueVolumeName(tc.expectedVolumesNeedDevicePath[i])
   115  			}
   116  			if !reflect.DeepEqual(expectedVolumes, rcInstance.volumesNeedUpdateFromNodeStatus) {
   117  				t.Errorf("Expected expectedVolumesNeedDevicePath:\n%v\n got:\n%v", expectedVolumes, rcInstance.volumesNeedUpdateFromNodeStatus)
   118  			}
   120  			expectedVolumes = make([]v1.UniqueVolumeName, len(tc.expectedVolumesNeedReportedInUse))
   121  			for i := range tc.expectedVolumesNeedReportedInUse {
   122  				expectedVolumes[i] = v1.UniqueVolumeName(tc.expectedVolumesNeedReportedInUse[i])
   123  			}
   124  			if !reflect.DeepEqual(expectedVolumes, rcInstance.volumesNeedReportedInUse) {
   125  				t.Errorf("Expected volumesNeedReportedInUse:\n%v\n got:\n%v", expectedVolumes, rcInstance.volumesNeedReportedInUse)
   126  			}
   128  			volumesFailedReconstruction := sets.NewString()
   129  			for _, vol := range rcInstance.volumesFailedReconstruction {
   130  				volumesFailedReconstruction.Insert(vol.volumeSpecName)
   131  			}
   132  			if !reflect.DeepEqual(volumesFailedReconstruction.List(), tc.expectedVolumesFailedReconstruction) {
   133  				t.Errorf("Expected volumesFailedReconstruction:\n%v\n got:\n%v", tc.expectedVolumesFailedReconstruction, volumesFailedReconstruction.List())
   134  			}
   136  			if tc.verifyFunc != nil {
   137  				if err := tc.verifyFunc(rcInstance, fakePlugin); err != nil {
   138  					t.Errorf("Test %s failed: %v", tc.name, err)
   139  				}
   140  			}
   141  		})
   142  	}
   143  }
   145  func TestCleanOrphanVolumes(t *testing.T) {
   146  	type podInfo struct {
   147  		podName         string
   148  		podUID          string
   149  		outerVolumeName string
   150  		innerVolumeName string
   151  	}
   152  	defaultPodInfo := podInfo{
   153  		podName:         "pod1",
   154  		podUID:          "pod1uid",
   155  		outerVolumeName: "volume-name",
   156  		innerVolumeName: "volume-name",
   157  	}
   158  	defaultVolume := podVolume{
   159  		podName:        "pod1uid",
   160  		volumeSpecName: "volume-name",
   161  		volumePath:     "",
   162  		pluginName:     "fake-plugin",
   163  		volumeMode:     v1.PersistentVolumeFilesystem,
   164  	}
   166  	tests := []struct {
   167  		name                        string
   168  		podInfos                    []podInfo
   169  		volumesFailedReconstruction []podVolume
   170  		expectedUnmounts            int
   171  	}{
   172  		{
   173  			name:                        "volume is in DSW and is not cleaned",
   174  			podInfos:                    []podInfo{defaultPodInfo},
   175  			volumesFailedReconstruction: []podVolume{defaultVolume},
   176  			expectedUnmounts:            0,
   177  		},
   178  		{
   179  			name:                        "volume is not in DSW and is cleaned",
   180  			podInfos:                    []podInfo{},
   181  			volumesFailedReconstruction: []podVolume{defaultVolume},
   182  			expectedUnmounts:            1,
   183  		},
   184  	}
   185  	for _, tc := range tests {
   186  		t.Run(tc.name, func(t *testing.T) {
   187  			// Arrange
   188  			tmpKubeletDir, err := os.MkdirTemp("", "")
   189  			if err != nil {
   190  				t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
   191  			}
   192  			defer os.RemoveAll(tmpKubeletDir)
   194  			// create kubelet pod directory
   195  			tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
   196  			os.MkdirAll(tmpKubeletPodDir, 0755)
   198  			mountPaths := []string{}
   200  			rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths, nil /*custom kubeclient*/)
   201  			rcInstance, _ := rc.(*reconciler)
   202  			rcInstance.volumesFailedReconstruction = tc.volumesFailedReconstruction
   203  			logger, _ := ktesting.NewTestContext(t)
   204  			for _, tpodInfo := range tc.podInfos {
   205  				pod := getInlineFakePod(tpodInfo.podName, tpodInfo.podUID, tpodInfo.outerVolumeName, tpodInfo.innerVolumeName)
   206  				volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
   207  				podName := util.GetUniquePodName(pod)
   208  				volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
   209  					podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* SELinuxContext */)
   210  				if err != nil {
   211  					t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err)
   212  				}
   213  				rcInstance.actualStateOfWorld.MarkVolumeAsAttached(logger, volumeName, volumeSpec, nodeName, "")
   214  			}
   216  			// Act
   217  			rcInstance.cleanOrphanVolumes()
   219  			// Assert
   220  			if len(rcInstance.volumesFailedReconstruction) != 0 {
   221  				t.Errorf("Expected volumesFailedReconstruction to be empty, got %+v", rcInstance.volumesFailedReconstruction)
   222  			}
   223  			// Unmount runs in a go routine, wait for its finish
   224  			var lastErr error
   225  			err = retryWithExponentialBackOff(testOperationBackOffDuration, func() (bool, error) {
   226  				if err := verifyTearDownCalls(fakePlugin, tc.expectedUnmounts); err != nil {
   227  					lastErr = err
   228  					return false, nil
   229  				}
   230  				return true, nil
   231  			})
   232  			if err != nil {
   233  				t.Errorf("Error waiting for volumes to get unmounted: %s: %s", err, lastErr)
   234  			}
   235  		})
   236  	}
   237  }
   239  func verifyTearDownCalls(plugin *volumetesting.FakeVolumePlugin, expected int) error {
   240  	unmounters := plugin.GetUnmounters()
   241  	if len(unmounters) == 0 && (expected == 0) {
   242  		return nil
   243  	}
   244  	actualCallCount := 0
   245  	for _, unmounter := range unmounters {
   246  		actualCallCount = unmounter.GetTearDownCallCount()
   247  		if actualCallCount == expected {
   248  			return nil
   249  		}
   250  	}
   251  	return fmt.Errorf("expected TearDown calls %d, got %d", expected, actualCallCount)
   252  }
   254  func TestReconstructVolumesMount(t *testing.T) {
   255  	// This test checks volume reconstruction + subsequent failed mount.
   256  	// Since the volume is reconstructed, it must be marked as uncertain
   257  	// even after a final SetUp error, see https://github.com/kubernetes/kubernetes/issues/96635
   258  	// and https://github.com/kubernetes/kubernetes/pull/110670.
   260  	tests := []struct {
   261  		name            string
   262  		volumePath      string
   263  		expectMount     bool
   264  		volumeMode      v1.PersistentVolumeMode
   265  		deviceMountPath string
   266  	}{
   267  		{
   268  			name:       "reconstructed volume is mounted",
   269  			volumePath: filepath.Join("pod1uid", "volumes", "fake-plugin", "volumename"),
   271  			expectMount: true,
   272  			volumeMode:  v1.PersistentVolumeFilesystem,
   273  		},
   274  		{
   275  			name: "reconstructed volume fails to mount",
   276  			// FailOnSetupVolumeName: MountDevice succeeds, SetUp fails
   277  			volumePath:  filepath.Join("pod1uid", "volumes", "fake-plugin", volumetesting.FailOnSetupVolumeName),
   278  			expectMount: false,
   279  			volumeMode:  v1.PersistentVolumeFilesystem,
   280  		},
   281  		{
   282  			name:            "reconstructed volume device map fails",
   283  			volumePath:      filepath.Join("pod1uid", "volumeDevices", "fake-plugin", volumetesting.FailMountDeviceVolumeName),
   284  			volumeMode:      v1.PersistentVolumeBlock,
   285  			deviceMountPath: filepath.Join("plugins", "fake-plugin", "volumeDevices", "pluginDependentPath"),
   286  		},
   287  	}
   288  	for _, tc := range tests {
   289  		t.Run(tc.name, func(t *testing.T) {
   290  			tmpKubeletDir, err := os.MkdirTemp("", "")
   291  			if err != nil {
   292  				t.Fatalf("can't make a temp directory for kubeletPods: %v", err)
   293  			}
   294  			defer os.RemoveAll(tmpKubeletDir)
   296  			// create kubelet pod directory
   297  			tmpKubeletPodDir := filepath.Join(tmpKubeletDir, "pods")
   298  			os.MkdirAll(tmpKubeletPodDir, 0755)
   300  			// create pod and volume directories so as reconciler can find them.
   301  			vp := filepath.Join(tmpKubeletPodDir, tc.volumePath)
   302  			mountPaths := []string{vp}
   303  			os.MkdirAll(vp, 0755)
   305  			// Arrange 2 - populate DSW
   306  			outerName := filepath.Base(tc.volumePath)
   307  			pod, pv, pvc := getPodPVCAndPV(tc.volumeMode, "pod1", outerName, "pvc1")
   308  			volumeSpec := &volume.Spec{PersistentVolume: pv}
   309  			kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
   310  				Name:       v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", outerName)),
   311  				DevicePath: "fake/path",
   312  			})
   314  			rc, fakePlugin := getReconciler(tmpKubeletDir, t, mountPaths, kubeClient /*custom kubeclient*/)
   315  			rcInstance, _ := rc.(*reconciler)
   317  			// Act 1 - reconstruction
   318  			rcInstance.reconstructVolumes()
   320  			// Assert 1 - the volume is Uncertain
   321  			mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
   322  			if len(mountedPods) != 0 {
   323  				t.Errorf("expected 0 mounted volumes, got %+v", mountedPods)
   324  			}
   325  			allPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
   326  			if len(allPods) != 1 {
   327  				t.Errorf("expected 1 uncertain volume in asw, got %+v", allPods)
   328  			}
   330  			podName := util.GetUniquePodName(pod)
   331  			volumeName, err := rcInstance.desiredStateOfWorld.AddPodToVolume(
   332  				podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* SELinuxContext */)
   333  			if err != nil {
   334  				t.Fatalf("Error adding volume %s to dsow: %v", volumeSpec.Name(), err)
   335  			}
   336  			logger, _ := ktesting.NewTestContext(t)
   337  			rcInstance.actualStateOfWorld.MarkVolumeAsAttached(logger, volumeName, volumeSpec, nodeName, "")
   339  			rcInstance.populatorHasAddedPods = func() bool {
   340  				// Mark DSW populated to allow unmounting of volumes.
   341  				return true
   342  			}
   343  			// Mark devices paths as reconciled to allow unmounting of volumes.
   344  			rcInstance.volumesNeedUpdateFromNodeStatus = nil
   346  			// Act 2 - reconcile once
   347  			rcInstance.reconcileNew()
   349  			// Assert 2
   350  			// MountDevice was attempted
   351  			var lastErr error
   352  			err = retryWithExponentialBackOff(testOperationBackOffDuration, func() (bool, error) {
   353  				if tc.volumeMode == v1.PersistentVolumeFilesystem {
   354  					if err := volumetesting.VerifyMountDeviceCallCount(1, fakePlugin); err != nil {
   355  						lastErr = err
   356  						return false, nil
   357  					}
   358  					return true, nil
   359  				} else {
   360  					return true, nil
   361  				}
   362  			})
   363  			if err != nil {
   364  				t.Errorf("Error waiting for volumes to get mounted: %s: %s", err, lastErr)
   365  			}
   367  			if tc.expectMount {
   368  				// The volume should be fully mounted
   369  				waitForMount(t, fakePlugin, volumeName, rcInstance.actualStateOfWorld)
   370  				// SetUp was called and succeeded
   371  				if err := volumetesting.VerifySetUpCallCount(1, fakePlugin); err != nil {
   372  					t.Errorf("Expected SetUp() to be called, got %s", err)
   373  				}
   374  			} else {
   375  				// The test does not expect any change in ASW, yet it needs to wait for volume operations to finish
   376  				err = retryWithExponentialBackOff(testOperationBackOffDuration, func() (bool, error) {
   377  					return !rcInstance.operationExecutor.IsOperationPending(volumeName, "pod1uid", nodeName), nil
   378  				})
   379  				if err != nil {
   380  					t.Errorf("Error waiting for operation to get finished: %s", err)
   381  				}
   382  				// The volume is uncertain
   383  				mountedPods := rcInstance.actualStateOfWorld.GetMountedVolumes()
   384  				if len(mountedPods) != 0 {
   385  					t.Errorf("expected 0 mounted volumes after reconcile, got %+v", mountedPods)
   386  				}
   387  				allPods := rcInstance.actualStateOfWorld.GetAllMountedVolumes()
   388  				if len(allPods) != 1 {
   389  					t.Errorf("expected 1 mounted or uncertain volumes after reconcile, got %+v", allPods)
   390  				}
   391  				if tc.deviceMountPath != "" {
   392  					expectedDeviceMountPath := filepath.Join(tmpKubeletDir, tc.deviceMountPath)
   393  					deviceMountPath := allPods[0].DeviceMountPath
   394  					if expectedDeviceMountPath != deviceMountPath {
   395  						t.Errorf("expected deviceMountPath to be %s, got %s", expectedDeviceMountPath, deviceMountPath)
   396  					}
   397  				}
   399  			}
   401  			// Unmount was *not* attempted in any case
   402  			verifyTearDownCalls(fakePlugin, 0)
   403  		})
   404  	}
   405  }
   407  func getPodPVCAndPV(volumeMode v1.PersistentVolumeMode, podName, pvName, pvcName string) (*v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim) {
   408  	pv := &v1.PersistentVolume{
   409  		ObjectMeta: metav1.ObjectMeta{
   410  			Name: pvName,
   411  			UID:  "pvuid",
   412  		},
   413  		Spec: v1.PersistentVolumeSpec{
   414  			ClaimRef:   &v1.ObjectReference{Name: pvcName},
   415  			VolumeMode: &volumeMode,
   416  		},
   417  	}
   418  	pvc := &v1.PersistentVolumeClaim{
   419  		ObjectMeta: metav1.ObjectMeta{
   420  			Name: pvcName,
   421  			UID:  "pvcuid",
   422  		},
   423  		Spec: v1.PersistentVolumeClaimSpec{
   424  			VolumeName: pvName,
   425  			VolumeMode: &volumeMode,
   426  		},
   427  	}
   428  	pod := &v1.Pod{
   429  		ObjectMeta: metav1.ObjectMeta{
   430  			Name: podName,
   431  			UID:  "pod1uid",
   432  		},
   433  		Spec: v1.PodSpec{
   434  			Volumes: []v1.Volume{
   435  				{
   436  					Name: "volume-name",
   437  					VolumeSource: v1.VolumeSource{
   438  						PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   439  							ClaimName: pvc.Name,
   440  						},
   441  					},
   442  				},
   443  			},
   444  		},
   445  	}
   446  	return pod, pv, pvc
   447  }

