...

Source file src/k8s.io/kubernetes/test/e2e/storage/testsuites/volumelimits.go

Documentation: k8s.io/kubernetes/test/e2e/storage/testsuites

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package testsuites
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"regexp"
    23  	"strings"
    24  	"time"
    25  
    26  	"github.com/onsi/ginkgo/v2"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	storagev1 "k8s.io/api/storage/v1"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	"k8s.io/apimachinery/pkg/util/sets"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/component-helpers/storage/ephemeral"
    36  	migrationplugins "k8s.io/csi-translation-lib/plugins" // volume plugin names are exported nicely there
    37  	volumeutil "k8s.io/kubernetes/pkg/volume/util"
    38  	"k8s.io/kubernetes/test/e2e/framework"
    39  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    40  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    41  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    42  	storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
    43  	storageutils "k8s.io/kubernetes/test/e2e/storage/utils"
    44  	admissionapi "k8s.io/pod-security-admission/api"
    45  )
    46  
    47  type volumeLimitsTestSuite struct {
    48  	tsInfo storageframework.TestSuiteInfo
    49  }
    50  
    51  const (
    52  	// The test uses generic pod startup / PV deletion timeouts. As it creates
    53  	// much more volumes at once, these timeouts are multiplied by this number.
    54  	// Using real nr. of volumes (e.g. 128 on GCE) would be really too much.
    55  	testSlowMultiplier = 10
    56  
    57  	// How long to wait until CSINode gets attach limit from installed CSI driver.
    58  	csiNodeInfoTimeout = 2 * time.Minute
    59  )
    60  
    61  var _ storageframework.TestSuite = &volumeLimitsTestSuite{}
    62  
    63  // InitCustomVolumeLimitsTestSuite returns volumeLimitsTestSuite that implements TestSuite interface
    64  // using custom test patterns
    65  func InitCustomVolumeLimitsTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
    66  	return &volumeLimitsTestSuite{
    67  		tsInfo: storageframework.TestSuiteInfo{
    68  			Name:         "volumeLimits",
    69  			TestPatterns: patterns,
    70  		},
    71  	}
    72  }
    73  
    74  // InitVolumeLimitsTestSuite returns volumeLimitsTestSuite that implements TestSuite interface
    75  // using testsuite default patterns
    76  func InitVolumeLimitsTestSuite() storageframework.TestSuite {
    77  	patterns := []storageframework.TestPattern{
    78  		storageframework.FsVolModeDynamicPV,
    79  		storageframework.DefaultFsGenericEphemeralVolume,
    80  	}
    81  	return InitCustomVolumeLimitsTestSuite(patterns)
    82  }
    83  
    84  func (t *volumeLimitsTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
    85  	return t.tsInfo
    86  }
    87  
    88  func (t *volumeLimitsTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
    89  }
    90  
    91  func (t *volumeLimitsTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
    92  	type local struct {
    93  		config *storageframework.PerTestConfig
    94  
    95  		cs clientset.Interface
    96  		ns *v1.Namespace
    97  		// VolumeResource contains pv, pvc, sc, etc. of the first pod created
    98  		resource *storageframework.VolumeResource
    99  
   100  		// All created PVCs
   101  		pvcNames []string
   102  
   103  		// All created Pods
   104  		podNames []string
   105  
   106  		// All created PVs, incl. the one in resource
   107  		pvNames sets.String
   108  	}
   109  	var (
   110  		l local
   111  	)
   112  
   113  	// Beware that it also registers an AfterEach which renders f unusable. Any code using
   114  	// f must run inside an It or Context callback.
   115  	f := framework.NewFrameworkWithCustomTimeouts("volumelimits", storageframework.GetDriverTimeouts(driver))
   116  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
   117  
   118  	// This checks that CSIMaxVolumeLimitChecker works as expected.
   119  	// A randomly chosen node should be able to handle as many CSI volumes as
   120  	// it claims to handle in CSINode.Spec.Drivers[x].Allocatable.
   121  	// The test uses one single pod with a lot of volumes to work around any
   122  	// max pod limit on a node.
   123  	// And one extra pod with a CSI volume should get Pending with a condition
   124  	// that says it's unschedulable because of volume limit.
   125  	// BEWARE: the test may create lot of volumes and it's really slow.
   126  	f.It("should support volume limits", f.WithSerial(), func(ctx context.Context) {
   127  		driverInfo := driver.GetDriverInfo()
   128  		if !driverInfo.Capabilities[storageframework.CapVolumeLimits] {
   129  			ginkgo.Skip(fmt.Sprintf("driver %s does not support volume limits", driverInfo.Name))
   130  		}
   131  		var dDriver storageframework.DynamicPVTestDriver
   132  		if dDriver = driver.(storageframework.DynamicPVTestDriver); dDriver == nil {
   133  			framework.Failf("Test driver does not provide dynamically created volumes")
   134  		}
   135  
   136  		l.ns = f.Namespace
   137  		l.cs = f.ClientSet
   138  
   139  		l.config = driver.PrepareTest(ctx, f)
   140  
   141  		ginkgo.By("Picking a node")
   142  		// Some CSI drivers are deployed to a single node (e.g csi-hostpath),
   143  		// so we use that node instead of picking a random one.
   144  		nodeName := l.config.ClientNodeSelection.Name
   145  		if nodeName == "" {
   146  			node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
   147  			framework.ExpectNoError(err)
   148  			nodeName = node.Name
   149  		}
   150  		framework.Logf("Selected node %s", nodeName)
   151  
   152  		ginkgo.By("Checking node limits")
   153  		limit, err := getNodeLimits(ctx, l.cs, l.config, nodeName, driverInfo)
   154  		framework.ExpectNoError(err)
   155  
   156  		framework.Logf("Node %s can handle %d volumes of driver %s", nodeName, limit, driverInfo.Name)
   157  		// Create a storage class and generate a PVC. Do not instantiate the PVC yet, keep it for the last pod.
   158  		testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
   159  		driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange
   160  		claimSize, err := storageutils.GetSizeRangesIntersection(testVolumeSizeRange, driverVolumeSizeRange)
   161  		framework.ExpectNoError(err, "determine intersection of test size range %+v and driver size range %+v", testVolumeSizeRange, dDriver)
   162  
   163  		l.resource = storageframework.CreateVolumeResource(ctx, driver, l.config, pattern, testVolumeSizeRange)
   164  		ginkgo.DeferCleanup(l.resource.CleanupResource)
   165  		ginkgo.DeferCleanup(cleanupTest, l.cs, l.ns.Name, l.podNames, l.pvcNames, l.pvNames, testSlowMultiplier*f.Timeouts.PVDelete)
   166  
   167  		selection := e2epod.NodeSelection{Name: nodeName}
   168  
   169  		if pattern.VolType == storageframework.GenericEphemeralVolume {
   170  			// Create <limit> Pods.
   171  			ginkgo.By(fmt.Sprintf("Creating %d Pod(s) with one volume each", limit))
   172  			for i := 0; i < limit; i++ {
   173  				pod := StartInPodWithVolumeSource(ctx, l.cs, *l.resource.VolSource, l.ns.Name, "volume-limits", "sleep 1000000", selection)
   174  				l.podNames = append(l.podNames, pod.Name)
   175  				l.pvcNames = append(l.pvcNames, ephemeral.VolumeClaimName(pod, &pod.Spec.Volumes[0]))
   176  			}
   177  		} else {
   178  			// Create <limit> PVCs for one gigantic pod.
   179  			var pvcs []*v1.PersistentVolumeClaim
   180  			ginkgo.By(fmt.Sprintf("Creating %d PVC(s)", limit))
   181  			for i := 0; i < limit; i++ {
   182  				pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
   183  					ClaimSize:        claimSize,
   184  					StorageClassName: &l.resource.Sc.Name,
   185  				}, l.ns.Name)
   186  				pvc, err = l.cs.CoreV1().PersistentVolumeClaims(l.ns.Name).Create(ctx, pvc, metav1.CreateOptions{})
   187  				framework.ExpectNoError(err)
   188  				l.pvcNames = append(l.pvcNames, pvc.Name)
   189  				pvcs = append(pvcs, pvc)
   190  			}
   191  
   192  			ginkgo.By("Creating pod to use all PVC(s)")
   193  			podConfig := e2epod.Config{
   194  				NS:            l.ns.Name,
   195  				PVCs:          pvcs,
   196  				SeLinuxLabel:  e2epv.SELinuxLabel,
   197  				NodeSelection: selection,
   198  			}
   199  			pod, err := e2epod.MakeSecPod(&podConfig)
   200  			framework.ExpectNoError(err)
   201  			pod, err = l.cs.CoreV1().Pods(l.ns.Name).Create(ctx, pod, metav1.CreateOptions{})
   202  			framework.ExpectNoError(err)
   203  			l.podNames = append(l.podNames, pod.Name)
   204  		}
   205  
   206  		ginkgo.By("Waiting for all PVCs to get Bound")
   207  		l.pvNames, err = waitForAllPVCsBound(ctx, l.cs, testSlowMultiplier*f.Timeouts.PVBound, l.ns.Name, l.pvcNames)
   208  		framework.ExpectNoError(err)
   209  
   210  		ginkgo.By("Waiting for the pod(s) running")
   211  		for _, podName := range l.podNames {
   212  			err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, l.cs, podName, l.ns.Name, testSlowMultiplier*f.Timeouts.PodStart)
   213  			framework.ExpectNoError(err)
   214  		}
   215  
   216  		ginkgo.By("Creating an extra pod with one volume to exceed the limit")
   217  		pod := StartInPodWithVolumeSource(ctx, l.cs, *l.resource.VolSource, l.ns.Name, "volume-limits-exceeded", "sleep 10000", selection)
   218  		l.podNames = append(l.podNames, pod.Name)
   219  
   220  		ginkgo.By("Waiting for the pod to get unschedulable with the right message")
   221  		err = e2epod.WaitForPodCondition(ctx, l.cs, l.ns.Name, pod.Name, "Unschedulable", f.Timeouts.PodStart, func(pod *v1.Pod) (bool, error) {
   222  			if pod.Status.Phase == v1.PodPending {
   223  				reg, err := regexp.Compile(`max.+volume.+count`)
   224  				if err != nil {
   225  					return false, err
   226  				}
   227  				for _, cond := range pod.Status.Conditions {
   228  					matched := reg.MatchString(cond.Message)
   229  					if cond.Type == v1.PodScheduled && cond.Status == v1.ConditionFalse && cond.Reason == "Unschedulable" && matched {
   230  						return true, nil
   231  					}
   232  				}
   233  			}
   234  			if pod.Status.Phase != v1.PodPending {
   235  				return true, fmt.Errorf("Expected pod to be in phase Pending, but got phase: %v", pod.Status.Phase)
   236  			}
   237  			return false, nil
   238  		})
   239  		framework.ExpectNoError(err)
   240  	})
   241  
   242  	ginkgo.It("should verify that all csinodes have volume limits", func(ctx context.Context) {
   243  		driverInfo := driver.GetDriverInfo()
   244  		if !driverInfo.Capabilities[storageframework.CapVolumeLimits] {
   245  			ginkgo.Skip(fmt.Sprintf("driver %s does not support volume limits", driverInfo.Name))
   246  		}
   247  
   248  		l.ns = f.Namespace
   249  		l.cs = f.ClientSet
   250  
   251  		l.config = driver.PrepareTest(ctx, f)
   252  
   253  		nodeNames := []string{}
   254  		if l.config.ClientNodeSelection.Name != "" {
   255  			// Some CSI drivers are deployed to a single node (e.g csi-hostpath),
   256  			// so we check that node instead of checking all of them
   257  			nodeNames = append(nodeNames, l.config.ClientNodeSelection.Name)
   258  		} else {
   259  			nodeList, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
   260  			framework.ExpectNoError(err)
   261  			for _, node := range nodeList.Items {
   262  				nodeNames = append(nodeNames, node.Name)
   263  			}
   264  		}
   265  
   266  		for _, nodeName := range nodeNames {
   267  			ginkgo.By("Checking csinode limits")
   268  			_, err := getNodeLimits(ctx, l.cs, l.config, nodeName, driverInfo)
   269  			if err != nil {
   270  				framework.Failf("Expected volume limits to be set, error: %v", err)
   271  			}
   272  		}
   273  	})
   274  }
   275  
   276  func cleanupTest(ctx context.Context, cs clientset.Interface, ns string, podNames, pvcNames []string, pvNames sets.String, timeout time.Duration) error {
   277  	var cleanupErrors []string
   278  	for _, podName := range podNames {
   279  		err := cs.CoreV1().Pods(ns).Delete(ctx, podName, metav1.DeleteOptions{})
   280  		if err != nil {
   281  			cleanupErrors = append(cleanupErrors, fmt.Sprintf("failed to delete pod %s: %s", podName, err))
   282  		}
   283  	}
   284  	for _, pvcName := range pvcNames {
   285  		err := cs.CoreV1().PersistentVolumeClaims(ns).Delete(ctx, pvcName, metav1.DeleteOptions{})
   286  		if !apierrors.IsNotFound(err) {
   287  			cleanupErrors = append(cleanupErrors, fmt.Sprintf("failed to delete PVC %s: %s", pvcName, err))
   288  		}
   289  	}
   290  	// Wait for the PVs to be deleted. It includes also pod and PVC deletion because of PVC protection.
   291  	// We use PVs to make sure that the test does not leave orphan PVs when a CSI driver is destroyed
   292  	// just after the test ends.
   293  	err := wait.Poll(5*time.Second, timeout, func() (bool, error) {
   294  		existing := 0
   295  		for _, pvName := range pvNames.UnsortedList() {
   296  			_, err := cs.CoreV1().PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{})
   297  			if err == nil {
   298  				existing++
   299  			} else {
   300  				if apierrors.IsNotFound(err) {
   301  					pvNames.Delete(pvName)
   302  				} else {
   303  					framework.Logf("Failed to get PV %s: %s", pvName, err)
   304  				}
   305  			}
   306  		}
   307  		if existing > 0 {
   308  			framework.Logf("Waiting for %d PVs to be deleted", existing)
   309  			return false, nil
   310  		}
   311  		return true, nil
   312  	})
   313  	if err != nil {
   314  		cleanupErrors = append(cleanupErrors, fmt.Sprintf("timed out waiting for PVs to be deleted: %s", err))
   315  	}
   316  	if len(cleanupErrors) != 0 {
   317  		return fmt.Errorf("test cleanup failed: " + strings.Join(cleanupErrors, "; "))
   318  	}
   319  	return nil
   320  }
   321  
   322  // waitForAllPVCsBound waits until the given PVCs are all bound. It then returns the bound PVC names as a set.
   323  func waitForAllPVCsBound(ctx context.Context, cs clientset.Interface, timeout time.Duration, ns string, pvcNames []string) (sets.String, error) {
   324  	pvNames := sets.NewString()
   325  	err := wait.Poll(5*time.Second, timeout, func() (bool, error) {
   326  		unbound := 0
   327  		for _, pvcName := range pvcNames {
   328  			pvc, err := cs.CoreV1().PersistentVolumeClaims(ns).Get(ctx, pvcName, metav1.GetOptions{})
   329  			if err != nil {
   330  				return false, err
   331  			}
   332  			if pvc.Status.Phase != v1.ClaimBound {
   333  				unbound++
   334  			} else {
   335  				pvNames.Insert(pvc.Spec.VolumeName)
   336  			}
   337  		}
   338  		if unbound > 0 {
   339  			framework.Logf("%d/%d of PVCs are Bound", pvNames.Len(), len(pvcNames))
   340  			return false, nil
   341  		}
   342  		return true, nil
   343  	})
   344  	if err != nil {
   345  		return nil, fmt.Errorf("error waiting for all PVCs to be bound: %w", err)
   346  	}
   347  	return pvNames, nil
   348  }
   349  
   350  func getNodeLimits(ctx context.Context, cs clientset.Interface, config *storageframework.PerTestConfig, nodeName string, driverInfo *storageframework.DriverInfo) (int, error) {
   351  	if len(driverInfo.InTreePluginName) == 0 {
   352  		return getCSINodeLimits(ctx, cs, config, nodeName, driverInfo)
   353  	}
   354  	return getInTreeNodeLimits(ctx, cs, nodeName, driverInfo)
   355  }
   356  
   357  func getInTreeNodeLimits(ctx context.Context, cs clientset.Interface, nodeName string, driverInfo *storageframework.DriverInfo) (int, error) {
   358  	node, err := cs.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
   359  	if err != nil {
   360  		return 0, err
   361  	}
   362  
   363  	var allocatableKey string
   364  	switch driverInfo.InTreePluginName {
   365  	case migrationplugins.AWSEBSInTreePluginName:
   366  		allocatableKey = volumeutil.EBSVolumeLimitKey
   367  	case migrationplugins.GCEPDInTreePluginName:
   368  		allocatableKey = volumeutil.GCEVolumeLimitKey
   369  	case migrationplugins.CinderInTreePluginName:
   370  		allocatableKey = volumeutil.CinderVolumeLimitKey
   371  	case migrationplugins.AzureDiskInTreePluginName:
   372  		allocatableKey = volumeutil.AzureVolumeLimitKey
   373  	default:
   374  		return 0, fmt.Errorf("Unknown in-tree volume plugin name: %s", driverInfo.InTreePluginName)
   375  	}
   376  
   377  	limit, ok := node.Status.Allocatable[v1.ResourceName(allocatableKey)]
   378  	if !ok {
   379  		return 0, fmt.Errorf("Node %s does not contain status.allocatable[%s] for volume plugin %s", nodeName, allocatableKey, driverInfo.InTreePluginName)
   380  	}
   381  	return int(limit.Value()), nil
   382  }
   383  
   384  func getCSINodeLimits(ctx context.Context, cs clientset.Interface, config *storageframework.PerTestConfig, nodeName string, driverInfo *storageframework.DriverInfo) (int, error) {
   385  	// Retry with a timeout, the driver might just have been installed and kubelet takes a while to publish everything.
   386  	var limit int
   387  	err := wait.PollImmediate(2*time.Second, csiNodeInfoTimeout, func() (bool, error) {
   388  		csiNode, err := cs.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
   389  		if err != nil {
   390  			framework.Logf("%s", err)
   391  			return false, nil
   392  		}
   393  		var csiDriver *storagev1.CSINodeDriver
   394  		for i, c := range csiNode.Spec.Drivers {
   395  			if c.Name == driverInfo.Name || c.Name == config.GetUniqueDriverName() {
   396  				csiDriver = &csiNode.Spec.Drivers[i]
   397  				break
   398  			}
   399  		}
   400  		if csiDriver == nil {
   401  			framework.Logf("CSINodeInfo does not have driver %s yet", driverInfo.Name)
   402  			return false, nil
   403  		}
   404  		if csiDriver.Allocatable == nil {
   405  			return false, fmt.Errorf("CSINodeInfo does not have Allocatable for driver %s", driverInfo.Name)
   406  		}
   407  		if csiDriver.Allocatable.Count == nil {
   408  			return false, fmt.Errorf("CSINodeInfo does not have Allocatable.Count for driver %s", driverInfo.Name)
   409  		}
   410  		limit = int(*csiDriver.Allocatable.Count)
   411  		return true, nil
   412  	})
   413  	if err != nil {
   414  		return 0, fmt.Errorf("could not get CSINode limit for driver %s: %w", driverInfo.Name, err)
   415  	}
   416  	return limit, nil
   417  }
   418  

View as plain text