...

Source file src/k8s.io/kubernetes/test/e2e/storage/csi_mock/base.go

Documentation: k8s.io/kubernetes/test/e2e/storage/csi_mock

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package csi_mock
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"reflect"
    24  	"strconv"
    25  	"strings"
    26  	"sync/atomic"
    27  	"time"
    28  
    29  	csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
    30  	"github.com/onsi/ginkgo/v2"
    31  	"google.golang.org/grpc/codes"
    32  	v1 "k8s.io/api/core/v1"
    33  	storagev1 "k8s.io/api/storage/v1"
    34  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    35  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    36  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    37  	"k8s.io/apimachinery/pkg/fields"
    38  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    39  	"k8s.io/apimachinery/pkg/util/sets"
    40  	"k8s.io/apimachinery/pkg/util/wait"
    41  	clientset "k8s.io/client-go/kubernetes"
    42  	"k8s.io/kubernetes/test/e2e/framework"
    43  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    44  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    45  	"k8s.io/kubernetes/test/e2e/storage/drivers"
    46  	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
    47  	"k8s.io/kubernetes/test/e2e/storage/testsuites"
    48  	"k8s.io/kubernetes/test/e2e/storage/utils"
    49  	"k8s.io/kubernetes/test/utils/format"
    50  	imageutils "k8s.io/kubernetes/test/utils/image"
    51  )
    52  
    53  const (
    54  	csiNodeLimitUpdateTimeout  = 5 * time.Minute
    55  	csiPodUnschedulableTimeout = 5 * time.Minute
    56  	csiResizeWaitPeriod        = 5 * time.Minute
    57  	csiVolumeAttachmentTimeout = 7 * time.Minute
    58  	// how long to wait for Resizing Condition on PVC to appear
    59  	csiResizingConditionWait = 2 * time.Minute
    60  
    61  	// Time for starting a pod with a volume.
    62  	csiPodRunningTimeout = 5 * time.Minute
    63  
    64  	// How log to wait for kubelet to unstage a volume after a pod is deleted
    65  	csiUnstageWaitTimeout = 1 * time.Minute
    66  )
    67  
    68  // csiCall represents an expected call from Kubernetes to CSI mock driver and
    69  // expected return value.
    70  // When matching expected csiCall with a real CSI mock driver output, one csiCall
    71  // matches *one or more* calls with the same method and error code.
    72  // This is due to exponential backoff in Kubernetes, where the test cannot expect
    73  // exact number of call repetitions.
    74  type csiCall struct {
    75  	expectedMethod string
    76  	expectedError  codes.Code
    77  	expectedSecret map[string]string
    78  	// This is a mark for the test itself to delete the tested pod *after*
    79  	// this csiCall is received.
    80  	deletePod bool
    81  }
    82  
    83  type testParameters struct {
    84  	disableAttach       bool
    85  	attachLimit         int
    86  	registerDriver      bool
    87  	lateBinding         bool
    88  	enableTopology      bool
    89  	podInfo             *bool
    90  	storageCapacity     *bool
    91  	scName              string // pre-selected storage class name; must be unique in the cluster
    92  	enableResizing      bool   // enable resizing for both CSI mock driver and storageClass.
    93  	enableNodeExpansion bool   // enable node expansion for CSI mock driver
    94  	// just disable resizing on driver it overrides enableResizing flag for CSI mock driver
    95  	disableResizingOnDriver       bool
    96  	enableSnapshot                bool
    97  	enableVolumeMountGroup        bool // enable the VOLUME_MOUNT_GROUP node capability in the CSI mock driver.
    98  	hooks                         *drivers.Hooks
    99  	tokenRequests                 []storagev1.TokenRequest
   100  	requiresRepublish             *bool
   101  	fsGroupPolicy                 *storagev1.FSGroupPolicy
   102  	enableSELinuxMount            *bool
   103  	enableRecoverExpansionFailure bool
   104  	enableCSINodeExpandSecret     bool
   105  }
   106  
   107  type mockDriverSetup struct {
   108  	cs          clientset.Interface
   109  	config      *storageframework.PerTestConfig
   110  	pods        []*v1.Pod
   111  	pvcs        []*v1.PersistentVolumeClaim
   112  	sc          map[string]*storagev1.StorageClass
   113  	vsc         map[string]*unstructured.Unstructured
   114  	driver      drivers.MockCSITestDriver
   115  	provisioner string
   116  	tp          testParameters
   117  	f           *framework.Framework
   118  }
   119  
   120  type volumeType string
   121  
   122  var (
   123  	csiEphemeral     = volumeType("CSI")
   124  	genericEphemeral = volumeType("Ephemeral")
   125  	pvcReference     = volumeType("PVC")
   126  )
   127  
   128  const (
   129  	poll                           = 2 * time.Second
   130  	pvcAsSourceProtectionFinalizer = "snapshot.storage.kubernetes.io/pvc-as-source-protection"
   131  	volumeSnapshotContentFinalizer = "snapshot.storage.kubernetes.io/volumesnapshotcontent-bound-protection"
   132  	volumeSnapshotBoundFinalizer   = "snapshot.storage.kubernetes.io/volumesnapshot-bound-protection"
   133  	errReasonNotEnoughSpace        = "node(s) did not have enough free storage"
   134  
   135  	csiNodeExpandSecretKey          = "csi.storage.k8s.io/node-expand-secret-name"
   136  	csiNodeExpandSecretNamespaceKey = "csi.storage.k8s.io/node-expand-secret-namespace"
   137  )
   138  
   139  var (
   140  	errPodCompleted   = fmt.Errorf("pod ran to completion")
   141  	errNotEnoughSpace = errors.New(errReasonNotEnoughSpace)
   142  )
   143  
   144  func newMockDriverSetup(f *framework.Framework) *mockDriverSetup {
   145  	return &mockDriverSetup{
   146  		cs:  f.ClientSet,
   147  		sc:  make(map[string]*storagev1.StorageClass),
   148  		vsc: make(map[string]*unstructured.Unstructured),
   149  		f:   f,
   150  	}
   151  }
   152  
   153  func (m *mockDriverSetup) init(ctx context.Context, tp testParameters) {
   154  	m.cs = m.f.ClientSet
   155  	m.tp = tp
   156  
   157  	var err error
   158  	driverOpts := drivers.CSIMockDriverOpts{
   159  		RegisterDriver:                tp.registerDriver,
   160  		PodInfo:                       tp.podInfo,
   161  		StorageCapacity:               tp.storageCapacity,
   162  		EnableTopology:                tp.enableTopology,
   163  		AttachLimit:                   tp.attachLimit,
   164  		DisableAttach:                 tp.disableAttach,
   165  		EnableResizing:                tp.enableResizing,
   166  		EnableNodeExpansion:           tp.enableNodeExpansion,
   167  		EnableSnapshot:                tp.enableSnapshot,
   168  		EnableVolumeMountGroup:        tp.enableVolumeMountGroup,
   169  		TokenRequests:                 tp.tokenRequests,
   170  		RequiresRepublish:             tp.requiresRepublish,
   171  		FSGroupPolicy:                 tp.fsGroupPolicy,
   172  		EnableSELinuxMount:            tp.enableSELinuxMount,
   173  		EnableRecoverExpansionFailure: tp.enableRecoverExpansionFailure,
   174  	}
   175  
   176  	// At the moment, only tests which need hooks are
   177  	// using the embedded CSI mock driver. The rest run
   178  	// the driver inside the cluster although they could
   179  	// changed to use embedding merely by setting
   180  	// driverOpts.embedded to true.
   181  	//
   182  	// Not enabling it for all tests minimizes
   183  	// the risk that the introduction of embedded breaks
   184  	// some existings tests and avoids a dependency
   185  	// on port forwarding, which is important if some of
   186  	// these tests are supposed to become part of
   187  	// conformance testing (port forwarding isn't
   188  	// currently required).
   189  	if tp.hooks != nil {
   190  		driverOpts.Embedded = true
   191  		driverOpts.Hooks = *tp.hooks
   192  	}
   193  
   194  	// this just disable resizing on driver, keeping resizing on SC enabled.
   195  	if tp.disableResizingOnDriver {
   196  		driverOpts.EnableResizing = false
   197  	}
   198  
   199  	m.driver = drivers.InitMockCSIDriver(driverOpts)
   200  	config := m.driver.PrepareTest(ctx, m.f)
   201  	m.config = config
   202  	m.provisioner = config.GetUniqueDriverName()
   203  
   204  	if tp.registerDriver {
   205  		err = waitForCSIDriver(m.cs, m.config.GetUniqueDriverName())
   206  		framework.ExpectNoError(err, "Failed to get CSIDriver %v", m.config.GetUniqueDriverName())
   207  		ginkgo.DeferCleanup(destroyCSIDriver, m.cs, m.config.GetUniqueDriverName())
   208  	}
   209  
   210  	// Wait for the CSIDriver actually get deployed and CSINode object to be generated.
   211  	// This indicates the mock CSI driver pod is up and running healthy.
   212  	err = drivers.WaitForCSIDriverRegistrationOnNode(ctx, m.config.ClientNodeSelection.Name, m.config.GetUniqueDriverName(), m.cs)
   213  	framework.ExpectNoError(err, "Failed to register CSIDriver %v", m.config.GetUniqueDriverName())
   214  }
   215  
   216  func (m *mockDriverSetup) cleanup(ctx context.Context) {
   217  	cs := m.f.ClientSet
   218  	var errs []error
   219  
   220  	for _, pod := range m.pods {
   221  		ginkgo.By(fmt.Sprintf("Deleting pod %s", pod.Name))
   222  		errs = append(errs, e2epod.DeletePodWithWait(ctx, cs, pod))
   223  	}
   224  
   225  	for _, claim := range m.pvcs {
   226  		ginkgo.By(fmt.Sprintf("Deleting claim %s", claim.Name))
   227  		claim, err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Get(context.TODO(), claim.Name, metav1.GetOptions{})
   228  		if err == nil {
   229  			if err := cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(context.TODO(), claim.Name, metav1.DeleteOptions{}); err != nil {
   230  				errs = append(errs, err)
   231  			}
   232  			if claim.Spec.VolumeName != "" {
   233  				errs = append(errs, e2epv.WaitForPersistentVolumeDeleted(ctx, cs, claim.Spec.VolumeName, framework.Poll, 2*time.Minute))
   234  			}
   235  		}
   236  	}
   237  
   238  	for _, sc := range m.sc {
   239  		ginkgo.By(fmt.Sprintf("Deleting storageclass %s", sc.Name))
   240  		cs.StorageV1().StorageClasses().Delete(context.TODO(), sc.Name, metav1.DeleteOptions{})
   241  	}
   242  
   243  	for _, vsc := range m.vsc {
   244  		ginkgo.By(fmt.Sprintf("Deleting volumesnapshotclass %s", vsc.GetName()))
   245  		m.config.Framework.DynamicClient.Resource(utils.SnapshotClassGVR).Delete(context.TODO(), vsc.GetName(), metav1.DeleteOptions{})
   246  	}
   247  
   248  	err := utilerrors.NewAggregate(errs)
   249  	framework.ExpectNoError(err, "while cleaning up after test")
   250  }
   251  
   252  func (m *mockDriverSetup) update(o utils.PatchCSIOptions) {
   253  	item, err := m.cs.StorageV1().CSIDrivers().Get(context.TODO(), m.config.GetUniqueDriverName(), metav1.GetOptions{})
   254  	framework.ExpectNoError(err, "Failed to get CSIDriver %v", m.config.GetUniqueDriverName())
   255  
   256  	err = utils.PatchCSIDeployment(nil, o, item)
   257  	framework.ExpectNoError(err, "Failed to apply %v to CSIDriver object %v", o, m.config.GetUniqueDriverName())
   258  
   259  	_, err = m.cs.StorageV1().CSIDrivers().Update(context.TODO(), item, metav1.UpdateOptions{})
   260  	framework.ExpectNoError(err, "Failed to update CSIDriver %v", m.config.GetUniqueDriverName())
   261  }
   262  
   263  func (m *mockDriverSetup) createPod(ctx context.Context, withVolume volumeType) (class *storagev1.StorageClass, claim *v1.PersistentVolumeClaim, pod *v1.Pod) {
   264  	ginkgo.By("Creating pod")
   265  	f := m.f
   266  
   267  	sc := m.driver.GetDynamicProvisionStorageClass(ctx, m.config, "")
   268  	if m.tp.enableCSINodeExpandSecret {
   269  		if sc.Parameters == nil {
   270  			parameters := map[string]string{
   271  				csiNodeExpandSecretKey:          "test-secret",
   272  				csiNodeExpandSecretNamespaceKey: f.Namespace.Name,
   273  			}
   274  			sc.Parameters = parameters
   275  		} else {
   276  			sc.Parameters[csiNodeExpandSecretKey] = "test-secret"
   277  			sc.Parameters[csiNodeExpandSecretNamespaceKey] = f.Namespace.Name
   278  		}
   279  	}
   280  	scTest := testsuites.StorageClassTest{
   281  		Name:                 m.driver.GetDriverInfo().Name,
   282  		Timeouts:             f.Timeouts,
   283  		Provisioner:          sc.Provisioner,
   284  		Parameters:           sc.Parameters,
   285  		ClaimSize:            "1Gi",
   286  		ExpectedSize:         "1Gi",
   287  		DelayBinding:         m.tp.lateBinding,
   288  		AllowVolumeExpansion: m.tp.enableResizing,
   289  	}
   290  
   291  	// The mock driver only works when everything runs on a single node.
   292  	nodeSelection := m.config.ClientNodeSelection
   293  	switch withVolume {
   294  	case csiEphemeral:
   295  		pod = startPausePodInline(f.ClientSet, scTest, nodeSelection, f.Namespace.Name)
   296  	case genericEphemeral:
   297  		class, pod = startPausePodGenericEphemeral(f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
   298  		if class != nil {
   299  			m.sc[class.Name] = class
   300  		}
   301  		claim = &v1.PersistentVolumeClaim{
   302  			ObjectMeta: metav1.ObjectMeta{
   303  				Name:      pod.Name + "-" + pod.Spec.Volumes[0].Name,
   304  				Namespace: f.Namespace.Name,
   305  			},
   306  		}
   307  	case pvcReference:
   308  		class, claim, pod = startPausePod(ctx, f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name)
   309  		if class != nil {
   310  			m.sc[class.Name] = class
   311  		}
   312  		if claim != nil {
   313  			m.pvcs = append(m.pvcs, claim)
   314  		}
   315  	}
   316  	if pod != nil {
   317  		m.pods = append(m.pods, pod)
   318  	}
   319  	return // result variables set above
   320  }
   321  
   322  func (m *mockDriverSetup) createPodWithPVC(pvc *v1.PersistentVolumeClaim) (*v1.Pod, error) {
   323  	f := m.f
   324  
   325  	nodeSelection := m.config.ClientNodeSelection
   326  	pod, err := startPausePodWithClaim(m.cs, pvc, nodeSelection, f.Namespace.Name)
   327  	if pod != nil {
   328  		m.pods = append(m.pods, pod)
   329  	}
   330  	return pod, err
   331  }
   332  
   333  func (m *mockDriverSetup) createPodWithFSGroup(ctx context.Context, fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
   334  	f := m.f
   335  
   336  	ginkgo.By("Creating pod with fsGroup")
   337  	nodeSelection := m.config.ClientNodeSelection
   338  	sc := m.driver.GetDynamicProvisionStorageClass(ctx, m.config, "")
   339  	scTest := testsuites.StorageClassTest{
   340  		Name:                 m.driver.GetDriverInfo().Name,
   341  		Provisioner:          sc.Provisioner,
   342  		Parameters:           sc.Parameters,
   343  		ClaimSize:            "1Gi",
   344  		ExpectedSize:         "1Gi",
   345  		DelayBinding:         m.tp.lateBinding,
   346  		AllowVolumeExpansion: m.tp.enableResizing,
   347  	}
   348  	class, claim, pod := startBusyBoxPod(ctx, f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name, fsGroup)
   349  
   350  	if class != nil {
   351  		m.sc[class.Name] = class
   352  	}
   353  	if claim != nil {
   354  		m.pvcs = append(m.pvcs, claim)
   355  	}
   356  
   357  	if pod != nil {
   358  		m.pods = append(m.pods, pod)
   359  	}
   360  
   361  	return class, claim, pod
   362  }
   363  
   364  func (m *mockDriverSetup) createPodWithSELinux(ctx context.Context, accessModes []v1.PersistentVolumeAccessMode, mountOptions []string, seLinuxOpts *v1.SELinuxOptions) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
   365  	ginkgo.By("Creating pod with SELinux context")
   366  	f := m.f
   367  	nodeSelection := m.config.ClientNodeSelection
   368  	sc := m.driver.GetDynamicProvisionStorageClass(ctx, m.config, "")
   369  	scTest := testsuites.StorageClassTest{
   370  		Name:                 m.driver.GetDriverInfo().Name,
   371  		Provisioner:          sc.Provisioner,
   372  		Parameters:           sc.Parameters,
   373  		ClaimSize:            "1Gi",
   374  		ExpectedSize:         "1Gi",
   375  		DelayBinding:         m.tp.lateBinding,
   376  		AllowVolumeExpansion: m.tp.enableResizing,
   377  		MountOptions:         mountOptions,
   378  	}
   379  	class, claim := createClaim(ctx, f.ClientSet, scTest, nodeSelection, m.tp.scName, f.Namespace.Name, accessModes)
   380  	pod, err := startPausePodWithSELinuxOptions(f.ClientSet, claim, nodeSelection, f.Namespace.Name, seLinuxOpts)
   381  	framework.ExpectNoError(err, "Failed to create pause pod with SELinux context %s: %v", seLinuxOpts, err)
   382  
   383  	if class != nil {
   384  		m.sc[class.Name] = class
   385  	}
   386  	if claim != nil {
   387  		m.pvcs = append(m.pvcs, claim)
   388  	}
   389  
   390  	if pod != nil {
   391  		m.pods = append(m.pods, pod)
   392  	}
   393  
   394  	return class, claim, pod
   395  }
   396  
   397  func waitForCSIDriver(cs clientset.Interface, driverName string) error {
   398  	timeout := 4 * time.Minute
   399  
   400  	framework.Logf("waiting up to %v for CSIDriver %q", timeout, driverName)
   401  	for start := time.Now(); time.Since(start) < timeout; time.Sleep(framework.Poll) {
   402  		_, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{})
   403  		if !apierrors.IsNotFound(err) {
   404  			return err
   405  		}
   406  	}
   407  	return fmt.Errorf("gave up after waiting %v for CSIDriver %q", timeout, driverName)
   408  }
   409  
   410  func destroyCSIDriver(cs clientset.Interface, driverName string) {
   411  	driverGet, err := cs.StorageV1().CSIDrivers().Get(context.TODO(), driverName, metav1.GetOptions{})
   412  	if err == nil {
   413  		framework.Logf("deleting %s.%s: %s", driverGet.TypeMeta.APIVersion, driverGet.TypeMeta.Kind, driverGet.ObjectMeta.Name)
   414  		// Uncomment the following line to get full dump of CSIDriver object
   415  		// framework.Logf("%s", framework.PrettyPrint(driverGet))
   416  		cs.StorageV1().CSIDrivers().Delete(context.TODO(), driverName, metav1.DeleteOptions{})
   417  	}
   418  }
   419  
   420  func newStorageClass(t testsuites.StorageClassTest, ns string, prefix string) *storagev1.StorageClass {
   421  	pluginName := t.Provisioner
   422  	if pluginName == "" {
   423  		pluginName = getDefaultPluginName()
   424  	}
   425  	if prefix == "" {
   426  		prefix = "sc"
   427  	}
   428  	bindingMode := storagev1.VolumeBindingImmediate
   429  	if t.DelayBinding {
   430  		bindingMode = storagev1.VolumeBindingWaitForFirstConsumer
   431  	}
   432  	if t.Parameters == nil {
   433  		t.Parameters = make(map[string]string)
   434  	}
   435  
   436  	if framework.NodeOSDistroIs("windows") {
   437  		// fstype might be forced from outside, in that case skip setting a default
   438  		if _, exists := t.Parameters["fstype"]; !exists {
   439  			t.Parameters["fstype"] = e2epv.GetDefaultFSType()
   440  			framework.Logf("settings a default fsType=%s in the storage class", t.Parameters["fstype"])
   441  		}
   442  	}
   443  
   444  	sc := getStorageClass(pluginName, t.Parameters, &bindingMode, t.MountOptions, ns, prefix)
   445  	if t.AllowVolumeExpansion {
   446  		sc.AllowVolumeExpansion = &t.AllowVolumeExpansion
   447  	}
   448  	return sc
   449  }
   450  
   451  func getStorageClass(
   452  	provisioner string,
   453  	parameters map[string]string,
   454  	bindingMode *storagev1.VolumeBindingMode,
   455  	mountOptions []string,
   456  	ns string,
   457  	prefix string,
   458  ) *storagev1.StorageClass {
   459  	if bindingMode == nil {
   460  		defaultBindingMode := storagev1.VolumeBindingImmediate
   461  		bindingMode = &defaultBindingMode
   462  	}
   463  	return &storagev1.StorageClass{
   464  		TypeMeta: metav1.TypeMeta{
   465  			Kind: "StorageClass",
   466  		},
   467  		ObjectMeta: metav1.ObjectMeta{
   468  			// Name must be unique, so let's base it on namespace name and the prefix (the prefix is test specific)
   469  			GenerateName: ns + "-" + prefix,
   470  		},
   471  		Provisioner:       provisioner,
   472  		Parameters:        parameters,
   473  		VolumeBindingMode: bindingMode,
   474  		MountOptions:      mountOptions,
   475  	}
   476  }
   477  
   478  func getDefaultPluginName() string {
   479  	switch {
   480  	case framework.ProviderIs("gke"), framework.ProviderIs("gce"):
   481  		return "kubernetes.io/gce-pd"
   482  	case framework.ProviderIs("aws"):
   483  		return "kubernetes.io/aws-ebs"
   484  	case framework.ProviderIs("vsphere"):
   485  		return "kubernetes.io/vsphere-volume"
   486  	case framework.ProviderIs("azure"):
   487  		return "kubernetes.io/azure-disk"
   488  	}
   489  	return ""
   490  }
   491  
   492  func createSC(cs clientset.Interface, t testsuites.StorageClassTest, scName, ns string) *storagev1.StorageClass {
   493  	class := newStorageClass(t, ns, "")
   494  	if scName != "" {
   495  		class.Name = scName
   496  	}
   497  	var err error
   498  	_, err = cs.StorageV1().StorageClasses().Get(context.TODO(), class.Name, metav1.GetOptions{})
   499  	if err != nil {
   500  		class, err = cs.StorageV1().StorageClasses().Create(context.TODO(), class, metav1.CreateOptions{})
   501  		framework.ExpectNoError(err, "Failed to create class: %v", err)
   502  	}
   503  
   504  	return class
   505  }
   506  
   507  func createClaim(ctx context.Context, cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string, accessModes []v1.PersistentVolumeAccessMode) (*storagev1.StorageClass, *v1.PersistentVolumeClaim) {
   508  	class := createSC(cs, t, scName, ns)
   509  	claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   510  		ClaimSize:        t.ClaimSize,
   511  		StorageClassName: &(class.Name),
   512  		VolumeMode:       &t.VolumeMode,
   513  		AccessModes:      accessModes,
   514  	}, ns)
   515  	claim, err := cs.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), claim, metav1.CreateOptions{})
   516  	framework.ExpectNoError(err, "Failed to create claim: %v", err)
   517  
   518  	if !t.DelayBinding {
   519  		pvcClaims := []*v1.PersistentVolumeClaim{claim}
   520  		_, err = e2epv.WaitForPVClaimBoundPhase(ctx, cs, pvcClaims, framework.ClaimProvisionTimeout)
   521  		framework.ExpectNoError(err, "Failed waiting for PVC to be bound: %v", err)
   522  	}
   523  	return class, claim
   524  }
   525  
   526  func startPausePod(ctx context.Context, cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
   527  	class, claim := createClaim(ctx, cs, t, node, scName, ns, nil)
   528  
   529  	pod, err := startPausePodWithClaim(cs, claim, node, ns)
   530  	framework.ExpectNoError(err, "Failed to create pause pod: %v", err)
   531  	return class, claim, pod
   532  }
   533  
   534  func startBusyBoxPod(ctx context.Context, cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string, fsGroup *int64) (*storagev1.StorageClass, *v1.PersistentVolumeClaim, *v1.Pod) {
   535  	class, claim := createClaim(ctx, cs, t, node, scName, ns, nil)
   536  	pod, err := startBusyBoxPodWithClaim(cs, claim, node, ns, fsGroup)
   537  	framework.ExpectNoError(err, "Failed to create busybox pod: %v", err)
   538  	return class, claim, pod
   539  }
   540  
   541  func startPausePodInline(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, ns string) *v1.Pod {
   542  	pod, err := startPausePodWithInlineVolume(cs,
   543  		&v1.CSIVolumeSource{
   544  			Driver: t.Provisioner,
   545  		},
   546  		node, ns)
   547  	framework.ExpectNoError(err, "Failed to create pod: %v", err)
   548  	return pod
   549  }
   550  
   551  func startPausePodGenericEphemeral(cs clientset.Interface, t testsuites.StorageClassTest, node e2epod.NodeSelection, scName, ns string) (*storagev1.StorageClass, *v1.Pod) {
   552  	class := createSC(cs, t, scName, ns)
   553  	claim := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   554  		ClaimSize:        t.ClaimSize,
   555  		StorageClassName: &(class.Name),
   556  		VolumeMode:       &t.VolumeMode,
   557  	}, ns)
   558  	pod, err := startPausePodWithVolumeSource(cs, v1.VolumeSource{
   559  		Ephemeral: &v1.EphemeralVolumeSource{
   560  			VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{Spec: claim.Spec}},
   561  	}, node, ns)
   562  	framework.ExpectNoError(err, "Failed to create pod: %v", err)
   563  	return class, pod
   564  }
   565  
   566  func startPausePodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
   567  	return startPausePodWithVolumeSource(cs,
   568  		v1.VolumeSource{
   569  			PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   570  				ClaimName: pvc.Name,
   571  				ReadOnly:  false,
   572  			},
   573  		},
   574  		node, ns)
   575  }
   576  
   577  func startBusyBoxPodWithClaim(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) {
   578  	return startBusyBoxPodWithVolumeSource(cs,
   579  		v1.VolumeSource{
   580  			PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   581  				ClaimName: pvc.Name,
   582  				ReadOnly:  false,
   583  			},
   584  		},
   585  		node, ns, fsGroup)
   586  }
   587  
   588  func startPausePodWithInlineVolume(cs clientset.Interface, inlineVolume *v1.CSIVolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
   589  	return startPausePodWithVolumeSource(cs,
   590  		v1.VolumeSource{
   591  			CSI: inlineVolume,
   592  		},
   593  		node, ns)
   594  }
   595  
   596  func startPausePodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string) (*v1.Pod, error) {
   597  	pod := &v1.Pod{
   598  		ObjectMeta: metav1.ObjectMeta{
   599  			GenerateName: "pvc-volume-tester-",
   600  		},
   601  		Spec: v1.PodSpec{
   602  			Containers: []v1.Container{
   603  				{
   604  					Name:  "volume-tester",
   605  					Image: imageutils.GetE2EImage(imageutils.Pause),
   606  					VolumeMounts: []v1.VolumeMount{
   607  						{
   608  							Name:      "my-volume",
   609  							MountPath: "/mnt/test",
   610  						},
   611  					},
   612  				},
   613  			},
   614  			RestartPolicy: v1.RestartPolicyNever,
   615  			Volumes: []v1.Volume{
   616  				{
   617  					Name:         "my-volume",
   618  					VolumeSource: volumeSource,
   619  				},
   620  			},
   621  		},
   622  	}
   623  	e2epod.SetNodeSelection(&pod.Spec, node)
   624  	return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
   625  }
   626  
   627  func startBusyBoxPodWithVolumeSource(cs clientset.Interface, volumeSource v1.VolumeSource, node e2epod.NodeSelection, ns string, fsGroup *int64) (*v1.Pod, error) {
   628  	pod := &v1.Pod{
   629  		ObjectMeta: metav1.ObjectMeta{
   630  			GenerateName: "pvc-volume-tester-",
   631  		},
   632  		Spec: v1.PodSpec{
   633  			Containers: []v1.Container{
   634  				{
   635  					Name:  "volume-tester",
   636  					Image: framework.BusyBoxImage,
   637  					VolumeMounts: []v1.VolumeMount{
   638  						{
   639  							Name:      "my-volume",
   640  							MountPath: "/mnt/test",
   641  						},
   642  					},
   643  					Command: e2epod.GenerateScriptCmd("while true ; do sleep 2; done"),
   644  				},
   645  			},
   646  			SecurityContext: &v1.PodSecurityContext{
   647  				FSGroup: fsGroup,
   648  			},
   649  			RestartPolicy: v1.RestartPolicyNever,
   650  			Volumes: []v1.Volume{
   651  				{
   652  					Name:         "my-volume",
   653  					VolumeSource: volumeSource,
   654  				},
   655  			},
   656  		},
   657  	}
   658  	e2epod.SetNodeSelection(&pod.Spec, node)
   659  	return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
   660  }
   661  
   662  func startPausePodWithSELinuxOptions(cs clientset.Interface, pvc *v1.PersistentVolumeClaim, node e2epod.NodeSelection, ns string, seLinuxOpts *v1.SELinuxOptions) (*v1.Pod, error) {
   663  	pod := &v1.Pod{
   664  		ObjectMeta: metav1.ObjectMeta{
   665  			GenerateName: "pvc-volume-tester-",
   666  		},
   667  		Spec: v1.PodSpec{
   668  			SecurityContext: &v1.PodSecurityContext{
   669  				SELinuxOptions: seLinuxOpts,
   670  			},
   671  			Containers: []v1.Container{
   672  				{
   673  					Name:  "volume-tester",
   674  					Image: imageutils.GetE2EImage(imageutils.Pause),
   675  					VolumeMounts: []v1.VolumeMount{
   676  						{
   677  							Name:      "my-volume",
   678  							MountPath: "/mnt/test",
   679  						},
   680  					},
   681  				},
   682  			},
   683  			RestartPolicy: v1.RestartPolicyNever,
   684  			Volumes: []v1.Volume{
   685  				{
   686  					Name: "my-volume",
   687  					VolumeSource: v1.VolumeSource{
   688  						PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   689  							ClaimName: pvc.Name,
   690  							ReadOnly:  false,
   691  						},
   692  					},
   693  				},
   694  			},
   695  		},
   696  	}
   697  	if node.Name != "" {
   698  		// Force schedule the pod to skip scheduler RWOP checks
   699  		framework.Logf("Forcing node name %s", node.Name)
   700  		pod.Spec.NodeName = node.Name
   701  	}
   702  	e2epod.SetNodeSelection(&pod.Spec, node)
   703  	return cs.CoreV1().Pods(ns).Create(context.TODO(), pod, metav1.CreateOptions{})
   704  }
   705  
   706  // checkNodePublishVolume goes through all calls to the mock driver and checks that at least one NodePublishVolume call had expected attributes.
   707  // If a matched call is found but it has unexpected attributes, checkNodePublishVolume skips it and continues searching.
   708  func checkNodePublishVolume(ctx context.Context, getCalls func(ctx context.Context) ([]drivers.MockCSICall, error), pod *v1.Pod, expectPodInfo, ephemeralVolume, csiInlineVolumesEnabled, csiServiceAccountTokenEnabled bool) error {
   709  	expectedAttributes := map[string]string{}
   710  	unexpectedAttributeKeys := sets.New[string]()
   711  	if expectPodInfo {
   712  		expectedAttributes["csi.storage.k8s.io/pod.name"] = pod.Name
   713  		expectedAttributes["csi.storage.k8s.io/pod.namespace"] = pod.Namespace
   714  		expectedAttributes["csi.storage.k8s.io/pod.uid"] = string(pod.UID)
   715  		expectedAttributes["csi.storage.k8s.io/serviceAccount.name"] = "default"
   716  	} else {
   717  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/pod.name")
   718  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/pod.namespace")
   719  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/pod.uid")
   720  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/serviceAccount.name")
   721  	}
   722  	if csiInlineVolumesEnabled {
   723  		// This is only passed in 1.15 when the CSIInlineVolume feature gate is set.
   724  		expectedAttributes["csi.storage.k8s.io/ephemeral"] = strconv.FormatBool(ephemeralVolume)
   725  	} else {
   726  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/ephemeral")
   727  	}
   728  
   729  	if csiServiceAccountTokenEnabled {
   730  		expectedAttributes["csi.storage.k8s.io/serviceAccount.tokens"] = "<nonempty>"
   731  	} else {
   732  		unexpectedAttributeKeys.Insert("csi.storage.k8s.io/serviceAccount.tokens")
   733  	}
   734  
   735  	calls, err := getCalls(ctx)
   736  	if err != nil {
   737  		return err
   738  	}
   739  
   740  	var volumeContexts []map[string]string
   741  	for _, call := range calls {
   742  		if call.Method != "NodePublishVolume" {
   743  			continue
   744  		}
   745  
   746  		volumeCtx := call.Request.VolumeContext
   747  
   748  		// Check that NodePublish had expected attributes
   749  		foundAttributes := sets.NewString()
   750  		for k, v := range expectedAttributes {
   751  			vv, found := volumeCtx[k]
   752  			if found && (v == vv || (v == "<nonempty>" && len(vv) != 0)) {
   753  				foundAttributes.Insert(k)
   754  			}
   755  		}
   756  		if foundAttributes.Len() != len(expectedAttributes) {
   757  			framework.Logf("Skipping the NodePublishVolume call: expected attribute %+v, got %+v", format.Object(expectedAttributes, 1), format.Object(volumeCtx, 1))
   758  			continue
   759  		}
   760  
   761  		// Check that NodePublish had no unexpected attributes
   762  		unexpectedAttributes := make(map[string]string)
   763  		for k := range volumeCtx {
   764  			if unexpectedAttributeKeys.Has(k) {
   765  				unexpectedAttributes[k] = volumeCtx[k]
   766  			}
   767  		}
   768  		if len(unexpectedAttributes) != 0 {
   769  			framework.Logf("Skipping the NodePublishVolume call because it contains unexpected attributes %+v", format.Object(unexpectedAttributes, 1))
   770  			continue
   771  		}
   772  
   773  		return nil
   774  	}
   775  
   776  	if len(volumeContexts) == 0 {
   777  		return fmt.Errorf("NodePublishVolume was never called")
   778  	}
   779  
   780  	return fmt.Errorf("NodePublishVolume was called %d times, but no call had expected attributes %s or calls have unwanted attributes key %+v", len(volumeContexts), format.Object(expectedAttributes, 1), unexpectedAttributeKeys.UnsortedList())
   781  }
   782  
   783  // createFSGroupRequestPreHook creates a hook that records the fsGroup passed in
   784  // through NodeStageVolume and NodePublishVolume calls.
   785  func createFSGroupRequestPreHook(nodeStageFsGroup, nodePublishFsGroup *string) *drivers.Hooks {
   786  	return &drivers.Hooks{
   787  		Pre: func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
   788  			nodeStageRequest, ok := request.(*csipbv1.NodeStageVolumeRequest)
   789  			if ok {
   790  				mountVolume := nodeStageRequest.GetVolumeCapability().GetMount()
   791  				if mountVolume != nil {
   792  					*nodeStageFsGroup = mountVolume.VolumeMountGroup
   793  				}
   794  			}
   795  			nodePublishRequest, ok := request.(*csipbv1.NodePublishVolumeRequest)
   796  			if ok {
   797  				mountVolume := nodePublishRequest.GetVolumeCapability().GetMount()
   798  				if mountVolume != nil {
   799  					*nodePublishFsGroup = mountVolume.VolumeMountGroup
   800  				}
   801  			}
   802  			return nil, nil
   803  		},
   804  	}
   805  }
   806  
   807  // createPreHook counts invocations of a certain method (identified by a substring in the full gRPC method name).
   808  func createPreHook(method string, callback func(counter int64) error) *drivers.Hooks {
   809  	var counter int64
   810  
   811  	return &drivers.Hooks{
   812  		Pre: func() func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
   813  			return func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
   814  				if strings.Contains(fullMethod, method) {
   815  					counter := atomic.AddInt64(&counter, 1)
   816  					return nil, callback(counter)
   817  				}
   818  				return nil, nil
   819  			}
   820  		}(),
   821  	}
   822  }
   823  
   824  // compareCSICalls compares expectedCalls with logs of the mock driver.
   825  // It returns index of the first expectedCall that was *not* received
   826  // yet or error when calls do not match.
   827  // All repeated calls to the CSI mock driver (e.g. due to exponential backoff)
   828  // are squashed and checked against single expectedCallSequence item.
   829  //
   830  // Only permanent errors are returned. Other errors are logged and no
   831  // calls are returned. The caller is expected to retry.
   832  func compareCSICalls(ctx context.Context, trackedCalls []string, expectedCallSequence []csiCall, getCalls func(ctx context.Context) ([]drivers.MockCSICall, error)) ([]drivers.MockCSICall, int, error) {
   833  	allCalls, err := getCalls(ctx)
   834  	if err != nil {
   835  		framework.Logf("intermittent (?) log retrieval error, proceeding without output: %v", err)
   836  		return nil, 0, nil
   837  	}
   838  
   839  	// Remove all repeated and ignored calls
   840  	tracked := sets.NewString(trackedCalls...)
   841  	var calls []drivers.MockCSICall
   842  	var last drivers.MockCSICall
   843  	for _, c := range allCalls {
   844  		if !tracked.Has(c.Method) {
   845  			continue
   846  		}
   847  		if c.Method != last.Method || c.FullError.Code != last.FullError.Code {
   848  			last = c
   849  			calls = append(calls, c)
   850  		}
   851  		// This call is the same as the last one, ignore it.
   852  	}
   853  
   854  	for i, c := range calls {
   855  		if i >= len(expectedCallSequence) {
   856  			// Log all unexpected calls first, return error below outside the loop.
   857  			framework.Logf("Unexpected CSI driver call: %s (%v)", c.Method, c.FullError)
   858  			continue
   859  		}
   860  
   861  		// Compare current call with expected call
   862  		expectedCall := expectedCallSequence[i]
   863  		if c.Method != expectedCall.expectedMethod || c.FullError.Code != expectedCall.expectedError {
   864  			return allCalls, i, fmt.Errorf("Unexpected CSI call %d: expected %s (%d), got %s (%d)", i, expectedCall.expectedMethod, expectedCall.expectedError, c.Method, c.FullError.Code)
   865  		}
   866  
   867  		// if the secret is not nil, compare it
   868  		if expectedCall.expectedSecret != nil {
   869  			if !reflect.DeepEqual(expectedCall.expectedSecret, c.Request.Secrets) {
   870  				return allCalls, i, fmt.Errorf("Unexpected secret: expected %v, got %v", expectedCall.expectedSecret, c.Request.Secrets)
   871  			}
   872  		}
   873  
   874  	}
   875  	if len(calls) > len(expectedCallSequence) {
   876  		return allCalls, len(expectedCallSequence), fmt.Errorf("Received %d unexpected CSI driver calls", len(calls)-len(expectedCallSequence))
   877  	}
   878  	// All calls were correct
   879  	return allCalls, len(calls), nil
   880  
   881  }
   882  
   883  // createSELinuxMountPreHook creates a hook that records the mountOptions passed in
   884  // through NodeStageVolume and NodePublishVolume calls.
   885  func createSELinuxMountPreHook(nodeStageMountOpts, nodePublishMountOpts *[]string, stageCalls, unstageCalls, publishCalls, unpublishCalls *atomic.Int32) *drivers.Hooks {
   886  	return &drivers.Hooks{
   887  		Pre: func(ctx context.Context, fullMethod string, request interface{}) (reply interface{}, err error) {
   888  			switch req := request.(type) {
   889  			case *csipbv1.NodeStageVolumeRequest:
   890  				stageCalls.Add(1)
   891  				mountVolume := req.GetVolumeCapability().GetMount()
   892  				if mountVolume != nil {
   893  					*nodeStageMountOpts = mountVolume.MountFlags
   894  				}
   895  			case *csipbv1.NodePublishVolumeRequest:
   896  				publishCalls.Add(1)
   897  				mountVolume := req.GetVolumeCapability().GetMount()
   898  				if mountVolume != nil {
   899  					*nodePublishMountOpts = mountVolume.MountFlags
   900  				}
   901  			case *csipbv1.NodeUnstageVolumeRequest:
   902  				unstageCalls.Add(1)
   903  			case *csipbv1.NodeUnpublishVolumeRequest:
   904  				unpublishCalls.Add(1)
   905  			}
   906  			return nil, nil
   907  		},
   908  	}
   909  }
   910  
   911  // A lot of this code was copied from e2e/framework. It would be nicer
   912  // if it could be reused - see https://github.com/kubernetes/kubernetes/issues/92754
   913  func podRunning(ctx context.Context, c clientset.Interface, podName, namespace string) wait.ConditionFunc {
   914  	return func() (bool, error) {
   915  		pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
   916  		if err != nil {
   917  			return false, err
   918  		}
   919  		switch pod.Status.Phase {
   920  		case v1.PodRunning:
   921  			return true, nil
   922  		case v1.PodFailed, v1.PodSucceeded:
   923  			return false, errPodCompleted
   924  		}
   925  		return false, nil
   926  	}
   927  }
   928  
   929  func podHasStorage(ctx context.Context, c clientset.Interface, podName, namespace string, when time.Time) wait.ConditionFunc {
   930  	// Check for events of this pod. Copied from test/e2e/common/container_probe.go.
   931  	expectedEvent := fields.Set{
   932  		"involvedObject.kind":      "Pod",
   933  		"involvedObject.name":      podName,
   934  		"involvedObject.namespace": namespace,
   935  		"reason":                   "FailedScheduling",
   936  	}.AsSelector().String()
   937  	options := metav1.ListOptions{
   938  		FieldSelector: expectedEvent,
   939  	}
   940  	// copied from test/e2e/framework/events/events.go
   941  	return func() (bool, error) {
   942  		// We cannot be sure here whether it has enough storage, only when
   943  		// it hasn't. In that case we abort waiting with a special error.
   944  		events, err := c.CoreV1().Events(namespace).List(ctx, options)
   945  		if err != nil {
   946  			return false, fmt.Errorf("got error while getting events: %w", err)
   947  		}
   948  		for _, event := range events.Items {
   949  			if /* event.CreationTimestamp.After(when) &&
   950  			 */strings.Contains(event.Message, errReasonNotEnoughSpace) {
   951  				return false, errNotEnoughSpace
   952  			}
   953  		}
   954  		return false, nil
   955  	}
   956  }
   957  
   958  func anyOf(conditions ...wait.ConditionFunc) wait.ConditionFunc {
   959  	return func() (bool, error) {
   960  		for _, condition := range conditions {
   961  			done, err := condition()
   962  			if err != nil {
   963  				return false, err
   964  			}
   965  			if done {
   966  				return true, nil
   967  			}
   968  		}
   969  		return false, nil
   970  	}
   971  }
   972  
   973  func waitForMaxVolumeCondition(pod *v1.Pod, cs clientset.Interface) error {
   974  	waitErr := wait.PollImmediate(10*time.Second, csiPodUnschedulableTimeout, func() (bool, error) {
   975  		pod, err := cs.CoreV1().Pods(pod.Namespace).Get(context.TODO(), pod.Name, metav1.GetOptions{})
   976  		if err != nil {
   977  			return false, err
   978  		}
   979  		for _, c := range pod.Status.Conditions {
   980  			// Conformance tests cannot rely on specific output of optional fields (e.g., Reason
   981  			// and Message) because these fields are not suject to the deprecation policy.
   982  			if c.Type == v1.PodScheduled && c.Status == v1.ConditionFalse && c.Reason != "" && c.Message != "" {
   983  				return true, nil
   984  			}
   985  		}
   986  		return false, nil
   987  	})
   988  	if waitErr != nil {
   989  		return fmt.Errorf("error waiting for pod %s/%s to have max volume condition: %v", pod.Namespace, pod.Name, waitErr)
   990  	}
   991  	return nil
   992  }
   993  

View as plain text