1
16
17 package csi_mock
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "time"
24
25 "github.com/onsi/ginkgo/v2"
26 "github.com/onsi/gomega"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29 v1 "k8s.io/api/core/v1"
30 storagev1 "k8s.io/api/storage/v1"
31 "k8s.io/apimachinery/pkg/api/resource"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/util/wait"
34 "k8s.io/apimachinery/pkg/watch"
35 cachetools "k8s.io/client-go/tools/cache"
36 watchtools "k8s.io/client-go/tools/watch"
37 "k8s.io/kubernetes/test/e2e/framework"
38 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
39 "k8s.io/kubernetes/test/e2e/storage/drivers"
40 "k8s.io/kubernetes/test/e2e/storage/utils"
41 admissionapi "k8s.io/pod-security-admission/api"
42 )
43
44 var _ = utils.SIGDescribe("CSI Mock volume storage capacity", func() {
45 f := framework.NewDefaultFramework("csi-mock-volumes-capacity")
46 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
47 m := newMockDriverSetup(f)
48
49 ginkgo.Context("storage capacity", func() {
50 tests := []struct {
51 name string
52 resourceExhausted bool
53 lateBinding bool
54 topology bool
55 }{
56 {
57 name: "unlimited",
58 },
59 {
60 name: "exhausted, immediate binding",
61 resourceExhausted: true,
62 },
63 {
64 name: "exhausted, late binding, no topology",
65 resourceExhausted: true,
66 lateBinding: true,
67 },
68 {
69 name: "exhausted, late binding, with topology",
70 resourceExhausted: true,
71 lateBinding: true,
72 topology: true,
73 },
74 }
75
76 createVolume := "CreateVolume"
77 deleteVolume := "DeleteVolume"
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 deterministicCalls := []string{
97 createVolume,
98
99
100
101
102 deleteVolume,
103 }
104
105 for _, t := range tests {
106 test := t
107 ginkgo.It(test.name, ginkgo.NodeTimeout(csiPodRunningTimeout), func(ctx context.Context) {
108 var err error
109 params := testParameters{
110 lateBinding: test.lateBinding,
111 enableTopology: test.topology,
112
113
114
115
116 disableAttach: true,
117 registerDriver: true,
118 }
119
120 if test.resourceExhausted {
121 params.hooks = createPreHook("CreateVolume", func(counter int64) error {
122 if counter%2 != 0 {
123 return status.Error(codes.ResourceExhausted, "fake error")
124 }
125 return nil
126 })
127 }
128
129 m.init(ctx, params)
130 ginkgo.DeferCleanup(m.cleanup)
131
132
133
134
135
136
137 initResource, err := f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{})
138 framework.ExpectNoError(err, "Failed to fetch initial PVC resource")
139 listWatcher := &cachetools.ListWatch{
140 WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
141 return f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Watch(ctx, listOptions)
142 },
143 }
144 pvcWatch, err := watchtools.NewRetryWatcher(initResource.GetResourceVersion(), listWatcher)
145 framework.ExpectNoError(err, "create PVC watch")
146 defer pvcWatch.Stop()
147
148 sc, claim, pod := m.createPod(ctx, pvcReference)
149 gomega.Expect(pod).NotTo(gomega.BeNil(), "while creating pod")
150 bindingMode := storagev1.VolumeBindingImmediate
151 if test.lateBinding {
152 bindingMode = storagev1.VolumeBindingWaitForFirstConsumer
153 }
154 gomega.Expect(*sc.VolumeBindingMode).To(gomega.Equal(bindingMode), "volume binding mode")
155
156 err = e2epod.WaitForPodNameRunningInNamespace(ctx, m.cs, pod.Name, pod.Namespace)
157 framework.ExpectNoError(err, "failed to start pod")
158 err = e2epod.DeletePodWithWait(ctx, m.cs, pod)
159 framework.ExpectNoError(err, "failed to delete pod")
160 err = m.cs.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(ctx, claim.Name, metav1.DeleteOptions{})
161 framework.ExpectNoError(err, "failed to delete claim")
162
163 normal := []csiCall{}
164 for _, method := range deterministicCalls {
165 normal = append(normal, csiCall{expectedMethod: method})
166 }
167 expected := normal
168
169
170
171
172 if test.resourceExhausted {
173 expected = []csiCall{
174 {expectedMethod: createVolume, expectedError: codes.ResourceExhausted},
175 }
176 expected = append(expected, normal...)
177 }
178
179 var calls []drivers.MockCSICall
180 err = wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) {
181 c, index, err := compareCSICalls(ctx, deterministicCalls, expected, m.driver.GetCalls)
182 if err != nil {
183 return true, fmt.Errorf("error waiting for expected CSI calls: %w", err)
184 }
185 calls = c
186 if index == 0 {
187
188 return false, nil
189 }
190 if len(expected) == index {
191
192 return true, nil
193 }
194 return false, nil
195 })
196 framework.ExpectNoError(err, "while waiting for all CSI calls")
197
198
199
200
201
202
203
204
205
206
207
208
209 ginkgo.By("Checking PVC events")
210 nodeAnnotationSet := false
211 nodeAnnotationReset := false
212 watchFailed := false
213 loop:
214 for {
215 select {
216 case event, ok := <-pvcWatch.ResultChan():
217 if !ok {
218 watchFailed = true
219 break loop
220 }
221
222 framework.Logf("PVC event %s: %#v", event.Type, event.Object)
223 switch event.Type {
224 case watch.Modified:
225 pvc, ok := event.Object.(*v1.PersistentVolumeClaim)
226 if !ok {
227 framework.Failf("PVC watch sent %#v instead of a PVC", event.Object)
228 }
229 _, set := pvc.Annotations["volume.kubernetes.io/selected-node"]
230 if set {
231 nodeAnnotationSet = true
232 } else if nodeAnnotationSet {
233 nodeAnnotationReset = true
234 }
235 case watch.Deleted:
236 break loop
237 case watch.Error:
238 watchFailed = true
239 break
240 }
241 case <-ctx.Done():
242 framework.Failf("Timeout while waiting to observe PVC list")
243 }
244 }
245
246
247 if test.resourceExhausted {
248 for _, call := range calls {
249 if call.Method == createVolume {
250 gomega.Expect(call.Error).To(gomega.ContainSubstring("code = ResourceExhausted"), "first CreateVolume error in\n%s", calls)
251 break
252 }
253 }
254
255 switch {
256 case watchFailed:
257
258
259
260 framework.Logf("PVC watch delivered incomplete data, cannot check annotation")
261 case test.lateBinding:
262 gomega.Expect(nodeAnnotationSet).To(gomega.BeTrue(), "selected-node should have been set")
263
264
265 if test.topology {
266 gomega.Expect(nodeAnnotationReset).To(gomega.BeTrue(), "selected-node should have been set")
267 } else {
268 gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset")
269 }
270 default:
271 gomega.Expect(nodeAnnotationSet).To(gomega.BeFalse(), "selected-node should not have been set")
272 gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset")
273 }
274 }
275 })
276 }
277 })
278
279
280 ginkgo.Context("CSIStorageCapacity", func() {
281 var (
282 err error
283 yes = true
284 no = false
285 )
286
287
288 tests := []struct {
289 name string
290 storageCapacity *bool
291 capacities []string
292 expectFailure bool
293 }{
294 {
295 name: "CSIStorageCapacity unused",
296 },
297 {
298 name: "CSIStorageCapacity disabled",
299 storageCapacity: &no,
300 },
301 {
302 name: "CSIStorageCapacity used, no capacity",
303 storageCapacity: &yes,
304 expectFailure: true,
305 },
306 {
307 name: "CSIStorageCapacity used, insufficient capacity",
308 storageCapacity: &yes,
309 expectFailure: true,
310 capacities: []string{"1Mi"},
311 },
312 {
313 name: "CSIStorageCapacity used, have capacity",
314 storageCapacity: &yes,
315 capacities: []string{"100Gi"},
316 },
317
318
319
320 }
321 for _, t := range tests {
322 test := t
323 ginkgo.It(t.name, ginkgo.NodeTimeout(f.Timeouts.PodStart), func(ctx context.Context) {
324 scName := "mock-csi-storage-capacity-" + f.UniqueName
325 m.init(ctx, testParameters{
326 registerDriver: true,
327 scName: scName,
328 storageCapacity: test.storageCapacity,
329 lateBinding: true,
330 })
331 ginkgo.DeferCleanup(m.cleanup)
332
333
334
335 for _, capacityStr := range test.capacities {
336 capacityQuantity := resource.MustParse(capacityStr)
337 capacity := &storagev1.CSIStorageCapacity{
338 ObjectMeta: metav1.ObjectMeta{
339 GenerateName: "fake-capacity-",
340 },
341
342 StorageClassName: scName,
343 NodeTopology: &metav1.LabelSelector{},
344 Capacity: &capacityQuantity,
345 }
346 createdCapacity, err := f.ClientSet.StorageV1().CSIStorageCapacities(f.Namespace.Name).Create(ctx, capacity, metav1.CreateOptions{})
347 framework.ExpectNoError(err, "create CSIStorageCapacity %+v", *capacity)
348 ginkgo.DeferCleanup(framework.IgnoreNotFound(f.ClientSet.StorageV1().CSIStorageCapacities(f.Namespace.Name).Delete), createdCapacity.Name, metav1.DeleteOptions{})
349 }
350
351
352
353 syncDelay := 5 * time.Second
354 time.Sleep(syncDelay)
355
356 sc, _, pod := m.createPod(ctx, pvcReference)
357 gomega.Expect(sc.Name).To(gomega.Equal(scName), "pre-selected storage class name not used")
358
359 condition := anyOf(
360 podRunning(ctx, f.ClientSet, pod.Name, pod.Namespace),
361
362
363
364 podHasStorage(ctx, f.ClientSet, pod.Name, pod.Namespace, time.Now().Add(syncDelay)),
365 )
366 err = wait.PollImmediateUntil(poll, condition, ctx.Done())
367 if test.expectFailure {
368 switch {
369 case errors.Is(err, context.DeadlineExceeded),
370 errors.Is(err, wait.ErrorInterrupted(errors.New("timed out waiting for the condition"))),
371 errors.Is(err, errNotEnoughSpace):
372
373 case err == nil:
374 framework.Fail("pod unexpectedly started to run")
375 default:
376 framework.Failf("unexpected error while waiting for pod: %v", err)
377 }
378 } else {
379 framework.ExpectNoError(err, "failed to start pod")
380 }
381
382 ginkgo.By("Deleting the previously created pod")
383 err = e2epod.DeletePodWithWait(ctx, m.cs, pod)
384 framework.ExpectNoError(err, "while deleting")
385 })
386 }
387 })
388 })
389
View as plain text