1
16
17 package testsuites
18
19 import (
20 "context"
21 "fmt"
22 "regexp"
23 "strings"
24 "time"
25
26 "github.com/onsi/ginkgo/v2"
27
28 v1 "k8s.io/api/core/v1"
29 storagev1 "k8s.io/api/storage/v1"
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/util/sets"
33 "k8s.io/apimachinery/pkg/util/wait"
34 clientset "k8s.io/client-go/kubernetes"
35 "k8s.io/component-helpers/storage/ephemeral"
36 migrationplugins "k8s.io/csi-translation-lib/plugins"
37 volumeutil "k8s.io/kubernetes/pkg/volume/util"
38 "k8s.io/kubernetes/test/e2e/framework"
39 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
40 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
41 e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
42 storageframework "k8s.io/kubernetes/test/e2e/storage/framework"
43 storageutils "k8s.io/kubernetes/test/e2e/storage/utils"
44 admissionapi "k8s.io/pod-security-admission/api"
45 )
46
47 type volumeLimitsTestSuite struct {
48 tsInfo storageframework.TestSuiteInfo
49 }
50
51 const (
52
53
54
55 testSlowMultiplier = 10
56
57
58 csiNodeInfoTimeout = 2 * time.Minute
59 )
60
61 var _ storageframework.TestSuite = &volumeLimitsTestSuite{}
62
63
64
65 func InitCustomVolumeLimitsTestSuite(patterns []storageframework.TestPattern) storageframework.TestSuite {
66 return &volumeLimitsTestSuite{
67 tsInfo: storageframework.TestSuiteInfo{
68 Name: "volumeLimits",
69 TestPatterns: patterns,
70 },
71 }
72 }
73
74
75
76 func InitVolumeLimitsTestSuite() storageframework.TestSuite {
77 patterns := []storageframework.TestPattern{
78 storageframework.FsVolModeDynamicPV,
79 storageframework.DefaultFsGenericEphemeralVolume,
80 }
81 return InitCustomVolumeLimitsTestSuite(patterns)
82 }
83
84 func (t *volumeLimitsTestSuite) GetTestSuiteInfo() storageframework.TestSuiteInfo {
85 return t.tsInfo
86 }
87
88 func (t *volumeLimitsTestSuite) SkipUnsupportedTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
89 }
90
91 func (t *volumeLimitsTestSuite) DefineTests(driver storageframework.TestDriver, pattern storageframework.TestPattern) {
92 type local struct {
93 config *storageframework.PerTestConfig
94
95 cs clientset.Interface
96 ns *v1.Namespace
97
98 resource *storageframework.VolumeResource
99
100
101 pvcNames []string
102
103
104 podNames []string
105
106
107 pvNames sets.String
108 }
109 var (
110 l local
111 )
112
113
114
115 f := framework.NewFrameworkWithCustomTimeouts("volumelimits", storageframework.GetDriverTimeouts(driver))
116 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
117
118
119
120
121
122
123
124
125
126 f.It("should support volume limits", f.WithSerial(), func(ctx context.Context) {
127 driverInfo := driver.GetDriverInfo()
128 if !driverInfo.Capabilities[storageframework.CapVolumeLimits] {
129 ginkgo.Skip(fmt.Sprintf("driver %s does not support volume limits", driverInfo.Name))
130 }
131 var dDriver storageframework.DynamicPVTestDriver
132 if dDriver = driver.(storageframework.DynamicPVTestDriver); dDriver == nil {
133 framework.Failf("Test driver does not provide dynamically created volumes")
134 }
135
136 l.ns = f.Namespace
137 l.cs = f.ClientSet
138
139 l.config = driver.PrepareTest(ctx, f)
140
141 ginkgo.By("Picking a node")
142
143
144 nodeName := l.config.ClientNodeSelection.Name
145 if nodeName == "" {
146 node, err := e2enode.GetRandomReadySchedulableNode(ctx, f.ClientSet)
147 framework.ExpectNoError(err)
148 nodeName = node.Name
149 }
150 framework.Logf("Selected node %s", nodeName)
151
152 ginkgo.By("Checking node limits")
153 limit, err := getNodeLimits(ctx, l.cs, l.config, nodeName, driverInfo)
154 framework.ExpectNoError(err)
155
156 framework.Logf("Node %s can handle %d volumes of driver %s", nodeName, limit, driverInfo.Name)
157
158 testVolumeSizeRange := t.GetTestSuiteInfo().SupportedSizeRange
159 driverVolumeSizeRange := dDriver.GetDriverInfo().SupportedSizeRange
160 claimSize, err := storageutils.GetSizeRangesIntersection(testVolumeSizeRange, driverVolumeSizeRange)
161 framework.ExpectNoError(err, "determine intersection of test size range %+v and driver size range %+v", testVolumeSizeRange, dDriver)
162
163 l.resource = storageframework.CreateVolumeResource(ctx, driver, l.config, pattern, testVolumeSizeRange)
164 ginkgo.DeferCleanup(l.resource.CleanupResource)
165 ginkgo.DeferCleanup(cleanupTest, l.cs, l.ns.Name, l.podNames, l.pvcNames, l.pvNames, testSlowMultiplier*f.Timeouts.PVDelete)
166
167 selection := e2epod.NodeSelection{Name: nodeName}
168
169 if pattern.VolType == storageframework.GenericEphemeralVolume {
170
171 ginkgo.By(fmt.Sprintf("Creating %d Pod(s) with one volume each", limit))
172 for i := 0; i < limit; i++ {
173 pod := StartInPodWithVolumeSource(ctx, l.cs, *l.resource.VolSource, l.ns.Name, "volume-limits", "sleep 1000000", selection)
174 l.podNames = append(l.podNames, pod.Name)
175 l.pvcNames = append(l.pvcNames, ephemeral.VolumeClaimName(pod, &pod.Spec.Volumes[0]))
176 }
177 } else {
178
179 var pvcs []*v1.PersistentVolumeClaim
180 ginkgo.By(fmt.Sprintf("Creating %d PVC(s)", limit))
181 for i := 0; i < limit; i++ {
182 pvc := e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
183 ClaimSize: claimSize,
184 StorageClassName: &l.resource.Sc.Name,
185 }, l.ns.Name)
186 pvc, err = l.cs.CoreV1().PersistentVolumeClaims(l.ns.Name).Create(ctx, pvc, metav1.CreateOptions{})
187 framework.ExpectNoError(err)
188 l.pvcNames = append(l.pvcNames, pvc.Name)
189 pvcs = append(pvcs, pvc)
190 }
191
192 ginkgo.By("Creating pod to use all PVC(s)")
193 podConfig := e2epod.Config{
194 NS: l.ns.Name,
195 PVCs: pvcs,
196 SeLinuxLabel: e2epv.SELinuxLabel,
197 NodeSelection: selection,
198 }
199 pod, err := e2epod.MakeSecPod(&podConfig)
200 framework.ExpectNoError(err)
201 pod, err = l.cs.CoreV1().Pods(l.ns.Name).Create(ctx, pod, metav1.CreateOptions{})
202 framework.ExpectNoError(err)
203 l.podNames = append(l.podNames, pod.Name)
204 }
205
206 ginkgo.By("Waiting for all PVCs to get Bound")
207 l.pvNames, err = waitForAllPVCsBound(ctx, l.cs, testSlowMultiplier*f.Timeouts.PVBound, l.ns.Name, l.pvcNames)
208 framework.ExpectNoError(err)
209
210 ginkgo.By("Waiting for the pod(s) running")
211 for _, podName := range l.podNames {
212 err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, l.cs, podName, l.ns.Name, testSlowMultiplier*f.Timeouts.PodStart)
213 framework.ExpectNoError(err)
214 }
215
216 ginkgo.By("Creating an extra pod with one volume to exceed the limit")
217 pod := StartInPodWithVolumeSource(ctx, l.cs, *l.resource.VolSource, l.ns.Name, "volume-limits-exceeded", "sleep 10000", selection)
218 l.podNames = append(l.podNames, pod.Name)
219
220 ginkgo.By("Waiting for the pod to get unschedulable with the right message")
221 err = e2epod.WaitForPodCondition(ctx, l.cs, l.ns.Name, pod.Name, "Unschedulable", f.Timeouts.PodStart, func(pod *v1.Pod) (bool, error) {
222 if pod.Status.Phase == v1.PodPending {
223 reg, err := regexp.Compile(`max.+volume.+count`)
224 if err != nil {
225 return false, err
226 }
227 for _, cond := range pod.Status.Conditions {
228 matched := reg.MatchString(cond.Message)
229 if cond.Type == v1.PodScheduled && cond.Status == v1.ConditionFalse && cond.Reason == "Unschedulable" && matched {
230 return true, nil
231 }
232 }
233 }
234 if pod.Status.Phase != v1.PodPending {
235 return true, fmt.Errorf("Expected pod to be in phase Pending, but got phase: %v", pod.Status.Phase)
236 }
237 return false, nil
238 })
239 framework.ExpectNoError(err)
240 })
241
242 ginkgo.It("should verify that all csinodes have volume limits", func(ctx context.Context) {
243 driverInfo := driver.GetDriverInfo()
244 if !driverInfo.Capabilities[storageframework.CapVolumeLimits] {
245 ginkgo.Skip(fmt.Sprintf("driver %s does not support volume limits", driverInfo.Name))
246 }
247
248 l.ns = f.Namespace
249 l.cs = f.ClientSet
250
251 l.config = driver.PrepareTest(ctx, f)
252
253 nodeNames := []string{}
254 if l.config.ClientNodeSelection.Name != "" {
255
256
257 nodeNames = append(nodeNames, l.config.ClientNodeSelection.Name)
258 } else {
259 nodeList, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
260 framework.ExpectNoError(err)
261 for _, node := range nodeList.Items {
262 nodeNames = append(nodeNames, node.Name)
263 }
264 }
265
266 for _, nodeName := range nodeNames {
267 ginkgo.By("Checking csinode limits")
268 _, err := getNodeLimits(ctx, l.cs, l.config, nodeName, driverInfo)
269 if err != nil {
270 framework.Failf("Expected volume limits to be set, error: %v", err)
271 }
272 }
273 })
274 }
275
276 func cleanupTest(ctx context.Context, cs clientset.Interface, ns string, podNames, pvcNames []string, pvNames sets.String, timeout time.Duration) error {
277 var cleanupErrors []string
278 for _, podName := range podNames {
279 err := cs.CoreV1().Pods(ns).Delete(ctx, podName, metav1.DeleteOptions{})
280 if err != nil {
281 cleanupErrors = append(cleanupErrors, fmt.Sprintf("failed to delete pod %s: %s", podName, err))
282 }
283 }
284 for _, pvcName := range pvcNames {
285 err := cs.CoreV1().PersistentVolumeClaims(ns).Delete(ctx, pvcName, metav1.DeleteOptions{})
286 if !apierrors.IsNotFound(err) {
287 cleanupErrors = append(cleanupErrors, fmt.Sprintf("failed to delete PVC %s: %s", pvcName, err))
288 }
289 }
290
291
292
293 err := wait.Poll(5*time.Second, timeout, func() (bool, error) {
294 existing := 0
295 for _, pvName := range pvNames.UnsortedList() {
296 _, err := cs.CoreV1().PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{})
297 if err == nil {
298 existing++
299 } else {
300 if apierrors.IsNotFound(err) {
301 pvNames.Delete(pvName)
302 } else {
303 framework.Logf("Failed to get PV %s: %s", pvName, err)
304 }
305 }
306 }
307 if existing > 0 {
308 framework.Logf("Waiting for %d PVs to be deleted", existing)
309 return false, nil
310 }
311 return true, nil
312 })
313 if err != nil {
314 cleanupErrors = append(cleanupErrors, fmt.Sprintf("timed out waiting for PVs to be deleted: %s", err))
315 }
316 if len(cleanupErrors) != 0 {
317 return fmt.Errorf("test cleanup failed: " + strings.Join(cleanupErrors, "; "))
318 }
319 return nil
320 }
321
322
323 func waitForAllPVCsBound(ctx context.Context, cs clientset.Interface, timeout time.Duration, ns string, pvcNames []string) (sets.String, error) {
324 pvNames := sets.NewString()
325 err := wait.Poll(5*time.Second, timeout, func() (bool, error) {
326 unbound := 0
327 for _, pvcName := range pvcNames {
328 pvc, err := cs.CoreV1().PersistentVolumeClaims(ns).Get(ctx, pvcName, metav1.GetOptions{})
329 if err != nil {
330 return false, err
331 }
332 if pvc.Status.Phase != v1.ClaimBound {
333 unbound++
334 } else {
335 pvNames.Insert(pvc.Spec.VolumeName)
336 }
337 }
338 if unbound > 0 {
339 framework.Logf("%d/%d of PVCs are Bound", pvNames.Len(), len(pvcNames))
340 return false, nil
341 }
342 return true, nil
343 })
344 if err != nil {
345 return nil, fmt.Errorf("error waiting for all PVCs to be bound: %w", err)
346 }
347 return pvNames, nil
348 }
349
350 func getNodeLimits(ctx context.Context, cs clientset.Interface, config *storageframework.PerTestConfig, nodeName string, driverInfo *storageframework.DriverInfo) (int, error) {
351 if len(driverInfo.InTreePluginName) == 0 {
352 return getCSINodeLimits(ctx, cs, config, nodeName, driverInfo)
353 }
354 return getInTreeNodeLimits(ctx, cs, nodeName, driverInfo)
355 }
356
357 func getInTreeNodeLimits(ctx context.Context, cs clientset.Interface, nodeName string, driverInfo *storageframework.DriverInfo) (int, error) {
358 node, err := cs.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
359 if err != nil {
360 return 0, err
361 }
362
363 var allocatableKey string
364 switch driverInfo.InTreePluginName {
365 case migrationplugins.AWSEBSInTreePluginName:
366 allocatableKey = volumeutil.EBSVolumeLimitKey
367 case migrationplugins.GCEPDInTreePluginName:
368 allocatableKey = volumeutil.GCEVolumeLimitKey
369 case migrationplugins.CinderInTreePluginName:
370 allocatableKey = volumeutil.CinderVolumeLimitKey
371 case migrationplugins.AzureDiskInTreePluginName:
372 allocatableKey = volumeutil.AzureVolumeLimitKey
373 default:
374 return 0, fmt.Errorf("Unknown in-tree volume plugin name: %s", driverInfo.InTreePluginName)
375 }
376
377 limit, ok := node.Status.Allocatable[v1.ResourceName(allocatableKey)]
378 if !ok {
379 return 0, fmt.Errorf("Node %s does not contain status.allocatable[%s] for volume plugin %s", nodeName, allocatableKey, driverInfo.InTreePluginName)
380 }
381 return int(limit.Value()), nil
382 }
383
384 func getCSINodeLimits(ctx context.Context, cs clientset.Interface, config *storageframework.PerTestConfig, nodeName string, driverInfo *storageframework.DriverInfo) (int, error) {
385
386 var limit int
387 err := wait.PollImmediate(2*time.Second, csiNodeInfoTimeout, func() (bool, error) {
388 csiNode, err := cs.StorageV1().CSINodes().Get(ctx, nodeName, metav1.GetOptions{})
389 if err != nil {
390 framework.Logf("%s", err)
391 return false, nil
392 }
393 var csiDriver *storagev1.CSINodeDriver
394 for i, c := range csiNode.Spec.Drivers {
395 if c.Name == driverInfo.Name || c.Name == config.GetUniqueDriverName() {
396 csiDriver = &csiNode.Spec.Drivers[i]
397 break
398 }
399 }
400 if csiDriver == nil {
401 framework.Logf("CSINodeInfo does not have driver %s yet", driverInfo.Name)
402 return false, nil
403 }
404 if csiDriver.Allocatable == nil {
405 return false, fmt.Errorf("CSINodeInfo does not have Allocatable for driver %s", driverInfo.Name)
406 }
407 if csiDriver.Allocatable.Count == nil {
408 return false, fmt.Errorf("CSINodeInfo does not have Allocatable.Count for driver %s", driverInfo.Name)
409 }
410 limit = int(*csiDriver.Allocatable.Count)
411 return true, nil
412 })
413 if err != nil {
414 return 0, fmt.Errorf("could not get CSINode limit for driver %s: %w", driverInfo.Name, err)
415 }
416 return limit, nil
417 }
418
View as plain text