...

Source file src/k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler/reconciler_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/volumemanager/reconciler

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package reconciler
    18  
    19  import (
    20  	"crypto/md5"
    21  	"fmt"
    22  	"path/filepath"
    23  	"testing"
    24  	"time"
    25  
    26  	csitrans "k8s.io/csi-translation-lib"
    27  	"k8s.io/kubernetes/pkg/volume/csimigration"
    28  
    29  	"github.com/stretchr/testify/assert"
    30  	"k8s.io/mount-utils"
    31  
    32  	v1 "k8s.io/api/core/v1"
    33  	"k8s.io/apimachinery/pkg/api/resource"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/runtime"
    36  	k8stypes "k8s.io/apimachinery/pkg/types"
    37  	"k8s.io/apimachinery/pkg/util/wait"
    38  	"k8s.io/client-go/kubernetes/fake"
    39  	core "k8s.io/client-go/testing"
    40  	"k8s.io/client-go/tools/record"
    41  	"k8s.io/klog/v2"
    42  	"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
    43  	"k8s.io/kubernetes/pkg/volume"
    44  	volumetesting "k8s.io/kubernetes/pkg/volume/testing"
    45  	"k8s.io/kubernetes/pkg/volume/util"
    46  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
    47  	"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
    48  	"k8s.io/kubernetes/pkg/volume/util/types"
    49  )
    50  
    51  const (
    52  	// reconcilerLoopSleepDuration is the amount of time the reconciler loop
    53  	// waits between successive executions
    54  	reconcilerLoopSleepDuration = 1 * time.Nanosecond
    55  	// waitForAttachTimeout is the maximum amount of time a
    56  	// operationexecutor.Mount call will wait for a volume to be attached.
    57  	waitForAttachTimeout         = 1 * time.Second
    58  	nodeName                     = k8stypes.NodeName("mynodename")
    59  	kubeletPodsDir               = "fake-dir"
    60  	testOperationBackOffDuration = 100 * time.Millisecond
    61  	reconcilerSyncWaitDuration   = 10 * time.Second
    62  )
    63  
    64  func hasAddedPods() bool { return true }
    65  
    66  // Calls Run()
    67  // Verifies there are no calls to attach, detach, mount, unmount, etc.
    68  func Test_Run_Positive_DoNothing(t *testing.T) {
    69  	// Arrange
    70  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
    71  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
    72  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
    73  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
    74  	kubeClient := createTestClient()
    75  	fakeRecorder := &record.FakeRecorder{}
    76  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
    77  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
    78  		kubeClient,
    79  		volumePluginMgr,
    80  		fakeRecorder,
    81  		fakeHandler,
    82  	))
    83  	reconciler := NewReconciler(
    84  		kubeClient,
    85  		false, /* controllerAttachDetachEnabled */
    86  		reconcilerLoopSleepDuration,
    87  		waitForAttachTimeout,
    88  		nodeName,
    89  		dsw,
    90  		asw,
    91  		hasAddedPods,
    92  		oex,
    93  		mount.NewFakeMounter(nil),
    94  		hostutil.NewFakeHostUtil(nil),
    95  		volumePluginMgr,
    96  		kubeletPodsDir)
    97  
    98  	// Act
    99  	runReconciler(reconciler)
   100  
   101  	// Assert
   102  	assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
   103  	assert.NoError(t, volumetesting.VerifyZeroWaitForAttachCallCount(fakePlugin))
   104  	assert.NoError(t, volumetesting.VerifyZeroMountDeviceCallCount(fakePlugin))
   105  	assert.NoError(t, volumetesting.VerifyZeroSetUpCallCount(fakePlugin))
   106  	assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
   107  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   108  }
   109  
   110  // Populates desiredStateOfWorld cache with one volume/pod.
   111  // Calls Run()
   112  // Verifies there is are attach/mount/etc calls and no detach/unmount calls.
   113  func Test_Run_Positive_VolumeAttachAndMount(t *testing.T) {
   114  	// Arrange
   115  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
   116  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
   117  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
   118  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
   119  	kubeClient := createTestClient()
   120  	fakeRecorder := &record.FakeRecorder{}
   121  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
   122  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   123  		kubeClient,
   124  		volumePluginMgr,
   125  		fakeRecorder,
   126  		fakeHandler))
   127  	reconciler := NewReconciler(
   128  		kubeClient,
   129  		false, /* controllerAttachDetachEnabled */
   130  		reconcilerLoopSleepDuration,
   131  		waitForAttachTimeout,
   132  		nodeName,
   133  		dsw,
   134  		asw,
   135  		hasAddedPods,
   136  		oex,
   137  		mount.NewFakeMounter(nil),
   138  		hostutil.NewFakeHostUtil(nil),
   139  		volumePluginMgr,
   140  		kubeletPodsDir)
   141  	pod := &v1.Pod{
   142  		ObjectMeta: metav1.ObjectMeta{
   143  			Name: "pod1",
   144  			UID:  "pod1uid",
   145  		},
   146  		Spec: v1.PodSpec{
   147  			Volumes: []v1.Volume{
   148  				{
   149  					Name: "volume-name",
   150  					VolumeSource: v1.VolumeSource{
   151  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
   152  							PDName: "fake-device1",
   153  						},
   154  					},
   155  				},
   156  			},
   157  		},
   158  	}
   159  
   160  	volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
   161  	podName := util.GetUniquePodName(pod)
   162  	generatedVolumeName, err := dsw.AddPodToVolume(
   163  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
   164  
   165  	// Assert
   166  	if err != nil {
   167  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
   168  	}
   169  
   170  	// Act
   171  	runReconciler(reconciler)
   172  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
   173  	// Assert
   174  	assert.NoError(t, volumetesting.VerifyAttachCallCount(
   175  		1 /* expectedAttachCallCount */, fakePlugin))
   176  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
   177  		1 /* expectedWaitForAttachCallCount */, fakePlugin))
   178  	assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
   179  		1 /* expectedMountDeviceCallCount */, fakePlugin))
   180  	assert.NoError(t, volumetesting.VerifySetUpCallCount(
   181  		1 /* expectedSetUpCallCount */, fakePlugin))
   182  	assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
   183  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   184  }
   185  
   186  // Populates desiredStateOfWorld cache with one volume/pod.
   187  // Calls Run()
   188  // Verifies there is are attach/mount/etc calls and no detach/unmount calls.
   189  func Test_Run_Positive_VolumeAttachAndMountMigrationEnabled(t *testing.T) {
   190  	// Arrange
   191  	intreeToCSITranslator := csitrans.New()
   192  	node := &v1.Node{
   193  		ObjectMeta: metav1.ObjectMeta{
   194  			Name: string(nodeName),
   195  		},
   196  		Spec: v1.NodeSpec{},
   197  		Status: v1.NodeStatus{
   198  			VolumesAttached: []v1.AttachedVolume{
   199  				{
   200  					Name:       v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", "pd.csi.storage.gke.io-fake-device1")),
   201  					DevicePath: "fake/path",
   202  				},
   203  			},
   204  		},
   205  	}
   206  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
   207  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
   208  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
   209  
   210  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
   211  	kubeClient := createTestClient(v1.AttachedVolume{
   212  		Name:       v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", "pd.csi.storage.gke.io-fake-device1")),
   213  		DevicePath: "fake/path",
   214  	})
   215  
   216  	fakeRecorder := &record.FakeRecorder{}
   217  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
   218  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   219  		kubeClient,
   220  		volumePluginMgr,
   221  		fakeRecorder,
   222  		fakeHandler))
   223  	reconciler := NewReconciler(
   224  		kubeClient,
   225  		true, /* controllerAttachDetachEnabled */
   226  		reconcilerLoopSleepDuration,
   227  		waitForAttachTimeout,
   228  		nodeName,
   229  		dsw,
   230  		asw,
   231  		hasAddedPods,
   232  		oex,
   233  		mount.NewFakeMounter(nil),
   234  		hostutil.NewFakeHostUtil(nil),
   235  		volumePluginMgr,
   236  		kubeletPodsDir)
   237  	pod := &v1.Pod{
   238  		ObjectMeta: metav1.ObjectMeta{
   239  			Name: "pod1",
   240  			UID:  "pod1uid",
   241  		},
   242  		Spec: v1.PodSpec{
   243  			Volumes: []v1.Volume{
   244  				{
   245  					Name: "volume-name",
   246  					VolumeSource: v1.VolumeSource{
   247  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
   248  							PDName: "fake-device1",
   249  						},
   250  					},
   251  				},
   252  			},
   253  		},
   254  	}
   255  
   256  	volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
   257  	migratedSpec, err := csimigration.TranslateInTreeSpecToCSI(volumeSpec, pod.Namespace, intreeToCSITranslator)
   258  	if err != nil {
   259  		t.Fatalf("unexpected error while translating spec %v: %v", volumeSpec, err)
   260  	}
   261  
   262  	podName := util.GetUniquePodName(pod)
   263  	generatedVolumeName, err := dsw.AddPodToVolume(
   264  		podName,
   265  		pod,
   266  		migratedSpec,
   267  		migratedSpec.Name(),
   268  		"",  /* volumeGidValue */
   269  		nil, /* SELinuxContexts */
   270  	)
   271  
   272  	// Assert
   273  	if err != nil {
   274  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
   275  	}
   276  	dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
   277  
   278  	// Act
   279  	runReconciler(reconciler)
   280  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
   281  	// Assert
   282  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
   283  		1 /* expectedWaitForAttachCallCount */, fakePlugin))
   284  	assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
   285  		1 /* expectedMountDeviceCallCount */, fakePlugin))
   286  	assert.NoError(t, volumetesting.VerifySetUpCallCount(
   287  		1 /* expectedSetUpCallCount */, fakePlugin))
   288  	assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
   289  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   290  }
   291  
   292  // Populates desiredStateOfWorld cache with one volume/pod.
   293  // Enables controllerAttachDetachEnabled.
   294  // Calls Run()
   295  // Verifies there is one mount call and no unmount calls.
   296  // Verifies there are no attach/detach calls.
   297  func Test_Run_Positive_VolumeMountControllerAttachEnabled(t *testing.T) {
   298  	// Arrange
   299  	node := &v1.Node{
   300  		ObjectMeta: metav1.ObjectMeta{
   301  			Name: string(nodeName),
   302  		},
   303  		Status: v1.NodeStatus{
   304  			VolumesAttached: []v1.AttachedVolume{
   305  				{
   306  					Name:       "fake-plugin/fake-device1",
   307  					DevicePath: "fake/path",
   308  				},
   309  			},
   310  		},
   311  	}
   312  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
   313  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
   314  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
   315  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
   316  	kubeClient := createTestClient()
   317  	fakeRecorder := &record.FakeRecorder{}
   318  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
   319  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   320  		kubeClient,
   321  		volumePluginMgr,
   322  		fakeRecorder,
   323  		fakeHandler))
   324  	reconciler := NewReconciler(
   325  		kubeClient,
   326  		true, /* controllerAttachDetachEnabled */
   327  		reconcilerLoopSleepDuration,
   328  		waitForAttachTimeout,
   329  		nodeName,
   330  		dsw,
   331  		asw,
   332  		hasAddedPods,
   333  		oex,
   334  		mount.NewFakeMounter(nil),
   335  		hostutil.NewFakeHostUtil(nil),
   336  		volumePluginMgr,
   337  		kubeletPodsDir)
   338  	pod := &v1.Pod{
   339  		ObjectMeta: metav1.ObjectMeta{
   340  			Name: "pod1",
   341  			UID:  "pod1uid",
   342  		},
   343  		Spec: v1.PodSpec{
   344  			Volumes: []v1.Volume{
   345  				{
   346  					Name: "volume-name",
   347  					VolumeSource: v1.VolumeSource{
   348  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
   349  							PDName: "fake-device1",
   350  						},
   351  					},
   352  				},
   353  			},
   354  		},
   355  	}
   356  
   357  	volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
   358  	podName := util.GetUniquePodName(pod)
   359  	generatedVolumeName, err := dsw.AddPodToVolume(
   360  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
   361  	dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
   362  
   363  	// Assert
   364  	if err != nil {
   365  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
   366  	}
   367  
   368  	// Act
   369  	runReconciler(reconciler)
   370  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
   371  
   372  	// Assert
   373  	assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
   374  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
   375  		1 /* expectedWaitForAttachCallCount */, fakePlugin))
   376  	assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
   377  		1 /* expectedMountDeviceCallCount */, fakePlugin))
   378  	assert.NoError(t, volumetesting.VerifySetUpCallCount(
   379  		1 /* expectedSetUpCallCount */, fakePlugin))
   380  	assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
   381  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   382  }
   383  
   384  // Populates desiredStateOfWorld cache with one volume/pod.
   385  // Enables controllerAttachDetachEnabled.
   386  // volume is not repored-in-use
   387  // Calls Run()
   388  // Verifies that there is not wait-for-mount call
   389  // Verifies that there is no exponential-backoff triggered
   390  func Test_Run_Negative_VolumeMountControllerAttachEnabled(t *testing.T) {
   391  	// Arrange
   392  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
   393  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
   394  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
   395  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
   396  	kubeClient := createTestClient()
   397  	fakeRecorder := &record.FakeRecorder{}
   398  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
   399  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   400  		kubeClient,
   401  		volumePluginMgr,
   402  		fakeRecorder,
   403  		fakeHandler))
   404  	reconciler := NewReconciler(
   405  		kubeClient,
   406  		true, /* controllerAttachDetachEnabled */
   407  		reconcilerLoopSleepDuration,
   408  		waitForAttachTimeout,
   409  		nodeName,
   410  		dsw,
   411  		asw,
   412  		hasAddedPods,
   413  		oex,
   414  		mount.NewFakeMounter(nil),
   415  		hostutil.NewFakeHostUtil(nil),
   416  		volumePluginMgr,
   417  		kubeletPodsDir)
   418  	pod := &v1.Pod{
   419  		ObjectMeta: metav1.ObjectMeta{
   420  			Name: "pod1",
   421  			UID:  "pod1uid",
   422  		},
   423  		Spec: v1.PodSpec{
   424  			Volumes: []v1.Volume{
   425  				{
   426  					Name: "volume-name",
   427  					VolumeSource: v1.VolumeSource{
   428  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
   429  							PDName: "fake-device1",
   430  						},
   431  					},
   432  				},
   433  			},
   434  		},
   435  	}
   436  
   437  	volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
   438  	podName := util.GetUniquePodName(pod)
   439  	generatedVolumeName, err := dsw.AddPodToVolume(
   440  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
   441  
   442  	// Assert
   443  	if err != nil {
   444  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
   445  	}
   446  
   447  	// Act
   448  	runReconciler(reconciler)
   449  	time.Sleep(reconcilerSyncWaitDuration)
   450  
   451  	ok := oex.IsOperationSafeToRetry(generatedVolumeName, podName, nodeName, operationexecutor.VerifyControllerAttachedVolumeOpName)
   452  	if !ok {
   453  		t.Errorf("operation on volume %s is not safe to retry", generatedVolumeName)
   454  	}
   455  
   456  	// Assert
   457  	assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
   458  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
   459  		0 /* expectedWaitForAttachCallCount */, fakePlugin))
   460  	assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
   461  		0 /* expectedMountDeviceCallCount */, fakePlugin))
   462  }
   463  
   464  // Populates desiredStateOfWorld cache with one volume/pod.
   465  // Calls Run()
   466  // Verifies there is one attach/mount/etc call and no detach calls.
   467  // Deletes volume/pod from desired state of world.
   468  // Verifies detach/unmount calls are issued.
   469  func Test_Run_Positive_VolumeAttachMountUnmountDetach(t *testing.T) {
   470  	// Arrange
   471  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
   472  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
   473  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
   474  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
   475  	kubeClient := createTestClient()
   476  	fakeRecorder := &record.FakeRecorder{}
   477  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
   478  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   479  		kubeClient,
   480  		volumePluginMgr,
   481  		fakeRecorder,
   482  		fakeHandler))
   483  	reconciler := NewReconciler(
   484  		kubeClient,
   485  		false, /* controllerAttachDetachEnabled */
   486  		reconcilerLoopSleepDuration,
   487  		waitForAttachTimeout,
   488  		nodeName,
   489  		dsw,
   490  		asw,
   491  		hasAddedPods,
   492  		oex,
   493  		mount.NewFakeMounter(nil),
   494  		hostutil.NewFakeHostUtil(nil),
   495  		volumePluginMgr,
   496  		kubeletPodsDir)
   497  	pod := &v1.Pod{
   498  		ObjectMeta: metav1.ObjectMeta{
   499  			Name: "pod1",
   500  			UID:  "pod1uid",
   501  		},
   502  		Spec: v1.PodSpec{
   503  			Volumes: []v1.Volume{
   504  				{
   505  					Name: "volume-name",
   506  					VolumeSource: v1.VolumeSource{
   507  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
   508  							PDName: "fake-device1",
   509  						},
   510  					},
   511  				},
   512  			},
   513  		},
   514  	}
   515  
   516  	volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
   517  	podName := util.GetUniquePodName(pod)
   518  	generatedVolumeName, err := dsw.AddPodToVolume(
   519  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
   520  
   521  	// Assert
   522  	if err != nil {
   523  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
   524  	}
   525  
   526  	// Act
   527  	runReconciler(reconciler)
   528  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
   529  	// Assert
   530  	assert.NoError(t, volumetesting.VerifyAttachCallCount(
   531  		1 /* expectedAttachCallCount */, fakePlugin))
   532  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
   533  		1 /* expectedWaitForAttachCallCount */, fakePlugin))
   534  	assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
   535  		1 /* expectedMountDeviceCallCount */, fakePlugin))
   536  	assert.NoError(t, volumetesting.VerifySetUpCallCount(
   537  		1 /* expectedSetUpCallCount */, fakePlugin))
   538  	assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
   539  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   540  
   541  	// Act
   542  	dsw.DeletePodFromVolume(podName, generatedVolumeName)
   543  	waitForDetach(t, generatedVolumeName, asw)
   544  
   545  	// Assert
   546  	assert.NoError(t, volumetesting.VerifyTearDownCallCount(
   547  		1 /* expectedTearDownCallCount */, fakePlugin))
   548  	assert.NoError(t, volumetesting.VerifyDetachCallCount(
   549  		1 /* expectedDetachCallCount */, fakePlugin))
   550  }
   551  
   552  // Populates desiredStateOfWorld cache with one volume/pod.
   553  // Enables controllerAttachDetachEnabled.
   554  // Calls Run()
   555  // Verifies one mount call is made and no unmount calls.
   556  // Deletes volume/pod from desired state of world.
   557  // Verifies one unmount call is made.
   558  // Verifies there are no attach/detach calls made.
   559  func Test_Run_Positive_VolumeUnmountControllerAttachEnabled(t *testing.T) {
   560  	// Arrange
   561  	node := &v1.Node{
   562  		ObjectMeta: metav1.ObjectMeta{
   563  			Name: string(nodeName),
   564  		},
   565  		Status: v1.NodeStatus{
   566  			VolumesAttached: []v1.AttachedVolume{
   567  				{
   568  					Name:       "fake-plugin/fake-device1",
   569  					DevicePath: "fake/path",
   570  				},
   571  			},
   572  		},
   573  	}
   574  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
   575  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
   576  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
   577  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
   578  	kubeClient := createTestClient()
   579  	fakeRecorder := &record.FakeRecorder{}
   580  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
   581  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   582  		kubeClient,
   583  		volumePluginMgr,
   584  		fakeRecorder,
   585  		fakeHandler))
   586  	reconciler := NewReconciler(
   587  		kubeClient,
   588  		true, /* controllerAttachDetachEnabled */
   589  		reconcilerLoopSleepDuration,
   590  		waitForAttachTimeout,
   591  		nodeName,
   592  		dsw,
   593  		asw,
   594  		hasAddedPods,
   595  		oex,
   596  		mount.NewFakeMounter(nil),
   597  		hostutil.NewFakeHostUtil(nil),
   598  		volumePluginMgr,
   599  		kubeletPodsDir)
   600  	pod := &v1.Pod{
   601  		ObjectMeta: metav1.ObjectMeta{
   602  			Name: "pod1",
   603  			UID:  "pod1uid",
   604  		},
   605  		Spec: v1.PodSpec{
   606  			Volumes: []v1.Volume{
   607  				{
   608  					Name: "volume-name",
   609  					VolumeSource: v1.VolumeSource{
   610  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
   611  							PDName: "fake-device1",
   612  						},
   613  					},
   614  				},
   615  			},
   616  		},
   617  	}
   618  
   619  	volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
   620  	podName := util.GetUniquePodName(pod)
   621  	generatedVolumeName, err := dsw.AddPodToVolume(
   622  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
   623  
   624  	// Assert
   625  	if err != nil {
   626  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
   627  	}
   628  
   629  	// Act
   630  	runReconciler(reconciler)
   631  
   632  	dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
   633  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
   634  
   635  	// Assert
   636  	assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
   637  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
   638  		1 /* expectedWaitForAttachCallCount */, fakePlugin))
   639  	assert.NoError(t, volumetesting.VerifyMountDeviceCallCount(
   640  		1 /* expectedMountDeviceCallCount */, fakePlugin))
   641  	assert.NoError(t, volumetesting.VerifySetUpCallCount(
   642  		1 /* expectedSetUpCallCount */, fakePlugin))
   643  	assert.NoError(t, volumetesting.VerifyZeroTearDownCallCount(fakePlugin))
   644  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   645  
   646  	// Act
   647  	dsw.DeletePodFromVolume(podName, generatedVolumeName)
   648  	waitForDetach(t, generatedVolumeName, asw)
   649  
   650  	// Assert
   651  	assert.NoError(t, volumetesting.VerifyTearDownCallCount(
   652  		1 /* expectedTearDownCallCount */, fakePlugin))
   653  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   654  }
   655  
   656  // Populates desiredStateOfWorld cache with one volume/pod.
   657  // Calls Run()
   658  // Verifies there are attach/get map paths/setupDevice calls and
   659  // no detach/teardownDevice calls.
   660  func Test_Run_Positive_VolumeAttachAndMap(t *testing.T) {
   661  	pod := &v1.Pod{
   662  		ObjectMeta: metav1.ObjectMeta{
   663  			Name:      "pod1",
   664  			UID:       "pod1uid",
   665  			Namespace: "ns",
   666  		},
   667  		Spec: v1.PodSpec{},
   668  	}
   669  
   670  	mode := v1.PersistentVolumeBlock
   671  	gcepv := &v1.PersistentVolume{
   672  		ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
   673  		Spec: v1.PersistentVolumeSpec{
   674  			Capacity:               v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
   675  			PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
   676  			AccessModes: []v1.PersistentVolumeAccessMode{
   677  				v1.ReadWriteOnce,
   678  				v1.ReadOnlyMany,
   679  			},
   680  			VolumeMode: &mode,
   681  			ClaimRef:   &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
   682  		},
   683  	}
   684  
   685  	gcepvc := &v1.PersistentVolumeClaim{
   686  		ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
   687  		Spec: v1.PersistentVolumeClaimSpec{
   688  			VolumeName: "volume-name",
   689  			VolumeMode: &mode,
   690  		},
   691  		Status: v1.PersistentVolumeClaimStatus{
   692  			Phase:    v1.ClaimBound,
   693  			Capacity: gcepv.Spec.Capacity,
   694  		},
   695  	}
   696  
   697  	// Arrange
   698  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
   699  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
   700  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
   701  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
   702  	kubeClient := createtestClientWithPVPVC(gcepv, gcepvc)
   703  	fakeRecorder := &record.FakeRecorder{}
   704  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
   705  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   706  		kubeClient,
   707  		volumePluginMgr,
   708  		fakeRecorder,
   709  		fakeHandler))
   710  	reconciler := NewReconciler(
   711  		kubeClient,
   712  		false, /* controllerAttachDetachEnabled */
   713  		reconcilerLoopSleepDuration,
   714  		waitForAttachTimeout,
   715  		nodeName,
   716  		dsw,
   717  		asw,
   718  		hasAddedPods,
   719  		oex,
   720  		mount.NewFakeMounter(nil),
   721  		hostutil.NewFakeHostUtil(nil),
   722  		volumePluginMgr,
   723  		kubeletPodsDir)
   724  
   725  	volumeSpec := &volume.Spec{
   726  		PersistentVolume: gcepv,
   727  	}
   728  	podName := util.GetUniquePodName(pod)
   729  	generatedVolumeName, err := dsw.AddPodToVolume(
   730  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
   731  
   732  	// Assert
   733  	if err != nil {
   734  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
   735  	}
   736  
   737  	// Act
   738  	runReconciler(reconciler)
   739  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
   740  	// Assert
   741  	assert.NoError(t, volumetesting.VerifyAttachCallCount(
   742  		1 /* expectedAttachCallCount */, fakePlugin))
   743  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
   744  		1 /* expectedWaitForAttachCallCount */, fakePlugin))
   745  	assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
   746  		1 /* expectedGetMapDeviceCallCount */, fakePlugin))
   747  	assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
   748  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   749  }
   750  
   751  // Populates desiredStateOfWorld cache with one volume/pod.
   752  // Enables controllerAttachDetachEnabled.
   753  // Calls Run()
   754  // Verifies there are two get map path calls, a setupDevice call
   755  // and no teardownDevice call.
   756  // Verifies there are no attach/detach calls.
   757  func Test_Run_Positive_BlockVolumeMapControllerAttachEnabled(t *testing.T) {
   758  	pod := &v1.Pod{
   759  		ObjectMeta: metav1.ObjectMeta{
   760  			Name:      "pod1",
   761  			UID:       "pod1uid",
   762  			Namespace: "ns",
   763  		},
   764  		Spec: v1.PodSpec{},
   765  	}
   766  
   767  	mode := v1.PersistentVolumeBlock
   768  	gcepv := &v1.PersistentVolume{
   769  		ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
   770  		Spec: v1.PersistentVolumeSpec{
   771  			Capacity:               v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
   772  			PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
   773  			AccessModes: []v1.PersistentVolumeAccessMode{
   774  				v1.ReadWriteOnce,
   775  				v1.ReadOnlyMany,
   776  			},
   777  			VolumeMode: &mode,
   778  			ClaimRef:   &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
   779  		},
   780  	}
   781  	gcepvc := &v1.PersistentVolumeClaim{
   782  		ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
   783  		Spec: v1.PersistentVolumeClaimSpec{
   784  			VolumeName: "volume-name",
   785  			VolumeMode: &mode,
   786  		},
   787  		Status: v1.PersistentVolumeClaimStatus{
   788  			Phase:    v1.ClaimBound,
   789  			Capacity: gcepv.Spec.Capacity,
   790  		},
   791  	}
   792  
   793  	volumeSpec := &volume.Spec{
   794  		PersistentVolume: gcepv,
   795  	}
   796  	node := &v1.Node{
   797  		ObjectMeta: metav1.ObjectMeta{
   798  			Name: string(nodeName),
   799  		},
   800  		Status: v1.NodeStatus{
   801  			VolumesAttached: []v1.AttachedVolume{
   802  				{
   803  					Name:       "fake-plugin/fake-device1",
   804  					DevicePath: "fake/path",
   805  				},
   806  			},
   807  		},
   808  	}
   809  
   810  	// Arrange
   811  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
   812  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
   813  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
   814  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
   815  	kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
   816  		Name:       "fake-plugin/fake-device1",
   817  		DevicePath: "/fake/path",
   818  	})
   819  	fakeRecorder := &record.FakeRecorder{}
   820  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
   821  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   822  		kubeClient,
   823  		volumePluginMgr,
   824  		fakeRecorder,
   825  		fakeHandler))
   826  	reconciler := NewReconciler(
   827  		kubeClient,
   828  		true, /* controllerAttachDetachEnabled */
   829  		reconcilerLoopSleepDuration,
   830  		waitForAttachTimeout,
   831  		nodeName,
   832  		dsw,
   833  		asw,
   834  		hasAddedPods,
   835  		oex,
   836  		mount.NewFakeMounter(nil),
   837  		hostutil.NewFakeHostUtil(nil),
   838  		volumePluginMgr,
   839  		kubeletPodsDir)
   840  
   841  	podName := util.GetUniquePodName(pod)
   842  	generatedVolumeName, err := dsw.AddPodToVolume(
   843  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
   844  	dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
   845  
   846  	// Assert
   847  	if err != nil {
   848  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
   849  	}
   850  
   851  	// Act
   852  	runReconciler(reconciler)
   853  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
   854  
   855  	// Assert
   856  	assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
   857  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
   858  		1 /* expectedWaitForAttachCallCount */, fakePlugin))
   859  	assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
   860  		1 /* expectedGetMapDeviceCallCount */, fakePlugin))
   861  	assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
   862  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   863  }
   864  
   865  // Populates desiredStateOfWorld cache with one volume/pod.
   866  // Calls Run()
   867  // Verifies there is one attach call, two get map path calls,
   868  // setupDevice call and no detach calls.
   869  // Deletes volume/pod from desired state of world.
   870  // Verifies one detach/teardownDevice calls are issued.
   871  func Test_Run_Positive_BlockVolumeAttachMapUnmapDetach(t *testing.T) {
   872  	pod := &v1.Pod{
   873  		ObjectMeta: metav1.ObjectMeta{
   874  			Name:      "pod1",
   875  			UID:       "pod1uid",
   876  			Namespace: "ns",
   877  		},
   878  		Spec: v1.PodSpec{},
   879  	}
   880  
   881  	mode := v1.PersistentVolumeBlock
   882  	gcepv := &v1.PersistentVolume{
   883  		ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
   884  		Spec: v1.PersistentVolumeSpec{
   885  			Capacity:               v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
   886  			PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
   887  			AccessModes: []v1.PersistentVolumeAccessMode{
   888  				v1.ReadWriteOnce,
   889  				v1.ReadOnlyMany,
   890  			},
   891  			VolumeMode: &mode,
   892  			ClaimRef:   &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
   893  		},
   894  	}
   895  	gcepvc := &v1.PersistentVolumeClaim{
   896  		ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
   897  		Spec: v1.PersistentVolumeClaimSpec{
   898  			VolumeName: "volume-name",
   899  			VolumeMode: &mode,
   900  		},
   901  		Status: v1.PersistentVolumeClaimStatus{
   902  			Phase:    v1.ClaimBound,
   903  			Capacity: gcepv.Spec.Capacity,
   904  		},
   905  	}
   906  
   907  	volumeSpec := &volume.Spec{
   908  		PersistentVolume: gcepv,
   909  	}
   910  
   911  	// Arrange
   912  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgr(t)
   913  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
   914  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
   915  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
   916  	kubeClient := createtestClientWithPVPVC(gcepv, gcepvc)
   917  	fakeRecorder := &record.FakeRecorder{}
   918  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
   919  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
   920  		kubeClient,
   921  		volumePluginMgr,
   922  		fakeRecorder,
   923  		fakeHandler))
   924  	reconciler := NewReconciler(
   925  		kubeClient,
   926  		false, /* controllerAttachDetachEnabled */
   927  		reconcilerLoopSleepDuration,
   928  		waitForAttachTimeout,
   929  		nodeName,
   930  		dsw,
   931  		asw,
   932  		hasAddedPods,
   933  		oex,
   934  		mount.NewFakeMounter(nil),
   935  		hostutil.NewFakeHostUtil(nil),
   936  		volumePluginMgr,
   937  		kubeletPodsDir)
   938  
   939  	podName := util.GetUniquePodName(pod)
   940  	generatedVolumeName, err := dsw.AddPodToVolume(
   941  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
   942  
   943  	// Assert
   944  	if err != nil {
   945  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
   946  	}
   947  
   948  	// Act
   949  	runReconciler(reconciler)
   950  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
   951  	// Assert
   952  	assert.NoError(t, volumetesting.VerifyAttachCallCount(
   953  		1 /* expectedAttachCallCount */, fakePlugin))
   954  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
   955  		1 /* expectedWaitForAttachCallCount */, fakePlugin))
   956  	assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
   957  		1 /* expectedGetMapDeviceCallCount */, fakePlugin))
   958  	assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
   959  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
   960  
   961  	// Act
   962  	dsw.DeletePodFromVolume(podName, generatedVolumeName)
   963  	waitForDetach(t, generatedVolumeName, asw)
   964  
   965  	// Assert
   966  	assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
   967  		1 /* expectedTearDownDeviceCallCount */, fakePlugin))
   968  	assert.NoError(t, volumetesting.VerifyDetachCallCount(
   969  		1 /* expectedDetachCallCount */, fakePlugin))
   970  }
   971  
   972  // Populates desiredStateOfWorld cache with one volume/pod.
   973  // Enables controllerAttachDetachEnabled.
   974  // Calls Run()
   975  // Verifies two map path calls are made and no teardownDevice/detach calls.
   976  // Deletes volume/pod from desired state of world.
   977  // Verifies one teardownDevice call is made.
   978  // Verifies there are no attach/detach calls made.
   979  func Test_Run_Positive_VolumeUnmapControllerAttachEnabled(t *testing.T) {
   980  	pod := &v1.Pod{
   981  		ObjectMeta: metav1.ObjectMeta{
   982  			Name:      "pod1",
   983  			UID:       "pod1uid",
   984  			Namespace: "ns",
   985  		},
   986  		Spec: v1.PodSpec{},
   987  	}
   988  
   989  	mode := v1.PersistentVolumeBlock
   990  	gcepv := &v1.PersistentVolume{
   991  		ObjectMeta: metav1.ObjectMeta{UID: "001", Name: "volume-name"},
   992  		Spec: v1.PersistentVolumeSpec{
   993  			Capacity:               v1.ResourceList{v1.ResourceName(v1.ResourceStorage): resource.MustParse("10G")},
   994  			PersistentVolumeSource: v1.PersistentVolumeSource{GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{PDName: "fake-device1"}},
   995  			AccessModes: []v1.PersistentVolumeAccessMode{
   996  				v1.ReadWriteOnce,
   997  				v1.ReadOnlyMany,
   998  			},
   999  			VolumeMode: &mode,
  1000  			ClaimRef:   &v1.ObjectReference{Namespace: "ns", Name: "pvc-volume-name"},
  1001  		},
  1002  	}
  1003  	gcepvc := &v1.PersistentVolumeClaim{
  1004  		ObjectMeta: metav1.ObjectMeta{UID: "pvc-001", Name: "pvc-volume-name", Namespace: "ns"},
  1005  		Spec: v1.PersistentVolumeClaimSpec{
  1006  			VolumeName: "volume-name",
  1007  			VolumeMode: &mode,
  1008  		},
  1009  		Status: v1.PersistentVolumeClaimStatus{
  1010  			Phase:    v1.ClaimBound,
  1011  			Capacity: gcepv.Spec.Capacity,
  1012  		},
  1013  	}
  1014  
  1015  	volumeSpec := &volume.Spec{
  1016  		PersistentVolume: gcepv,
  1017  	}
  1018  
  1019  	node := &v1.Node{
  1020  		ObjectMeta: metav1.ObjectMeta{
  1021  			Name: string(nodeName),
  1022  		},
  1023  		Status: v1.NodeStatus{
  1024  			VolumesAttached: []v1.AttachedVolume{
  1025  				{
  1026  					Name:       "fake-plugin/fake-device1",
  1027  					DevicePath: "/fake/path",
  1028  				},
  1029  			},
  1030  		},
  1031  	}
  1032  
  1033  	// Arrange
  1034  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
  1035  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
  1036  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
  1037  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1038  	kubeClient := createtestClientWithPVPVC(gcepv, gcepvc, v1.AttachedVolume{
  1039  		Name:       "fake-plugin/fake-device1",
  1040  		DevicePath: "/fake/path",
  1041  	})
  1042  	fakeRecorder := &record.FakeRecorder{}
  1043  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
  1044  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1045  		kubeClient,
  1046  		volumePluginMgr,
  1047  		fakeRecorder,
  1048  		fakeHandler))
  1049  	reconciler := NewReconciler(
  1050  		kubeClient,
  1051  		true, /* controllerAttachDetachEnabled */
  1052  		reconcilerLoopSleepDuration,
  1053  		waitForAttachTimeout,
  1054  		nodeName,
  1055  		dsw,
  1056  		asw,
  1057  		hasAddedPods,
  1058  		oex,
  1059  		mount.NewFakeMounter(nil),
  1060  		hostutil.NewFakeHostUtil(nil),
  1061  		volumePluginMgr,
  1062  		kubeletPodsDir)
  1063  
  1064  	podName := util.GetUniquePodName(pod)
  1065  	generatedVolumeName, err := dsw.AddPodToVolume(
  1066  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
  1067  
  1068  	// Assert
  1069  	if err != nil {
  1070  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  1071  	}
  1072  
  1073  	// Act
  1074  	runReconciler(reconciler)
  1075  
  1076  	dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  1077  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
  1078  
  1079  	// Assert
  1080  	assert.NoError(t, volumetesting.VerifyZeroAttachCalls(fakePlugin))
  1081  	assert.NoError(t, volumetesting.VerifyWaitForAttachCallCount(
  1082  		1 /* expectedWaitForAttachCallCount */, fakePlugin))
  1083  	assert.NoError(t, volumetesting.VerifyGetMapPodDeviceCallCount(
  1084  		1 /* expectedGetMapDeviceCallCount */, fakePlugin))
  1085  	assert.NoError(t, volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin))
  1086  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  1087  
  1088  	// Act
  1089  	dsw.DeletePodFromVolume(podName, generatedVolumeName)
  1090  	waitForDetach(t, generatedVolumeName, asw)
  1091  
  1092  	// Assert
  1093  	assert.NoError(t, volumetesting.VerifyTearDownDeviceCallCount(
  1094  		1 /* expectedTearDownDeviceCallCount */, fakePlugin))
  1095  	assert.NoError(t, volumetesting.VerifyZeroDetachCallCount(fakePlugin))
  1096  }
  1097  
  1098  func Test_GenerateMapVolumeFunc_Plugin_Not_Found(t *testing.T) {
  1099  	testCases := map[string]struct {
  1100  		volumePlugins  []volume.VolumePlugin
  1101  		expectErr      bool
  1102  		expectedErrMsg string
  1103  	}{
  1104  		"volumePlugin is nil": {
  1105  			volumePlugins:  []volume.VolumePlugin{},
  1106  			expectErr:      true,
  1107  			expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed",
  1108  		},
  1109  		"blockVolumePlugin is nil": {
  1110  			volumePlugins:  volumetesting.NewFakeFileVolumePlugin(),
  1111  			expectErr:      true,
  1112  			expectedErrMsg: "MapVolume.FindMapperPluginBySpec failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
  1113  		},
  1114  	}
  1115  
  1116  	for name, tc := range testCases {
  1117  		t.Run(name, func(t *testing.T) {
  1118  			volumePluginMgr := &volume.VolumePluginMgr{}
  1119  			volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
  1120  			asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1121  			oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1122  				nil, /* kubeClient */
  1123  				volumePluginMgr,
  1124  				nil, /* fakeRecorder */
  1125  				nil))
  1126  
  1127  			pod := &v1.Pod{
  1128  				ObjectMeta: metav1.ObjectMeta{
  1129  					Name: "pod1",
  1130  					UID:  "pod1uid",
  1131  				},
  1132  				Spec: v1.PodSpec{},
  1133  			}
  1134  			volumeMode := v1.PersistentVolumeBlock
  1135  			tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  1136  			volumeToMount := operationexecutor.VolumeToMount{
  1137  				Pod:        pod,
  1138  				VolumeSpec: tmpSpec}
  1139  			err := oex.MountVolume(waitForAttachTimeout, volumeToMount, asw, false)
  1140  			// Assert
  1141  			if assert.Error(t, err) {
  1142  				assert.Contains(t, err.Error(), tc.expectedErrMsg)
  1143  			}
  1144  		})
  1145  	}
  1146  }
  1147  
  1148  func Test_GenerateUnmapVolumeFunc_Plugin_Not_Found(t *testing.T) {
  1149  	testCases := map[string]struct {
  1150  		volumePlugins  []volume.VolumePlugin
  1151  		expectErr      bool
  1152  		expectedErrMsg string
  1153  	}{
  1154  		"volumePlugin is nil": {
  1155  			volumePlugins:  []volume.VolumePlugin{},
  1156  			expectErr:      true,
  1157  			expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed",
  1158  		},
  1159  		"blockVolumePlugin is nil": {
  1160  			volumePlugins:  volumetesting.NewFakeFileVolumePlugin(),
  1161  			expectErr:      true,
  1162  			expectedErrMsg: "UnmapVolume.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
  1163  		},
  1164  	}
  1165  
  1166  	for name, tc := range testCases {
  1167  		t.Run(name, func(t *testing.T) {
  1168  			volumePluginMgr := &volume.VolumePluginMgr{}
  1169  			volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
  1170  			asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1171  			oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1172  				nil, /* kubeClient */
  1173  				volumePluginMgr,
  1174  				nil, /* fakeRecorder */
  1175  				nil))
  1176  			volumeMode := v1.PersistentVolumeBlock
  1177  			tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  1178  			volumeToUnmount := operationexecutor.MountedVolume{
  1179  				PluginName: "fake-file-plugin",
  1180  				VolumeSpec: tmpSpec}
  1181  			err := oex.UnmountVolume(volumeToUnmount, asw, "" /* podsDir */)
  1182  			// Assert
  1183  			if assert.Error(t, err) {
  1184  				assert.Contains(t, err.Error(), tc.expectedErrMsg)
  1185  			}
  1186  		})
  1187  	}
  1188  }
  1189  
  1190  func Test_GenerateUnmapDeviceFunc_Plugin_Not_Found(t *testing.T) {
  1191  	testCases := map[string]struct {
  1192  		volumePlugins  []volume.VolumePlugin
  1193  		expectErr      bool
  1194  		expectedErrMsg string
  1195  	}{
  1196  		"volumePlugin is nil": {
  1197  			volumePlugins:  []volume.VolumePlugin{},
  1198  			expectErr:      true,
  1199  			expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed",
  1200  		},
  1201  		"blockVolumePlugin is nil": {
  1202  			volumePlugins:  volumetesting.NewFakeFileVolumePlugin(),
  1203  			expectErr:      true,
  1204  			expectedErrMsg: "UnmapDevice.FindMapperPluginByName failed to find BlockVolumeMapper plugin. Volume plugin is nil.",
  1205  		},
  1206  	}
  1207  
  1208  	for name, tc := range testCases {
  1209  		t.Run(name, func(t *testing.T) {
  1210  			volumePluginMgr := &volume.VolumePluginMgr{}
  1211  			volumePluginMgr.InitPlugins(tc.volumePlugins, nil, nil)
  1212  			asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1213  			oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1214  				nil, /* kubeClient */
  1215  				volumePluginMgr,
  1216  				nil, /* fakeRecorder */
  1217  				nil))
  1218  			var hostutil hostutil.HostUtils
  1219  			volumeMode := v1.PersistentVolumeBlock
  1220  			tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
  1221  			deviceToDetach := operationexecutor.AttachedVolume{VolumeSpec: tmpSpec, PluginName: "fake-file-plugin"}
  1222  			err := oex.UnmountDevice(deviceToDetach, asw, hostutil)
  1223  			// Assert
  1224  			if assert.Error(t, err) {
  1225  				assert.Contains(t, err.Error(), tc.expectedErrMsg)
  1226  			}
  1227  		})
  1228  	}
  1229  }
  1230  
  1231  // Populates desiredStateOfWorld cache with one volume/pod.
  1232  // Enables controllerAttachDetachEnabled.
  1233  // Calls Run()
  1234  // Wait for volume mounted.
  1235  // Mark volume as fsResizeRequired in ASW.
  1236  // Verifies volume's fsResizeRequired flag is cleared later.
  1237  func Test_Run_Positive_VolumeFSResizeControllerAttachEnabled(t *testing.T) {
  1238  	blockMode := v1.PersistentVolumeBlock
  1239  	fsMode := v1.PersistentVolumeFilesystem
  1240  
  1241  	var tests = []struct {
  1242  		name            string
  1243  		volumeMode      *v1.PersistentVolumeMode
  1244  		expansionFailed bool
  1245  		uncertainTest   bool
  1246  		pvName          string
  1247  		pvcSize         resource.Quantity
  1248  		pvcStatusSize   resource.Quantity
  1249  		oldPVSize       resource.Quantity
  1250  		newPVSize       resource.Quantity
  1251  	}{
  1252  		{
  1253  			name:          "expand-fs-volume",
  1254  			volumeMode:    &fsMode,
  1255  			pvName:        "pv",
  1256  			pvcSize:       resource.MustParse("10G"),
  1257  			pvcStatusSize: resource.MustParse("10G"),
  1258  			newPVSize:     resource.MustParse("15G"),
  1259  			oldPVSize:     resource.MustParse("10G"),
  1260  		},
  1261  		{
  1262  			name:          "expand-raw-block",
  1263  			volumeMode:    &blockMode,
  1264  			pvName:        "pv",
  1265  			pvcSize:       resource.MustParse("10G"),
  1266  			pvcStatusSize: resource.MustParse("10G"),
  1267  			newPVSize:     resource.MustParse("15G"),
  1268  			oldPVSize:     resource.MustParse("10G"),
  1269  		},
  1270  		{
  1271  			name:            "expand-fs-volume with in-use error",
  1272  			volumeMode:      &fsMode,
  1273  			expansionFailed: true,
  1274  			pvName:          volumetesting.FailWithInUseVolumeName,
  1275  			pvcSize:         resource.MustParse("10G"),
  1276  			pvcStatusSize:   resource.MustParse("10G"),
  1277  			newPVSize:       resource.MustParse("15G"),
  1278  			oldPVSize:       resource.MustParse("13G"),
  1279  		},
  1280  		{
  1281  			name:            "expand-fs-volume with unsupported error",
  1282  			volumeMode:      &fsMode,
  1283  			expansionFailed: false,
  1284  			pvName:          volumetesting.FailWithUnSupportedVolumeName,
  1285  			pvcSize:         resource.MustParse("10G"),
  1286  			pvcStatusSize:   resource.MustParse("10G"),
  1287  			newPVSize:       resource.MustParse("15G"),
  1288  			oldPVSize:       resource.MustParse("13G"),
  1289  		},
  1290  	}
  1291  
  1292  	for _, tc := range tests {
  1293  		t.Run(tc.name, func(t *testing.T) {
  1294  			pv := getTestPV(tc.pvName, tc.volumeMode, tc.oldPVSize)
  1295  			pvc := getTestPVC("pv", tc.volumeMode, tc.pvcSize, tc.pvcStatusSize)
  1296  			pod := getTestPod(pvc.Name)
  1297  
  1298  			// deep copy before reconciler runs to avoid data race.
  1299  			pvWithSize := pv.DeepCopy()
  1300  			node := &v1.Node{
  1301  				ObjectMeta: metav1.ObjectMeta{
  1302  					Name: string(nodeName),
  1303  				},
  1304  				Spec: v1.NodeSpec{},
  1305  				Status: v1.NodeStatus{
  1306  					VolumesAttached: []v1.AttachedVolume{
  1307  						{
  1308  							Name:       v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
  1309  							DevicePath: "fake/path",
  1310  						},
  1311  					},
  1312  				},
  1313  			}
  1314  			volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
  1315  			seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
  1316  			dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
  1317  			asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1318  			kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
  1319  				Name:       v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.pvName)),
  1320  				DevicePath: "fake/path",
  1321  			})
  1322  			fakeRecorder := &record.FakeRecorder{}
  1323  			fakeHandler := volumetesting.NewBlockVolumePathHandler()
  1324  			oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1325  				kubeClient,
  1326  				volumePluginMgr,
  1327  				fakeRecorder,
  1328  				fakeHandler))
  1329  
  1330  			reconciler := NewReconciler(
  1331  				kubeClient,
  1332  				true, /* controllerAttachDetachEnabled */
  1333  				reconcilerLoopSleepDuration,
  1334  				waitForAttachTimeout,
  1335  				nodeName,
  1336  				dsw,
  1337  				asw,
  1338  				hasAddedPods,
  1339  				oex,
  1340  				mount.NewFakeMounter(nil),
  1341  				hostutil.NewFakeHostUtil(nil),
  1342  				volumePluginMgr,
  1343  				kubeletPodsDir)
  1344  
  1345  			volumeSpec := &volume.Spec{PersistentVolume: pv}
  1346  			podName := util.GetUniquePodName(pod)
  1347  			volumeName, err := dsw.AddPodToVolume(
  1348  				podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
  1349  			// Assert
  1350  			if err != nil {
  1351  				t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  1352  			}
  1353  			dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
  1354  
  1355  			// Start the reconciler to fill ASW.
  1356  			stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  1357  			go func() {
  1358  				defer close(stoppedChan)
  1359  				reconciler.Run(stopChan)
  1360  			}()
  1361  			waitForMount(t, fakePlugin, volumeName, asw)
  1362  			// Stop the reconciler.
  1363  			close(stopChan)
  1364  			<-stoppedChan
  1365  
  1366  			// Simulate what DSOWP does
  1367  			pvWithSize.Spec.Capacity[v1.ResourceStorage] = tc.newPVSize
  1368  			volumeSpec = &volume.Spec{PersistentVolume: pvWithSize}
  1369  			dsw.AddPodToVolume(podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxContexts */)
  1370  
  1371  			t.Logf("Changing size of the volume to %s", tc.newPVSize.String())
  1372  			newSize := tc.newPVSize.DeepCopy()
  1373  			dsw.UpdatePersistentVolumeSize(volumeName, &newSize)
  1374  
  1375  			_, _, podExistErr := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxLabel */)
  1376  			if tc.expansionFailed {
  1377  				if cache.IsFSResizeRequiredError(podExistErr) {
  1378  					t.Fatalf("volume %s should not throw fsResizeRequired error: %v", volumeName, podExistErr)
  1379  				}
  1380  			} else {
  1381  				if !cache.IsFSResizeRequiredError(podExistErr) {
  1382  					t.Fatalf("Volume should be marked as fsResizeRequired, but receive unexpected error: %v", podExistErr)
  1383  				}
  1384  				go reconciler.Run(wait.NeverStop)
  1385  
  1386  				waitErr := retryWithExponentialBackOff(testOperationBackOffDuration, func() (done bool, err error) {
  1387  					mounted, _, err := asw.PodExistsInVolume(podName, volumeName, newSize, "" /* SELinuxContext */)
  1388  					return mounted && err == nil, nil
  1389  				})
  1390  				if waitErr != nil {
  1391  					t.Fatalf("Volume resize should succeeded %v", waitErr)
  1392  				}
  1393  			}
  1394  
  1395  		})
  1396  	}
  1397  }
  1398  
  1399  func getTestPVC(pvName string, volumeMode *v1.PersistentVolumeMode, specSize, statusSize resource.Quantity) *v1.PersistentVolumeClaim {
  1400  	pvc := &v1.PersistentVolumeClaim{
  1401  		ObjectMeta: metav1.ObjectMeta{
  1402  			Name: "pvc",
  1403  			UID:  "pvcuid",
  1404  		},
  1405  		Spec: v1.PersistentVolumeClaimSpec{
  1406  			Resources: v1.VolumeResourceRequirements{
  1407  				Requests: v1.ResourceList{
  1408  					v1.ResourceStorage: specSize,
  1409  				},
  1410  			},
  1411  			VolumeName: pvName,
  1412  			VolumeMode: volumeMode,
  1413  		},
  1414  		Status: v1.PersistentVolumeClaimStatus{
  1415  			Capacity: v1.ResourceList{
  1416  				v1.ResourceStorage: statusSize,
  1417  			},
  1418  		},
  1419  	}
  1420  	return pvc
  1421  }
  1422  
  1423  func getTestPV(pvName string, volumeMode *v1.PersistentVolumeMode, pvSize resource.Quantity) *v1.PersistentVolume {
  1424  	pv := &v1.PersistentVolume{
  1425  		ObjectMeta: metav1.ObjectMeta{
  1426  			Name: pvName,
  1427  			UID:  "pvuid",
  1428  		},
  1429  		Spec: v1.PersistentVolumeSpec{
  1430  			ClaimRef:   &v1.ObjectReference{Name: "pvc"},
  1431  			VolumeMode: volumeMode,
  1432  			Capacity: v1.ResourceList{
  1433  				v1.ResourceStorage: pvSize,
  1434  			},
  1435  		},
  1436  	}
  1437  	return pv
  1438  }
  1439  
  1440  func getTestPod(claimName string) *v1.Pod {
  1441  	pod := &v1.Pod{
  1442  		ObjectMeta: metav1.ObjectMeta{
  1443  			Name: "pod1",
  1444  			UID:  "pod1uid",
  1445  		},
  1446  		Spec: v1.PodSpec{
  1447  			Volumes: []v1.Volume{
  1448  				{
  1449  					Name: "volume-name",
  1450  					VolumeSource: v1.VolumeSource{
  1451  						PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  1452  							ClaimName: claimName,
  1453  						},
  1454  					},
  1455  				},
  1456  			},
  1457  		},
  1458  	}
  1459  	return pod
  1460  }
  1461  
  1462  func Test_UncertainDeviceGlobalMounts(t *testing.T) {
  1463  	var tests = []struct {
  1464  		name                   string
  1465  		deviceState            operationexecutor.DeviceMountState
  1466  		unmountDeviceCallCount int
  1467  		volumeName             string
  1468  		supportRemount         bool
  1469  	}{
  1470  		{
  1471  			name:                   "timed out operations should result in device marked as uncertain",
  1472  			deviceState:            operationexecutor.DeviceMountUncertain,
  1473  			unmountDeviceCallCount: 1,
  1474  			volumeName:             volumetesting.TimeoutOnMountDeviceVolumeName,
  1475  		},
  1476  		{
  1477  			name:                   "failed operation should result in not-mounted device",
  1478  			deviceState:            operationexecutor.DeviceNotMounted,
  1479  			unmountDeviceCallCount: 0,
  1480  			volumeName:             volumetesting.FailMountDeviceVolumeName,
  1481  		},
  1482  		{
  1483  			name:                   "timeout followed by failed operation should result in non-mounted device",
  1484  			deviceState:            operationexecutor.DeviceNotMounted,
  1485  			unmountDeviceCallCount: 0,
  1486  			volumeName:             volumetesting.TimeoutAndFailOnMountDeviceVolumeName,
  1487  		},
  1488  		{
  1489  			name:                   "success followed by timeout operation should result in mounted device",
  1490  			deviceState:            operationexecutor.DeviceGloballyMounted,
  1491  			unmountDeviceCallCount: 1,
  1492  			volumeName:             volumetesting.SuccessAndTimeoutDeviceName,
  1493  			supportRemount:         true,
  1494  		},
  1495  		{
  1496  			name:                   "success followed by failed operation should result in mounted device",
  1497  			deviceState:            operationexecutor.DeviceGloballyMounted,
  1498  			unmountDeviceCallCount: 1,
  1499  			volumeName:             volumetesting.SuccessAndFailOnMountDeviceName,
  1500  			supportRemount:         true,
  1501  		},
  1502  	}
  1503  
  1504  	modes := []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem}
  1505  
  1506  	for modeIndex := range modes {
  1507  		for tcIndex := range tests {
  1508  			mode := modes[modeIndex]
  1509  			tc := tests[tcIndex]
  1510  			testName := fmt.Sprintf("%s [%s]", tc.name, mode)
  1511  			uniqueTestString := fmt.Sprintf("global-mount-%s", testName)
  1512  			uniquePodDir := fmt.Sprintf("%s-%x", kubeletPodsDir, md5.Sum([]byte(uniqueTestString)))
  1513  			t.Run(testName+"[", func(t *testing.T) {
  1514  				t.Parallel()
  1515  				pv := &v1.PersistentVolume{
  1516  					ObjectMeta: metav1.ObjectMeta{
  1517  						Name: tc.volumeName,
  1518  						UID:  "pvuid",
  1519  					},
  1520  					Spec: v1.PersistentVolumeSpec{
  1521  						ClaimRef:   &v1.ObjectReference{Name: "pvc"},
  1522  						VolumeMode: &mode,
  1523  					},
  1524  				}
  1525  				pvc := &v1.PersistentVolumeClaim{
  1526  					ObjectMeta: metav1.ObjectMeta{
  1527  						Name: "pvc",
  1528  						UID:  "pvcuid",
  1529  					},
  1530  					Spec: v1.PersistentVolumeClaimSpec{
  1531  						VolumeName: tc.volumeName,
  1532  						VolumeMode: &mode,
  1533  					},
  1534  				}
  1535  				pod := &v1.Pod{
  1536  					ObjectMeta: metav1.ObjectMeta{
  1537  						Name: "pod1",
  1538  						UID:  "pod1uid",
  1539  					},
  1540  					Spec: v1.PodSpec{
  1541  						Volumes: []v1.Volume{
  1542  							{
  1543  								Name: "volume-name",
  1544  								VolumeSource: v1.VolumeSource{
  1545  									PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  1546  										ClaimName: pvc.Name,
  1547  									},
  1548  								},
  1549  							},
  1550  						},
  1551  					},
  1552  				}
  1553  
  1554  				node := &v1.Node{
  1555  					ObjectMeta: metav1.ObjectMeta{
  1556  						Name: string(nodeName),
  1557  					},
  1558  					Spec: v1.NodeSpec{},
  1559  					Status: v1.NodeStatus{
  1560  						VolumesAttached: []v1.AttachedVolume{
  1561  							{
  1562  								Name:       v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
  1563  								DevicePath: "fake/path",
  1564  							},
  1565  						},
  1566  					},
  1567  				}
  1568  				volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
  1569  				fakePlugin.SupportsRemount = tc.supportRemount
  1570  				seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
  1571  
  1572  				dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
  1573  				asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1574  				kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
  1575  					Name:       v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
  1576  					DevicePath: "fake/path",
  1577  				})
  1578  				fakeRecorder := &record.FakeRecorder{}
  1579  				fakeHandler := volumetesting.NewBlockVolumePathHandler()
  1580  				oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1581  					kubeClient,
  1582  					volumePluginMgr,
  1583  					fakeRecorder,
  1584  					fakeHandler))
  1585  
  1586  				reconciler := NewReconciler(
  1587  					kubeClient,
  1588  					true, /* controllerAttachDetachEnabled */
  1589  					reconcilerLoopSleepDuration,
  1590  					waitForAttachTimeout,
  1591  					nodeName,
  1592  					dsw,
  1593  					asw,
  1594  					hasAddedPods,
  1595  					oex,
  1596  					&mount.FakeMounter{},
  1597  					hostutil.NewFakeHostUtil(nil),
  1598  					volumePluginMgr,
  1599  					uniquePodDir)
  1600  				volumeSpec := &volume.Spec{PersistentVolume: pv}
  1601  				podName := util.GetUniquePodName(pod)
  1602  				volumeName, err := dsw.AddPodToVolume(
  1603  					podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
  1604  				// Assert
  1605  				if err != nil {
  1606  					t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  1607  				}
  1608  				dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
  1609  
  1610  				// Start the reconciler to fill ASW.
  1611  				stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  1612  				go func() {
  1613  					reconciler.Run(stopChan)
  1614  					close(stoppedChan)
  1615  				}()
  1616  				waitForVolumeToExistInASW(t, volumeName, asw)
  1617  				if tc.volumeName == volumetesting.TimeoutAndFailOnMountDeviceVolumeName {
  1618  					// Wait upto 10s for reconciler to catch up
  1619  					time.Sleep(reconcilerSyncWaitDuration)
  1620  				}
  1621  
  1622  				if tc.volumeName == volumetesting.SuccessAndFailOnMountDeviceName ||
  1623  					tc.volumeName == volumetesting.SuccessAndTimeoutDeviceName {
  1624  					// wait for mount and then break it via remount
  1625  					waitForMount(t, fakePlugin, volumeName, asw)
  1626  					asw.MarkRemountRequired(podName)
  1627  					time.Sleep(reconcilerSyncWaitDuration)
  1628  				}
  1629  
  1630  				if tc.deviceState == operationexecutor.DeviceMountUncertain {
  1631  					waitForUncertainGlobalMount(t, volumeName, asw)
  1632  				}
  1633  
  1634  				if tc.deviceState == operationexecutor.DeviceGloballyMounted {
  1635  					waitForMount(t, fakePlugin, volumeName, asw)
  1636  				}
  1637  
  1638  				dsw.DeletePodFromVolume(podName, volumeName)
  1639  				waitForDetach(t, volumeName, asw)
  1640  				if mode == v1.PersistentVolumeFilesystem {
  1641  					err = volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
  1642  				} else {
  1643  					if tc.unmountDeviceCallCount == 0 {
  1644  						err = volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin)
  1645  					} else {
  1646  						err = volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin)
  1647  					}
  1648  				}
  1649  				if err != nil {
  1650  					t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
  1651  				}
  1652  			})
  1653  		}
  1654  	}
  1655  }
  1656  
  1657  func Test_UncertainVolumeMountState(t *testing.T) {
  1658  	var tests = []struct {
  1659  		name                   string
  1660  		volumeState            operationexecutor.VolumeMountState
  1661  		unmountDeviceCallCount int
  1662  		unmountVolumeCount     int
  1663  		volumeName             string
  1664  		supportRemount         bool
  1665  		pvcStatusSize          resource.Quantity
  1666  		pvSize                 resource.Quantity
  1667  	}{
  1668  		{
  1669  			name:                   "timed out operations should result in volume marked as uncertain",
  1670  			volumeState:            operationexecutor.VolumeMountUncertain,
  1671  			unmountDeviceCallCount: 1,
  1672  			unmountVolumeCount:     1,
  1673  			volumeName:             volumetesting.TimeoutOnSetupVolumeName,
  1674  		},
  1675  		{
  1676  			name:                   "failed operation should result in not-mounted volume",
  1677  			volumeState:            operationexecutor.VolumeNotMounted,
  1678  			unmountDeviceCallCount: 1,
  1679  			unmountVolumeCount:     0,
  1680  			volumeName:             volumetesting.FailOnSetupVolumeName,
  1681  		},
  1682  		{
  1683  			name:                   "timeout followed by failed operation should result in non-mounted volume",
  1684  			volumeState:            operationexecutor.VolumeNotMounted,
  1685  			unmountDeviceCallCount: 1,
  1686  			unmountVolumeCount:     0,
  1687  			volumeName:             volumetesting.TimeoutAndFailOnSetupVolumeName,
  1688  		},
  1689  		{
  1690  			name:                   "success followed by timeout operation should result in mounted volume",
  1691  			volumeState:            operationexecutor.VolumeMounted,
  1692  			unmountDeviceCallCount: 1,
  1693  			unmountVolumeCount:     1,
  1694  			volumeName:             volumetesting.SuccessAndTimeoutSetupVolumeName,
  1695  			supportRemount:         true,
  1696  		},
  1697  		{
  1698  			name:                   "success followed by failed operation should result in mounted volume",
  1699  			volumeState:            operationexecutor.VolumeMounted,
  1700  			unmountDeviceCallCount: 1,
  1701  			unmountVolumeCount:     1,
  1702  			volumeName:             volumetesting.SuccessAndFailOnSetupVolumeName,
  1703  			supportRemount:         true,
  1704  		},
  1705  		{
  1706  			name:                   "mount success but fail to expand filesystem",
  1707  			volumeState:            operationexecutor.VolumeMountUncertain,
  1708  			unmountDeviceCallCount: 1,
  1709  			unmountVolumeCount:     1,
  1710  			volumeName:             volumetesting.FailVolumeExpansion,
  1711  			supportRemount:         true,
  1712  			pvSize:                 resource.MustParse("10G"),
  1713  			pvcStatusSize:          resource.MustParse("2G"),
  1714  		},
  1715  	}
  1716  	modes := []v1.PersistentVolumeMode{v1.PersistentVolumeBlock, v1.PersistentVolumeFilesystem}
  1717  
  1718  	for modeIndex := range modes {
  1719  		for tcIndex := range tests {
  1720  			mode := modes[modeIndex]
  1721  			tc := tests[tcIndex]
  1722  			testName := fmt.Sprintf("%s [%s]", tc.name, mode)
  1723  			uniqueTestString := fmt.Sprintf("local-mount-%s", testName)
  1724  			uniquePodDir := fmt.Sprintf("%s-%x", kubeletPodsDir, md5.Sum([]byte(uniqueTestString)))
  1725  			t.Run(testName, func(t *testing.T) {
  1726  				t.Parallel()
  1727  				pv := &v1.PersistentVolume{
  1728  					ObjectMeta: metav1.ObjectMeta{
  1729  						Name: tc.volumeName,
  1730  						UID:  "pvuid",
  1731  					},
  1732  					Spec: v1.PersistentVolumeSpec{
  1733  						ClaimRef:   &v1.ObjectReference{Name: "pvc"},
  1734  						VolumeMode: &mode,
  1735  					},
  1736  				}
  1737  				if tc.pvSize.CmpInt64(0) > 0 {
  1738  					pv.Spec.Capacity = v1.ResourceList{
  1739  						v1.ResourceStorage: tc.pvSize,
  1740  					}
  1741  				}
  1742  				pvc := &v1.PersistentVolumeClaim{
  1743  					ObjectMeta: metav1.ObjectMeta{
  1744  						Name: "pvc",
  1745  						UID:  "pvcuid",
  1746  					},
  1747  					Spec: v1.PersistentVolumeClaimSpec{
  1748  						VolumeName: tc.volumeName,
  1749  						VolumeMode: &mode,
  1750  					},
  1751  				}
  1752  				if tc.pvcStatusSize.CmpInt64(0) > 0 {
  1753  					pvc.Status = v1.PersistentVolumeClaimStatus{
  1754  						Capacity: v1.ResourceList{
  1755  							v1.ResourceStorage: tc.pvcStatusSize,
  1756  						},
  1757  					}
  1758  				}
  1759  				pod := &v1.Pod{
  1760  					ObjectMeta: metav1.ObjectMeta{
  1761  						Name: "pod1",
  1762  						UID:  "pod1uid",
  1763  					},
  1764  					Spec: v1.PodSpec{
  1765  						Volumes: []v1.Volume{
  1766  							{
  1767  								Name: "volume-name",
  1768  								VolumeSource: v1.VolumeSource{
  1769  									PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
  1770  										ClaimName: pvc.Name,
  1771  									},
  1772  								},
  1773  							},
  1774  						},
  1775  					},
  1776  				}
  1777  
  1778  				node := &v1.Node{
  1779  					ObjectMeta: metav1.ObjectMeta{
  1780  						Name: string(nodeName),
  1781  					},
  1782  					Status: v1.NodeStatus{
  1783  						VolumesAttached: []v1.AttachedVolume{
  1784  							{
  1785  								Name:       v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
  1786  								DevicePath: "fake/path",
  1787  							},
  1788  						},
  1789  					},
  1790  				}
  1791  
  1792  				volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
  1793  				fakePlugin.SupportsRemount = tc.supportRemount
  1794  				seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
  1795  				dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
  1796  				asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  1797  				kubeClient := createtestClientWithPVPVC(pv, pvc, v1.AttachedVolume{
  1798  					Name:       v1.UniqueVolumeName(fmt.Sprintf("fake-plugin/%s", tc.volumeName)),
  1799  					DevicePath: "fake/path",
  1800  				})
  1801  				fakeRecorder := &record.FakeRecorder{}
  1802  				fakeHandler := volumetesting.NewBlockVolumePathHandler()
  1803  				oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  1804  					kubeClient,
  1805  					volumePluginMgr,
  1806  					fakeRecorder,
  1807  					fakeHandler))
  1808  
  1809  				reconciler := NewReconciler(
  1810  					kubeClient,
  1811  					true, /* controllerAttachDetachEnabled */
  1812  					reconcilerLoopSleepDuration,
  1813  					waitForAttachTimeout,
  1814  					nodeName,
  1815  					dsw,
  1816  					asw,
  1817  					hasAddedPods,
  1818  					oex,
  1819  					&mount.FakeMounter{},
  1820  					hostutil.NewFakeHostUtil(nil),
  1821  					volumePluginMgr,
  1822  					uniquePodDir)
  1823  				volumeSpec := &volume.Spec{PersistentVolume: pv}
  1824  				podName := util.GetUniquePodName(pod)
  1825  				volumeName, err := dsw.AddPodToVolume(
  1826  					podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
  1827  				// Assert
  1828  				if err != nil {
  1829  					t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  1830  				}
  1831  				dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{volumeName})
  1832  
  1833  				// Start the reconciler to fill ASW.
  1834  				stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  1835  				go func() {
  1836  					reconciler.Run(stopChan)
  1837  					close(stoppedChan)
  1838  				}()
  1839  				waitForVolumeToExistInASW(t, volumeName, asw)
  1840  				// all of these tests rely on device to be globally mounted and hence waiting for global
  1841  				// mount ensures that unmountDevice is called as expected.
  1842  				waitForGlobalMount(t, volumeName, asw)
  1843  				if tc.volumeName == volumetesting.TimeoutAndFailOnSetupVolumeName {
  1844  					// Wait upto 10s for reconciler to catchup
  1845  					time.Sleep(reconcilerSyncWaitDuration)
  1846  				}
  1847  
  1848  				if tc.volumeName == volumetesting.SuccessAndFailOnSetupVolumeName ||
  1849  					tc.volumeName == volumetesting.SuccessAndTimeoutSetupVolumeName {
  1850  					// wait for mount and then break it via remount
  1851  					waitForMount(t, fakePlugin, volumeName, asw)
  1852  					asw.MarkRemountRequired(podName)
  1853  					time.Sleep(reconcilerSyncWaitDuration)
  1854  				}
  1855  
  1856  				if tc.volumeState == operationexecutor.VolumeMountUncertain {
  1857  					waitForUncertainPodMount(t, volumeName, podName, asw)
  1858  				}
  1859  
  1860  				if tc.volumeState == operationexecutor.VolumeMounted {
  1861  					waitForMount(t, fakePlugin, volumeName, asw)
  1862  				}
  1863  
  1864  				dsw.DeletePodFromVolume(podName, volumeName)
  1865  				waitForDetach(t, volumeName, asw)
  1866  
  1867  				if mode == v1.PersistentVolumeFilesystem {
  1868  					if err := volumetesting.VerifyUnmountDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil {
  1869  						t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
  1870  					}
  1871  					if err := volumetesting.VerifyTearDownCallCount(tc.unmountVolumeCount, fakePlugin); err != nil {
  1872  						t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
  1873  					}
  1874  				} else {
  1875  					if tc.unmountVolumeCount == 0 {
  1876  						if err := volumetesting.VerifyZeroUnmapPodDeviceCallCount(fakePlugin); err != nil {
  1877  							t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
  1878  						}
  1879  					} else {
  1880  						if err := volumetesting.VerifyUnmapPodDeviceCallCount(tc.unmountVolumeCount, fakePlugin); err != nil {
  1881  							t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
  1882  						}
  1883  					}
  1884  					if tc.unmountDeviceCallCount == 0 {
  1885  						if err := volumetesting.VerifyZeroTearDownDeviceCallCount(fakePlugin); err != nil {
  1886  							t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
  1887  						}
  1888  					} else {
  1889  						if err := volumetesting.VerifyTearDownDeviceCallCount(tc.unmountDeviceCallCount, fakePlugin); err != nil {
  1890  							t.Errorf("Error verifying UnMountDeviceCallCount: %v", err)
  1891  						}
  1892  					}
  1893  				}
  1894  			})
  1895  		}
  1896  	}
  1897  }
  1898  
  1899  func waitForUncertainGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
  1900  	// check if volume is globally mounted in uncertain state
  1901  	err := retryWithExponentialBackOff(
  1902  		testOperationBackOffDuration,
  1903  		func() (bool, error) {
  1904  			unmountedVolumes := asw.GetUnmountedVolumes()
  1905  			for _, v := range unmountedVolumes {
  1906  				if v.VolumeName == volumeName && v.DeviceMountState == operationexecutor.DeviceMountUncertain {
  1907  					return true, nil
  1908  				}
  1909  			}
  1910  			return false, nil
  1911  		},
  1912  	)
  1913  
  1914  	if err != nil {
  1915  		t.Fatalf("expected volumes %s to be mounted in uncertain state globally", volumeName)
  1916  	}
  1917  }
  1918  
  1919  func waitForGlobalMount(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
  1920  	// check if volume is globally mounted
  1921  	err := retryWithExponentialBackOff(
  1922  		testOperationBackOffDuration,
  1923  		func() (bool, error) {
  1924  			mountedVolumes := asw.GetGloballyMountedVolumes()
  1925  			for _, v := range mountedVolumes {
  1926  				if v.VolumeName == volumeName {
  1927  					return true, nil
  1928  				}
  1929  			}
  1930  			return false, nil
  1931  		},
  1932  	)
  1933  
  1934  	if err != nil {
  1935  		t.Fatalf("expected volume devices %s to be mounted globally", volumeName)
  1936  	}
  1937  }
  1938  
  1939  func waitForUncertainPodMount(t *testing.T, volumeName v1.UniqueVolumeName, podName types.UniquePodName, asw cache.ActualStateOfWorld) {
  1940  	// check if volume is locally pod mounted in uncertain state
  1941  	err := retryWithExponentialBackOff(
  1942  		testOperationBackOffDuration,
  1943  		func() (bool, error) {
  1944  			mounted, _, err := asw.PodExistsInVolume(podName, volumeName, resource.Quantity{}, "" /* SELinuxContext */)
  1945  			if mounted || err != nil {
  1946  				return false, nil
  1947  			}
  1948  			allMountedVolumes := asw.GetAllMountedVolumes()
  1949  			for _, v := range allMountedVolumes {
  1950  				if v.VolumeName == volumeName {
  1951  					return true, nil
  1952  				}
  1953  			}
  1954  			return false, nil
  1955  		},
  1956  	)
  1957  
  1958  	if err != nil {
  1959  		t.Fatalf("expected volumes %s to be mounted in uncertain state for pod", volumeName)
  1960  	}
  1961  }
  1962  
  1963  func waitForMount(
  1964  	t *testing.T,
  1965  	fakePlugin *volumetesting.FakeVolumePlugin,
  1966  	volumeName v1.UniqueVolumeName,
  1967  	asw cache.ActualStateOfWorld) {
  1968  	err := retryWithExponentialBackOff(
  1969  		testOperationBackOffDuration,
  1970  		func() (bool, error) {
  1971  			mountedVolumes := asw.GetMountedVolumes()
  1972  			for _, mountedVolume := range mountedVolumes {
  1973  				if mountedVolume.VolumeName == volumeName {
  1974  					return true, nil
  1975  				}
  1976  			}
  1977  
  1978  			return false, nil
  1979  		},
  1980  	)
  1981  
  1982  	if err != nil {
  1983  		t.Fatalf("Timed out waiting for volume %q to be attached.", volumeName)
  1984  	}
  1985  }
  1986  
  1987  func waitForVolumeToExistInASW(t *testing.T, volumeName v1.UniqueVolumeName, asw cache.ActualStateOfWorld) {
  1988  	err := retryWithExponentialBackOff(
  1989  		testOperationBackOffDuration,
  1990  		func() (bool, error) {
  1991  			if asw.VolumeExists(volumeName) {
  1992  				return true, nil
  1993  			}
  1994  			return false, nil
  1995  		},
  1996  	)
  1997  	if err != nil {
  1998  		t.Fatalf("Timed out waiting for volume %q to be exist in asw.", volumeName)
  1999  	}
  2000  }
  2001  
  2002  func waitForDetach(
  2003  	t *testing.T,
  2004  	volumeName v1.UniqueVolumeName,
  2005  	asw cache.ActualStateOfWorld) {
  2006  	err := retryWithExponentialBackOff(
  2007  		testOperationBackOffDuration,
  2008  		func() (bool, error) {
  2009  			if asw.VolumeExists(volumeName) {
  2010  				return false, nil
  2011  			}
  2012  
  2013  			return true, nil
  2014  		},
  2015  	)
  2016  
  2017  	if err != nil {
  2018  		t.Fatalf("Timed out waiting for volume %q to be detached.", volumeName)
  2019  	}
  2020  }
  2021  
  2022  func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
  2023  	backoff := wait.Backoff{
  2024  		Duration: initialDuration,
  2025  		Factor:   3,
  2026  		Jitter:   0,
  2027  		Steps:    6,
  2028  	}
  2029  	return wait.ExponentialBackoff(backoff, fn)
  2030  }
  2031  
  2032  func createTestClient(attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
  2033  	fakeClient := &fake.Clientset{}
  2034  	if len(attachedVolumes) == 0 {
  2035  		attachedVolumes = append(attachedVolumes, v1.AttachedVolume{
  2036  			Name:       "fake-plugin/fake-device1",
  2037  			DevicePath: "fake/path",
  2038  		})
  2039  	}
  2040  	fakeClient.AddReactor("get", "nodes",
  2041  		func(action core.Action) (bool, runtime.Object, error) {
  2042  			return true, &v1.Node{
  2043  				ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
  2044  				Status: v1.NodeStatus{
  2045  					VolumesAttached: attachedVolumes,
  2046  				},
  2047  			}, nil
  2048  		},
  2049  	)
  2050  
  2051  	fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  2052  		return true, nil, fmt.Errorf("no reaction implemented for %s", action)
  2053  	})
  2054  	return fakeClient
  2055  }
  2056  
  2057  func runReconciler(reconciler Reconciler) {
  2058  	go reconciler.Run(wait.NeverStop)
  2059  }
  2060  
  2061  func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolumeClaim, attachedVolumes ...v1.AttachedVolume) *fake.Clientset {
  2062  	fakeClient := &fake.Clientset{}
  2063  	if len(attachedVolumes) == 0 {
  2064  		attachedVolumes = append(attachedVolumes, v1.AttachedVolume{
  2065  			Name:       "fake-plugin/pv",
  2066  			DevicePath: "fake/path",
  2067  		})
  2068  	}
  2069  	fakeClient.AddReactor("get", "nodes",
  2070  		func(action core.Action) (bool, runtime.Object, error) {
  2071  			return true, &v1.Node{
  2072  				ObjectMeta: metav1.ObjectMeta{Name: string(nodeName)},
  2073  				Status: v1.NodeStatus{
  2074  					VolumesAttached: attachedVolumes,
  2075  				},
  2076  			}, nil
  2077  		})
  2078  	fakeClient.AddReactor("get", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  2079  		return true, pvc, nil
  2080  	})
  2081  	fakeClient.AddReactor("get", "persistentvolumes", func(action core.Action) (bool, runtime.Object, error) {
  2082  		return true, pv, nil
  2083  	})
  2084  	fakeClient.AddReactor("patch", "persistentvolumeclaims", func(action core.Action) (bool, runtime.Object, error) {
  2085  		if action.GetSubresource() == "status" {
  2086  			return true, pvc, nil
  2087  		}
  2088  		return true, nil, fmt.Errorf("no reaction implemented for %s", action)
  2089  	})
  2090  	fakeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
  2091  		return true, nil, fmt.Errorf("no reaction implemented for %s", action)
  2092  	})
  2093  	return fakeClient
  2094  }
  2095  
  2096  func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
  2097  	// Arrange
  2098  	node := &v1.Node{
  2099  		ObjectMeta: metav1.ObjectMeta{
  2100  			Name: string(nodeName),
  2101  		},
  2102  		Status: v1.NodeStatus{
  2103  			VolumesAttached: []v1.AttachedVolume{
  2104  				{
  2105  					Name:       "fake-plugin/fake-device1",
  2106  					DevicePath: "/fake/path",
  2107  				},
  2108  			},
  2109  		},
  2110  	}
  2111  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
  2112  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
  2113  
  2114  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
  2115  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  2116  	kubeClient := createTestClient()
  2117  	fakeRecorder := &record.FakeRecorder{}
  2118  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
  2119  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  2120  		kubeClient,
  2121  		volumePluginMgr,
  2122  		fakeRecorder,
  2123  		fakeHandler))
  2124  	reconciler := NewReconciler(
  2125  		kubeClient,
  2126  		true, /* controllerAttachDetachEnabled */
  2127  		reconcilerLoopSleepDuration,
  2128  		waitForAttachTimeout,
  2129  		nodeName,
  2130  		dsw,
  2131  		asw,
  2132  		hasAddedPods,
  2133  		oex,
  2134  		mount.NewFakeMounter(nil),
  2135  		hostutil.NewFakeHostUtil(nil),
  2136  		volumePluginMgr,
  2137  		kubeletPodsDir)
  2138  	pod := &v1.Pod{
  2139  		ObjectMeta: metav1.ObjectMeta{
  2140  			Name: "pod1",
  2141  			UID:  "pod1uid",
  2142  		},
  2143  		Spec: v1.PodSpec{
  2144  			Volumes: []v1.Volume{
  2145  				{
  2146  					Name: "volume-name",
  2147  					VolumeSource: v1.VolumeSource{
  2148  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  2149  							PDName: "fake-device1",
  2150  						},
  2151  					},
  2152  				},
  2153  			},
  2154  		},
  2155  	}
  2156  
  2157  	// Some steps are executes out of order in callbacks, follow the numbers.
  2158  
  2159  	// 1. Add a volume to DSW and wait until it's mounted
  2160  	volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  2161  	// copy before reconciler runs to avoid data race.
  2162  	volumeSpecCopy := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  2163  	podName := util.GetUniquePodName(pod)
  2164  	generatedVolumeName, err := dsw.AddPodToVolume(
  2165  		podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
  2166  	dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
  2167  
  2168  	if err != nil {
  2169  		t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
  2170  	}
  2171  	// Start the reconciler to fill ASW.
  2172  	stopChan, stoppedChan := make(chan struct{}), make(chan struct{})
  2173  	go func() {
  2174  		reconciler.Run(stopChan)
  2175  		close(stoppedChan)
  2176  	}()
  2177  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
  2178  	// Stop the reconciler.
  2179  	close(stopChan)
  2180  	<-stoppedChan
  2181  
  2182  	finished := make(chan interface{})
  2183  	fakePlugin.Lock()
  2184  	fakePlugin.UnmountDeviceHook = func(mountPath string) error {
  2185  		// Act:
  2186  		// 3. While a volume is being unmounted, add it back to the desired state of world
  2187  		klog.InfoS("UnmountDevice called")
  2188  		var generatedVolumeNameCopy v1.UniqueVolumeName
  2189  		generatedVolumeNameCopy, err = dsw.AddPodToVolume(
  2190  			podName, pod, volumeSpecCopy, volumeSpec.Name(), "" /* volumeGidValue */, nil /* seLinuxLabel */)
  2191  		dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeNameCopy})
  2192  		return nil
  2193  	}
  2194  
  2195  	fakePlugin.WaitForAttachHook = func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) {
  2196  		// Assert
  2197  		// 4. When the volume is mounted again, expect that UnmountDevice operation did not clear devicePath
  2198  		if devicePath == "" {
  2199  			klog.ErrorS(nil, "Expected WaitForAttach called with devicePath from Node.Status")
  2200  			close(finished)
  2201  			return "", fmt.Errorf("Expected devicePath from Node.Status")
  2202  		}
  2203  		close(finished)
  2204  		return devicePath, nil
  2205  	}
  2206  	fakePlugin.Unlock()
  2207  
  2208  	// Start the reconciler again.
  2209  	go reconciler.Run(wait.NeverStop)
  2210  
  2211  	// 2. Delete the volume from DSW (and wait for callbacks)
  2212  	dsw.DeletePodFromVolume(podName, generatedVolumeName)
  2213  
  2214  	<-finished
  2215  	waitForMount(t, fakePlugin, generatedVolumeName, asw)
  2216  }
  2217  
  2218  func getFakeNode() *v1.Node {
  2219  	return &v1.Node{
  2220  		ObjectMeta: metav1.ObjectMeta{
  2221  			Name: string(nodeName),
  2222  		},
  2223  		Status: v1.NodeStatus{
  2224  			VolumesAttached: []v1.AttachedVolume{
  2225  				{
  2226  					Name:       "fake-plugin/fake-device1",
  2227  					DevicePath: "/fake/path",
  2228  				},
  2229  			},
  2230  		},
  2231  	}
  2232  }
  2233  
  2234  func getInlineFakePod(podName, podUUID, outerName, innerName string) *v1.Pod {
  2235  	pod := &v1.Pod{
  2236  		ObjectMeta: metav1.ObjectMeta{
  2237  			Name: podName,
  2238  			UID:  k8stypes.UID(podUUID),
  2239  		},
  2240  		Spec: v1.PodSpec{
  2241  			Volumes: []v1.Volume{
  2242  				{
  2243  					Name: outerName,
  2244  					VolumeSource: v1.VolumeSource{
  2245  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  2246  							PDName: innerName,
  2247  						},
  2248  					},
  2249  				},
  2250  			},
  2251  		},
  2252  	}
  2253  	return pod
  2254  }
  2255  
  2256  func getReconciler(kubeletDir string, t *testing.T, volumePaths []string, kubeClient *fake.Clientset) (Reconciler, *volumetesting.FakeVolumePlugin) {
  2257  	node := getFakeNode()
  2258  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNodeAndRoot(t, node, kubeletDir)
  2259  	tmpKubeletPodDir := filepath.Join(kubeletDir, "pods")
  2260  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
  2261  
  2262  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
  2263  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  2264  	if kubeClient == nil {
  2265  		kubeClient = createTestClient()
  2266  	}
  2267  
  2268  	fakeRecorder := &record.FakeRecorder{}
  2269  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
  2270  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  2271  		kubeClient,
  2272  		volumePluginMgr,
  2273  		fakeRecorder,
  2274  		fakeHandler))
  2275  	mountPoints := []mount.MountPoint{}
  2276  	for _, volumePath := range volumePaths {
  2277  		mountPoints = append(mountPoints, mount.MountPoint{Path: volumePath})
  2278  	}
  2279  	rc := NewReconciler(
  2280  		kubeClient,
  2281  		true, /* controllerAttachDetachEnabled */
  2282  		reconcilerLoopSleepDuration,
  2283  		waitForAttachTimeout,
  2284  		nodeName,
  2285  		dsw,
  2286  		asw,
  2287  		hasAddedPods,
  2288  		oex,
  2289  		mount.NewFakeMounter(mountPoints),
  2290  		hostutil.NewFakeHostUtil(nil),
  2291  		volumePluginMgr,
  2292  		tmpKubeletPodDir)
  2293  	return rc, fakePlugin
  2294  }
  2295  
  2296  func TestReconcileWithUpdateReconstructedFromAPIServer(t *testing.T) {
  2297  	// Calls Run() with two reconstructed volumes.
  2298  	// Verifies the devicePaths + volume attachability are reconstructed from node.status.
  2299  
  2300  	// Arrange
  2301  	node := &v1.Node{
  2302  		ObjectMeta: metav1.ObjectMeta{
  2303  			Name: string(nodeName),
  2304  		},
  2305  		Status: v1.NodeStatus{
  2306  			VolumesAttached: []v1.AttachedVolume{
  2307  				{
  2308  					Name:       "fake-plugin/fake-device1",
  2309  					DevicePath: "fake/path",
  2310  				},
  2311  			},
  2312  		},
  2313  	}
  2314  	volumePluginMgr, fakePlugin := volumetesting.GetTestKubeletVolumePluginMgrWithNode(t, node)
  2315  	seLinuxTranslator := util.NewFakeSELinuxLabelTranslator()
  2316  	dsw := cache.NewDesiredStateOfWorld(volumePluginMgr, seLinuxTranslator)
  2317  	asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
  2318  	kubeClient := createTestClient()
  2319  	fakeRecorder := &record.FakeRecorder{}
  2320  	fakeHandler := volumetesting.NewBlockVolumePathHandler()
  2321  	oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
  2322  		kubeClient,
  2323  		volumePluginMgr,
  2324  		fakeRecorder,
  2325  		fakeHandler))
  2326  	rc := NewReconciler(
  2327  		kubeClient,
  2328  		true, /* controllerAttachDetachEnabled */
  2329  		reconcilerLoopSleepDuration,
  2330  		waitForAttachTimeout,
  2331  		nodeName,
  2332  		dsw,
  2333  		asw,
  2334  		hasAddedPods,
  2335  		oex,
  2336  		mount.NewFakeMounter(nil),
  2337  		hostutil.NewFakeHostUtil(nil),
  2338  		volumePluginMgr,
  2339  		kubeletPodsDir)
  2340  	reconciler := rc.(*reconciler)
  2341  
  2342  	// The pod has two volumes, fake-device1 is attachable, fake-device2 is not.
  2343  	pod := &v1.Pod{
  2344  		ObjectMeta: metav1.ObjectMeta{
  2345  			Name: "pod1",
  2346  			UID:  "pod1uid",
  2347  		},
  2348  		Spec: v1.PodSpec{
  2349  			Volumes: []v1.Volume{
  2350  				{
  2351  					Name: "volume-name",
  2352  					VolumeSource: v1.VolumeSource{
  2353  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  2354  							PDName: "fake-device1",
  2355  						},
  2356  					},
  2357  				},
  2358  				{
  2359  					Name: "volume-name2",
  2360  					VolumeSource: v1.VolumeSource{
  2361  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
  2362  							PDName: "fake-device2",
  2363  						},
  2364  					},
  2365  				},
  2366  			},
  2367  		},
  2368  	}
  2369  
  2370  	volumeSpec1 := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
  2371  	volumeName1 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device1")
  2372  	volumeSpec2 := &volume.Spec{Volume: &pod.Spec.Volumes[1]}
  2373  	volumeName2 := util.GetUniqueVolumeName(fakePlugin.GetPluginName(), "fake-device2")
  2374  
  2375  	assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName1, volumeSpec1, nodeName, ""))
  2376  	assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName1, "/dev/badly/reconstructed", "/var/lib/kubelet/plugins/global1", ""))
  2377  	assert.NoError(t, asw.AddAttachUncertainReconstructedVolume(volumeName2, volumeSpec2, nodeName, ""))
  2378  	assert.NoError(t, asw.MarkDeviceAsUncertain(volumeName2, "/dev/reconstructed", "/var/lib/kubelet/plugins/global2", ""))
  2379  
  2380  	assert.False(t, reconciler.StatesHasBeenSynced())
  2381  
  2382  	reconciler.volumesNeedUpdateFromNodeStatus = append(reconciler.volumesNeedUpdateFromNodeStatus, volumeName1, volumeName2)
  2383  	// Act - run reconcile loop just once.
  2384  	// "volumesNeedUpdateFromNodeStatus" is not empty, so no unmount will be triggered.
  2385  	reconciler.reconcileNew()
  2386  
  2387  	// Assert
  2388  	assert.True(t, reconciler.StatesHasBeenSynced())
  2389  	assert.Empty(t, reconciler.volumesNeedUpdateFromNodeStatus)
  2390  
  2391  	attachedVolumes := asw.GetAttachedVolumes()
  2392  	assert.Equalf(t, len(attachedVolumes), 2, "two volumes in ASW expected")
  2393  	for _, vol := range attachedVolumes {
  2394  		if vol.VolumeName == volumeName1 {
  2395  			// devicePath + attachability must have been updated from node.status
  2396  			assert.True(t, vol.PluginIsAttachable)
  2397  			assert.Equal(t, vol.DevicePath, "fake/path")
  2398  		}
  2399  		if vol.VolumeName == volumeName2 {
  2400  			// only attachability was updated from node.status
  2401  			assert.False(t, vol.PluginIsAttachable)
  2402  			assert.Equal(t, vol.DevicePath, "/dev/reconstructed")
  2403  		}
  2404  	}
  2405  }
  2406  

View as plain text