
Source file src/k8s.io/kubernetes/test/e2e/storage/drivers/csi.go

Documentation: k8s.io/kubernetes/test/e2e/storage/drivers

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  /*
    18   * This file defines various csi volume test drivers for TestSuites.
    19   *
    20   * There are two ways, how to prepare test drivers:
    21   * 1) With containerized server (NFS, Ceph, iSCSI, ...)
    22   * It creates a server pod which defines one volume for the tests.
    23   * These tests work only when privileged containers are allowed, exporting
    24   * various filesystems (ex: NFS) usually needs some mounting or
    25   * other privileged magic in the server pod.
    26   *
    27   * Note that the server containers are for testing purposes only and should not
    28   * be used in production.
    29   *
    30   * 2) With server or cloud provider outside of Kubernetes (Cinder, GCE, AWS, Azure, ...)
    31   * Appropriate server or cloud provider must exist somewhere outside
    32   * the tested Kubernetes cluster. CreateVolume will create a new volume to be
    33   * used in the TestSuites for inlineVolume or DynamicPV tests.
    34   */
    36  package drivers
    38  import (
    39  	"context"
    40  	"encoding/json"
    41  	"errors"
    42  	"fmt"
    43  	"strconv"
    44  	"strings"
    45  	"sync"
    46  	"time"
    48  	"github.com/onsi/ginkgo/v2"
    49  	spb "google.golang.org/genproto/googleapis/rpc/status"
    50  	"google.golang.org/grpc/codes"
    51  	grpcstatus "google.golang.org/grpc/status"
    53  	appsv1 "k8s.io/api/apps/v1"
    54  	v1 "k8s.io/api/core/v1"
    55  	rbacv1 "k8s.io/api/rbac/v1"
    56  	storagev1 "k8s.io/api/storage/v1"
    57  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    58  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    59  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    60  	"k8s.io/apimachinery/pkg/util/sets"
    61  	"k8s.io/apimachinery/pkg/util/wait"
    62  	clientset "k8s.io/client-go/kubernetes"
    63  	"k8s.io/klog/v2"
    64  	"k8s.io/kubernetes/test/e2e/feature"
    65  	"k8s.io/kubernetes/test/e2e/framework"
    66  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    67  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    68  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    69  	e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
    70  	mockdriver "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/driver"
    71  	mockservice "k8s.io/kubernetes/test/e2e/storage/drivers/csi-test/mock/service"
    72  	"k8s.io/kubernetes/test/e2e/storage/drivers/proxy"
    73  	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
    74  	"k8s.io/kubernetes/test/e2e/storage/utils"
    76  	"google.golang.org/grpc"
    77  )
    79  const (
    80  	// GCEPDCSIDriverName is the name of GCE Persistent Disk CSI driver
    81  	GCEPDCSIDriverName = "pd.csi.storage.gke.io"
    82  	// GCEPDCSIZoneTopologyKey is the key of GCE Persistent Disk CSI zone topology
    83  	GCEPDCSIZoneTopologyKey = "topology.gke.io/zone"
    85  	// Prefix of the mock driver grpc log
    86  	grpcCallPrefix = "gRPCCall:"
    87  )
    89  // hostpathCSI
    90  type hostpathCSIDriver struct {
    91  	driverInfo       storageframework.DriverInfo
    92  	manifests        []string
    93  	volumeAttributes []map[string]string
    94  }
    96  func initHostPathCSIDriver(name string, capabilities map[storageframework.Capability]bool, volumeAttributes []map[string]string, manifests ...string) storageframework.TestDriver {
    97  	return &hostpathCSIDriver{
    98  		driverInfo: storageframework.DriverInfo{
    99  			Name:        name,
   100  			MaxFileSize: storageframework.FileSizeMedium,
   101  			SupportedFsType: sets.NewString(
   102  				"", // Default fsType
   103  			),
   104  			SupportedSizeRange: e2evolume.SizeRange{
   105  				Min: "1Mi",
   106  			},
   107  			Capabilities: capabilities,
   108  			StressTestOptions: &storageframework.StressTestOptions{
   109  				NumPods:     10,
   110  				NumRestarts: 10,
   111  			},
   112  			VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{
   113  				NumPods:      10,
   114  				NumSnapshots: 10,
   115  			},
   116  			PerformanceTestOptions: &storageframework.PerformanceTestOptions{
   117  				ProvisioningOptions: &storageframework.PerformanceTestProvisioningOptions{
   118  					VolumeSize: "1Mi",
   119  					Count:      300,
   120  					// Volume provisioning metrics are compared to a high baseline.
   121  					// Failure to pass would suggest a performance regression.
   122  					ExpectedMetrics: &storageframework.Metrics{
   123  						AvgLatency: 2 * time.Minute,
   124  						Throughput: 0.5,
   125  					},
   126  				},
   127  			},
   128  		},
   129  		manifests:        manifests,
   130  		volumeAttributes: volumeAttributes,
   131  	}
   132  }
   134  var _ storageframework.TestDriver = &hostpathCSIDriver{}
   135  var _ storageframework.DynamicPVTestDriver = &hostpathCSIDriver{}
   136  var _ storageframework.SnapshottableTestDriver = &hostpathCSIDriver{}
   137  var _ storageframework.EphemeralTestDriver = &hostpathCSIDriver{}
   139  // InitHostPathCSIDriver returns hostpathCSIDriver that implements TestDriver interface
   140  func InitHostPathCSIDriver() storageframework.TestDriver {
   141  	capabilities := map[storageframework.Capability]bool{
   142  		storageframework.CapPersistence:                    true,
   143  		storageframework.CapSnapshotDataSource:             true,
   144  		storageframework.CapMultiPODs:                      true,
   145  		storageframework.CapBlock:                          true,
   146  		storageframework.CapPVCDataSource:                  true,
   147  		storageframework.CapControllerExpansion:            true,
   148  		storageframework.CapOfflineExpansion:               true,
   149  		storageframework.CapOnlineExpansion:                true,
   150  		storageframework.CapSingleNodeVolume:               true,
   151  		storageframework.CapReadWriteOncePod:               true,
   152  		storageframework.CapMultiplePVsSameID:              true,
   153  		storageframework.CapFSResizeFromSourceNotSupported: true,
   155  		// This is needed for the
   156  		// testsuites/volumelimits.go `should support volume limits`
   157  		// test. --maxvolumespernode=10 gets
   158  		// added when patching the deployment.
   159  		storageframework.CapVolumeLimits: true,
   160  	}
   161  	return initHostPathCSIDriver("csi-hostpath",
   162  		capabilities,
   163  		// Volume attributes don't matter, but we have to provide at least one map.
   164  		[]map[string]string{
   165  			{"foo": "bar"},
   166  		},
   167  		"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
   168  		"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
   169  		"test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml",
   170  		"test/e2e/testing-manifests/storage-csi/external-health-monitor/external-health-monitor-controller/rbac.yaml",
   171  		"test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml",
   172  		"test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-driverinfo.yaml",
   173  		"test/e2e/testing-manifests/storage-csi/hostpath/hostpath/csi-hostpath-plugin.yaml",
   174  		"test/e2e/testing-manifests/storage-csi/hostpath/hostpath/e2e-test-rbac.yaml",
   175  	)
   176  }
   178  func (h *hostpathCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
   179  	return &h.driverInfo
   180  }
   182  func (h *hostpathCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
   183  	if pattern.VolType == storageframework.CSIInlineVolume && len(h.volumeAttributes) == 0 {
   184  		e2eskipper.Skipf("%s has no volume attributes defined, doesn't support ephemeral inline volumes", h.driverInfo.Name)
   185  	}
   186  }
   188  func (h *hostpathCSIDriver) GetDynamicProvisionStorageClass(ctx context.Context, config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
   189  	provisioner := config.GetUniqueDriverName()
   190  	parameters := map[string]string{}
   191  	ns := config.Framework.Namespace.Name
   193  	return storageframework.GetStorageClass(provisioner, parameters, nil, ns)
   194  }
   196  func (h *hostpathCSIDriver) GetVolume(config *storageframework.PerTestConfig, volumeNumber int) (map[string]string, bool, bool) {
   197  	return h.volumeAttributes[volumeNumber%len(h.volumeAttributes)], false /* not shared */, false /* read-write */
   198  }
   200  func (h *hostpathCSIDriver) GetCSIDriverName(config *storageframework.PerTestConfig) string {
   201  	return config.GetUniqueDriverName()
   202  }
   204  func (h *hostpathCSIDriver) GetSnapshotClass(ctx context.Context, config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
   205  	snapshotter := config.GetUniqueDriverName()
   206  	ns := config.Framework.Namespace.Name
   208  	return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
   209  }
   211  func (h *hostpathCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework) *storageframework.PerTestConfig {
   212  	// Create secondary namespace which will be used for creating driver
   213  	driverNamespace := utils.CreateDriverNamespace(ctx, f)
   214  	driverns := driverNamespace.Name
   215  	testns := f.Namespace.Name
   217  	ginkgo.By(fmt.Sprintf("deploying %s driver", h.driverInfo.Name))
   218  	cancelLogging := utils.StartPodLogs(ctx, f, driverNamespace)
   219  	cs := f.ClientSet
   221  	// The hostpath CSI driver only works when everything runs on the same node.
   222  	node, err := e2enode.GetRandomReadySchedulableNode(ctx, cs)
   223  	framework.ExpectNoError(err)
   224  	config := &storageframework.PerTestConfig{
   225  		Driver:              h,
   226  		Prefix:              "hostpath",
   227  		Framework:           f,
   228  		ClientNodeSelection: e2epod.NodeSelection{Name: node.Name},
   229  		DriverNamespace:     driverNamespace,
   230  	}
   232  	o := utils.PatchCSIOptions{
   233  		OldDriverName:       h.driverInfo.Name,
   234  		NewDriverName:       config.GetUniqueDriverName(),
   235  		DriverContainerName: "hostpath",
   236  		DriverContainerArguments: []string{"--drivername=" + config.GetUniqueDriverName(),
   237  			// This is needed for the
   238  			// testsuites/volumelimits.go `should support volume limits`
   239  			// test.
   240  			"--maxvolumespernode=10",
   241  			// Enable volume lifecycle checks, to report failure if
   242  			// the volume is not unpublished / unstaged correctly.
   243  			"--check-volume-lifecycle=true",
   244  		},
   245  		ProvisionerContainerName: "csi-provisioner",
   246  		SnapshotterContainerName: "csi-snapshotter",
   247  		NodeName:                 node.Name,
   248  	}
   250  	err = utils.CreateFromManifests(ctx, config.Framework, driverNamespace, func(item interface{}) error {
   251  		if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil {
   252  			return err
   253  		}
   255  		// Remove csi-external-health-monitor-agent and
   256  		// csi-external-health-monitor-controller
   257  		// containers. The agent is obsolete.
   258  		// The controller is not needed for any of the
   259  		// tests and is causing too much overhead when
   260  		// running in a large cluster (see
   261  		// https://github.com/kubernetes/kubernetes/issues/102452#issuecomment-856991009).
   262  		switch item := item.(type) {
   263  		case *appsv1.StatefulSet:
   264  			var containers []v1.Container
   265  			for _, container := range item.Spec.Template.Spec.Containers {
   266  				switch container.Name {
   267  				case "csi-external-health-monitor-agent", "csi-external-health-monitor-controller":
   268  					// Remove these containers.
   269  				default:
   270  					// Keep the others.
   271  					containers = append(containers, container)
   272  				}
   273  			}
   274  			item.Spec.Template.Spec.Containers = containers
   275  		}
   276  		return nil
   277  	}, h.manifests...)
   279  	if err != nil {
   280  		framework.Failf("deploying %s driver: %v", h.driverInfo.Name, err)
   281  	}
   283  	cleanupFunc := generateDriverCleanupFunc(
   284  		f,
   285  		h.driverInfo.Name,
   286  		testns,
   287  		driverns,
   288  		cancelLogging)
   289  	ginkgo.DeferCleanup(cleanupFunc)
   291  	return config
   292  }
   294  // mockCSI
   295  type mockCSIDriver struct {
   296  	driverInfo                    storageframework.DriverInfo
   297  	manifests                     []string
   298  	podInfo                       *bool
   299  	storageCapacity               *bool
   300  	attachable                    bool
   301  	attachLimit                   int
   302  	enableTopology                bool
   303  	enableNodeExpansion           bool
   304  	hooks                         Hooks
   305  	tokenRequests                 []storagev1.TokenRequest
   306  	requiresRepublish             *bool
   307  	fsGroupPolicy                 *storagev1.FSGroupPolicy
   308  	enableVolumeMountGroup        bool
   309  	embedded                      bool
   310  	calls                         MockCSICalls
   311  	embeddedCSIDriver             *mockdriver.CSIDriver
   312  	enableSELinuxMount            *bool
   313  	enableRecoverExpansionFailure bool
   315  	// Additional values set during PrepareTest
   316  	clientSet       clientset.Interface
   317  	driverNamespace *v1.Namespace
   318  }
   320  // Hooks to be run to execute while handling gRPC calls.
   321  //
   322  // At the moment, only generic pre- and post-function call
   323  // hooks are implemented. Those hooks can cast the request and
   324  // response values if needed. More hooks inside specific
   325  // functions could be added if needed.
   326  type Hooks struct {
   327  	// Pre is called before invoking the mock driver's implementation of a method.
   328  	// If either a non-nil reply or error are returned, then those are returned to the caller.
   329  	Pre func(ctx context.Context, method string, request interface{}) (reply interface{}, err error)
   331  	// Post is called after invoking the mock driver's implementation of a method.
   332  	// What it returns is used as actual result.
   333  	Post func(ctx context.Context, method string, request, reply interface{}, err error) (finalReply interface{}, finalErr error)
   334  }
   336  // MockCSITestDriver provides additional functions specific to the CSI mock driver.
   337  type MockCSITestDriver interface {
   338  	storageframework.DynamicPVTestDriver
   340  	// GetCalls returns all currently observed gRPC calls. Only valid
   341  	// after PrepareTest.
   342  	GetCalls(ctx context.Context) ([]MockCSICall, error)
   343  }
   345  // CSIMockDriverOpts defines options used for csi driver
   346  type CSIMockDriverOpts struct {
   347  	RegisterDriver                bool
   348  	DisableAttach                 bool
   349  	PodInfo                       *bool
   350  	StorageCapacity               *bool
   351  	AttachLimit                   int
   352  	EnableTopology                bool
   353  	EnableResizing                bool
   354  	EnableNodeExpansion           bool
   355  	EnableSnapshot                bool
   356  	EnableVolumeMountGroup        bool
   357  	TokenRequests                 []storagev1.TokenRequest
   358  	RequiresRepublish             *bool
   359  	FSGroupPolicy                 *storagev1.FSGroupPolicy
   360  	EnableSELinuxMount            *bool
   361  	EnableRecoverExpansionFailure bool
   363  	// Embedded defines whether the CSI mock driver runs
   364  	// inside the cluster (false, the default) or just a proxy
   365  	// runs inside the cluster and all gRPC calls are handled
   366  	// inside the e2e.test binary.
   367  	Embedded bool
   369  	// Hooks that will be called if (and only if!) the embedded
   370  	// mock driver is used. Beware that hooks are invoked
   371  	// asynchronously in different goroutines.
   372  	Hooks Hooks
   373  }
   375  // Dummy structure that parses just volume_attributes and error code out of logged CSI call
   376  type MockCSICall struct {
   377  	json string // full log entry
   379  	Method  string
   380  	Request struct {
   381  		VolumeContext map[string]string `json:"volume_context"`
   382  		Secrets       map[string]string `json:"secrets"`
   383  	}
   384  	FullError struct {
   385  		Code    codes.Code `json:"code"`
   386  		Message string     `json:"message"`
   387  	}
   388  	Error string
   389  }
   391  // MockCSICalls is a Thread-safe storage for MockCSICall instances.
   392  type MockCSICalls struct {
   393  	calls []MockCSICall
   394  	mutex sync.Mutex
   395  }
   397  // Get returns all currently recorded calls.
   398  func (c *MockCSICalls) Get() []MockCSICall {
   399  	c.mutex.Lock()
   400  	defer c.mutex.Unlock()
   402  	return c.calls[:]
   403  }
   405  // Add appends one new call at the end.
   406  func (c *MockCSICalls) Add(call MockCSICall) {
   407  	c.mutex.Lock()
   408  	defer c.mutex.Unlock()
   410  	c.calls = append(c.calls, call)
   411  }
   413  // LogGRPC takes individual parameters from the mock CSI driver and adds them.
   414  func (c *MockCSICalls) LogGRPC(method string, request, reply interface{}, err error) {
   415  	// Encoding to JSON and decoding mirrors the traditional way of capturing calls.
   416  	// Probably could be simplified now...
   417  	logMessage := struct {
   418  		Method   string
   419  		Request  interface{}
   420  		Response interface{}
   421  		// Error as string, for backward compatibility.
   422  		// "" on no error.
   423  		Error string
   424  		// Full error dump, to be able to parse out full gRPC error code and message separately in a test.
   425  		FullError *spb.Status
   426  	}{
   427  		Method:   method,
   428  		Request:  request,
   429  		Response: reply,
   430  	}
   432  	if err != nil {
   433  		logMessage.Error = err.Error()
   434  		logMessage.FullError = grpcstatus.Convert(err).Proto()
   435  	}
   437  	msg, _ := json.Marshal(logMessage)
   438  	call := MockCSICall{
   439  		json: string(msg),
   440  	}
   441  	json.Unmarshal(msg, &call)
   443  	klog.Infof("%s %s", grpcCallPrefix, string(msg))
   445  	// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
   446  	methodParts := strings.Split(call.Method, "/")
   447  	call.Method = methodParts[len(methodParts)-1]
   449  	c.Add(call)
   450  }
   452  var _ storageframework.TestDriver = &mockCSIDriver{}
   453  var _ storageframework.DynamicPVTestDriver = &mockCSIDriver{}
   454  var _ storageframework.SnapshottableTestDriver = &mockCSIDriver{}
   456  // InitMockCSIDriver returns a mockCSIDriver that implements TestDriver interface
   457  func InitMockCSIDriver(driverOpts CSIMockDriverOpts) MockCSITestDriver {
   458  	driverManifests := []string{
   459  		"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
   460  		"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
   461  		"test/e2e/testing-manifests/storage-csi/external-resizer/rbac.yaml",
   462  		"test/e2e/testing-manifests/storage-csi/external-snapshotter/csi-snapshotter/rbac-csi-snapshotter.yaml",
   463  		"test/e2e/testing-manifests/storage-csi/mock/csi-mock-rbac.yaml",
   464  		"test/e2e/testing-manifests/storage-csi/mock/csi-storageclass.yaml",
   465  	}
   466  	if driverOpts.Embedded {
   467  		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-proxy.yaml")
   468  	} else {
   469  		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver.yaml")
   470  	}
   472  	if driverOpts.RegisterDriver {
   473  		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driverinfo.yaml")
   474  	}
   476  	if !driverOpts.DisableAttach {
   477  		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-attacher.yaml")
   478  	}
   480  	if driverOpts.EnableResizing {
   481  		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-resizer.yaml")
   482  	}
   484  	if driverOpts.EnableSnapshot {
   485  		driverManifests = append(driverManifests, "test/e2e/testing-manifests/storage-csi/mock/csi-mock-driver-snapshotter.yaml")
   486  	}
   488  	return &mockCSIDriver{
   489  		driverInfo: storageframework.DriverInfo{
   490  			Name:        "csi-mock",
   491  			MaxFileSize: storageframework.FileSizeMedium,
   492  			SupportedFsType: sets.NewString(
   493  				"", // Default fsType
   494  			),
   495  			Capabilities: map[storageframework.Capability]bool{
   496  				storageframework.CapPersistence:       false,
   497  				storageframework.CapFsGroup:           false,
   498  				storageframework.CapExec:              false,
   499  				storageframework.CapVolumeLimits:      true,
   500  				storageframework.CapMultiplePVsSameID: true,
   501  			},
   502  		},
   503  		manifests:                     driverManifests,
   504  		podInfo:                       driverOpts.PodInfo,
   505  		storageCapacity:               driverOpts.StorageCapacity,
   506  		enableTopology:                driverOpts.EnableTopology,
   507  		attachable:                    !driverOpts.DisableAttach,
   508  		attachLimit:                   driverOpts.AttachLimit,
   509  		enableNodeExpansion:           driverOpts.EnableNodeExpansion,
   510  		tokenRequests:                 driverOpts.TokenRequests,
   511  		requiresRepublish:             driverOpts.RequiresRepublish,
   512  		fsGroupPolicy:                 driverOpts.FSGroupPolicy,
   513  		enableVolumeMountGroup:        driverOpts.EnableVolumeMountGroup,
   514  		enableSELinuxMount:            driverOpts.EnableSELinuxMount,
   515  		enableRecoverExpansionFailure: driverOpts.EnableRecoverExpansionFailure,
   516  		embedded:                      driverOpts.Embedded,
   517  		hooks:                         driverOpts.Hooks,
   518  	}
   519  }
   521  func (m *mockCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
   522  	return &m.driverInfo
   523  }
   525  func (m *mockCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
   526  }
   528  func (m *mockCSIDriver) GetDynamicProvisionStorageClass(ctx context.Context, config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
   529  	provisioner := config.GetUniqueDriverName()
   530  	parameters := map[string]string{}
   531  	ns := config.Framework.Namespace.Name
   533  	return storageframework.GetStorageClass(provisioner, parameters, nil, ns)
   534  }
   536  func (m *mockCSIDriver) GetSnapshotClass(ctx context.Context, config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
   537  	snapshotter := m.driverInfo.Name + "-" + config.Framework.UniqueName
   538  	ns := config.Framework.Namespace.Name
   540  	return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
   541  }
   543  func (m *mockCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework) *storageframework.PerTestConfig {
   544  	m.clientSet = f.ClientSet
   546  	// Create secondary namespace which will be used for creating driver
   547  	m.driverNamespace = utils.CreateDriverNamespace(ctx, f)
   548  	driverns := m.driverNamespace.Name
   549  	testns := f.Namespace.Name
   551  	if m.embedded {
   552  		ginkgo.By("deploying csi mock proxy")
   553  	} else {
   554  		ginkgo.By("deploying csi mock driver")
   555  	}
   556  	cancelLogging := utils.StartPodLogs(ctx, f, m.driverNamespace)
   557  	cs := f.ClientSet
   559  	// pods should be scheduled on the node
   560  	node, err := e2enode.GetRandomReadySchedulableNode(ctx, cs)
   561  	framework.ExpectNoError(err)
   563  	embeddedCleanup := func() {}
   564  	containerArgs := []string{}
   565  	if m.embedded {
   566  		// Run embedded CSI driver.
   567  		//
   568  		// For now we start exactly one instance which implements controller,
   569  		// node and identity services. It matches with the one pod that we run
   570  		// inside the cluster. The name and namespace of that one is deterministic,
   571  		// so we know what to connect to.
   572  		//
   573  		// Long-term we could also deploy one central controller and multiple
   574  		// node instances, with knowledge about provisioned volumes shared in
   575  		// this process.
   576  		podname := "csi-mockplugin-0"
   577  		containername := "mock"
   579  		// Must keep running even after the test context is cancelled
   580  		// for cleanup callbacks.
   581  		ctx, cancel := context.WithCancel(context.Background())
   582  		serviceConfig := mockservice.Config{
   583  			DisableAttach:            !m.attachable,
   584  			DriverName:               "csi-mock-" + f.UniqueName,
   585  			AttachLimit:              int64(m.attachLimit),
   586  			NodeExpansionRequired:    m.enableNodeExpansion,
   587  			VolumeMountGroupRequired: m.enableVolumeMountGroup,
   588  			EnableTopology:           m.enableTopology,
   589  			IO: proxy.PodDirIO{
   590  				F:             f,
   591  				Namespace:     m.driverNamespace.Name,
   592  				PodName:       podname,
   593  				ContainerName: "busybox",
   594  			},
   595  		}
   596  		s := mockservice.New(serviceConfig)
   597  		servers := &mockdriver.CSIDriverServers{
   598  			Controller: s,
   599  			Identity:   s,
   600  			Node:       s,
   601  		}
   602  		m.embeddedCSIDriver = mockdriver.NewCSIDriver(servers)
   603  		l, err := proxy.Listen(ctx, f.ClientSet, f.ClientConfig(),
   604  			proxy.Addr{
   605  				Namespace:     m.driverNamespace.Name,
   606  				PodName:       podname,
   607  				ContainerName: containername,
   608  				Port:          9000,
   609  			},
   610  		)
   612  		framework.ExpectNoError(err, "start connecting to proxy pod")
   613  		err = m.embeddedCSIDriver.Start(l, m.interceptGRPC)
   614  		framework.ExpectNoError(err, "start mock driver")
   616  		embeddedCleanup = func() {
   617  			// Kill all goroutines and delete resources of the mock driver.
   618  			m.embeddedCSIDriver.Stop()
   619  			l.Close()
   620  			cancel()
   621  		}
   622  	} else {
   623  		// When using the mock driver inside the cluster it has to be reconfigured
   624  		// via command line parameters.
   625  		containerArgs = append(containerArgs, "--drivername=csi-mock-"+f.UniqueName)
   627  		if m.attachable {
   628  			containerArgs = append(containerArgs, "--enable-attach")
   629  		}
   631  		if m.enableTopology {
   632  			containerArgs = append(containerArgs, "--enable-topology")
   633  		}
   635  		if m.attachLimit > 0 {
   636  			containerArgs = append(containerArgs, "--attach-limit", strconv.Itoa(m.attachLimit))
   637  		}
   639  		if m.enableNodeExpansion {
   640  			containerArgs = append(containerArgs, "--node-expand-required=true")
   641  		}
   642  	}
   644  	config := &storageframework.PerTestConfig{
   645  		Driver:              m,
   646  		Prefix:              "mock",
   647  		Framework:           f,
   648  		ClientNodeSelection: e2epod.NodeSelection{Name: node.Name},
   649  		DriverNamespace:     m.driverNamespace,
   650  	}
   652  	o := utils.PatchCSIOptions{
   653  		OldDriverName:            "csi-mock",
   654  		NewDriverName:            "csi-mock-" + f.UniqueName,
   655  		DriverContainerName:      "mock",
   656  		DriverContainerArguments: containerArgs,
   657  		ProvisionerContainerName: "csi-provisioner",
   658  		NodeName:                 node.Name,
   659  		PodInfo:                  m.podInfo,
   660  		StorageCapacity:          m.storageCapacity,
   661  		CanAttach:                &m.attachable,
   662  		VolumeLifecycleModes: &[]storagev1.VolumeLifecycleMode{
   663  			storagev1.VolumeLifecyclePersistent,
   664  			storagev1.VolumeLifecycleEphemeral,
   665  		},
   666  		TokenRequests:     m.tokenRequests,
   667  		RequiresRepublish: m.requiresRepublish,
   668  		FSGroupPolicy:     m.fsGroupPolicy,
   669  		SELinuxMount:      m.enableSELinuxMount,
   670  		Features:          map[string][]string{},
   671  	}
   673  	if m.enableRecoverExpansionFailure {
   674  		o.Features["csi-resizer"] = []string{"RecoverVolumeExpansionFailure=true"}
   675  	}
   676  	err = utils.CreateFromManifests(ctx, f, m.driverNamespace, func(item interface{}) error {
   677  		if err := utils.PatchCSIDeployment(config.Framework, o, item); err != nil {
   678  			return err
   679  		}
   681  		switch item := item.(type) {
   682  		case *rbacv1.ClusterRole:
   683  			if strings.HasPrefix(item.Name, "external-snapshotter-runner") {
   684  				// Re-enable access to secrets for the snapshotter sidecar for
   685  				// https://github.com/kubernetes/kubernetes/blob/6ede5ca95f78478fa627ecfea8136e0dff34436b/test/e2e/storage/csi_mock_volume.go#L1539-L1548
   686  				// It was disabled in https://github.com/kubernetes-csi/external-snapshotter/blob/501cc505846c03ee665355132f2da0ce7d5d747d/deploy/kubernetes/csi-snapshotter/rbac-csi-snapshotter.yaml#L26-L32
   687  				item.Rules = append(item.Rules, rbacv1.PolicyRule{
   688  					APIGroups: []string{""},
   689  					Resources: []string{"secrets"},
   690  					Verbs:     []string{"get", "list"},
   691  				})
   692  			}
   693  		}
   695  		return nil
   696  	}, m.manifests...)
   698  	if err != nil {
   699  		framework.Failf("deploying csi mock driver: %v", err)
   700  	}
   702  	driverCleanupFunc := generateDriverCleanupFunc(
   703  		f,
   704  		"mock",
   705  		testns,
   706  		driverns,
   707  		cancelLogging)
   709  	ginkgo.DeferCleanup(func(ctx context.Context) {
   710  		embeddedCleanup()
   711  		driverCleanupFunc(ctx)
   712  	})
   714  	return config
   715  }
   717  func (m *mockCSIDriver) interceptGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
   718  	defer func() {
   719  		// Always log the call and its final result,
   720  		// regardless whether the result was from the real
   721  		// implementation or a hook.
   722  		m.calls.LogGRPC(info.FullMethod, req, resp, err)
   723  	}()
   725  	if m.hooks.Pre != nil {
   726  		resp, err = m.hooks.Pre(ctx, info.FullMethod, req)
   727  		if resp != nil || err != nil {
   728  			return
   729  		}
   730  	}
   731  	resp, err = handler(ctx, req)
   732  	if m.hooks.Post != nil {
   733  		resp, err = m.hooks.Post(ctx, info.FullMethod, req, resp, err)
   734  	}
   735  	return
   736  }
   738  func (m *mockCSIDriver) GetCalls(ctx context.Context) ([]MockCSICall, error) {
   739  	if m.embedded {
   740  		return m.calls.Get(), nil
   741  	}
   743  	if m.driverNamespace == nil {
   744  		return nil, errors.New("PrepareTest not called yet")
   745  	}
   747  	// Name of CSI driver pod name (it's in a StatefulSet with a stable name)
   748  	driverPodName := "csi-mockplugin-0"
   749  	// Name of CSI driver container name
   750  	driverContainerName := "mock"
   752  	// Load logs of driver pod
   753  	log, err := e2epod.GetPodLogs(ctx, m.clientSet, m.driverNamespace.Name, driverPodName, driverContainerName)
   754  	if err != nil {
   755  		return nil, fmt.Errorf("could not load CSI driver logs: %w", err)
   756  	}
   758  	logLines := strings.Split(log, "\n")
   759  	var calls []MockCSICall
   760  	for _, line := range logLines {
   761  		index := strings.Index(line, grpcCallPrefix)
   762  		if index == -1 {
   763  			continue
   764  		}
   765  		line = line[index+len(grpcCallPrefix):]
   766  		call := MockCSICall{
   767  			json: string(line),
   768  		}
   769  		err := json.Unmarshal([]byte(line), &call)
   770  		if err != nil {
   771  			framework.Logf("Could not parse CSI driver log line %q: %s", line, err)
   772  			continue
   773  		}
   775  		// Trim gRPC service name, i.e. "/csi.v1.Identity/Probe" -> "Probe"
   776  		methodParts := strings.Split(call.Method, "/")
   777  		call.Method = methodParts[len(methodParts)-1]
   779  		calls = append(calls, call)
   780  	}
   781  	return calls, nil
   782  }
   784  // gce-pd
   785  type gcePDCSIDriver struct {
   786  	driverInfo storageframework.DriverInfo
   787  }
   789  var _ storageframework.TestDriver = &gcePDCSIDriver{}
   790  var _ storageframework.DynamicPVTestDriver = &gcePDCSIDriver{}
   791  var _ storageframework.SnapshottableTestDriver = &gcePDCSIDriver{}
   793  // InitGcePDCSIDriver returns gcePDCSIDriver that implements TestDriver interface
   794  func InitGcePDCSIDriver() storageframework.TestDriver {
   795  	return &gcePDCSIDriver{
   796  		driverInfo: storageframework.DriverInfo{
   797  			Name:        GCEPDCSIDriverName,
   798  			TestTags:    []interface{}{framework.WithSerial()},
   799  			MaxFileSize: storageframework.FileSizeMedium,
   800  			SupportedSizeRange: e2evolume.SizeRange{
   801  				Min: "5Gi",
   802  			},
   803  			SupportedFsType: sets.NewString(
   804  				"", // Default fsType
   805  				"ext2",
   806  				"ext3",
   807  				"ext4",
   808  				"xfs",
   809  			),
   810  			SupportedMountOption: sets.NewString("debug", "nouid32"),
   811  			Capabilities: map[storageframework.Capability]bool{
   812  				storageframework.CapPersistence: true,
   813  				storageframework.CapBlock:       true,
   814  				storageframework.CapFsGroup:     true,
   815  				storageframework.CapExec:        true,
   816  				storageframework.CapMultiPODs:   true,
   817  				// GCE supports volume limits, but the test creates large
   818  				// number of volumes and times out test suites.
   819  				storageframework.CapVolumeLimits:                   false,
   820  				storageframework.CapTopology:                       true,
   821  				storageframework.CapControllerExpansion:            true,
   822  				storageframework.CapOfflineExpansion:               true,
   823  				storageframework.CapOnlineExpansion:                true,
   824  				storageframework.CapNodeExpansion:                  true,
   825  				storageframework.CapSnapshotDataSource:             true,
   826  				storageframework.CapReadWriteOncePod:               true,
   827  				storageframework.CapMultiplePVsSameID:              true,
   828  				storageframework.CapFSResizeFromSourceNotSupported: true, //TODO: remove when CI tests use the fixed driver with: https://github.com/kubernetes-sigs/gcp-compute-persistent-disk-csi-driver/pull/972
   829  			},
   830  			RequiredAccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
   831  			TopologyKeys:        []string{GCEPDCSIZoneTopologyKey},
   832  			StressTestOptions: &storageframework.StressTestOptions{
   833  				NumPods:     10,
   834  				NumRestarts: 10,
   835  			},
   836  			VolumeSnapshotStressTestOptions: &storageframework.VolumeSnapshotStressTestOptions{
   837  				// GCE only allows for one snapshot per volume to be created at a time,
   838  				// which can cause test timeouts. We reduce the likelihood of test timeouts
   839  				// by increasing the number of pods (and volumes) and reducing the number
   840  				// of snapshots per volume.
   841  				NumPods:      20,
   842  				NumSnapshots: 2,
   843  			},
   844  		},
   845  	}
   846  }
   848  func (g *gcePDCSIDriver) GetDriverInfo() *storageframework.DriverInfo {
   849  	return &g.driverInfo
   850  }
   852  func (g *gcePDCSIDriver) SkipUnsupportedTest(pattern storageframework.TestPattern) {
   853  	e2eskipper.SkipUnlessProviderIs("gce", "gke")
   854  	if pattern.FsType == "xfs" {
   855  		e2eskipper.SkipUnlessNodeOSDistroIs("ubuntu", "custom")
   856  	}
   857  	for _, tag := range pattern.TestTags {
   858  		if framework.TagsEqual(tag, feature.Windows) {
   859  			e2eskipper.Skipf("Skipping tests for windows since CSI does not support it yet")
   860  		}
   861  	}
   862  }
   864  func (g *gcePDCSIDriver) GetDynamicProvisionStorageClass(ctx context.Context, config *storageframework.PerTestConfig, fsType string) *storagev1.StorageClass {
   865  	ns := config.Framework.Namespace.Name
   866  	provisioner := g.driverInfo.Name
   868  	parameters := map[string]string{"type": "pd-standard"}
   869  	if fsType != "" {
   870  		parameters["csi.storage.k8s.io/fstype"] = fsType
   871  	}
   872  	delayedBinding := storagev1.VolumeBindingWaitForFirstConsumer
   874  	return storageframework.GetStorageClass(provisioner, parameters, &delayedBinding, ns)
   875  }
   877  func (g *gcePDCSIDriver) GetSnapshotClass(ctx context.Context, config *storageframework.PerTestConfig, parameters map[string]string) *unstructured.Unstructured {
   878  	snapshotter := g.driverInfo.Name
   879  	ns := config.Framework.Namespace.Name
   881  	return utils.GenerateSnapshotClassSpec(snapshotter, parameters, ns)
   882  }
   884  func (g *gcePDCSIDriver) PrepareTest(ctx context.Context, f *framework.Framework) *storageframework.PerTestConfig {
   885  	testns := f.Namespace.Name
   886  	cfg := &storageframework.PerTestConfig{
   887  		Driver:    g,
   888  		Prefix:    "gcepd",
   889  		Framework: f,
   890  	}
   892  	if framework.ProviderIs("gke") {
   893  		framework.Logf("The csi gce-pd driver is automatically installed in GKE. Skipping driver installation.")
   894  		return cfg
   895  	}
   897  	// Check if the cluster is already running gce-pd CSI Driver
   898  	deploy, err := f.ClientSet.AppsV1().Deployments("gce-pd-csi-driver").Get(ctx, "csi-gce-pd-controller", metav1.GetOptions{})
   899  	if err == nil && deploy != nil {
   900  		framework.Logf("The csi gce-pd driver is already installed.")
   901  		return cfg
   902  	}
   903  	ginkgo.By("deploying csi gce-pd driver")
   904  	// Create secondary namespace which will be used for creating driver
   905  	driverNamespace := utils.CreateDriverNamespace(ctx, f)
   906  	driverns := driverNamespace.Name
   908  	cancelLogging := utils.StartPodLogs(ctx, f, driverNamespace)
   909  	// It would be safer to rename the gcePD driver, but that
   910  	// hasn't been done before either and attempts to do so now led to
   911  	// errors during driver registration, therefore it is disabled
   912  	// by passing a nil function below.
   913  	//
   914  	// These are the options which would have to be used:
   915  	// o := utils.PatchCSIOptions{
   916  	// 	OldDriverName:            g.driverInfo.Name,
   917  	// 	NewDriverName:            storageframework.GetUniqueDriverName(g),
   918  	// 	DriverContainerName:      "gce-driver",
   919  	// 	ProvisionerContainerName: "csi-external-provisioner",
   920  	// }
   921  	createGCESecrets(f.ClientSet, driverns)
   923  	manifests := []string{
   924  		"test/e2e/testing-manifests/storage-csi/external-attacher/rbac.yaml",
   925  		"test/e2e/testing-manifests/storage-csi/external-provisioner/rbac.yaml",
   926  		"test/e2e/testing-manifests/storage-csi/gce-pd/csi-controller-rbac.yaml",
   927  		"test/e2e/testing-manifests/storage-csi/gce-pd/node_ds.yaml",
   928  		"test/e2e/testing-manifests/storage-csi/gce-pd/controller_ss.yaml",
   929  	}
   931  	err = utils.CreateFromManifests(ctx, f, driverNamespace, nil, manifests...)
   932  	if err != nil {
   933  		framework.Failf("deploying csi gce-pd driver: %v", err)
   934  	}
   936  	if err = WaitForCSIDriverRegistrationOnAllNodes(ctx, GCEPDCSIDriverName, f.ClientSet); err != nil {
   937  		framework.Failf("waiting for csi driver node registration on: %v", err)
   938  	}
   940  	cleanupFunc := generateDriverCleanupFunc(
   941  		f,
   942  		"gce-pd",
   943  		testns,
   944  		driverns,
   945  		cancelLogging)
   946  	ginkgo.DeferCleanup(cleanupFunc)
   948  	return &storageframework.PerTestConfig{
   949  		Driver:          g,
   950  		Prefix:          "gcepd",
   951  		Framework:       f,
   952  		DriverNamespace: driverNamespace,
   953  	}
   954  }
   956  // WaitForCSIDriverRegistrationOnAllNodes waits for the CSINode object to be updated
   957  // with the given driver on all schedulable nodes.
   958  func WaitForCSIDriverRegistrationOnAllNodes(ctx context.Context, driverName string, cs clientset.Interface) error {
   959  	nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
   960  	if err != nil {
   961  		return err
   962  	}
   963  	for _, node := range nodes.Items {
   964  		if err := WaitForCSIDriverRegistrationOnNode(ctx, node.Name, driverName, cs); err != nil {
   965  			return err
   966  		}
   967  	}
   968  	return nil
   969  }
   971  // WaitForCSIDriverRegistrationOnNode waits for the CSINode object generated by the node-registrar on a certain node
   972  func WaitForCSIDriverRegistrationOnNode(ctx context.Context, nodeName string, driverName string, cs clientset.Interface) error {
   973  	framework.Logf("waiting for CSIDriver %v to register on node %v", driverName, nodeName)
   975  	// About 8.6 minutes timeout
   976  	backoff := wait.Backoff{
   977  		Duration: 2 * time.Second,
   978  		Factor:   1.5,
   979  		Steps:    12,
   980  	}
   982  	waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
   983  		csiNode, err := cs.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
   984  		if err != nil && !apierrors.IsNotFound(err) {
   985  			return false, err
   986  		}
   987  		for _, driver := range csiNode.Spec.Drivers {
   988  			if driver.Name == driverName {
   989  				return true, nil
   990  			}
   991  		}
   992  		return false, nil
   993  	})
   994  	if waitErr != nil {
   995  		return fmt.Errorf("error waiting for CSI driver %s registration on node %s: %v", driverName, nodeName, waitErr)
   996  	}
   997  	return nil
   998  }
  1000  func tryFunc(f func()) error {
  1001  	var err error
  1002  	if f == nil {
  1003  		return nil
  1004  	}
  1005  	defer func() {
  1006  		if recoverError := recover(); recoverError != nil {
  1007  			err = fmt.Errorf("%v", recoverError)
  1008  		}
  1009  	}()
  1010  	f()
  1011  	return err
  1012  }
  1014  func generateDriverCleanupFunc(
  1015  	f *framework.Framework,
  1016  	driverName, testns, driverns string,
  1017  	cancelLogging func()) func(ctx context.Context) {
  1019  	// Cleanup CSI driver and namespaces. This function needs to be idempotent and can be
  1020  	// concurrently called from defer (or AfterEach) and AfterSuite action hooks.
  1021  	cleanupFunc := func(ctx context.Context) {
  1022  		ginkgo.By(fmt.Sprintf("deleting the test namespace: %s", testns))
  1023  		// Delete the primary namespace but it's okay to fail here because this namespace will
  1024  		// also be deleted by framework.Aftereach hook
  1025  		_ = tryFunc(func() { f.DeleteNamespace(ctx, testns) })
  1027  		ginkgo.By(fmt.Sprintf("uninstalling csi %s driver", driverName))
  1028  		_ = tryFunc(cancelLogging)
  1030  		ginkgo.By(fmt.Sprintf("deleting the driver namespace: %s", driverns))
  1031  		_ = tryFunc(func() { f.DeleteNamespace(ctx, driverns) })
  1032  	}
  1034  	return cleanupFunc
  1035  }

View as plain text