
Source file src/k8s.io/kubernetes/test/e2e/storage/csi_mock/base.go

Documentation: k8s.io/kubernetes/test/e2e/storage/csi_mock

    17  package csi_mock
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"reflect"
    24  	"strconv"
    25  	"strings"
    26  	"sync/atomic"
    27  	"time"
    29  	csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
    30  	"github.com/onsi/ginkgo/v2"
    31  	"google.golang.org/grpc/codes"
    32  	v1 "k8s.io/api/core/v1"
    33  	storagev1 "k8s.io/api/storage/v1"
    34  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    37  	"k8s.io/apimachinery/pkg/fields"
    38  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    39  	"k8s.io/apimachinery/pkg/util/sets"
    40  	"k8s.io/apimachinery/pkg/util/wait"
    41  	clientset "k8s.io/client-go/kubernetes"
    42  	"k8s.io/kubernetes/test/e2e/framework"
    43  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    44  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    45  	"k8s.io/kubernetes/test/e2e/storage/drivers"
    46  	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
    47  	"k8s.io/kubernetes/test/e2e/storage/testsuites"
    48  	"k8s.io/kubernetes/test/e2e/storage/utils"
    49  	"k8s.io/kubernetes/test/utils/format"
    50  	imageutils "k8s.io/kubernetes/test/utils/image"
    51  )
    53  const (
    54  	csiNodeLimitUpdateTimeout  = 5 * time.Minute
    55  	csiPodUnschedulableTimeout = 5 * time.Minute
    56  	csiResizeWaitPeriod        = 5 * time.Minute
    57  	csiVolumeAttachmentTimeout = 7 * time.Minute
    58  	// how long to wait for Resizing Condition on PVC to appear
    59  	csiResizingConditionWait = 2 * time.Minute
    61  	// Time for starting a pod with a volume.
    62  	csiPodRunningTimeout = 5 * time.Minute
    64  	// How log to wait for kubelet to unstage a volume after a pod is deleted
    65  	csiUnstageWaitTimeout = 1 * time.Minute
    66  )
    68  // csiCall represents an expected call from Kubernetes to CSI mock driver and
    69  // expected return value.
    70  // When matching expected csiCall with a real CSI mock driver output, one csiCall
    71  // matches *one or more* calls with the same method and error code.
    72  // This is due to exponential backoff in Kubernetes, where the test cannot expect
    73  // exact number of call repetitions.
    74  type csiCall struct {
    75  	expectedMethod string
    76  	expectedError  codes.Code
    77  	expectedSecret map[string]string
    78  	// This is a mark for the test itself to delete the tested pod *after*
    79  	// this csiCall is received.
    80  	deletePod bool
    81  }
    83  type testParameters struct {
    84  	disableAttach       bool
    85  	attachLimit         int
    86  	registerDriver      bool
    87  	lateBinding         bool
    88  	enableTopology      bool
    89  	podInfo             *bool
    90  	storageCapacity     *bool
    91  	scName              string // pre-selected storage class name; must be unique in the cluster
    92  	enableResizing      bool   // enable resizing for both CSI mock driver and storageClass.
    93  	enableNodeExpansion bool   // enable node expansion for CSI mock driver
    94  	// just disable resizing on driver it overrides enableResizing flag for CSI mock driver
    95  	disableResizingOnDriver       bool
    96  	enableSnapshot                bool
    97  	enableVolumeMountGroup        bool // enable the VOLUME_MOUNT_GROUP node capability in the CSI mock driver.
    98  	hooks                         *drivers.Hooks
    99  	tokenRequests                 []storagev1.TokenRequest
   100  	requiresRepublish             *bool
   101  	fsGroupPolicy                 *storagev1.FSGroupPolicy
   102  	enableSELinuxMount            *bool
   103  	enableRecoverExpansionFailure bool
   104  	enableCSINodeExpandSecret     bool
   105  }
   107  type mockDriverSetup struct {
   108  	cs          clientset.Interface
   109  	config      *storageframework.PerTestConfig
   110  	pods        []*v1.Pod
   111  	pvcs        []*v1.PersistentVolumeClaim
   112  	sc          map[string]*storagev1.StorageClass
   113  	vsc         map[string]*unstructured.Unstructured
   114  	driver      drivers.MockCSITestDriver
   115  	provisioner string
   116  	tp          testParameters
   117  	f           *framework.Framework
   118  }
   120  type volumeType string
   122  var (
   123  	csiEphemeral     = volumeType("CSI")
   124  	genericEphemeral = volumeType("Ephemeral")
   125  	pvcReference     = volumeType("PVC")
   126  )
   128  const (
   129  	poll                           = 2 * time.Second
   130  	pvcAsSourceProtectionFinalizer = "snapshot.storage.kubernetes.io/pvc-as-source-protection"
   131  	volumeSnapshotContentFinalizer = "snapshot.storage.kubernetes.io/volumesnapshotcontent-bound-protection"
   132  	volumeSnapshotBoundFinalizer   = "snapshot.storage.kubernetes.io/volumesnapshot-bound-protection"
   133  	errReasonNotEnoughSpace        = "node(s) did not have enough free storage"
   135  	csiNodeExpandSecretKey          = "csi.storage.k8s.io/node-expand-secret-name"
   136  	csiNodeExpandSecretNamespaceKey = "csi.storage.k8s.io/node-expand-secret-namespace"
   137  )
   139  var (
   140  	errPodCompleted   = fmt.Errorf("pod ran to completion")
   141  	errNotEnoughSpace = errors.New(errReasonNotEnoughSpace)
   142  )
   144  func newMockDriverSetup(f *framework.Framework) *mockDriverSetup {
   145  	return &mockDriverSetup{
   146  		cs:  f.ClientSet,
   147  		sc:  make(map[string]*storagev1.StorageClass),
   148  		vsc: make(map[string]*unstructured.Unstructured),
   149  		f:   f,
   150  	}
   151  }
   153  func (m *mockDriverSetup) init(ctx context.Context, tp testParameters) {
   154  	m.cs = m.f.ClientSet
   155  	m.tp = tp
   157  	var err error
   158  	driverOpts := drivers.CSIMockDriverOpts{
   159  		RegisterDriver:                tp.registerDriver,
   160  		PodInfo:                       tp.podInfo,
   161  		StorageCapacity:               tp.storageCapacity,
   162  		EnableTopology:                tp.enableTopology,
   163  		AttachLimit:                   tp.attachLimit,
   164  		DisableAttach:                 tp.disableAttach,
   165  		EnableResizing:                tp.enableResizing,
   166  		EnableNodeExpansion:           tp.enableNodeExpansion,
   167  		EnableSnapshot:                tp.enableSnapshot,
   168  		EnableVolumeMountGroup:        tp.enableVolumeMountGroup,
   169  		TokenRequests:                 tp.tokenRequests,
   170  		RequiresRepublish:             tp.requiresRepublish,
   171  		FSGroupPolicy:                 tp.fsGroupPolicy,
   172  		EnableSELinuxMount:            tp.enableSELinuxMount,
   173  		EnableRecoverExpansionFailure: tp.enableRecoverExpansionFailure,
   174  	}
   176  	// At the moment, only tests which need hooks are
   177  	// using the embedded CSI mock driver. The rest run
   178  	// the driver inside the cluster although they could
   179  	// changed to use embedding merely by setting
   180  	// driverOpts.embedded to true.
   181  	//
   182  	// Not enabling it for all tests minimizes
   183  	// the risk that the introduction of embedded breaks
   184  	// some existings tests and avoids a dependency
   185  	// on port forwarding, which is important if some of
   186  	// these tests are supposed to become part of
   187  	// conformance testing (port forwarding isn't
   188  	// currently required).
   189  	if tp.hooks != nil {
   190  		driverOpts.Embedded = true
   191  		driverOpts.Hooks = *tp.hooks
   192  	}
   194  	// this just disable resizing on driver, keeping resizing on SC enabled.
   195  	if tp.disableResizingOnDriver {
   196  		driverOpts.EnableResizing = false
   197  	}
   199  	m.driver = drivers.InitMockCSIDriver(driverOpts)
   200  	config := m.driver.PrepareTest(ctx, m.f)
   201  	m.config = config
   202  	m.provisioner = config.GetUniqueDriverName()
   204  	if tp.registerDriver {
   205  		err = waitForCSIDriver(m.cs, m.config.GetUniqueDriverName())
   206  		framework.ExpectNoError(err, "Failed to get CSIDriver %v", m.config.GetUniqueDriverName())
   207  		ginkgo.DeferCleanup(destroyCSIDriver, m.cs, m.config.GetUniqueDriverName())
   208  	}
   210  	// Wait for the CSIDriver actually get deployed and CSINode object to be generated.
   211  	// This indicates the mock CSI driver pod is up and running healthy.
   212  	err = drivers.WaitForCSIDriverRegistrationOnNode(ctx, m.config.ClientNodeSelection.Name, m.config.GetUniqueDriverName(), m.cs)
   213  	framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName())
   214  }
   216  func (m *mockDriverSetup) cleanup(ctx context.Context) {
   217  	cs := m.f.ClientSet
   218  	var errs []error
   220  	for _, pod := range m.pods {
   221  		ginkgo.By(fmt.Sprintf("Deleting pod %s", pod.Name))
   222  		errs = append(errs, e2epod.DeletePodWithWait(ctx, cs, pod))
   223  	}
   225  	for _, claim := range m.pvcs {
   226  		ginkgo.By(fmt.Sprintf("Deleting claim %s", claim.Name))
   227  		claim, err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(context.TODO(), claim.Name, metav1.GetOptions{})
   228  		if err == nil {
   229  			if err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(context.TODO(), claim.Name, metav1.DeleteOptions{}); err != nil {
   230  				errs = append(errs, err)
   231  			}
   232  			if claim.Spec.VolumeName != "" {
   233  				errs = append(errs, e2epv.WaitForPersistentVolumeDeleted(ctx, cs, claim.Spec.VolumeName, framework.Poll, 2*time.Minute))
   234  			}
   235  		}
   236  	}
   238  	for _, sc := range m.sc {
   239  		ginkgo.By(fmt.Sprintf("Deleting storageclass %s", sc.Name))
   240  		cs.StorageV1().StorageClasses().Delete(context.TODO(), sc.Name, metav1.DeleteOptions{})
   241  	}
   243  	for _, vsc := range m.vsc {
   244  		ginkgo.By(fmt.Sprintf("Deleting volumesnapshotclass %s", vsc.GetName()))
   245  		m.config.Framework.DynamicClient.Resource(utils.SnapshotClassGVR).Delete(context.TODO(), vsc.GetName(), metav1.DeleteOptions{})
   246  	}
   248  	err := utilerrors.NewAggregate(errs)
   249  	framework.ExpectNoError(err, "while cleaning up after test")
   250  }
   252  func (m *mockDriverSetup) update(o utils.PatchCSIOptions) {
   253  	item, err := m.cs.StorageV1().CSIDrivers().Get(context.TODO(), m.config.GetUniqueDriverName(), metav1.GetOptions{})
   254  	framework.ExpectNoError(err, "Failed to get CSIDriver %v", m.config.GetUniqueDriverName())
   256  	err = utils.PatchCSIDeployment(nil, o, item)
   257  	framework.ExpectNoError(err, "Failed to apply %v to CSIDriver object %v", o, m.config.GetUniqueDriverName())
   259  	_, err = m.cs.StorageV1().CSIDrivers().Update(context.TODO(), item, metav1.UpdateOptions{})
   260  	framework.ExpectNoError(err, "Failed to update CSIDriver %v", m.config.GetUniqueDriverName())
   261  }
   263  func (m *mockDriverSetup) createPod(ctx context.Context, withVolume volumeType) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) {
   264  	ginkgo.By("Creating pod")
   265  	f := m.f
   267  	sc := m.driver.GetDynamicProvisionStorageClass(ctx, m.config, "")
   268  	if m.tp.enableCSINodeExpandSecret {
   269  		if sc.Parameters == nil {
   270  			parameters := map[string]string{
   271  				csiNodeExpandSecretKey:          "test-secret",
   272  				csiNodeExpandSecretNamespaceKey: f.Namespace.Name,
   273  			}
   274  			sc.Parameters = parameters
   275  		} else {
   276  			sc.Parameters[csiNodeExpandSecretKey] = "test-secret"
   277  			sc.Parameters[csiNodeExpandSecretNamespaceKey] = f.Namespace.Name
   278  		}
   279  	}
   280  	scTest := testsuites.StorageClassTest{
   281  		Name:                 m.driver.GetDriverInfo().Name,
   282  		Timeouts:             f.Timeouts,
   283  		Provisioner:          sc.Provisioner,
   284  		Parameters:           sc.Parameters,
   285  		ClaimSize:            "1Gi",
   286  		ExpectedSize:         "1Gi",
   287  		DelayBinding:         m.tp.lateBinding,
   288  		AllowVolumeExpansion: m.tp.enableResizing,
   289  	}
   291  	// The mock driver only works when everything runs on a single node.
   292  	nodeSelection := m.config.ClientNodeSelection
   293  	switch withVolume {
   294  	case csiEphemeral:
   295  		pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name)
   296  	case genericEphemeral:
   297  		class, pod = startPausePodGenericEphemeral(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
   298  		if class != nil {
   299  			m.sc[class.Name] = class
   300  		}
   301  		claim = &v1.PersistentVolumeClaim{
   302  			ObjectMeta: metav1.ObjectMeta{
   303  				Name:      pod.Name + "-" + pod.Spec.Volumes[0].Name,
   304  				Namespace: f.Namespace.Name,
   305  			},
   306  		}
   307  	case pvcReference:
   308  		class, claim, pod = startPausePod(ctx, f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
   309  		if class != nil {
   310  			m.sc[class.Name] = class
   311  		}
   312  		if claim != nil {
   313  			m.pvcs = append(m.pvcs, claim)
   314  		}
   315  	}
   316  	if pod != nil {
   317  		m.pods = append(m.pods, pod)
   318  	}
   319  	return // result variables set above
   320  }
   322  func (m *mockDriverSetup) createPodWithPVC(pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) {
   323  	f := m.f
   325  	nodeSelection := m.config.ClientNodeSelection
   326  	pod, err := startPausePodWithClaim(m.cs, pvc, nodeSelection, f.Namespace.Name)
   327  	if pod != nil {
   328  		m.pods = append(m.pods, pod)
   329  	}
   330  	return pod, err
   331  }
   333  func (m *mockDriverSetup) createPodWithFSGroup(ctx context.Context, fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
   334  	f := m.f
   336  	ginkgo.By("Creating pod with fsGroup")
   337  	nodeSelection := m.config.ClientNodeSelection
   338  	sc := m.driver.GetDynamicProvisionStorageClass(ctx, m.config, "")
   339  	scTest := testsuites.StorageClassTest{
   340  		Name:                 m.driver.GetDriverInfo().Name,
   341  		Provisioner:          sc.Provisioner,
   342  		Parameters:           sc.Parameters,
   343  		ClaimSize:            "1Gi",
   344  		ExpectedSize:         "1Gi",
   345  		DelayBinding:         m.tp.lateBinding,
   346  		AllowVolumeExpansion: m.tp.enableResizing,
   347  	}
   348  	class, claim, pod := startBusyBoxPod(ctx, f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name, fsGroup)
   350  	if class != nil {
   351  		m.sc[class.Name] = class
   352  	}
   353  	if claim != nil {
   354  		m.pvcs = append(m.pvcs, claim)
   355  	}
   357  	if pod != nil {
   358  		m.pods = append(m.pods, pod)
   359  	}
   361  	return class, claim, pod
   362  }
   364  func (m *mockDriverSetup) createPodWithSELinux(ctx context.Context, accessModes []v1.PersistentVolumeAccessMode, mountOptions []string, seLinuxOpts *v1.SELinuxOptions) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
   365  	ginkgo.By("Creating pod with SELinux context")
   366  	f := m.f
   367  	nodeSelection := m.config.ClientNodeSelection
   368  	sc := m.driver.GetDynamicProvisionStorageClass(ctx, m.config, "")
   369  	scTest := testsuites.StorageClassTest{
   370  		Name:                 m.driver.GetDriverInfo().Name,
   371  		Provisioner:          sc.Provisioner,
   372  		Parameters:           sc.Parameters,
   373  		ClaimSize:            "1Gi",
   374  		ExpectedSize:         "1Gi",
   375  		DelayBinding:         m.tp.lateBinding,
   376  		AllowVolumeExpansion: m.tp.enableResizing,
   377  		MountOptions:         mountOptions,
   378  	}
   379  	class, claim := createClaim(ctx, f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name, accessModes)
   380  	pod, err := startPausePodWithSELinuxOptions(f.ClientSet, claim, nodeSelection, f.Namespace.Name, seLinuxOpts)
   381  	framework.ExpectNoError(err, "Failed to create pause pod with SELinux context %s: %v", seLinuxOpts, err)
   383  	if class != nil {
   384  		m.sc[class.Name] = class
   385  	}
   386  	if claim != nil {
   387  		m.pvcs = append(m.pvcs, claim)
   388  	}
   390  	if pod != nil {
   391  		m.pods = append(m.pods, pod)
   392  	}
   394  	return class, claim, pod
   395  }
   397  func waitForCSIDriver(cs clientset.Interface, driverName string) error {
   398  	timeout := 4 * time.Minute
   400  	framework.Logf("waiting up to %v for CSIDriver %q", timeout, driverName)
   401  	for start := time.Now(); time.Since(start) < timeout; time.Sleep(framework.Poll) {
   402  		_, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{})
   403  		if !apierrors.IsNotFound(err) {
   404  			return err
   405  		}
   406  	}
   407  	return fmt.Errorf("gave up after waiting %v for CSIDriver %q", timeout, driverName)
   408  }
   410  func destroyCSIDriver(cs clientset.Interface, driverName string) {
   411  	driverGet, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{})
   412  	if err == nil {
   413  		framework.Logf("deleting %s.%s: %s", driverGet.TypeMeta.APIVersion, driverGet.TypeMeta.Kind, driverGet.ObjectMeta.Name)
   414  		// Uncomment the following line to get full dump of CSIDriver object
   415  		// framework.Logf("%s", framework.PrettyPrint(driverGet))
   416  		cs.StorageV1().CSIDrivers().Delete(context.TODO(), driverName, metav1.DeleteOptions{})
   417  	}
   418  }
   420  func newStorageClass(t testsuites.StorageClassTest, ns string, prefix string) *storagev1.StorageClass {
   421  	pluginName := t.Provisioner
   422  	if pluginName == "" {
   423  		pluginName = getDefaultPluginName()
   424  	}
   425  	if prefix == "" {
   426  		prefix = "sc"
   427  	}
   428  	bindingMode := storagev1.VolumeBindingImmediate
   429  	if t.DelayBinding {
   430  		bindingMode = storagev1.VolumeBindingWaitForFirstConsumer
   431  	}
   432  	if t.Parameters == nil {
   433  		t.Parameters = make(map[string]string)
   434  	}
   436  	if framework.NodeOSDistroIs("windows") {
   437  		// fstype might be forced from outside, in that case skip setting a default
   438  		if _, exists := t.Parameters["fstype"]; !exists {
   439  			t.Parameters["fstype"] = e2epv.GetDefaultFSType()
   440  			framework.Logf("settings a default fsType=%s in the storage class", t.Parameters["fstype"])
   441  		}
   442  	}
   444  	sc := getStorageClass(pluginName, t.Parameters, &bindingMode, t.MountOptions, ns, prefix)
   445  	if t.AllowVolumeExpansion {
   446  		sc.AllowVolumeExpansion = &t.AllowVolumeExpansion
   447  	}
   448  	return sc
   449  }
   451  func getStorageClass(
   452  	provisioner string,
   453  	parameters map[string]string,
   454  	bindingMode *storagev1.VolumeBindingMode,
   455  	mountOptions []string,
   456  	ns string,
   457  	prefix string,
   458  ) *storagev1.StorageClass {
   459  	if bindingMode == nil {
   460  		defaultBindingMode := storagev1.VolumeBindingImmediate
   461  		bindingMode = &defaultBindingMode
   462  	}
   463  	return &storagev1.StorageClass{
   464  		TypeMeta: metav1.TypeMeta{
   465  			Kind: "StorageClass",
   466  		},
   467  		ObjectMeta: metav1.ObjectMeta{
   468  			// Name must be unique, so let's base it on namespace name and the prefix (the prefix is test specific)
   469  			GenerateName: ns + "-" + prefix,
   470  		},
   471  		Provisioner:       provisioner,
   472  		Parameters:        parameters,
   473  		VolumeBindingMode: bindingMode,
   474  		MountOptions:      mountOptions,
   475  	}
   476  }
   478  func getDefaultPluginName() string {
   479  	switch {
   480  	case framework.ProviderIs("gke"), framework.ProviderIs("gce"):
   481  		return "kubernetes.io/gce-pd"
   482  	case framework.ProviderIs("aws"):
   483  		return "kubernetes.io/aws-ebs"
   484  	case framework.ProviderIs("vsphere"):
   485  		return "kubernetes.io/vsphere-volume"
   486  	case framework.ProviderIs("azure"):
   487  		return "kubernetes.io/azure-disk"
   488  	}
   489  	return ""
   490  }
   492  func createSC(cs clientset.Interface, t testsuites.StorageClassTest, scName, ns string) *storagev1.StorageClass {
   493  	class := newStorageClass(t, ns, "")
   494  	if scName != "" {
   495  		class.Name = scName
   496  	}
   497  	var err error
   498  	_, err = cs.StorageV1().StorageClasses().Get(context.TODO(), class.Name, metav1.GetOptions{})
   499  	if err != nil {
   500  		class, err = cs.StorageV1().StorageClasses().Create(context.TODO(), class, metav1.CreateOptions{})
   501  		framework.ExpectNoError(err, "Failed to create class: %v", err)
   502  	}
   504  	return class
   505  }
   507  func createClaim(ctx context.Context, cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string, accessModes []v1.PersistentVolumeAccessMode) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) {
   508  	class := createSC(cs, t, scName, ns)
   509  	claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   510  		ClaimSize:        t.ClaimSize,
   511  		StorageClassName: &(class.Name),
   512  		VolumeMode:       &t.VolumeMode,
   513  		AccessModes:      accessModes,
   514  	}, ns)
   515  	claim, err := cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{})
   516  	framework.ExpectNoError(err, "Failed to create claim: %v", err)
   518  	if !t.DelayBinding {
   519  		pvcClaims := []*v1.PersistentVolumeClaim{claim}
   520  		_, err = e2epv.WaitForPVClaimBoundPhase(ctx, cs, pvcClaims, framework.ClaimProvisionTimeout)
   521  		framework.ExpectNoError(err, "Failed waiting for PVC to be bound: %v", err)
   522  	}
   523  	return class, claim
   524  }
   526  func startPausePod(ctx context.Context, cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
   527  	class, claim := createClaim(ctx, cs, t, node, scName, ns, nil)
   529  	pod, err := startPausePodWithClaim(cs, claim, node, ns)
   530  	framework.ExpectNoError(err, "Failed to create pause pod: %v", err)
   531  	return class, claim, pod
   532  }
   534  func startBusyBoxPod(ctx context.Context, cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string, fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
   535  	class, claim := createClaim(ctx, cs, t, node, scName, ns, nil)
   536  	pod, err := startBusyBoxPodWithClaim(cs, claim, node, ns, fsGroup)
   537  	framework.ExpectNoError(err, "Failed to create busybox pod: %v", err)
   538  	return class, claim, pod
   539  }
   541  func startPausePodInline(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, ns string) *v1.Pod {
   542  	pod, err := startPausePodWithInlineVolume(cs,
   543  		&v1.CSIVolumeSource{
   544  			Driver: t.Provisioner,
   545  		},
   546  		node, ns)
   547  	framework.ExpectNoError(err, "Failed to create pod: %v", err)
   548  	return pod
   549  }
   551  func startPausePodGenericEphemeral(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.Pod) {
   552  	class := createSC(cs, t, scName, ns)
   553  	claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   554  		ClaimSize:        t.ClaimSize,
   555  		StorageClassName: &(class.Name),
   556  		VolumeMode:       &t.VolumeMode,
   557  	}, ns)
   558  	pod, err := startPausePodWithVolumeSource(cs, v1.VolumeSource{
   559  		Ephemeral: &v1.EphemeralVolumeSource{
   560  			VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{Spec: claim.Spec}},
   561  	}, node, ns)
   562  	framework.ExpectNoError(err, "Failed to create pod: %v", err)
   563  	return class, pod
   564  }
   566  func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
   567  	return startPausePodWithVolumeSource(cs,
   568  		v1.VolumeSource{
   569  			PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   570  				ClaimName: pvc.Name,
   571  				ReadOnly:  false,
   572  			},
   573  		},
   574  		node, ns)
   575  }
   577  func startBusyBoxPodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) {
   578  	return startBusyBoxPodWithVolumeSource(cs,
   579  		v1.VolumeSource{
   580  			PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   581  				ClaimName: pvc.Name,
   582  				ReadOnly:  false,
   583  			},
   584  		},
   585  		node, ns, fsGroup)
   586  }
   588  func startPausePodWithInlineVolume(cs clientset.Interface, inlineVolume *v1.CSIVolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
   589  	return startPausePodWithVolumeSource(cs,
   590  		v1.VolumeSource{
   591  			CSI: inlineVolume,
   592  		},
   593  		node, ns)
   594  }
   596  func startPausePodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
   597  	pod := &v1.Pod{
   598  		ObjectMeta: metav1.ObjectMeta{
   599  			GenerateName: "pvc-volume-tester-",
   600  		},
   601  		Spec: v1.PodSpec{
   602  			Containers: []v1.Container{
   603  				{
   604  					Name:  "volume-tester",
   605  					Image: imageutils.GetE2EImage(imageutils.Pause),
   606  					VolumeMounts: []v1.VolumeMount{
   607  						{
   608  							Name:      "my-volume",
   609  							MountPath: "/mnt/test",
   610  						},
   611  					},
   612  				},
   613  			},
   614  			RestartPolicy: v1.RestartPolicyNever,
   615  			Volumes: []v1.Volume{
   616  				{
   617  					Name:         "my-volume",
   618  					VolumeSource: volumeSource,
   619  				},
   620  			},
   621  		},
   622  	}
   623  	e2epod.SetNodeSelection(&pod.Spec, node)
   624  	return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
   625  }
   627  func startBusyBoxPodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) {
   628  	pod := &v1.Pod{
   629  		ObjectMeta: metav1.ObjectMeta{
   630  			GenerateName: "pvc-volume-tester-",
   631  		},
   632  		Spec: v1.PodSpec{
   633  			Containers: []v1.Container{
   634  				{
   635  					Name:  "volume-tester",
   636  					Image: framework.BusyBoxImage,
   637  					VolumeMounts: []v1.VolumeMount{
   638  						{
   639  							Name:      "my-volume",
   640  							MountPath: "/mnt/test",
   641  						},
   642  					},
   643  					Command: e2epod.GenerateScriptCmd("while true ; do sleep 2; done"),
   644  				},
   645  			},
   646  			SecurityContext: &v1.PodSecurityContext{
   647  				FSGroup: fsGroup,
   648  			},
   649  			RestartPolicy: v1.RestartPolicyNever,
   650  			Volumes: []v1.Volume{
   651  				{
   652  					Name:         "my-volume",
   653  					VolumeSource: volumeSource,
   654  				},
   655  			},
   656  		},
   657  	}
   658  	e2epod.SetNodeSelection(&pod.Spec, node)
   659  	return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
   660  }
   662  func startPausePodWithSELinuxOptions(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string, seLinuxOpts *v1.SELinuxOptions) (*v1.Pod, error) {
   663  	pod := &v1.Pod{
   664  		ObjectMeta: metav1.ObjectMeta{
   665  			GenerateName: "pvc-volume-tester-",
   666  		},
   667  		Spec: v1.PodSpec{
   668  			SecurityContext: &v1.PodSecurityContext{
   669  				SELinuxOptions: seLinuxOpts,
   670  			},
   671  			Containers: []v1.Container{
   672  				{
   673  					Name:  "volume-tester",
   674  					Image: imageutils.GetE2EImage(imageutils.Pause),
   675  					VolumeMounts: []v1.VolumeMount{
   676  						{
   677  							Name:      "my-volume",
   678  							MountPath: "/mnt/test",
   679  						},
   680  					},
   681  				},
   682  			},
   683  			RestartPolicy: v1.RestartPolicyNever,
   684  			Volumes: []v1.Volume{
   685  				{
   686  					Name: "my-volume",
   687  					VolumeSource: v1.VolumeSource{
   688  						PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   689  							ClaimName: pvc.Name,
   690  							ReadOnly:  false,
   691  						},
   692  					},
   693  				},
   694  			},
   695  		},
   696  	}
   697  	if node.Name != "" {
   698  		// Force schedule the pod to skip scheduler RWOP checks
   699  		framework.Logf("Forcing node name %s", node.Name)
   700  		pod.Spec.NodeName = node.Name
   701  	}
   702  	e2epod.SetNodeSelection(&pod.Spec, node)
   703  	return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
   704  }
   706  // checkNodePublishVolume goes through all calls to the mock driver and checks that at least one NodePublishVolume call had expected attributes.
   707  // If a matched call is found but it has unexpected attributes, checkNodePublishVolume skips it and continues searching.
   708  func checkNodePublishVolume(ctx context.Context, getCalls func(ctx context.Context) ([]drivers.MockCSICall, error), pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled, csiServiceAccountTokenEnabled bool) error {
   709  	expectedAttributes := map[string]string{}
   710  	unexpectedAttributeKeys := sets.New[string]()
   711  	if expectPodInfo {
   712  		expectedAttributes["csi.storage.k8s.io/pod.name"] = pod.Name
   713  		expectedAttributes["csi.storage.k8s.io/pod.namespace"] = pod.Namespace
   714  		expectedAttributes["csi.storage.k8s.io/pod.uid"] = string(pod.UID)
   715  		expectedAttributes["csi.storage.k8s.io/serviceAccount.name"] = "default"
   716  	} else {
   717  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/pod.name")
   718  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/pod.namespace")
   719  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/pod.uid")
   720  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/serviceAccount.name")
   721  	}
   722  	if csiInlineVolumesEnabled {
   723  		// This is only passed in 1.15 when the CSIInlineVolume feature gate is set.
   724  		expectedAttributes["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(ephemeralVolume)
   725  	} else {
   726  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/ephemeral")
   727  	}
   729  	if csiServiceAccountTokenEnabled {
   730  		expectedAttributes["csi.storage.k8s.io/serviceAccount.tokens"] = "<nonempty>"
   731  	} else {
   732  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/serviceAccount.tokens")
   733  	}
   735  	calls, err := getCalls(ctx)
   736  	if err != nil {
   737  		return err
   738  	}
   740  	var volumeContexts []map[string]string
   741  	for _, call := range calls {
   742  		if call.Method != "NodePublishVolume" {
   743  			continue
   744  		}
   746  		volumeCtx := call.Request.VolumeContext
   748  		// Check that NodePublish had expected attributes
   749  		foundAttributes := sets.NewString()
   750  		for k, v := range expectedAttributes {
   751  			vv, found := volumeCtx[k]
   752  			if found && (v == vv || (v == "<nonempty>" && len(vv) != 0)) {
   753  				foundAttributes.Insert(k)
   754  			}
   755  		}
   756  		if foundAttributes.Len() != len(expectedAttributes) {
   757  			framework.Logf("Skipping the NodePublishVolume call: expected attribute %+v, got %+v", format.Object(expectedAttributes, 1), format.Object(volumeCtx, 1))
   758  			continue
   759  		}
   761  		// Check that NodePublish had no unexpected attributes
   762  		unexpectedAttributes := make(map[string]string)
   763  		for k := range volumeCtx {
   764  			if unexpectedAttributeKeys.Has(k) {
   765  				unexpectedAttributes[k] = volumeCtx[k]
   766  			}
   767  		}
   768  		if len(unexpectedAttributes) != 0 {
   769  			framework.Logf("Skipping the NodePublishVolume call because it contains unexpected attributes %+v", format.Object(unexpectedAttributes, 1))
   770  			continue
   771  		}
   773  		return nil
   774  	}
   776  	if len(volumeContexts) == 0 {
   777  		return fmt.Errorf("NodePublishVolume was never called")
   778  	}
   780  	return fmt.Errorf("NodePublishVolume was called %d times, but no call had expected attributes %s or calls have unwanted attributes key %+v", len(volumeContexts), format.Object(expectedAttributes, 1), unexpectedAttributeKeys.UnsortedList())
   781  }
   783  // createFSGroupRequestPreHook creates a hook that records the fsGroup passed in
   784  // through NodeStageVolume and NodePublishVolume calls.
   785  func createFSGroupRequestPreHook(nodeStageFsGroup, nodePublishFsGroup *string) *drivers.Hooks {
   786  	return &drivers.Hooks{
   787  		Pre: func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
   788  			nodeStageRequest, ok := request.(*csipbv1.NodeStageVolumeRequest)
   789  			if ok {
   790  				mountVolume := nodeStageRequest.GetVolumeCapability().GetMount()
   791  				if mountVolume != nil {
   792  					*nodeStageFsGroup = mountVolume.VolumeMountGroup
   793  				}
   794  			}
   795  			nodePublishRequest, ok := request.(*csipbv1.NodePublishVolumeRequest)
   796  			if ok {
   797  				mountVolume := nodePublishRequest.GetVolumeCapability().GetMount()
   798  				if mountVolume != nil {
   799  					*nodePublishFsGroup = mountVolume.VolumeMountGroup
   800  				}
   801  			}
   802  			return nil, nil
   803  		},
   804  	}
   805  }
   807  // createPreHook counts invocations of a certain method (identified by a substring in the full gRPC method name).
   808  func createPreHook(method string, callback func(counter int64) error) *drivers.Hooks {
   809  	var counter int64
   811  	return &drivers.Hooks{
   812  		Pre: func() func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
   813  			return func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
   814  				if strings.Contains(fullMethod, method) {
   815  					counter := atomic.AddInt64(&counter, 1)
   816  					return nil, callback(counter)
   817  				}
   818  				return nil, nil
   819  			}
   820  		}(),
   821  	}
   822  }
   824  // compareCSICalls compares expectedCalls with logs of the mock driver.
   825  // It returns index of the first expectedCall that was *not* received
   826  // yet or error when calls do not match.
   827  // All repeated calls to the CSI mock driver (e.g. due to exponential backoff)
   828  // are squashed and checked against single expectedCallSequence item.
   829  //
   830  // Only permanent errors are returned. Other errors are logged and no
   831  // calls are returned. The caller is expected to retry.
   832  func compareCSICalls(ctx context.Context, trackedCalls []string, expectedCallSequence []csiCall, getCalls func(ctx context.Context) ([]drivers.MockCSICall, error)) ([]drivers.MockCSICall, int, error) {
   833  	allCalls, err := getCalls(ctx)
   834  	if err != nil {
   835  		framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err)
   836  		return nil, 0, nil
   837  	}
   839  	// Remove all repeated and ignored calls
   840  	tracked := sets.NewString(trackedCalls...)
   841  	var calls []drivers.MockCSICall
   842  	var last drivers.MockCSICall
   843  	for _, c := range allCalls {
   844  		if !tracked.Has(c.Method) {
   845  			continue
   846  		}
   847  		if c.Method != last.Method || c.FullError.Code != last.FullError.Code {
   848  			last = c
   849  			calls = append(calls, c)
   850  		}
   851  		// This call is the same as the last one, ignore it.
   852  	}
   854  	for i, c := range calls {
   855  		if i >= len(expectedCallSequence) {
   856  			// Log all unexpected calls first, return error below outside the loop.
   857  			framework.Logf("Unexpected CSI driver call: %s (%v)", c.Method, c.FullError)
   858  			continue
   859  		}
   861  		// Compare current call with expected call
   862  		expectedCall := expectedCallSequence[i]
   863  		if c.Method != expectedCall.expectedMethod || c.FullError.Code != expectedCall.expectedError {
   864  			return allCalls, i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code)
   865  		}
   867  		// if the secret is not nil, compare it
   868  		if expectedCall.expectedSecret != nil {
   869  			if !reflect.DeepEqual(expectedCall.expectedSecret, c.Request.Secrets) {
   870  				return allCalls, i, fmt.Errorf("Unexpected secret: expected %v, got %v", expectedCall.expectedSecret, c.Request.Secrets)
   871  			}
   872  		}
   874  	}
   875  	if len(calls) > len(expectedCallSequence) {
   876  		return allCalls, len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence))
   877  	}
   878  	// All calls were correct
   879  	return allCalls, len(calls), nil
   881  }
   883  // createSELinuxMountPreHook creates a hook that records the mountOptions passed in
   884  // through NodeStageVolume and NodePublishVolume calls.
   885  func createSELinuxMountPreHook(nodeStageMountOpts, nodePublishMountOpts *[]string, stageCalls, unstageCalls, publishCalls, unpublishCalls *atomic.Int32) *drivers.Hooks {
   886  	return &drivers.Hooks{
   887  		Pre: func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
   888  			switch req := request.(type) {
   889  			case *csipbv1.NodeStageVolumeRequest:
   890  				stageCalls.Add(1)
   891  				mountVolume := req.GetVolumeCapability().GetMount()
   892  				if mountVolume != nil {
   893  					*nodeStageMountOpts = mountVolume.MountFlags
   894  				}
   895  			case *csipbv1.NodePublishVolumeRequest:
   896  				publishCalls.Add(1)
   897  				mountVolume := req.GetVolumeCapability().GetMount()
   898  				if mountVolume != nil {
   899  					*nodePublishMountOpts = mountVolume.MountFlags
   900  				}
   901  			case *csipbv1.NodeUnstageVolumeRequest:
   902  				unstageCalls.Add(1)
   903  			case *csipbv1.NodeUnpublishVolumeRequest:
   904  				unpublishCalls.Add(1)
   905  			}
   906  			return nil, nil
   907  		},
   908  	}
   909  }
   911  // A lot of this code was copied from e2e/framework. It would be nicer
   912  // if it could be reused - see https://github.com/kubernetes/kubernetes/issues/92754
   913  func podRunning(ctx context.Context, c clientset.Interface, podName, namespace string) wait.ConditionFunc {
   914  	return func() (bool, error) {
   915  		pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
   916  		if err != nil {
   917  			return false, err
   918  		}
   919  		switch pod.Status.Phase {
   920  		case v1.PodRunning:
   921  			return true, nil
   922  		case v1.PodFailed, v1.PodSucceeded:
   923  			return false, errPodCompleted
   924  		}
   925  		return false, nil
   926  	}
   927  }
   929  func podHasStorage(ctx context.Context, c clientset.Interface, podName, namespace string, when time.Time) wait.ConditionFunc {
   930  	// Check for events of this pod. Copied from test/e2e/common/container_probe.go.
   931  	expectedEvent := fields.Set{
   932  		"involvedObject.kind":      "Pod",
   933  		"involvedObject.name":      podName,
   934  		"involvedObject.namespace": namespace,
   935  		"reason":                   "FailedScheduling",
   936  	}.AsSelector().String()
   937  	options := metav1.ListOptions{
   938  		FieldSelector: expectedEvent,
   939  	}
   940  	// copied from test/e2e/framework/events/events.go
   941  	return func() (bool, error) {
   942  		// We cannot be sure here whether it has enough storage, only when
   943  		// it hasn't. In that case we abort waiting with a special error.
   944  		events, err := c.CoreV1().Events(namespace).List(ctx, options)
   945  		if err != nil {
   946  			return false, fmt.Errorf("got error while getting events: %w", err)
   947  		}
   948  		for _, event := range events.Items {
   949  			if /* event.CreationTimestamp.After(when) &&
   950  			 */strings.Contains(event.Message, errReasonNotEnoughSpace) {
   951  				return false, errNotEnoughSpace
   952  			}
   953  		}
   954  		return false, nil
   955  	}
   956  }
   958  func anyOf(conditions ...wait.ConditionFunc) wait.ConditionFunc {
   959  	return func() (bool, error) {
   960  		for _, condition := range conditions {
   961  			done, err := condition()
   962  			if err != nil {
   963  				return false, err
   964  			}
   965  			if done {
   966  				return true, nil
   967  			}
   968  		}
   969  		return false, nil
   970  	}
   971  }
   973  func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
   974  	waitErr := wait.PollImmediate(10*time.Second, csiPodUnschedulableTimeout, func() (bool, error) {
   975  		pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
   976  		if err != nil {
   977  			return false, err
   978  		}
   979  		for _, c := range pod.Status.Conditions {
   980  			// Conformance tests cannot rely on specific output of optional fields (e.g., Reason
   981  			// and Message) because these fields are not suject to the deprecation policy.
   982  			if c.Type == v1.PodScheduled && c.Status == v1.ConditionFalse && c.Reason != "" && c.Message != "" {
   983  				return true, nil
   984  			}
   985  		}
   986  		return false, nil
   987  	})
   988  	if waitErr != nil {
   989  		return fmt.Errorf("error waiting for pod %s/%s to have max volume condition: %v", pod.Namespace, pod.Name, waitErr)
   990  	}
   991  	return nil
   992  }

