...

Source file src/k8s.io/kubernetes/pkg/volume/util/operationexecutor/operation_executor_test.go

Documentation: k8s.io/kubernetes/pkg/volume/util/operationexecutor

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package operationexecutor
    18  
    19  import (
    20  	"fmt"
    21  	"k8s.io/klog/v2"
    22  	"strconv"
    23  	"testing"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/api/resource"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/types"
    30  	"k8s.io/apimachinery/pkg/util/uuid"
    31  	csitrans "k8s.io/csi-translation-lib"
    32  	"k8s.io/klog/v2/ktesting"
    33  	"k8s.io/kubernetes/pkg/volume"
    34  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
    35  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    36  )
    37  
    38  const (
    39  	numVolumesToMount                    = 2
    40  	numAttachableVolumesToUnmount        = 2
    41  	numNonAttachableVolumesToUnmount     = 2
    42  	numDevicesToUnmount                  = 2
    43  	numVolumesToAttach                   = 2
    44  	numVolumesToDetach                   = 2
    45  	numVolumesToVerifyAttached           = 2
    46  	numVolumesToVerifyControllerAttached = 2
    47  	numVolumesToMap                      = 2
    48  	numAttachableVolumesToUnmap          = 2
    49  	numNonAttachableVolumesToUnmap       = 2
    50  	numDevicesToUnmap                    = 2
    51  )
    52  
    53  var _ OperationGenerator = &fakeOperationGenerator{}
    54  
    55  func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachableAndNonDevicemountablePlugins(t *testing.T) {
    56  	// Arrange
    57  	ch, quit, oe := setup()
    58  	volumesToMount := make([]VolumeToMount, numVolumesToMount)
    59  	secretName := "secret-volume"
    60  	volumeName := v1.UniqueVolumeName(secretName)
    61  
    62  	// Act
    63  	for i := range volumesToMount {
    64  		podName := "pod-" + strconv.Itoa(i+1)
    65  		pod := getTestPodWithSecret(podName, secretName)
    66  		volumesToMount[i] = VolumeToMount{
    67  			Pod:                     pod,
    68  			VolumeName:              volumeName,
    69  			PluginIsAttachable:      false, // this field determines whether the plugin is attachable
    70  			PluginIsDeviceMountable: false, // this field determines whether the plugin is devicemountable
    71  			ReportedInUse:           true,
    72  		}
    73  		oe.MountVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false /* isRemount */)
    74  	}
    75  
    76  	// Assert
    77  	if !isOperationRunConcurrently(ch, quit, numVolumesToMount) {
    78  		t.Fatalf("Unable to start mount operations in Concurrent for non-attachable volumes")
    79  	}
    80  }
    81  
    82  func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins(t *testing.T) {
    83  	t.Parallel()
    84  
    85  	// Arrange
    86  	ch, quit, oe := setup()
    87  	volumesToMount := make([]VolumeToMount, numVolumesToAttach)
    88  	pdName := "pd-volume"
    89  	volumeName := v1.UniqueVolumeName(pdName)
    90  	// Act
    91  	for i := range volumesToMount {
    92  		podName := "pod-" + strconv.Itoa(i+1)
    93  		pod := getTestPodWithGCEPD(podName, pdName)
    94  		volumesToMount[i] = VolumeToMount{
    95  			Pod:                pod,
    96  			VolumeName:         volumeName,
    97  			PluginIsAttachable: true, // this field determines whether the plugin is attachable
    98  			ReportedInUse:      true,
    99  		}
   100  		oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false /* isRemount */)
   101  	}
   102  
   103  	// Assert
   104  	if !isOperationRunSerially(ch, quit) {
   105  		t.Fatalf("Mount operations should not start concurrently for attachable volumes")
   106  	}
   107  }
   108  
   109  func TestOperationExecutor_MountVolume_ConcurrentMountForDeviceMountablePlugins(t *testing.T) {
   110  	t.Parallel()
   111  
   112  	// Arrange
   113  	ch, quit, oe := setup()
   114  	volumesToMount := make([]VolumeToMount, numVolumesToAttach)
   115  	pdName := "pd-volume"
   116  	volumeName := v1.UniqueVolumeName(pdName)
   117  	// Act
   118  	for i := range volumesToMount {
   119  		podName := "pod-" + strconv.Itoa(i+1)
   120  		pod := getTestPodWithGCEPD(podName, pdName)
   121  		volumesToMount[i] = VolumeToMount{
   122  			Pod:                     pod,
   123  			VolumeName:              volumeName,
   124  			PluginIsDeviceMountable: true, // this field determines whether the plugin is devicemountable
   125  			ReportedInUse:           true,
   126  		}
   127  		oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false /* isRemount */)
   128  	}
   129  
   130  	// Assert
   131  	if !isOperationRunSerially(ch, quit) {
   132  		t.Fatalf("Mount operations should not start concurrently for devicemountable volumes")
   133  	}
   134  }
   135  
   136  func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins(t *testing.T) {
   137  	// Arrange
   138  	ch, quit, oe := setup()
   139  	volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmount+numNonAttachableVolumesToUnmount)
   140  	pdName := "pd-volume"
   141  	secretName := "secret-volume"
   142  
   143  	// Act
   144  	for i := 0; i < numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount; i++ {
   145  		podName := "pod-" + strconv.Itoa(i+1)
   146  		if i < numNonAttachableVolumesToUnmount {
   147  			pod := getTestPodWithSecret(podName, secretName)
   148  			volumesToUnmount[i] = MountedVolume{
   149  				PodName:    volumetypes.UniquePodName(podName),
   150  				VolumeName: v1.UniqueVolumeName(secretName),
   151  				PodUID:     pod.UID,
   152  			}
   153  		} else {
   154  			pod := getTestPodWithGCEPD(podName, pdName)
   155  			volumesToUnmount[i] = MountedVolume{
   156  				PodName:    volumetypes.UniquePodName(podName),
   157  				VolumeName: v1.UniqueVolumeName(pdName),
   158  				PodUID:     pod.UID,
   159  			}
   160  		}
   161  		oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */, "" /*podsDir*/)
   162  	}
   163  
   164  	// Assert
   165  	if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount) {
   166  		t.Fatalf("Unable to start unmount operations concurrently for volume plugins")
   167  	}
   168  }
   169  
   170  func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) {
   171  	t.Parallel()
   172  
   173  	// Arrange
   174  	ch, quit, oe := setup()
   175  	attachedVolumes := make([]AttachedVolume, numDevicesToUnmount)
   176  	pdName := "pd-volume"
   177  
   178  	// Act
   179  	for i := range attachedVolumes {
   180  		attachedVolumes[i] = AttachedVolume{
   181  			VolumeName: v1.UniqueVolumeName(pdName),
   182  			NodeName:   "node-name",
   183  		}
   184  		oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
   185  	}
   186  
   187  	// Assert
   188  	if !isOperationRunSerially(ch, quit) {
   189  		t.Fatalf("Unmount device operations should not start concurrently")
   190  	}
   191  }
   192  
   193  func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testing.T) {
   194  	t.Parallel()
   195  
   196  	// Arrange
   197  	ch, quit, oe := setup()
   198  	volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
   199  	pdName := "pd-volume"
   200  
   201  	// Act
   202  	for i := range volumesToAttach {
   203  		volumesToAttach[i] = VolumeToAttach{
   204  			VolumeName: v1.UniqueVolumeName(pdName),
   205  			NodeName:   "node",
   206  			VolumeSpec: &volume.Spec{
   207  				PersistentVolume: &v1.PersistentVolume{
   208  					Spec: v1.PersistentVolumeSpec{
   209  						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
   210  					},
   211  				},
   212  			},
   213  		}
   214  		logger, _ := ktesting.NewTestContext(t)
   215  		oe.AttachVolume(logger, volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
   216  	}
   217  
   218  	// Assert
   219  	if !isOperationRunSerially(ch, quit) {
   220  		t.Fatalf("Attach volume operations should not start concurrently")
   221  	}
   222  }
   223  
   224  func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToSameNode(t *testing.T) {
   225  	t.Parallel()
   226  
   227  	// Arrange
   228  	ch, quit, oe := setup()
   229  	volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
   230  	pdName := "pd-volume"
   231  
   232  	// Act
   233  	for i := range volumesToAttach {
   234  		volumesToAttach[i] = VolumeToAttach{
   235  			VolumeName: v1.UniqueVolumeName(pdName),
   236  			NodeName:   "node",
   237  			VolumeSpec: &volume.Spec{
   238  				PersistentVolume: &v1.PersistentVolume{
   239  					Spec: v1.PersistentVolumeSpec{
   240  						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
   241  					},
   242  				},
   243  			},
   244  		}
   245  		logger, _ := ktesting.NewTestContext(t)
   246  		oe.AttachVolume(logger, volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
   247  	}
   248  
   249  	// Assert
   250  	if !isOperationRunSerially(ch, quit) {
   251  		t.Fatalf("Attach volume operations should not start concurrently")
   252  	}
   253  }
   254  
   255  func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) {
   256  	t.Parallel()
   257  
   258  	// Arrange
   259  	ch, quit, oe := setup()
   260  	volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
   261  	pdName := "pd-volume"
   262  
   263  	// Act
   264  	for i := range volumesToAttach {
   265  		volumesToAttach[i] = VolumeToAttach{
   266  			VolumeName: v1.UniqueVolumeName(pdName),
   267  			NodeName:   types.NodeName(fmt.Sprintf("node%d", i)),
   268  			VolumeSpec: &volume.Spec{
   269  				PersistentVolume: &v1.PersistentVolume{
   270  					Spec: v1.PersistentVolumeSpec{
   271  						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
   272  					},
   273  				},
   274  			},
   275  		}
   276  		logger, _ := ktesting.NewTestContext(t)
   277  		oe.AttachVolume(logger, volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
   278  	}
   279  
   280  	// Assert
   281  	if !isOperationRunSerially(ch, quit) {
   282  		t.Fatalf("Attach volume operations should not start concurrently")
   283  	}
   284  }
   285  
   286  func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) {
   287  	// Arrange
   288  	ch, quit, oe := setup()
   289  	volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
   290  	pdName := "pd-volume"
   291  
   292  	// Act
   293  	for i := range volumesToAttach {
   294  		volumesToAttach[i] = VolumeToAttach{
   295  			VolumeName: v1.UniqueVolumeName(pdName),
   296  			NodeName:   types.NodeName(fmt.Sprintf("node%d", i)),
   297  			VolumeSpec: &volume.Spec{
   298  				PersistentVolume: &v1.PersistentVolume{
   299  					Spec: v1.PersistentVolumeSpec{
   300  						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
   301  					},
   302  				},
   303  			},
   304  		}
   305  		logger, _ := ktesting.NewTestContext(t)
   306  		oe.AttachVolume(logger, volumesToAttach[i], nil /* actualStateOfWorldAttacherUpdater */)
   307  	}
   308  
   309  	// Assert
   310  	if !isOperationRunConcurrently(ch, quit, numVolumesToAttach) {
   311  		t.Fatalf("Attach volume operations should not execute serially")
   312  	}
   313  }
   314  
   315  func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *testing.T) {
   316  	t.Parallel()
   317  
   318  	// Arrange
   319  	ch, quit, oe := setup()
   320  	attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
   321  	pdName := "pd-volume"
   322  
   323  	// Act
   324  	for i := range attachedVolumes {
   325  		attachedVolumes[i] = AttachedVolume{
   326  			VolumeName: v1.UniqueVolumeName(pdName),
   327  			NodeName:   "node",
   328  			VolumeSpec: &volume.Spec{
   329  				PersistentVolume: &v1.PersistentVolume{
   330  					Spec: v1.PersistentVolumeSpec{
   331  						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
   332  					},
   333  				},
   334  			},
   335  		}
   336  		logger, _ := ktesting.NewTestContext(t)
   337  		oe.DetachVolume(logger, attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
   338  	}
   339  
   340  	// Assert
   341  	if !isOperationRunSerially(ch, quit) {
   342  		t.Fatalf("DetachVolume operations should not run concurrently")
   343  	}
   344  }
   345  
   346  func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromSameNode(t *testing.T) {
   347  	t.Parallel()
   348  
   349  	// Arrange
   350  	ch, quit, oe := setup()
   351  	attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
   352  	pdName := "pd-volume"
   353  
   354  	// Act
   355  	for i := range attachedVolumes {
   356  		attachedVolumes[i] = AttachedVolume{
   357  			VolumeName: v1.UniqueVolumeName(pdName),
   358  			NodeName:   "node",
   359  			VolumeSpec: &volume.Spec{
   360  				PersistentVolume: &v1.PersistentVolume{
   361  					Spec: v1.PersistentVolumeSpec{
   362  						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
   363  					},
   364  				},
   365  			},
   366  		}
   367  		logger, _ := ktesting.NewTestContext(t)
   368  		oe.DetachVolume(logger, attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
   369  	}
   370  
   371  	// Assert
   372  	if !isOperationRunSerially(ch, quit) {
   373  		t.Fatalf("DetachVolume operations should not run concurrently")
   374  	}
   375  }
   376  
   377  func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) {
   378  	// Arrange
   379  	ch, quit, oe := setup()
   380  	attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
   381  	pdName := "pd-volume"
   382  
   383  	// Act
   384  	for i := range attachedVolumes {
   385  		attachedVolumes[i] = AttachedVolume{
   386  			VolumeName: v1.UniqueVolumeName(pdName),
   387  			NodeName:   types.NodeName(fmt.Sprintf("node%d", i)),
   388  			VolumeSpec: &volume.Spec{
   389  				PersistentVolume: &v1.PersistentVolume{
   390  					Spec: v1.PersistentVolumeSpec{
   391  						AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
   392  					},
   393  				},
   394  			},
   395  		}
   396  		logger, _ := ktesting.NewTestContext(t)
   397  		oe.DetachVolume(logger, attachedVolumes[i], true /* verifySafeToDetach */, nil /* actualStateOfWorldAttacherUpdater */)
   398  	}
   399  
   400  	// Assert
   401  	if !isOperationRunConcurrently(ch, quit, numVolumesToDetach) {
   402  		t.Fatalf("Attach volume operations should not execute serially")
   403  	}
   404  }
   405  
   406  func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *testing.T) {
   407  	// Arrange
   408  	ch, quit, oe := setup()
   409  
   410  	// Act
   411  	for i := 0; i < numVolumesToVerifyAttached; i++ {
   412  		oe.VerifyVolumesAreAttachedPerNode(nil /* attachedVolumes */, "node-name", nil /* actualStateOfWorldAttacherUpdater */)
   413  	}
   414  
   415  	// Assert
   416  	if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
   417  		t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
   418  	}
   419  }
   420  
   421  func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnDifferentNodes(t *testing.T) {
   422  	// Arrange
   423  	ch, quit, oe := setup()
   424  
   425  	// Act
   426  	for i := 0; i < numVolumesToVerifyAttached; i++ {
   427  		oe.VerifyVolumesAreAttachedPerNode(
   428  			nil, /* attachedVolumes */
   429  			types.NodeName(fmt.Sprintf("node-name-%d", i)),
   430  			nil /* actualStateOfWorldAttacherUpdater */)
   431  	}
   432  
   433  	// Assert
   434  	if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
   435  		t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
   436  	}
   437  }
   438  
   439  func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) {
   440  	t.Parallel()
   441  
   442  	// Arrange
   443  	ch, quit, oe := setup()
   444  	volumesToMount := make([]VolumeToMount, numVolumesToVerifyControllerAttached)
   445  	pdName := "pd-volume"
   446  
   447  	// Act
   448  	for i := range volumesToMount {
   449  		volumesToMount[i] = VolumeToMount{
   450  			VolumeName: v1.UniqueVolumeName(pdName),
   451  		}
   452  		logger, _ := ktesting.NewTestContext(t)
   453  		oe.VerifyControllerAttachedVolume(logger, volumesToMount[i], types.NodeName("node-name"), nil /* actualStateOfWorldMounterUpdater */)
   454  	}
   455  
   456  	// Assert
   457  	if !isOperationRunSerially(ch, quit) {
   458  		t.Fatalf("VerifyControllerAttachedVolume should not run concurrently")
   459  	}
   460  }
   461  
   462  func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachablePlugins_VolumeMode_Block(t *testing.T) {
   463  	// Arrange
   464  	ch, quit, oe := setup()
   465  	volumesToMount := make([]VolumeToMount, numVolumesToMap)
   466  	secretName := "secret-volume"
   467  	volumeName := v1.UniqueVolumeName(secretName)
   468  	volumeMode := v1.PersistentVolumeBlock
   469  	tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
   470  
   471  	// Act
   472  	for i := range volumesToMount {
   473  		podName := "pod-" + strconv.Itoa(i+1)
   474  		pod := getTestPodWithSecret(podName, secretName)
   475  		volumesToMount[i] = VolumeToMount{
   476  			Pod:                pod,
   477  			VolumeName:         volumeName,
   478  			PluginIsAttachable: false, // this field determines whether the plugin is attachable
   479  			ReportedInUse:      true,
   480  			VolumeSpec:         tmpSpec,
   481  		}
   482  		oe.MountVolume(0 /* waitForAttachTimeOut */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false)
   483  	}
   484  
   485  	// Assert
   486  	if !isOperationRunConcurrently(ch, quit, numVolumesToMap) {
   487  		t.Fatalf("Unable to start map operations in Concurrent for non-attachable volumes")
   488  	}
   489  }
   490  
   491  func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins_VolumeMode_Block(t *testing.T) {
   492  	t.Parallel()
   493  
   494  	// Arrange
   495  	ch, quit, oe := setup()
   496  	volumesToMount := make([]VolumeToMount, numVolumesToAttach)
   497  	pdName := "pd-volume"
   498  	volumeName := v1.UniqueVolumeName(pdName)
   499  	volumeMode := v1.PersistentVolumeBlock
   500  	tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
   501  
   502  	// Act
   503  	for i := range volumesToMount {
   504  		podName := "pod-" + strconv.Itoa(i+1)
   505  		pod := getTestPodWithGCEPD(podName, pdName)
   506  		volumesToMount[i] = VolumeToMount{
   507  			Pod:                pod,
   508  			VolumeName:         volumeName,
   509  			PluginIsAttachable: true, // this field determines whether the plugin is attachable
   510  			ReportedInUse:      true,
   511  			VolumeSpec:         tmpSpec,
   512  		}
   513  		oe.MountVolume(0 /* waitForAttachTimeout */, volumesToMount[i], nil /* actualStateOfWorldMounterUpdater */, false)
   514  	}
   515  
   516  	// Assert
   517  	if !isOperationRunSerially(ch, quit) {
   518  		t.Fatalf("Map operations should not start concurrently for attachable volumes")
   519  	}
   520  }
   521  
   522  func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins_VolumeMode_Block(t *testing.T) {
   523  	// Arrange
   524  	ch, quit, oe := setup()
   525  	volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmap+numNonAttachableVolumesToUnmap)
   526  	pdName := "pd-volume"
   527  	secretName := "secret-volume"
   528  	volumeMode := v1.PersistentVolumeBlock
   529  	tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
   530  
   531  	// Act
   532  	for i := 0; i < numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap; i++ {
   533  		podName := "pod-" + strconv.Itoa(i+1)
   534  		if i < numNonAttachableVolumesToUnmap {
   535  			pod := getTestPodWithSecret(podName, secretName)
   536  			volumesToUnmount[i] = MountedVolume{
   537  				PodName:    volumetypes.UniquePodName(podName),
   538  				VolumeName: v1.UniqueVolumeName(secretName),
   539  				PodUID:     pod.UID,
   540  				VolumeSpec: tmpSpec,
   541  			}
   542  		} else {
   543  			pod := getTestPodWithGCEPD(podName, pdName)
   544  			volumesToUnmount[i] = MountedVolume{
   545  				PodName:    volumetypes.UniquePodName(podName),
   546  				VolumeName: v1.UniqueVolumeName(pdName),
   547  				PodUID:     pod.UID,
   548  				VolumeSpec: tmpSpec,
   549  			}
   550  		}
   551  		oe.UnmountVolume(volumesToUnmount[i], nil /* actualStateOfWorldMounterUpdater */, "" /* podsDir */)
   552  	}
   553  
   554  	// Assert
   555  	if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap) {
   556  		t.Fatalf("Unable to start unmap operations concurrently for volume plugins")
   557  	}
   558  }
   559  
   560  func TestOperationExecutor_UnmountDeviceConcurrently_VolumeMode_Block(t *testing.T) {
   561  	t.Parallel()
   562  
   563  	// Arrange
   564  	ch, quit, oe := setup()
   565  	attachedVolumes := make([]AttachedVolume, numDevicesToUnmap)
   566  	pdName := "pd-volume"
   567  	volumeMode := v1.PersistentVolumeBlock
   568  	tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
   569  
   570  	// Act
   571  	for i := range attachedVolumes {
   572  		attachedVolumes[i] = AttachedVolume{
   573  			VolumeName: v1.UniqueVolumeName(pdName),
   574  			NodeName:   "node-name",
   575  			VolumeSpec: tmpSpec,
   576  		}
   577  		oe.UnmountDevice(attachedVolumes[i], nil /* actualStateOfWorldMounterUpdater */, nil /* mount.Interface */)
   578  	}
   579  
   580  	// Assert
   581  	if !isOperationRunSerially(ch, quit) {
   582  		t.Fatalf("Unmap device operations should not start concurrently")
   583  	}
   584  }
   585  
   586  type fakeOperationGenerator struct {
   587  	ch   chan interface{}
   588  	quit chan interface{}
   589  }
   590  
   591  func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
   592  	return &fakeOperationGenerator{
   593  		ch:   ch,
   594  		quit: quit,
   595  	}
   596  }
   597  
   598  func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations {
   599  	opFunc := func() volumetypes.OperationContext {
   600  		startOperationAndBlock(fopg.ch, fopg.quit)
   601  		return volumetypes.NewOperationContext(nil, nil, false)
   602  	}
   603  	return volumetypes.GeneratedOperations{
   604  		OperationFunc: opFunc,
   605  	}
   606  }
   607  func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) {
   608  	opFunc := func() volumetypes.OperationContext {
   609  		startOperationAndBlock(fopg.ch, fopg.quit)
   610  		return volumetypes.NewOperationContext(nil, nil, false)
   611  	}
   612  	return volumetypes.GeneratedOperations{
   613  		OperationFunc: opFunc,
   614  	}, nil
   615  }
   616  func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(logger klog.Logger, volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
   617  	opFunc := func() volumetypes.OperationContext {
   618  		startOperationAndBlock(fopg.ch, fopg.quit)
   619  		return volumetypes.NewOperationContext(nil, nil, false)
   620  	}
   621  	return volumetypes.GeneratedOperations{
   622  		OperationFunc: opFunc,
   623  	}
   624  }
   625  func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(logger klog.Logger, volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
   626  	opFunc := func() volumetypes.OperationContext {
   627  		startOperationAndBlock(fopg.ch, fopg.quit)
   628  		return volumetypes.NewOperationContext(nil, nil, false)
   629  	}
   630  	return volumetypes.GeneratedOperations{
   631  		OperationFunc: opFunc,
   632  	}, nil
   633  }
   634  func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
   635  	opFunc := func() volumetypes.OperationContext {
   636  		startOperationAndBlock(fopg.ch, fopg.quit)
   637  		return volumetypes.NewOperationContext(nil, nil, false)
   638  	}
   639  	return volumetypes.GeneratedOperations{
   640  		OperationFunc: opFunc,
   641  	}, nil
   642  }
   643  func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
   644  	opFunc := func() volumetypes.OperationContext {
   645  		startOperationAndBlock(fopg.ch, fopg.quit)
   646  		return volumetypes.NewOperationContext(nil, nil, false)
   647  	}
   648  	return volumetypes.GeneratedOperations{
   649  		OperationFunc: opFunc,
   650  	}, nil
   651  }
   652  func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(logger klog.Logger, volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
   653  	opFunc := func() volumetypes.OperationContext {
   654  		startOperationAndBlock(fopg.ch, fopg.quit)
   655  		return volumetypes.NewOperationContext(nil, nil, false)
   656  	}
   657  	return volumetypes.GeneratedOperations{
   658  		OperationFunc: opFunc,
   659  	}, nil
   660  }
   661  
   662  func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
   663  	opFunc := func() volumetypes.OperationContext {
   664  		startOperationAndBlock(fopg.ch, fopg.quit)
   665  		return volumetypes.NewOperationContext(nil, nil, false)
   666  	}
   667  	return volumetypes.GeneratedOperations{
   668  		OperationFunc: opFunc,
   669  	}, nil
   670  }
   671  
   672  func (fopg *fakeOperationGenerator) GenerateExpandAndRecoverVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) (volumetypes.GeneratedOperations, error) {
   673  	opFunc := func() volumetypes.OperationContext {
   674  		startOperationAndBlock(fopg.ch, fopg.quit)
   675  		return volumetypes.NewOperationContext(nil, nil, false)
   676  	}
   677  	return volumetypes.GeneratedOperations{
   678  		OperationFunc: opFunc,
   679  	}, nil
   680  }
   681  
   682  func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
   683  	opFunc := func() volumetypes.OperationContext {
   684  		startOperationAndBlock(fopg.ch, fopg.quit)
   685  		return volumetypes.NewOperationContext(nil, nil, false)
   686  	}
   687  	return volumetypes.GeneratedOperations{
   688  		OperationFunc: opFunc,
   689  	}, nil
   690  }
   691  
   692  func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(
   693  	pluginNodeVolumes map[types.NodeName][]*volume.Spec,
   694  	pluginNane string,
   695  	volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
   696  	actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
   697  	opFunc := func() volumetypes.OperationContext {
   698  		startOperationAndBlock(fopg.ch, fopg.quit)
   699  		return volumetypes.NewOperationContext(nil, nil, false)
   700  	}
   701  	return volumetypes.GeneratedOperations{
   702  		OperationFunc: opFunc,
   703  	}, nil
   704  }
   705  
   706  func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
   707  	opFunc := func() volumetypes.OperationContext {
   708  		startOperationAndBlock(fopg.ch, fopg.quit)
   709  		return volumetypes.NewOperationContext(nil, nil, false)
   710  	}
   711  	return volumetypes.GeneratedOperations{
   712  		OperationFunc: opFunc,
   713  	}, nil
   714  }
   715  
   716  func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
   717  	opFunc := func() volumetypes.OperationContext {
   718  		startOperationAndBlock(fopg.ch, fopg.quit)
   719  		return volumetypes.NewOperationContext(nil, nil, false)
   720  	}
   721  	return volumetypes.GeneratedOperations{
   722  		OperationFunc: opFunc,
   723  	}, nil
   724  }
   725  
   726  func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
   727  	opFunc := func() volumetypes.OperationContext {
   728  		startOperationAndBlock(fopg.ch, fopg.quit)
   729  		return volumetypes.NewOperationContext(nil, nil, false)
   730  	}
   731  	return volumetypes.GeneratedOperations{
   732  		OperationFunc: opFunc,
   733  	}, nil
   734  }
   735  
   736  func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
   737  	return nil
   738  }
   739  
   740  func (fopg *fakeOperationGenerator) GetCSITranslator() InTreeToCSITranslator {
   741  	return csitrans.New()
   742  }
   743  
   744  func getTestPodWithSecret(podName, secretName string) *v1.Pod {
   745  	return &v1.Pod{
   746  		ObjectMeta: metav1.ObjectMeta{
   747  			Name: podName,
   748  			UID:  types.UID(podName),
   749  		},
   750  		Spec: v1.PodSpec{
   751  			Volumes: []v1.Volume{
   752  				{
   753  					Name: secretName,
   754  					VolumeSource: v1.VolumeSource{
   755  						Secret: &v1.SecretVolumeSource{
   756  							SecretName: secretName,
   757  						},
   758  					},
   759  				},
   760  			},
   761  			Containers: []v1.Container{
   762  				{
   763  					Name:  "secret-volume-test",
   764  					Image: "registry.k8s.io/mounttest:0.8",
   765  					Args: []string{
   766  						"--file_content=/etc/secret-volume/data-1",
   767  						"--file_mode=/etc/secret-volume/data-1"},
   768  					VolumeMounts: []v1.VolumeMount{
   769  						{
   770  							Name:      secretName,
   771  							MountPath: "/data",
   772  						},
   773  					},
   774  				},
   775  			},
   776  			RestartPolicy: v1.RestartPolicyNever,
   777  		},
   778  	}
   779  }
   780  
   781  func getTestPodWithGCEPD(podName, pdName string) *v1.Pod {
   782  	return &v1.Pod{
   783  		ObjectMeta: metav1.ObjectMeta{
   784  			Name: podName,
   785  			UID:  types.UID(podName + string(uuid.NewUUID())),
   786  		},
   787  		Spec: v1.PodSpec{
   788  			Volumes: []v1.Volume{
   789  				{
   790  					Name: pdName,
   791  					VolumeSource: v1.VolumeSource{
   792  						GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
   793  							PDName:   pdName,
   794  							FSType:   "ext4",
   795  							ReadOnly: false,
   796  						},
   797  					},
   798  				},
   799  			},
   800  			Containers: []v1.Container{
   801  				{
   802  					Name:  "pd-volume-test",
   803  					Image: "registry.k8s.io/mounttest:0.8",
   804  					Args: []string{
   805  						"--file_content=/etc/pd-volume/data-1",
   806  					},
   807  					VolumeMounts: []v1.VolumeMount{
   808  						{
   809  							Name:      pdName,
   810  							MountPath: "/data",
   811  						},
   812  					},
   813  				},
   814  			},
   815  			RestartPolicy: v1.RestartPolicyNever,
   816  		},
   817  	}
   818  }
   819  
   820  func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
   821  	defer close(quit)
   822  	numOperationsStarted := 0
   823  loop:
   824  	for {
   825  		select {
   826  		case <-ch:
   827  			numOperationsStarted++
   828  			if numOperationsStarted > 1 {
   829  				return false
   830  			}
   831  		case <-time.After(5 * time.Second):
   832  			break loop
   833  		}
   834  	}
   835  	return true
   836  }
   837  
   838  func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
   839  	defer close(quit)
   840  	numOperationsStarted := 0
   841  loop:
   842  	for {
   843  		select {
   844  		case <-ch:
   845  			numOperationsStarted++
   846  			if numOperationsStarted == numOperationsToRun {
   847  				return true
   848  			}
   849  		case <-time.After(5 * time.Second):
   850  			break loop
   851  		}
   852  	}
   853  	return false
   854  }
   855  
   856  func setup() (chan interface{}, chan interface{}, OperationExecutor) {
   857  	ch, quit := make(chan interface{}), make(chan interface{})
   858  	return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
   859  }
   860  
   861  // This function starts by writing to ch and blocks on the quit channel
   862  // until it is closed by the currently running test
   863  func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
   864  	ch <- nil
   865  	<-quit
   866  }
   867  

View as plain text