...

Source file src/k8s.io/kubernetes/test/e2e/storage/persistent_volumes-local.go

Documentation: k8s.io/kubernetes/test/e2e/storage

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storage
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"math/rand"
    24  	"path/filepath"
    25  	"strconv"
    26  	"strings"
    27  	"sync"
    28  	"time"
    29  
    30  	"github.com/onsi/ginkgo/v2"
    31  	"github.com/onsi/gomega"
    32  
    33  	appsv1 "k8s.io/api/apps/v1"
    34  	v1 "k8s.io/api/core/v1"
    35  	storagev1 "k8s.io/api/storage/v1"
    36  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    37  	"k8s.io/apimachinery/pkg/api/resource"
    38  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    39  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    40  	"k8s.io/apimachinery/pkg/util/sets"
    41  	"k8s.io/apimachinery/pkg/util/wait"
    42  	"k8s.io/apimachinery/pkg/watch"
    43  	clientset "k8s.io/client-go/kubernetes"
    44  	"k8s.io/kubernetes/test/e2e/framework"
    45  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    46  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    47  	e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
    48  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    49  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    50  	e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset"
    51  	e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
    52  	"k8s.io/kubernetes/test/e2e/storage/utils"
    53  	imageutils "k8s.io/kubernetes/test/utils/image"
    54  	admissionapi "k8s.io/pod-security-admission/api"
    55  )
    56  
    57  type localTestConfig struct {
    58  	ns           string
    59  	nodes        []v1.Node
    60  	randomNode   *v1.Node
    61  	client       clientset.Interface
    62  	timeouts     *framework.TimeoutContext
    63  	scName       string
    64  	discoveryDir string
    65  	hostExec     utils.HostExec
    66  	ltrMgr       utils.LocalTestResourceManager
    67  }
    68  
    69  type localVolumeType string
    70  
    71  const (
    72  	// DirectoryLocalVolumeType is the default local volume type, aka a directory
    73  	DirectoryLocalVolumeType localVolumeType = "dir"
    74  	// DirectoryLinkLocalVolumeType is like DirectoryLocalVolumeType,
    75  	// but it's a symbolic link to directory
    76  	DirectoryLinkLocalVolumeType localVolumeType = "dir-link"
    77  	// DirectoryBindMountedLocalVolumeType is like DirectoryLocalVolumeType
    78  	// but bind mounted
    79  	DirectoryBindMountedLocalVolumeType localVolumeType = "dir-bindmounted"
    80  	// DirectoryLinkBindMountedLocalVolumeType is like DirectoryLocalVolumeType,
    81  	// but it's a symbolic link to self bind mounted directory
    82  	// Note that bind mounting at symbolic link actually mounts at directory it
    83  	// links to.
    84  	DirectoryLinkBindMountedLocalVolumeType localVolumeType = "dir-link-bindmounted"
    85  	// TmpfsLocalVolumeType creates a tmpfs and mounts it
    86  	TmpfsLocalVolumeType localVolumeType = "tmpfs"
    87  	// GCELocalSSDVolumeType tests based on local ssd at /mnt/disks/by-uuid/
    88  	GCELocalSSDVolumeType localVolumeType = "gce-localssd-scsi-fs"
    89  	// BlockLocalVolumeType creates a local file, formats it, and maps it as a block device.
    90  	BlockLocalVolumeType localVolumeType = "block"
    91  	// BlockFsWithFormatLocalVolumeType creates a local file serving as the backing for block device,
    92  	// formats it, and mounts it to use as FS mode local volume.
    93  	BlockFsWithFormatLocalVolumeType localVolumeType = "blockfswithformat"
    94  	// BlockFsWithoutFormatLocalVolumeType creates a local file serving as the backing for block device,
    95  	// does not format it manually, and mounts it to use as FS mode local volume.
    96  	BlockFsWithoutFormatLocalVolumeType localVolumeType = "blockfswithoutformat"
    97  )
    98  
    99  // map to local test resource type
   100  var setupLocalVolumeMap = map[localVolumeType]utils.LocalVolumeType{
   101  	GCELocalSSDVolumeType:                   utils.LocalVolumeGCELocalSSD,
   102  	TmpfsLocalVolumeType:                    utils.LocalVolumeTmpfs,
   103  	DirectoryLocalVolumeType:                utils.LocalVolumeDirectory,
   104  	DirectoryLinkLocalVolumeType:            utils.LocalVolumeDirectoryLink,
   105  	DirectoryBindMountedLocalVolumeType:     utils.LocalVolumeDirectoryBindMounted,
   106  	DirectoryLinkBindMountedLocalVolumeType: utils.LocalVolumeDirectoryLinkBindMounted,
   107  	BlockLocalVolumeType:                    utils.LocalVolumeBlock, // block device in Block mode
   108  	BlockFsWithFormatLocalVolumeType:        utils.LocalVolumeBlockFS,
   109  	BlockFsWithoutFormatLocalVolumeType:     utils.LocalVolumeBlock, // block device in Filesystem mode (default in this test suite)
   110  }
   111  
   112  type localTestVolume struct {
   113  	// Local test resource
   114  	ltr *utils.LocalTestResource
   115  	// PVC for this volume
   116  	pvc *v1.PersistentVolumeClaim
   117  	// PV for this volume
   118  	pv *v1.PersistentVolume
   119  	// Type of local volume
   120  	localVolumeType localVolumeType
   121  }
   122  
   123  const (
   124  	// TODO: This may not be available/writable on all images.
   125  	hostBase = "/tmp"
   126  	// Path to the first volume in the test containers
   127  	// created via createLocalPod or makeLocalPod
   128  	// leveraging pv_util.MakePod
   129  	volumeDir = "/mnt/volume1"
   130  	// testFile created in setupLocalVolume
   131  	testFile = "test-file"
   132  	// testFileContent written into testFile
   133  	testFileContent = "test-file-content"
   134  	testSCPrefix    = "local-volume-test-storageclass"
   135  
   136  	// A sample request size
   137  	testRequestSize = "10Mi"
   138  
   139  	// Max number of nodes to use for testing
   140  	maxNodes = 5
   141  )
   142  
   143  var (
   144  	// storage class volume binding modes
   145  	waitMode      = storagev1.VolumeBindingWaitForFirstConsumer
   146  	immediateMode = storagev1.VolumeBindingImmediate
   147  
   148  	// Common selinux labels
   149  	selinuxLabel = &v1.SELinuxOptions{
   150  		Level: "s0:c0,c1"}
   151  )
   152  
   153  var _ = utils.SIGDescribe("PersistentVolumes-local", func() {
   154  	f := framework.NewDefaultFramework("persistent-local-volumes-test")
   155  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
   156  
   157  	var (
   158  		config *localTestConfig
   159  		scName string
   160  	)
   161  
   162  	ginkgo.BeforeEach(func(ctx context.Context) {
   163  		nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, f.ClientSet, maxNodes)
   164  		framework.ExpectNoError(err)
   165  
   166  		scName = fmt.Sprintf("%v-%v", testSCPrefix, f.Namespace.Name)
   167  		// Choose a random node
   168  		randomNode := &nodes.Items[rand.Intn(len(nodes.Items))]
   169  
   170  		hostExec := utils.NewHostExec(f)
   171  		ltrMgr := utils.NewLocalResourceManager("local-volume-test", hostExec, hostBase)
   172  		config = &localTestConfig{
   173  			ns:           f.Namespace.Name,
   174  			client:       f.ClientSet,
   175  			timeouts:     f.Timeouts,
   176  			nodes:        nodes.Items,
   177  			randomNode:   randomNode,
   178  			scName:       scName,
   179  			discoveryDir: filepath.Join(hostBase, f.Namespace.Name),
   180  			hostExec:     hostExec,
   181  			ltrMgr:       ltrMgr,
   182  		}
   183  	})
   184  
   185  	for tempTestVolType := range setupLocalVolumeMap {
   186  
   187  		// New variable required for ginkgo test closures
   188  		testVolType := tempTestVolType
   189  		args := []interface{}{fmt.Sprintf("[Volume type: %s]", testVolType)}
   190  		if testVolType == GCELocalSSDVolumeType {
   191  			args = append(args, framework.WithSerial())
   192  		}
   193  		testMode := immediateMode
   194  
   195  		args = append(args, func() {
   196  			var testVol *localTestVolume
   197  
   198  			ginkgo.BeforeEach(func(ctx context.Context) {
   199  				if testVolType == GCELocalSSDVolumeType {
   200  					SkipUnlessLocalSSDExists(ctx, config, "scsi", "fs", config.randomNode)
   201  				}
   202  				setupStorageClass(ctx, config, &testMode)
   203  				testVols := setupLocalVolumesPVCsPVs(ctx, config, testVolType, config.randomNode, 1, testMode)
   204  				if len(testVols) > 0 {
   205  					testVol = testVols[0]
   206  				} else {
   207  					framework.Failf("Failed to get a test volume")
   208  				}
   209  			})
   210  
   211  			ginkgo.AfterEach(func(ctx context.Context) {
   212  				if testVol != nil {
   213  					cleanupLocalVolumes(ctx, config, []*localTestVolume{testVol})
   214  					cleanupStorageClass(ctx, config)
   215  				} else {
   216  					framework.Failf("no test volume to cleanup")
   217  				}
   218  			})
   219  
   220  			ginkgo.Context("One pod requesting one prebound PVC", func() {
   221  				var (
   222  					pod1    *v1.Pod
   223  					pod1Err error
   224  				)
   225  
   226  				ginkgo.BeforeEach(func(ctx context.Context) {
   227  					ginkgo.By("Creating pod1")
   228  					pod1, pod1Err = createLocalPod(ctx, config, testVol, nil)
   229  					framework.ExpectNoError(pod1Err)
   230  					verifyLocalPod(ctx, config, testVol, pod1, config.randomNode.Name)
   231  
   232  					writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
   233  
   234  					ginkgo.By("Writing in pod1")
   235  					podRWCmdExec(f, pod1, writeCmd)
   236  				})
   237  
   238  				ginkgo.AfterEach(func(ctx context.Context) {
   239  					ginkgo.By("Deleting pod1")
   240  					e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod1.Name)
   241  				})
   242  
   243  				ginkgo.It("should be able to mount volume and read from pod1", func(ctx context.Context) {
   244  					ginkgo.By("Reading in pod1")
   245  					// testFileContent was written in BeforeEach
   246  					testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType)
   247  				})
   248  
   249  				ginkgo.It("should be able to mount volume and write from pod1", func(ctx context.Context) {
   250  					// testFileContent was written in BeforeEach
   251  					testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVolType)
   252  
   253  					ginkgo.By("Writing in pod1")
   254  					writeCmd := createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVolType)
   255  					podRWCmdExec(f, pod1, writeCmd)
   256  				})
   257  			})
   258  
   259  			ginkgo.Context("Two pods mounting a local volume at the same time", func() {
   260  				ginkgo.It("should be able to write from pod1 and read from pod2", func(ctx context.Context) {
   261  					twoPodsReadWriteTest(ctx, f, config, testVol)
   262  				})
   263  			})
   264  
   265  			ginkgo.Context("Two pods mounting a local volume one after the other", func() {
   266  				ginkgo.It("should be able to write from pod1 and read from pod2", func(ctx context.Context) {
   267  					twoPodsReadWriteSerialTest(ctx, f, config, testVol)
   268  				})
   269  			})
   270  
   271  			ginkgo.Context("Set fsGroup for local volume", func() {
   272  				ginkgo.BeforeEach(func() {
   273  					if testVolType == BlockLocalVolumeType {
   274  						e2eskipper.Skipf("We don't set fsGroup on block device, skipped.")
   275  					}
   276  				})
   277  
   278  				f.It("should set fsGroup for one pod", f.WithSlow(), func(ctx context.Context) {
   279  					ginkgo.By("Checking fsGroup is set")
   280  					pod := createPodWithFsGroupTest(ctx, config, testVol, 1234, 1234)
   281  					ginkgo.By("Deleting pod")
   282  					e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod.Name)
   283  				})
   284  
   285  				f.It("should set same fsGroup for two pods simultaneously", f.WithSlow(), func(ctx context.Context) {
   286  					fsGroup := int64(1234)
   287  					ginkgo.By("Create first pod and check fsGroup is set")
   288  					pod1 := createPodWithFsGroupTest(ctx, config, testVol, fsGroup, fsGroup)
   289  					ginkgo.By("Create second pod with same fsGroup and check fsGroup is correct")
   290  					pod2 := createPodWithFsGroupTest(ctx, config, testVol, fsGroup, fsGroup)
   291  					ginkgo.By("Deleting first pod")
   292  					e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod1.Name)
   293  					ginkgo.By("Deleting second pod")
   294  					e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod2.Name)
   295  				})
   296  
   297  				f.It("should set different fsGroup for second pod if first pod is deleted", f.WithFlaky(), func(ctx context.Context) {
   298  					// TODO: Disabled temporarily, remove [Flaky] tag after #73168 is fixed.
   299  					fsGroup1, fsGroup2 := int64(1234), int64(4321)
   300  					ginkgo.By("Create first pod and check fsGroup is set")
   301  					pod1 := createPodWithFsGroupTest(ctx, config, testVol, fsGroup1, fsGroup1)
   302  					ginkgo.By("Deleting first pod")
   303  					err := e2epod.DeletePodWithWait(ctx, config.client, pod1)
   304  					framework.ExpectNoError(err, "while deleting first pod")
   305  					ginkgo.By("Create second pod and check fsGroup is the new one")
   306  					pod2 := createPodWithFsGroupTest(ctx, config, testVol, fsGroup2, fsGroup2)
   307  					ginkgo.By("Deleting second pod")
   308  					e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod2.Name)
   309  				})
   310  			})
   311  		})
   312  		f.Context(args...)
   313  	}
   314  
   315  	f.Context("Local volume that cannot be mounted", f.WithSlow(), func() {
   316  		// TODO:
   317  		// - check for these errors in unit tests instead
   318  		ginkgo.It("should fail due to non-existent path", func(ctx context.Context) {
   319  			testVol := &localTestVolume{
   320  				ltr: &utils.LocalTestResource{
   321  					Node: config.randomNode,
   322  					Path: "/non-existent/location/nowhere",
   323  				},
   324  				localVolumeType: DirectoryLocalVolumeType,
   325  			}
   326  			ginkgo.By("Creating local PVC and PV")
   327  			createLocalPVCsPVs(ctx, config, []*localTestVolume{testVol}, immediateMode)
   328  			pod, err := createLocalPod(ctx, config, testVol, nil)
   329  			framework.ExpectError(err)
   330  			err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, config.client, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   331  			framework.ExpectError(err)
   332  			cleanupLocalPVCsPVs(ctx, config, []*localTestVolume{testVol})
   333  		})
   334  
   335  		ginkgo.It("should fail due to wrong node", func(ctx context.Context) {
   336  			if len(config.nodes) < 2 {
   337  				e2eskipper.Skipf("Runs only when number of nodes >= 2")
   338  			}
   339  
   340  			testVols := setupLocalVolumesPVCsPVs(ctx, config, DirectoryLocalVolumeType, config.randomNode, 1, immediateMode)
   341  			testVol := testVols[0]
   342  
   343  			conflictNodeName := config.nodes[0].Name
   344  			if conflictNodeName == config.randomNode.Name {
   345  				conflictNodeName = config.nodes[1].Name
   346  			}
   347  			pod := makeLocalPodWithNodeName(config, testVol, conflictNodeName)
   348  			pod, err := config.client.CoreV1().Pods(config.ns).Create(ctx, pod, metav1.CreateOptions{})
   349  			framework.ExpectNoError(err)
   350  
   351  			err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, config.client, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   352  			framework.ExpectError(err)
   353  
   354  			cleanupLocalVolumes(ctx, config, []*localTestVolume{testVol})
   355  		})
   356  	})
   357  
   358  	ginkgo.Context("Pod with node different from PV's NodeAffinity", func() {
   359  		var (
   360  			testVol          *localTestVolume
   361  			volumeType       localVolumeType
   362  			conflictNodeName string
   363  		)
   364  
   365  		ginkgo.BeforeEach(func(ctx context.Context) {
   366  			if len(config.nodes) < 2 {
   367  				e2eskipper.Skipf("Runs only when number of nodes >= 2")
   368  			}
   369  
   370  			volumeType = DirectoryLocalVolumeType
   371  			setupStorageClass(ctx, config, &immediateMode)
   372  			testVols := setupLocalVolumesPVCsPVs(ctx, config, volumeType, config.randomNode, 1, immediateMode)
   373  			conflictNodeName = config.nodes[0].Name
   374  			if conflictNodeName == config.randomNode.Name {
   375  				conflictNodeName = config.nodes[1].Name
   376  			}
   377  
   378  			testVol = testVols[0]
   379  		})
   380  
   381  		ginkgo.AfterEach(func(ctx context.Context) {
   382  			cleanupLocalVolumes(ctx, config, []*localTestVolume{testVol})
   383  			cleanupStorageClass(ctx, config)
   384  		})
   385  
   386  		ginkgo.It("should fail scheduling due to different NodeAffinity", func(ctx context.Context) {
   387  			testPodWithNodeConflict(ctx, config, testVol, conflictNodeName, makeLocalPodWithNodeAffinity)
   388  		})
   389  
   390  		ginkgo.It("should fail scheduling due to different NodeSelector", func(ctx context.Context) {
   391  			testPodWithNodeConflict(ctx, config, testVol, conflictNodeName, makeLocalPodWithNodeSelector)
   392  		})
   393  	})
   394  
   395  	f.Context("StatefulSet with pod affinity", f.WithSlow(), func() {
   396  		var testVols map[string][]*localTestVolume
   397  		const (
   398  			ssReplicas  = 3
   399  			volsPerNode = 6
   400  		)
   401  
   402  		ginkgo.BeforeEach(func(ctx context.Context) {
   403  			setupStorageClass(ctx, config, &waitMode)
   404  
   405  			testVols = map[string][]*localTestVolume{}
   406  			for i, node := range config.nodes {
   407  				// The PVCs created here won't be used
   408  				ginkgo.By(fmt.Sprintf("Setting up local volumes on node %q", node.Name))
   409  				vols := setupLocalVolumesPVCsPVs(ctx, config, DirectoryLocalVolumeType, &config.nodes[i], volsPerNode, waitMode)
   410  				testVols[node.Name] = vols
   411  			}
   412  		})
   413  
   414  		ginkgo.AfterEach(func(ctx context.Context) {
   415  			for _, vols := range testVols {
   416  				cleanupLocalVolumes(ctx, config, vols)
   417  			}
   418  			cleanupStorageClass(ctx, config)
   419  		})
   420  
   421  		ginkgo.It("should use volumes spread across nodes when pod has anti-affinity", func(ctx context.Context) {
   422  			if len(config.nodes) < ssReplicas {
   423  				e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas)
   424  			}
   425  			ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
   426  			ss := createStatefulSet(ctx, config, ssReplicas, volsPerNode, true, false)
   427  			validateStatefulSet(ctx, config, ss, true)
   428  		})
   429  
   430  		ginkgo.It("should use volumes on one node when pod has affinity", func(ctx context.Context) {
   431  			ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
   432  			ss := createStatefulSet(ctx, config, ssReplicas, volsPerNode/ssReplicas, false, false)
   433  			validateStatefulSet(ctx, config, ss, false)
   434  		})
   435  
   436  		ginkgo.It("should use volumes spread across nodes when pod management is parallel and pod has anti-affinity", func(ctx context.Context) {
   437  			if len(config.nodes) < ssReplicas {
   438  				e2eskipper.Skipf("Runs only when number of nodes >= %v", ssReplicas)
   439  			}
   440  			ginkgo.By("Creating a StatefulSet with pod anti-affinity on nodes")
   441  			ss := createStatefulSet(ctx, config, ssReplicas, 1, true, true)
   442  			validateStatefulSet(ctx, config, ss, true)
   443  		})
   444  
   445  		ginkgo.It("should use volumes on one node when pod management is parallel and pod has affinity", func(ctx context.Context) {
   446  			ginkgo.By("Creating a StatefulSet with pod affinity on nodes")
   447  			ss := createStatefulSet(ctx, config, ssReplicas, 1, false, true)
   448  			validateStatefulSet(ctx, config, ss, false)
   449  		})
   450  	})
   451  
   452  	f.Context("Stress with local volumes", f.WithSerial(), func() {
   453  		var (
   454  			allLocalVolumes = make(map[string][]*localTestVolume)
   455  			volType         = TmpfsLocalVolumeType
   456  		)
   457  
   458  		const (
   459  			volsPerNode = 10 // Make this non-divisable by volsPerPod to increase changes of partial binding failure
   460  			volsPerPod  = 3
   461  			podsFactor  = 4
   462  		)
   463  
   464  		ginkgo.BeforeEach(func(ctx context.Context) {
   465  			setupStorageClass(ctx, config, &waitMode)
   466  			ginkgo.DeferCleanup(cleanupStorageClass, config)
   467  
   468  			for i, node := range config.nodes {
   469  				ginkgo.By(fmt.Sprintf("Setting up %d local volumes on node %q", volsPerNode, node.Name))
   470  				allLocalVolumes[node.Name] = setupLocalVolumes(ctx, config, volType, &config.nodes[i], volsPerNode)
   471  			}
   472  			ginkgo.By(fmt.Sprintf("Create %d PVs", volsPerNode*len(config.nodes)))
   473  			var err error
   474  			for _, localVolumes := range allLocalVolumes {
   475  				for _, localVolume := range localVolumes {
   476  					pvConfig := makeLocalPVConfig(config, localVolume)
   477  					localVolume.pv, err = e2epv.CreatePV(ctx, config.client, f.Timeouts, e2epv.MakePersistentVolume(pvConfig))
   478  					framework.ExpectNoError(err)
   479  				}
   480  			}
   481  			ginkgo.DeferCleanup(func(ctx context.Context) {
   482  				ginkgo.By("Clean all PVs")
   483  				for nodeName, localVolumes := range allLocalVolumes {
   484  					ginkgo.By(fmt.Sprintf("Cleaning up %d local volumes on node %q", len(localVolumes), nodeName))
   485  					cleanupLocalVolumes(ctx, config, localVolumes)
   486  				}
   487  			})
   488  			ginkgo.By("Start a goroutine to recycle unbound PVs")
   489  			backgroundCtx, cancel := context.WithCancel(context.Background())
   490  			var wg sync.WaitGroup
   491  			wg.Add(1)
   492  			ginkgo.DeferCleanup(func() {
   493  				ginkgo.By("Stop and wait for recycle goroutine to finish")
   494  				cancel()
   495  				wg.Wait()
   496  			})
   497  			go func() {
   498  				defer ginkgo.GinkgoRecover()
   499  				defer wg.Done()
   500  				w, err := config.client.CoreV1().PersistentVolumes().Watch(backgroundCtx, metav1.ListOptions{})
   501  				framework.ExpectNoError(err)
   502  				if w == nil {
   503  					return
   504  				}
   505  				defer w.Stop()
   506  				for {
   507  					select {
   508  					case event := <-w.ResultChan():
   509  						if event.Type != watch.Modified {
   510  							continue
   511  						}
   512  						pv, ok := event.Object.(*v1.PersistentVolume)
   513  						if !ok {
   514  							continue
   515  						}
   516  						if pv.Status.Phase == v1.VolumeBound || pv.Status.Phase == v1.VolumeAvailable {
   517  							continue
   518  						}
   519  						pv, err = config.client.CoreV1().PersistentVolumes().Get(backgroundCtx, pv.Name, metav1.GetOptions{})
   520  						if apierrors.IsNotFound(err) || errors.Is(err, context.Canceled) {
   521  							continue
   522  						}
   523  						// Delete and create a new PV for same local volume storage
   524  						ginkgo.By(fmt.Sprintf("Delete %q and create a new PV for same local volume storage", pv.Name))
   525  						for _, localVolumes := range allLocalVolumes {
   526  							for _, localVolume := range localVolumes {
   527  								if localVolume.pv.Name != pv.Name {
   528  									continue
   529  								}
   530  								err = config.client.CoreV1().PersistentVolumes().Delete(backgroundCtx, pv.Name, metav1.DeleteOptions{})
   531  								if apierrors.IsNotFound(err) || errors.Is(err, context.Canceled) {
   532  									continue
   533  								}
   534  								framework.ExpectNoError(err)
   535  								pvConfig := makeLocalPVConfig(config, localVolume)
   536  								localVolume.pv, err = e2epv.CreatePV(backgroundCtx, config.client, f.Timeouts, e2epv.MakePersistentVolume(pvConfig))
   537  								if errors.Is(err, context.Canceled) {
   538  									continue
   539  								}
   540  								framework.ExpectNoError(err)
   541  							}
   542  						}
   543  					case <-backgroundCtx.Done():
   544  						return
   545  					}
   546  				}
   547  			}()
   548  		})
   549  
   550  		ginkgo.It("should be able to process many pods and reuse local volumes", func(ctx context.Context) {
   551  			var (
   552  				podsLock sync.Mutex
   553  				// Have one extra pod pending
   554  				numConcurrentPods = volsPerNode/volsPerPod*len(config.nodes) + 1
   555  				totalPods         = numConcurrentPods * podsFactor
   556  				numCreated        = 0
   557  				numFinished       = 0
   558  				pods              = map[string]*v1.Pod{}
   559  			)
   560  
   561  			// Create pods gradually instead of all at once because scheduler has
   562  			// exponential backoff
   563  			ginkgo.By(fmt.Sprintf("Creating %v pods periodically", numConcurrentPods))
   564  			stop := make(chan struct{})
   565  			go wait.Until(func() {
   566  				defer ginkgo.GinkgoRecover()
   567  				podsLock.Lock()
   568  				defer podsLock.Unlock()
   569  
   570  				if numCreated >= totalPods {
   571  					// Created all the pods for the test
   572  					return
   573  				}
   574  
   575  				if len(pods) > numConcurrentPods/2 {
   576  					// Too many outstanding pods
   577  					return
   578  				}
   579  
   580  				for i := 0; i < numConcurrentPods; i++ {
   581  					pvcs := []*v1.PersistentVolumeClaim{}
   582  					for j := 0; j < volsPerPod; j++ {
   583  						pvc := e2epv.MakePersistentVolumeClaim(makeLocalPVCConfig(config, volType), config.ns)
   584  						pvc, err := e2epv.CreatePVC(ctx, config.client, config.ns, pvc)
   585  						framework.ExpectNoError(err)
   586  						pvcs = append(pvcs, pvc)
   587  					}
   588  					podConfig := e2epod.Config{
   589  						NS:           config.ns,
   590  						PVCs:         pvcs,
   591  						Command:      "sleep 1",
   592  						SeLinuxLabel: selinuxLabel,
   593  					}
   594  					pod, err := e2epod.MakeSecPod(&podConfig)
   595  					framework.ExpectNoError(err)
   596  					pod, err = config.client.CoreV1().Pods(config.ns).Create(ctx, pod, metav1.CreateOptions{})
   597  					framework.ExpectNoError(err)
   598  					pods[pod.Name] = pod
   599  					numCreated++
   600  				}
   601  			}, 2*time.Second, stop)
   602  
   603  			defer func() {
   604  				close(stop)
   605  				podsLock.Lock()
   606  				defer podsLock.Unlock()
   607  
   608  				for _, pod := range pods {
   609  					if err := deletePodAndPVCs(ctx, config, pod); err != nil {
   610  						framework.Logf("Deleting pod %v failed: %v", pod.Name, err)
   611  					}
   612  				}
   613  			}()
   614  
   615  			ginkgo.By("Waiting for all pods to complete successfully")
   616  			const completeTimeout = 5 * time.Minute
   617  			waitErr := wait.PollUntilContextTimeout(ctx, time.Second, completeTimeout, true, func(ctx context.Context) (done bool, err error) {
   618  				podsList, err := config.client.CoreV1().Pods(config.ns).List(ctx, metav1.ListOptions{})
   619  				if err != nil {
   620  					return false, err
   621  				}
   622  
   623  				podsLock.Lock()
   624  				defer podsLock.Unlock()
   625  
   626  				for _, pod := range podsList.Items {
   627  					if pod.Status.Phase == v1.PodSucceeded {
   628  						// Delete pod and its PVCs
   629  						if err := deletePodAndPVCs(ctx, config, &pod); err != nil {
   630  							return false, err
   631  						}
   632  						delete(pods, pod.Name)
   633  						numFinished++
   634  						framework.Logf("%v/%v pods finished", numFinished, totalPods)
   635  					}
   636  				}
   637  
   638  				return numFinished == totalPods, nil
   639  			})
   640  			framework.ExpectNoError(waitErr, "some pods failed to complete within %v", completeTimeout)
   641  		})
   642  	})
   643  })
   644  
   645  func deletePodAndPVCs(ctx context.Context, config *localTestConfig, pod *v1.Pod) error {
   646  	framework.Logf("Deleting pod %v", pod.Name)
   647  	if err := config.client.CoreV1().Pods(config.ns).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
   648  		return err
   649  	}
   650  
   651  	// Delete PVCs
   652  	for _, vol := range pod.Spec.Volumes {
   653  		pvcSource := vol.VolumeSource.PersistentVolumeClaim
   654  		if pvcSource != nil {
   655  			if err := e2epv.DeletePersistentVolumeClaim(ctx, config.client, pvcSource.ClaimName, config.ns); err != nil {
   656  				return err
   657  			}
   658  		}
   659  	}
   660  	return nil
   661  }
   662  
   663  type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod
   664  
   665  func testPodWithNodeConflict(ctx context.Context, config *localTestConfig, testVol *localTestVolume, nodeName string, makeLocalPodFunc makeLocalPodWith) {
   666  	ginkgo.By(fmt.Sprintf("local-volume-type: %s", testVol.localVolumeType))
   667  
   668  	pod := makeLocalPodFunc(config, testVol, nodeName)
   669  	pod, err := config.client.CoreV1().Pods(config.ns).Create(ctx, pod, metav1.CreateOptions{})
   670  	framework.ExpectNoError(err)
   671  
   672  	err = e2epod.WaitForPodNameUnschedulableInNamespace(ctx, config.client, pod.Name, pod.Namespace)
   673  	framework.ExpectNoError(err)
   674  }
   675  
   676  // The tests below are run against multiple mount point types
   677  
   678  // Test two pods at the same time, write from pod1, and read from pod2
   679  func twoPodsReadWriteTest(ctx context.Context, f *framework.Framework, config *localTestConfig, testVol *localTestVolume) {
   680  	ginkgo.By("Creating pod1 to write to the PV")
   681  	pod1, pod1Err := createLocalPod(ctx, config, testVol, nil)
   682  	framework.ExpectNoError(pod1Err)
   683  	verifyLocalPod(ctx, config, testVol, pod1, config.randomNode.Name)
   684  
   685  	writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
   686  
   687  	ginkgo.By("Writing in pod1")
   688  	podRWCmdExec(f, pod1, writeCmd)
   689  
   690  	// testFileContent was written after creating pod1
   691  	testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
   692  
   693  	ginkgo.By("Creating pod2 to read from the PV")
   694  	pod2, pod2Err := createLocalPod(ctx, config, testVol, nil)
   695  	framework.ExpectNoError(pod2Err)
   696  	verifyLocalPod(ctx, config, testVol, pod2, config.randomNode.Name)
   697  
   698  	// testFileContent was written after creating pod1
   699  	testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
   700  
   701  	writeCmd = createWriteCmd(volumeDir, testFile, testVol.ltr.Path /*writeTestFileContent*/, testVol.localVolumeType)
   702  
   703  	ginkgo.By("Writing in pod2")
   704  	podRWCmdExec(f, pod2, writeCmd)
   705  
   706  	ginkgo.By("Reading in pod1")
   707  	testReadFileContent(f, volumeDir, testFile, testVol.ltr.Path, pod1, testVol.localVolumeType)
   708  
   709  	ginkgo.By("Deleting pod1")
   710  	e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod1.Name)
   711  	ginkgo.By("Deleting pod2")
   712  	e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod2.Name)
   713  }
   714  
   715  // Test two pods one after other, write from pod1, and read from pod2
   716  func twoPodsReadWriteSerialTest(ctx context.Context, f *framework.Framework, config *localTestConfig, testVol *localTestVolume) {
   717  	ginkgo.By("Creating pod1")
   718  	pod1, pod1Err := createLocalPod(ctx, config, testVol, nil)
   719  	framework.ExpectNoError(pod1Err)
   720  	verifyLocalPod(ctx, config, testVol, pod1, config.randomNode.Name)
   721  
   722  	writeCmd := createWriteCmd(volumeDir, testFile, testFileContent, testVol.localVolumeType)
   723  
   724  	ginkgo.By("Writing in pod1")
   725  	podRWCmdExec(f, pod1, writeCmd)
   726  
   727  	// testFileContent was written after creating pod1
   728  	testReadFileContent(f, volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
   729  
   730  	ginkgo.By("Deleting pod1")
   731  	e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod1.Name)
   732  
   733  	ginkgo.By("Creating pod2")
   734  	pod2, pod2Err := createLocalPod(ctx, config, testVol, nil)
   735  	framework.ExpectNoError(pod2Err)
   736  	verifyLocalPod(ctx, config, testVol, pod2, config.randomNode.Name)
   737  
   738  	ginkgo.By("Reading in pod2")
   739  	testReadFileContent(f, volumeDir, testFile, testFileContent, pod2, testVol.localVolumeType)
   740  
   741  	ginkgo.By("Deleting pod2")
   742  	e2epod.DeletePodOrFail(ctx, config.client, config.ns, pod2.Name)
   743  }
   744  
   745  // Test creating pod with fsGroup, and check fsGroup is expected fsGroup.
   746  func createPodWithFsGroupTest(ctx context.Context, config *localTestConfig, testVol *localTestVolume, fsGroup int64, expectedFsGroup int64) *v1.Pod {
   747  	pod, err := createLocalPod(ctx, config, testVol, &fsGroup)
   748  	framework.ExpectNoError(err)
   749  	_, err = e2eoutput.LookForStringInPodExec(config.ns, pod.Name, []string{"stat", "-c", "%g", volumeDir}, strconv.FormatInt(expectedFsGroup, 10), time.Second*3)
   750  	framework.ExpectNoError(err, "failed to get expected fsGroup %d on directory %s in pod %s", fsGroup, volumeDir, pod.Name)
   751  	return pod
   752  }
   753  
   754  func setupStorageClass(ctx context.Context, config *localTestConfig, mode *storagev1.VolumeBindingMode) {
   755  	sc := &storagev1.StorageClass{
   756  		ObjectMeta: metav1.ObjectMeta{
   757  			Name: config.scName,
   758  		},
   759  		Provisioner:       "kubernetes.io/no-provisioner",
   760  		VolumeBindingMode: mode,
   761  	}
   762  
   763  	_, err := config.client.StorageV1().StorageClasses().Create(ctx, sc, metav1.CreateOptions{})
   764  	framework.ExpectNoError(err)
   765  }
   766  
   767  func cleanupStorageClass(ctx context.Context, config *localTestConfig) {
   768  	framework.ExpectNoError(config.client.StorageV1().StorageClasses().Delete(ctx, config.scName, metav1.DeleteOptions{}))
   769  }
   770  
   771  // podNode wraps RunKubectl to get node where pod is running
   772  func podNodeName(ctx context.Context, config *localTestConfig, pod *v1.Pod) (string, error) {
   773  	runtimePod, runtimePodErr := config.client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
   774  	return runtimePod.Spec.NodeName, runtimePodErr
   775  }
   776  
   777  // setupLocalVolumes sets up directories to use for local PV
   778  func setupLocalVolumes(ctx context.Context, config *localTestConfig, localVolumeType localVolumeType, node *v1.Node, count int) []*localTestVolume {
   779  	vols := []*localTestVolume{}
   780  	for i := 0; i < count; i++ {
   781  		ltrType, ok := setupLocalVolumeMap[localVolumeType]
   782  		if !ok {
   783  			framework.Failf("Invalid localVolumeType: %v", localVolumeType)
   784  		}
   785  		ltr := config.ltrMgr.Create(ctx, node, ltrType, nil)
   786  		vols = append(vols, &localTestVolume{
   787  			ltr:             ltr,
   788  			localVolumeType: localVolumeType,
   789  		})
   790  	}
   791  	return vols
   792  }
   793  
   794  func cleanupLocalPVCsPVs(ctx context.Context, config *localTestConfig, volumes []*localTestVolume) {
   795  	for _, volume := range volumes {
   796  		ginkgo.By("Cleaning up PVC and PV")
   797  		errs := e2epv.PVPVCCleanup(ctx, config.client, config.ns, volume.pv, volume.pvc)
   798  		if len(errs) > 0 {
   799  			framework.Failf("Failed to delete PV and/or PVC: %v", utilerrors.NewAggregate(errs))
   800  		}
   801  	}
   802  }
   803  
   804  // Deletes the PVC/PV, and launches a pod with hostpath volume to remove the test directory
   805  func cleanupLocalVolumes(ctx context.Context, config *localTestConfig, volumes []*localTestVolume) {
   806  	cleanupLocalPVCsPVs(ctx, config, volumes)
   807  
   808  	for _, volume := range volumes {
   809  		config.ltrMgr.Remove(ctx, volume.ltr)
   810  	}
   811  }
   812  
   813  func verifyLocalVolume(ctx context.Context, config *localTestConfig, volume *localTestVolume) {
   814  	framework.ExpectNoError(e2epv.WaitOnPVandPVC(ctx, config.client, config.timeouts, config.ns, volume.pv, volume.pvc))
   815  }
   816  
   817  func verifyLocalPod(ctx context.Context, config *localTestConfig, volume *localTestVolume, pod *v1.Pod, expectedNodeName string) {
   818  	podNodeName, err := podNodeName(ctx, config, pod)
   819  	framework.ExpectNoError(err)
   820  	framework.Logf("pod %q created on Node %q", pod.Name, podNodeName)
   821  	gomega.Expect(podNodeName).To(gomega.Equal(expectedNodeName))
   822  }
   823  
   824  func makeLocalPVCConfig(config *localTestConfig, volumeType localVolumeType) e2epv.PersistentVolumeClaimConfig {
   825  	pvcConfig := e2epv.PersistentVolumeClaimConfig{
   826  		AccessModes:      []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
   827  		StorageClassName: &config.scName,
   828  	}
   829  	if volumeType == BlockLocalVolumeType {
   830  		pvcVolumeMode := v1.PersistentVolumeBlock
   831  		pvcConfig.VolumeMode = &pvcVolumeMode
   832  	}
   833  	return pvcConfig
   834  }
   835  
   836  func makeLocalPVConfig(config *localTestConfig, volume *localTestVolume) e2epv.PersistentVolumeConfig {
   837  	// TODO: hostname may not be the best option
   838  	nodeKey := "kubernetes.io/hostname"
   839  	if volume.ltr.Node.Labels == nil {
   840  		framework.Failf("Node does not have labels")
   841  	}
   842  	nodeValue, found := volume.ltr.Node.Labels[nodeKey]
   843  	if !found {
   844  		framework.Failf("Node does not have required label %q", nodeKey)
   845  	}
   846  
   847  	pvConfig := e2epv.PersistentVolumeConfig{
   848  		PVSource: v1.PersistentVolumeSource{
   849  			Local: &v1.LocalVolumeSource{
   850  				Path: volume.ltr.Path,
   851  			},
   852  		},
   853  		NamePrefix:       "local-pv",
   854  		StorageClassName: config.scName,
   855  		NodeAffinity: &v1.VolumeNodeAffinity{
   856  			Required: &v1.NodeSelector{
   857  				NodeSelectorTerms: []v1.NodeSelectorTerm{
   858  					{
   859  						MatchExpressions: []v1.NodeSelectorRequirement{
   860  							{
   861  								Key:      nodeKey,
   862  								Operator: v1.NodeSelectorOpIn,
   863  								Values:   []string{nodeValue},
   864  							},
   865  						},
   866  					},
   867  				},
   868  			},
   869  		},
   870  	}
   871  
   872  	if volume.localVolumeType == BlockLocalVolumeType {
   873  		pvVolumeMode := v1.PersistentVolumeBlock
   874  		pvConfig.VolumeMode = &pvVolumeMode
   875  	}
   876  	return pvConfig
   877  }
   878  
   879  // Creates a PVC and PV with prebinding
   880  func createLocalPVCsPVs(ctx context.Context, config *localTestConfig, volumes []*localTestVolume, mode storagev1.VolumeBindingMode) {
   881  	var err error
   882  
   883  	for _, volume := range volumes {
   884  		pvcConfig := makeLocalPVCConfig(config, volume.localVolumeType)
   885  		pvConfig := makeLocalPVConfig(config, volume)
   886  
   887  		volume.pv, volume.pvc, err = e2epv.CreatePVPVC(ctx, config.client, config.timeouts, pvConfig, pvcConfig, config.ns, false)
   888  		framework.ExpectNoError(err)
   889  	}
   890  
   891  	if mode == storagev1.VolumeBindingImmediate {
   892  		for _, volume := range volumes {
   893  			verifyLocalVolume(ctx, config, volume)
   894  		}
   895  	} else {
   896  		// Verify PVCs are not bound by waiting for phase==bound with a timeout and asserting that we hit the timeout.
   897  		// There isn't really a great way to verify this without making the test be slow...
   898  		const bindTimeout = 10 * time.Second
   899  		waitErr := wait.PollImmediate(time.Second, bindTimeout, func() (done bool, err error) {
   900  			for _, volume := range volumes {
   901  				pvc, err := config.client.CoreV1().PersistentVolumeClaims(volume.pvc.Namespace).Get(ctx, volume.pvc.Name, metav1.GetOptions{})
   902  				if err != nil {
   903  					return false, fmt.Errorf("failed to get PVC %s/%s: %w", volume.pvc.Namespace, volume.pvc.Name, err)
   904  				}
   905  				if pvc.Status.Phase != v1.ClaimPending {
   906  					return true, nil
   907  				}
   908  			}
   909  			return false, nil
   910  		})
   911  		if wait.Interrupted(waitErr) {
   912  			framework.Logf("PVCs were not bound within %v (that's good)", bindTimeout)
   913  			waitErr = nil
   914  		}
   915  		framework.ExpectNoError(waitErr, "Error making sure PVCs are not bound")
   916  	}
   917  }
   918  
   919  func makeLocalPodWithNodeAffinity(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
   920  	affinity := &v1.Affinity{
   921  		NodeAffinity: &v1.NodeAffinity{
   922  			RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
   923  				NodeSelectorTerms: []v1.NodeSelectorTerm{
   924  					{
   925  						MatchExpressions: []v1.NodeSelectorRequirement{
   926  							{
   927  								Key:      "kubernetes.io/hostname",
   928  								Operator: v1.NodeSelectorOpIn,
   929  								Values:   []string{nodeName},
   930  							},
   931  						},
   932  					},
   933  				},
   934  			},
   935  		},
   936  	}
   937  	podConfig := e2epod.Config{
   938  		NS:            config.ns,
   939  		PVCs:          []*v1.PersistentVolumeClaim{volume.pvc},
   940  		SeLinuxLabel:  selinuxLabel,
   941  		NodeSelection: e2epod.NodeSelection{Affinity: affinity},
   942  	}
   943  	pod, err := e2epod.MakeSecPod(&podConfig)
   944  	if pod == nil || err != nil {
   945  		return
   946  	}
   947  	pod.Spec.Affinity = affinity
   948  	return
   949  }
   950  
   951  func makeLocalPodWithNodeSelector(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
   952  	ns := map[string]string{
   953  		"kubernetes.io/hostname": nodeName,
   954  	}
   955  	podConfig := e2epod.Config{
   956  		NS:            config.ns,
   957  		PVCs:          []*v1.PersistentVolumeClaim{volume.pvc},
   958  		SeLinuxLabel:  selinuxLabel,
   959  		NodeSelection: e2epod.NodeSelection{Selector: ns},
   960  	}
   961  	pod, err := e2epod.MakeSecPod(&podConfig)
   962  	if pod == nil || err != nil {
   963  		return
   964  	}
   965  	return
   966  }
   967  
   968  func makeLocalPodWithNodeName(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
   969  	podConfig := e2epod.Config{
   970  		NS:           config.ns,
   971  		PVCs:         []*v1.PersistentVolumeClaim{volume.pvc},
   972  		SeLinuxLabel: selinuxLabel,
   973  	}
   974  	pod, err := e2epod.MakeSecPod(&podConfig)
   975  	if pod == nil || err != nil {
   976  		return
   977  	}
   978  
   979  	e2epod.SetNodeAffinity(&pod.Spec, nodeName)
   980  	return
   981  }
   982  
   983  func createLocalPod(ctx context.Context, config *localTestConfig, volume *localTestVolume, fsGroup *int64) (*v1.Pod, error) {
   984  	ginkgo.By("Creating a pod")
   985  	podConfig := e2epod.Config{
   986  		NS:           config.ns,
   987  		PVCs:         []*v1.PersistentVolumeClaim{volume.pvc},
   988  		SeLinuxLabel: selinuxLabel,
   989  		FsGroup:      fsGroup,
   990  	}
   991  	return e2epod.CreateSecPod(ctx, config.client, &podConfig, config.timeouts.PodStart)
   992  }
   993  
   994  func createWriteCmd(testDir string, testFile string, writeTestFileContent string, volumeType localVolumeType) string {
   995  	if volumeType == BlockLocalVolumeType {
   996  		// testDir is the block device.
   997  		testFileDir := filepath.Join("/tmp", testDir)
   998  		testFilePath := filepath.Join(testFileDir, testFile)
   999  		// Create a file containing the testFileContent.
  1000  		writeTestFileCmd := fmt.Sprintf("mkdir -p %s; echo %s > %s", testFileDir, writeTestFileContent, testFilePath)
  1001  		// sudo is needed when using ssh exec to node.
  1002  		// sudo is not needed and does not exist in some containers (e.g. busybox), when using pod exec.
  1003  		sudoCmd := fmt.Sprintf("SUDO_CMD=$(which sudo); echo ${SUDO_CMD}")
  1004  		// Write the testFileContent into the block device.
  1005  		writeBlockCmd := fmt.Sprintf("${SUDO_CMD} dd if=%s of=%s bs=512 count=100", testFilePath, testDir)
  1006  		// Cleanup the file containing testFileContent.
  1007  		deleteTestFileCmd := fmt.Sprintf("rm %s", testFilePath)
  1008  		return fmt.Sprintf("%s && %s && %s && %s", writeTestFileCmd, sudoCmd, writeBlockCmd, deleteTestFileCmd)
  1009  	}
  1010  	testFilePath := filepath.Join(testDir, testFile)
  1011  	return fmt.Sprintf("mkdir -p %s; echo %s > %s", testDir, writeTestFileContent, testFilePath)
  1012  }
  1013  
  1014  func createReadCmd(testFileDir string, testFile string, volumeType localVolumeType) string {
  1015  	if volumeType == BlockLocalVolumeType {
  1016  		// Create the command to read the beginning of the block device and print it in ascii.
  1017  		return fmt.Sprintf("hexdump -n 100 -e '100 \"%%_p\"' %s | head -1", testFileDir)
  1018  	}
  1019  	// Create the command to read (aka cat) a file.
  1020  	testFilePath := filepath.Join(testFileDir, testFile)
  1021  	return fmt.Sprintf("cat %s", testFilePath)
  1022  }
  1023  
  1024  // Read testFile and evaluate whether it contains the testFileContent
  1025  func testReadFileContent(f *framework.Framework, testFileDir string, testFile string, testFileContent string, pod *v1.Pod, volumeType localVolumeType) {
  1026  	readCmd := createReadCmd(testFileDir, testFile, volumeType)
  1027  	readOut := podRWCmdExec(f, pod, readCmd)
  1028  	gomega.Expect(readOut).To(gomega.ContainSubstring(testFileContent))
  1029  }
  1030  
  1031  // Execute a read or write command in a pod.
  1032  // Fail on error
  1033  func podRWCmdExec(f *framework.Framework, pod *v1.Pod, cmd string) string {
  1034  	stdout, stderr, err := e2evolume.PodExec(f, pod, cmd)
  1035  	framework.Logf("podRWCmdExec cmd: %q, out: %q, stderr: %q, err: %v", cmd, stdout, stderr, err)
  1036  	framework.ExpectNoError(err)
  1037  	return stdout
  1038  }
  1039  
  1040  // Initialize test volume on node
  1041  // and create local PVC and PV
  1042  func setupLocalVolumesPVCsPVs(
  1043  	ctx context.Context,
  1044  	config *localTestConfig,
  1045  	localVolumeType localVolumeType,
  1046  	node *v1.Node,
  1047  	count int,
  1048  	mode storagev1.VolumeBindingMode) []*localTestVolume {
  1049  
  1050  	ginkgo.By("Initializing test volumes")
  1051  	testVols := setupLocalVolumes(ctx, config, localVolumeType, node, count)
  1052  
  1053  	ginkgo.By("Creating local PVCs and PVs")
  1054  	createLocalPVCsPVs(ctx, config, testVols, mode)
  1055  
  1056  	return testVols
  1057  }
  1058  
  1059  // newLocalClaim creates a new persistent volume claim.
  1060  func newLocalClaimWithName(config *localTestConfig, name string) *v1.PersistentVolumeClaim {
  1061  	claim := v1.PersistentVolumeClaim{
  1062  		ObjectMeta: metav1.ObjectMeta{
  1063  			Name:      name,
  1064  			Namespace: config.ns,
  1065  		},
  1066  		Spec: v1.PersistentVolumeClaimSpec{
  1067  			StorageClassName: &config.scName,
  1068  			AccessModes: []v1.PersistentVolumeAccessMode{
  1069  				v1.ReadWriteOnce,
  1070  			},
  1071  			Resources: v1.VolumeResourceRequirements{
  1072  				Requests: v1.ResourceList{
  1073  					v1.ResourceName(v1.ResourceStorage): resource.MustParse(testRequestSize),
  1074  				},
  1075  			},
  1076  		},
  1077  	}
  1078  
  1079  	return &claim
  1080  }
  1081  
  1082  func createStatefulSet(ctx context.Context, config *localTestConfig, ssReplicas int32, volumeCount int, anti, parallel bool) *appsv1.StatefulSet {
  1083  	mounts := []v1.VolumeMount{}
  1084  	claims := []v1.PersistentVolumeClaim{}
  1085  	for i := 0; i < volumeCount; i++ {
  1086  		name := fmt.Sprintf("vol%v", i+1)
  1087  		pvc := newLocalClaimWithName(config, name)
  1088  		mounts = append(mounts, v1.VolumeMount{Name: name, MountPath: "/" + name})
  1089  		claims = append(claims, *pvc)
  1090  	}
  1091  
  1092  	podAffinityTerms := []v1.PodAffinityTerm{
  1093  		{
  1094  			LabelSelector: &metav1.LabelSelector{
  1095  				MatchExpressions: []metav1.LabelSelectorRequirement{
  1096  					{
  1097  						Key:      "app",
  1098  						Operator: metav1.LabelSelectorOpIn,
  1099  						Values:   []string{"local-volume-test"},
  1100  					},
  1101  				},
  1102  			},
  1103  			TopologyKey: "kubernetes.io/hostname",
  1104  		},
  1105  	}
  1106  
  1107  	affinity := v1.Affinity{}
  1108  	if anti {
  1109  		affinity.PodAntiAffinity = &v1.PodAntiAffinity{
  1110  			RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
  1111  		}
  1112  	} else {
  1113  		affinity.PodAffinity = &v1.PodAffinity{
  1114  			RequiredDuringSchedulingIgnoredDuringExecution: podAffinityTerms,
  1115  		}
  1116  	}
  1117  
  1118  	labels := map[string]string{"app": "local-volume-test"}
  1119  	spec := &appsv1.StatefulSet{
  1120  		ObjectMeta: metav1.ObjectMeta{
  1121  			Name:      "local-volume-statefulset",
  1122  			Namespace: config.ns,
  1123  		},
  1124  		Spec: appsv1.StatefulSetSpec{
  1125  			Selector: &metav1.LabelSelector{
  1126  				MatchLabels: map[string]string{"app": "local-volume-test"},
  1127  			},
  1128  			Replicas: &ssReplicas,
  1129  			Template: v1.PodTemplateSpec{
  1130  				ObjectMeta: metav1.ObjectMeta{
  1131  					Labels: labels,
  1132  				},
  1133  				Spec: v1.PodSpec{
  1134  					Containers: []v1.Container{
  1135  						{
  1136  							Name:         "nginx",
  1137  							Image:        imageutils.GetE2EImage(imageutils.Nginx),
  1138  							VolumeMounts: mounts,
  1139  						},
  1140  					},
  1141  					Affinity: &affinity,
  1142  				},
  1143  			},
  1144  			VolumeClaimTemplates: claims,
  1145  			ServiceName:          "test-service",
  1146  		},
  1147  	}
  1148  
  1149  	if parallel {
  1150  		spec.Spec.PodManagementPolicy = appsv1.ParallelPodManagement
  1151  	}
  1152  
  1153  	ss, err := config.client.AppsV1().StatefulSets(config.ns).Create(ctx, spec, metav1.CreateOptions{})
  1154  	framework.ExpectNoError(err)
  1155  
  1156  	e2estatefulset.WaitForRunningAndReady(ctx, config.client, ssReplicas, ss)
  1157  	return ss
  1158  }
  1159  
  1160  func validateStatefulSet(ctx context.Context, config *localTestConfig, ss *appsv1.StatefulSet, anti bool) {
  1161  	pods := e2estatefulset.GetPodList(ctx, config.client, ss)
  1162  
  1163  	nodes := sets.NewString()
  1164  	for _, pod := range pods.Items {
  1165  		nodes.Insert(pod.Spec.NodeName)
  1166  	}
  1167  
  1168  	if anti {
  1169  		// Verify that each pod is on a different node
  1170  		gomega.Expect(pods.Items).To(gomega.HaveLen(nodes.Len()))
  1171  	} else {
  1172  		// Verify that all pods are on same node.
  1173  		gomega.Expect(nodes.Len()).To(gomega.Equal(1))
  1174  	}
  1175  
  1176  	// Validate all PVCs are bound
  1177  	for _, pod := range pods.Items {
  1178  		for _, volume := range pod.Spec.Volumes {
  1179  			pvcSource := volume.VolumeSource.PersistentVolumeClaim
  1180  			if pvcSource != nil {
  1181  				err := e2epv.WaitForPersistentVolumeClaimPhase(ctx,
  1182  					v1.ClaimBound, config.client, config.ns, pvcSource.ClaimName, framework.Poll, time.Second)
  1183  				framework.ExpectNoError(err)
  1184  			}
  1185  		}
  1186  	}
  1187  }
  1188  
  1189  // SkipUnlessLocalSSDExists takes in an ssdInterface (scsi/nvme) and a filesystemType (fs/block)
  1190  // and skips if a disk of that type does not exist on the node
  1191  func SkipUnlessLocalSSDExists(ctx context.Context, config *localTestConfig, ssdInterface, filesystemType string, node *v1.Node) {
  1192  	ssdCmd := fmt.Sprintf("ls -1 /mnt/disks/by-uuid/google-local-ssds-%s-%s/ | wc -l", ssdInterface, filesystemType)
  1193  	res, err := config.hostExec.Execute(ctx, ssdCmd, node)
  1194  	utils.LogResult(res)
  1195  	framework.ExpectNoError(err)
  1196  	num, err := strconv.Atoi(strings.TrimSpace(res.Stdout))
  1197  	framework.ExpectNoError(err)
  1198  	if num < 1 {
  1199  		e2eskipper.Skipf("Requires at least 1 %s %s localSSD ", ssdInterface, filesystemType)
  1200  	}
  1201  }
  1202  

View as plain text