1
16
17 package volumescheduling
18
19
20
21 import (
22 "context"
23 "fmt"
24 "strconv"
25 "strings"
26 "testing"
27 "time"
28
29 "k8s.io/klog/v2"
30
31 v1 "k8s.io/api/core/v1"
32 storagev1 "k8s.io/api/storage/v1"
33 "k8s.io/apimachinery/pkg/api/resource"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/util/rand"
36 "k8s.io/apimachinery/pkg/util/sets"
37 "k8s.io/apimachinery/pkg/util/wait"
38 "k8s.io/client-go/informers"
39 clientset "k8s.io/client-go/kubernetes"
40 "k8s.io/client-go/util/workqueue"
41 "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
42 "k8s.io/kubernetes/pkg/scheduler/framework/plugins/nodevolumelimits"
43 "k8s.io/kubernetes/pkg/volume"
44 volumetest "k8s.io/kubernetes/pkg/volume/testing"
45 testutil "k8s.io/kubernetes/test/integration/util"
46 imageutils "k8s.io/kubernetes/test/utils/image"
47 )
48
49 type testConfig struct {
50 client clientset.Interface
51 ns string
52 stop <-chan struct{}
53 teardown func()
54 }
55
56 var (
57
58 deletePeriod = int64(0)
59 deleteOption = metav1.DeleteOptions{GracePeriodSeconds: &deletePeriod}
60
61 modeWait = storagev1.VolumeBindingWaitForFirstConsumer
62 modeImmediate = storagev1.VolumeBindingImmediate
63
64 classWait = "wait"
65 classImmediate = "immediate"
66 classDynamic = "dynamic"
67 classTopoMismatch = "topomismatch"
68
69 sharedClasses = map[string]*storagev1.StorageClass{
70 classImmediate: makeStorageClass(classImmediate, &modeImmediate),
71 classWait: makeStorageClass(classWait, &modeWait),
72 }
73 )
74
75 const (
76 node1 = "node-1"
77 node2 = "node-2"
78 podLimit = 50
79 volsPerPod = 3
80 nodeAffinityLabelKey = "kubernetes.io/hostname"
81 provisionerPluginName = "mock-provisioner.kubernetes.io"
82 )
83
84 type testPV struct {
85 name string
86 scName string
87 preboundPVC string
88 node string
89 }
90
91 type testPVC struct {
92 name string
93 scName string
94 preboundPV string
95 }
96
97 func TestVolumeBinding(t *testing.T) {
98 config := setupCluster(t, "volume-scheduling-", 2, 0, 0)
99 defer config.teardown()
100
101 cases := map[string]struct {
102 pod *v1.Pod
103 pvs []*testPV
104 pvcs []*testPVC
105
106 unboundPvcs []*testPVC
107 unboundPvs []*testPV
108 shouldFail bool
109 }{
110 "immediate can bind": {
111 pod: makePod("pod-i-canbind", config.ns, []string{"pvc-i-canbind"}),
112 pvs: []*testPV{{"pv-i-canbind", classImmediate, "", node1}},
113 pvcs: []*testPVC{{"pvc-i-canbind", classImmediate, ""}},
114 },
115 "immediate cannot bind": {
116 pod: makePod("pod-i-cannotbind", config.ns, []string{"pvc-i-cannotbind"}),
117 unboundPvcs: []*testPVC{{"pvc-i-cannotbind", classImmediate, ""}},
118 shouldFail: true,
119 },
120 "immediate pvc prebound": {
121 pod: makePod("pod-i-pvc-prebound", config.ns, []string{"pvc-i-prebound"}),
122 pvs: []*testPV{{"pv-i-pvc-prebound", classImmediate, "", node1}},
123 pvcs: []*testPVC{{"pvc-i-prebound", classImmediate, "pv-i-pvc-prebound"}},
124 },
125 "immediate pv prebound": {
126 pod: makePod("pod-i-pv-prebound", config.ns, []string{"pvc-i-pv-prebound"}),
127 pvs: []*testPV{{"pv-i-prebound", classImmediate, "pvc-i-pv-prebound", node1}},
128 pvcs: []*testPVC{{"pvc-i-pv-prebound", classImmediate, ""}},
129 },
130 "wait can bind": {
131 pod: makePod("pod-w-canbind", config.ns, []string{"pvc-w-canbind"}),
132 pvs: []*testPV{{"pv-w-canbind", classWait, "", node1}},
133 pvcs: []*testPVC{{"pvc-w-canbind", classWait, ""}},
134 },
135 "wait cannot bind": {
136 pod: makePod("pod-w-cannotbind", config.ns, []string{"pvc-w-cannotbind"}),
137 unboundPvcs: []*testPVC{{"pvc-w-cannotbind", classWait, ""}},
138 shouldFail: true,
139 },
140 "wait pvc prebound": {
141 pod: makePod("pod-w-pvc-prebound", config.ns, []string{"pvc-w-prebound"}),
142 pvs: []*testPV{{"pv-w-pvc-prebound", classWait, "", node1}},
143 pvcs: []*testPVC{{"pvc-w-prebound", classWait, "pv-w-pvc-prebound"}},
144 },
145 "wait pv prebound": {
146 pod: makePod("pod-w-pv-prebound", config.ns, []string{"pvc-w-pv-prebound"}),
147 pvs: []*testPV{{"pv-w-prebound", classWait, "pvc-w-pv-prebound", node1}},
148 pvcs: []*testPVC{{"pvc-w-pv-prebound", classWait, ""}},
149 },
150 "wait can bind two": {
151 pod: makePod("pod-w-canbind-2", config.ns, []string{"pvc-w-canbind-2", "pvc-w-canbind-3"}),
152 pvs: []*testPV{
153 {"pv-w-canbind-2", classWait, "", node2},
154 {"pv-w-canbind-3", classWait, "", node2},
155 },
156 pvcs: []*testPVC{
157 {"pvc-w-canbind-2", classWait, ""},
158 {"pvc-w-canbind-3", classWait, ""},
159 },
160 unboundPvs: []*testPV{
161 {"pv-w-canbind-5", classWait, "", node1},
162 },
163 },
164 "wait cannot bind two": {
165 pod: makePod("pod-w-cannotbind-2", config.ns, []string{"pvc-w-cannotbind-1", "pvc-w-cannotbind-2"}),
166 unboundPvcs: []*testPVC{
167 {"pvc-w-cannotbind-1", classWait, ""},
168 {"pvc-w-cannotbind-2", classWait, ""},
169 },
170 unboundPvs: []*testPV{
171 {"pv-w-cannotbind-1", classWait, "", node2},
172 {"pv-w-cannotbind-2", classWait, "", node1},
173 },
174 shouldFail: true,
175 },
176 "mix immediate and wait": {
177 pod: makePod("pod-mix-bound", config.ns, []string{"pvc-w-canbind-4", "pvc-i-canbind-2"}),
178 pvs: []*testPV{
179 {"pv-w-canbind-4", classWait, "", node1},
180 {"pv-i-canbind-2", classImmediate, "", node1},
181 },
182 pvcs: []*testPVC{
183 {"pvc-w-canbind-4", classWait, ""},
184 {"pvc-i-canbind-2", classImmediate, ""},
185 },
186 },
187 }
188
189 for name, test := range cases {
190 klog.Infof("Running test %v", name)
191
192
193 suffix := rand.String(4)
194 classes := map[string]*storagev1.StorageClass{}
195 classes[classImmediate] = makeStorageClass(fmt.Sprintf("immediate-%v", suffix), &modeImmediate)
196 classes[classWait] = makeStorageClass(fmt.Sprintf("wait-%v", suffix), &modeWait)
197 for _, sc := range classes {
198 if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
199 t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
200 }
201 }
202
203
204 for _, pvConfig := range test.pvs {
205 pv := makePV(pvConfig.name, classes[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
206 if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
207 t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
208 }
209 }
210
211 for _, pvConfig := range test.unboundPvs {
212 pv := makePV(pvConfig.name, classes[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
213 if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
214 t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
215 }
216 }
217
218
219
220 for _, pvConfig := range test.pvs {
221 if err := waitForPVPhase(config.client, pvConfig.name, v1.VolumeAvailable); err != nil {
222 t.Fatalf("PersistentVolume %q failed to become available: %v", pvConfig.name, err)
223 }
224 }
225
226 for _, pvConfig := range test.unboundPvs {
227 if err := waitForPVPhase(config.client, pvConfig.name, v1.VolumeAvailable); err != nil {
228 t.Fatalf("PersistentVolume %q failed to become available: %v", pvConfig.name, err)
229 }
230 }
231
232
233 for _, pvcConfig := range test.pvcs {
234 pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
235 if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
236 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
237 }
238 }
239 for _, pvcConfig := range test.unboundPvcs {
240 pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
241 if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
242 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
243 }
244 }
245
246
247 if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
248 t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
249 }
250 if test.shouldFail {
251 if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
252 t.Errorf("Pod %q was not unschedulable: %v", test.pod.Name, err)
253 }
254 } else {
255 if err := waitForPodToSchedule(config.client, test.pod); err != nil {
256 t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
257 }
258 }
259
260
261 for _, pvc := range test.pvcs {
262 validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, false)
263 }
264 for _, pvc := range test.unboundPvcs {
265 validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimPending, false)
266 }
267 for _, pv := range test.pvs {
268 validatePVPhase(t, config.client, pv.name, v1.VolumeBound)
269 }
270 for _, pv := range test.unboundPvs {
271 validatePVPhase(t, config.client, pv.name, v1.VolumeAvailable)
272 }
273
274
275 deleteTestObjects(config.client, config.ns, deleteOption)
276 }
277 }
278
279
280 func TestVolumeBindingRescheduling(t *testing.T) {
281 config := setupCluster(t, "volume-scheduling-", 2, 0, 0)
282 defer config.teardown()
283
284 storageClassName := "local-storage"
285
286 cases := map[string]struct {
287 pod *v1.Pod
288 pvcs []*testPVC
289 pvs []*testPV
290 trigger func(config *testConfig)
291 shouldFail bool
292 }{
293 "reschedule on WaitForFirstConsumer dynamic storage class add": {
294 pod: makePod("pod-reschedule-onclassadd-dynamic", config.ns, []string{"pvc-reschedule-onclassadd-dynamic"}),
295 pvcs: []*testPVC{
296 {"pvc-reschedule-onclassadd-dynamic", "", ""},
297 },
298 trigger: func(config *testConfig) {
299 sc := makeDynamicProvisionerStorageClass(storageClassName, &modeWait, nil)
300 if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
301 t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
302 }
303 },
304 shouldFail: false,
305 },
306 "reschedule on WaitForFirstConsumer static storage class add": {
307 pod: makePod("pod-reschedule-onclassadd-static", config.ns, []string{"pvc-reschedule-onclassadd-static"}),
308 pvcs: []*testPVC{
309 {"pvc-reschedule-onclassadd-static", "", ""},
310 },
311 trigger: func(config *testConfig) {
312 sc := makeStorageClass(storageClassName, &modeWait)
313 if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
314 t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
315 }
316
317 pv := makePV("pv-reschedule-onclassadd-static", storageClassName, "", "", node1)
318 if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
319 t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
320 }
321 },
322 shouldFail: false,
323 },
324 "reschedule on delay binding PVC add": {
325 pod: makePod("pod-reschedule-onpvcadd", config.ns, []string{"pvc-reschedule-onpvcadd"}),
326 pvs: []*testPV{
327 {
328 name: "pv-reschedule-onpvcadd",
329 scName: classWait,
330 node: node1,
331 },
332 },
333 trigger: func(config *testConfig) {
334 pvc := makePVC("pvc-reschedule-onpvcadd", config.ns, &classWait, "")
335 if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
336 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
337 }
338 },
339 shouldFail: false,
340 },
341 }
342
343 for name, test := range cases {
344 klog.Infof("Running test %v", name)
345
346 if test.pod == nil {
347 t.Fatal("pod is required for this test")
348 }
349
350
351 for _, pvcConfig := range test.pvcs {
352 pvc := makePVC(pvcConfig.name, config.ns, &storageClassName, "")
353 if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
354 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
355 }
356 }
357
358
359 for _, pvConfig := range test.pvs {
360 pv := makePV(pvConfig.name, sharedClasses[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
361 if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
362 t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
363 }
364 }
365
366
367 if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
368 t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
369 }
370
371
372 klog.Infof("Waiting for pod is unschedulable")
373 if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
374 t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
375 }
376
377
378 test.trigger(config)
379
380
381 if !test.shouldFail {
382 klog.Infof("Waiting for pod is scheduled")
383 if err := waitForPodToSchedule(config.client, test.pod); err != nil {
384 t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
385 }
386 } else {
387 klog.Infof("Waiting for pod is unschedulable")
388 if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
389 t.Errorf("Failed as Pod %s was not unschedulable: %v", test.pod.Name, err)
390 }
391 }
392
393
394 deleteTestObjects(config.client, config.ns, deleteOption)
395 }
396 }
397
398
399
400 func TestVolumeBindingStress(t *testing.T) {
401 testVolumeBindingStress(t, 0, false, 0)
402 }
403
404
405
406
407
408 func TestVolumeBindingStressWithSchedulerResync(t *testing.T) {
409 testVolumeBindingStress(t, time.Second, false, 0)
410 }
411
412
413 func TestVolumeBindingDynamicStressFast(t *testing.T) {
414 testVolumeBindingStress(t, 0, true, 0)
415 }
416
417
418 func TestVolumeBindingDynamicStressSlow(t *testing.T) {
419 testVolumeBindingStress(t, 0, true, 10)
420 }
421
422 func testVolumeBindingStress(t *testing.T, schedulerResyncPeriod time.Duration, dynamic bool, provisionDelaySeconds int) {
423 config := setupCluster(t, "volume-binding-stress-", 1, schedulerResyncPeriod, provisionDelaySeconds)
424 defer config.teardown()
425
426
427
428 t.Setenv(nodevolumelimits.KubeMaxPDVols, fmt.Sprintf("%v", podLimit*volsPerPod))
429
430 scName := &classWait
431 if dynamic {
432 scName = &classDynamic
433 sc := makeDynamicProvisionerStorageClass(*scName, &modeWait, nil)
434 if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
435 t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
436 }
437 }
438
439 klog.Infof("Start creating PVs and PVCs")
440
441 podVolumesCount := podLimit * volsPerPod
442 pvs := make([]*v1.PersistentVolume, podVolumesCount)
443 pvcs := make([]*v1.PersistentVolumeClaim, podVolumesCount)
444 workqueue.ParallelizeUntil(context.TODO(), 16, podVolumesCount, func(i int) {
445 var (
446 pv *v1.PersistentVolume
447 pvc *v1.PersistentVolumeClaim
448 pvName = fmt.Sprintf("pv-stress-%v", i)
449 pvcName = fmt.Sprintf("pvc-stress-%v", i)
450 )
451
452 if !dynamic {
453 if rand.Int()%2 == 0 {
454
455 pv = makePV(pvName, *scName, "", "", node1)
456 } else {
457
458 pv = makePV(pvName, classImmediate, pvcName, config.ns, node1)
459 }
460 if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
461 t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
462 }
463 pvs[i] = pv
464 }
465 if pv != nil && pv.Spec.ClaimRef != nil && pv.Spec.ClaimRef.Name == pvcName {
466 pvc = makePVC(pvcName, config.ns, &classImmediate, pv.Name)
467 } else {
468 pvc = makePVC(pvcName, config.ns, scName, "")
469 }
470 if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
471 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
472 }
473 pvcs[i] = pvc
474 })
475
476 klog.Infof("Start creating Pods")
477 pods := make([]*v1.Pod, podLimit)
478 workqueue.ParallelizeUntil(context.TODO(), 16, podLimit, func(i int) {
479
480 podPvcs := []string{}
481 for j := i * volsPerPod; j < (i+1)*volsPerPod; j++ {
482 podPvcs = append(podPvcs, pvcs[j].Name)
483 }
484
485 pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, podPvcs)
486 if pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
487 t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
488 }
489 pods[i] = pod
490 })
491
492 klog.Infof("Start validating pod scheduled")
493
494 workqueue.ParallelizeUntil(context.TODO(), 16, len(pods), func(i int) {
495 pod := pods[i]
496
497
498 if err := waitForPodToScheduleWithTimeout(config.client, pod, 2*time.Minute); err != nil {
499 t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
500 }
501 })
502
503 klog.Infof("Start validating PVCs scheduled")
504
505 workqueue.ParallelizeUntil(context.TODO(), 16, len(pvcs), func(i int) {
506 validatePVCPhase(t, config.client, pvcs[i].Name, config.ns, v1.ClaimBound, dynamic)
507 })
508
509
510 if !dynamic {
511 klog.Infof("Start validating PVs scheduled")
512 workqueue.ParallelizeUntil(context.TODO(), 16, len(pvs), func(i int) {
513 validatePVPhase(t, config.client, pvs[i].Name, v1.VolumeBound)
514 })
515 }
516 }
517
518 func testVolumeBindingWithAffinity(t *testing.T, anti bool, numNodes, numPods, numPVsFirstNode int) {
519 config := setupCluster(t, "volume-pod-affinity-", numNodes, 0, 0)
520 defer config.teardown()
521
522 pods := []*v1.Pod{}
523 pvcs := []*v1.PersistentVolumeClaim{}
524
525
526 for i := 0; i < numPVsFirstNode; i++ {
527 pv := makePV(fmt.Sprintf("pv-node1-%v", i), classWait, "", "", node1)
528 if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
529 t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
530 }
531 }
532
533
534 for i := 2; i <= numNodes; i++ {
535 pv := makePV(fmt.Sprintf("pv-node%v-0", i), classWait, "", "", fmt.Sprintf("node-%v", i))
536 if pv, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
537 t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
538 }
539 }
540
541
542 for i := 0; i < numPods; i++ {
543
544 pvc := makePVC(fmt.Sprintf("pvc-%v", i), config.ns, &classWait, "")
545 if pvc, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
546 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
547 }
548 pvcs = append(pvcs, pvc)
549
550
551 pod := makePod(fmt.Sprintf("pod%03d", i), config.ns, []string{pvc.Name})
552 pod.Spec.Affinity = &v1.Affinity{}
553 affinityTerms := []v1.PodAffinityTerm{
554 {
555 LabelSelector: &metav1.LabelSelector{
556 MatchExpressions: []metav1.LabelSelectorRequirement{
557 {
558 Key: "app",
559 Operator: metav1.LabelSelectorOpIn,
560 Values: []string{"volume-binding-test"},
561 },
562 },
563 },
564 TopologyKey: nodeAffinityLabelKey,
565 },
566 }
567 if anti {
568 pod.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{
569 RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
570 }
571 } else {
572 pod.Spec.Affinity.PodAffinity = &v1.PodAffinity{
573 RequiredDuringSchedulingIgnoredDuringExecution: affinityTerms,
574 }
575 }
576
577 if pod, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
578 t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
579 }
580 pods = append(pods, pod)
581 }
582
583
584 scheduledNodes := sets.NewString()
585 for _, pod := range pods {
586 if err := waitForPodToSchedule(config.client, pod); err != nil {
587 t.Errorf("Failed to schedule Pod %q: %v", pod.Name, err)
588 } else {
589
590 pod, err = config.client.CoreV1().Pods(config.ns).Get(context.TODO(), pod.Name, metav1.GetOptions{})
591 if err != nil {
592 t.Fatalf("Failed to get Pod %q: %v", pod.Name, err)
593 }
594 if pod.Spec.NodeName == "" {
595 t.Fatalf("Pod %q node name unset after scheduling", pod.Name)
596 }
597 scheduledNodes.Insert(pod.Spec.NodeName)
598 }
599 }
600
601
602 if anti {
603
604 if scheduledNodes.Len() != numPods {
605 t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), numPods)
606 }
607 } else {
608
609 if scheduledNodes.Len() != 1 {
610 t.Errorf("Pods were scheduled across %v nodes instead of %v", scheduledNodes.Len(), 1)
611 }
612 }
613
614
615 for _, pvc := range pvcs {
616 validatePVCPhase(t, config.client, pvc.Name, config.ns, v1.ClaimBound, false)
617 }
618 }
619
620 func TestVolumeBindingWithAntiAffinity(t *testing.T) {
621 numNodes := 10
622
623 numPods := numNodes
624
625 numPVsFirstNode := 10 * numNodes
626
627 testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
628 }
629
630 func TestVolumeBindingWithAffinity(t *testing.T) {
631 numPods := 10
632
633 numNodes := 10 * numPods
634
635 numPVsFirstNode := numPods
636
637 testVolumeBindingWithAffinity(t, true, numNodes, numPods, numPVsFirstNode)
638 }
639
640 func TestPVAffinityConflict(t *testing.T) {
641 config := setupCluster(t, "volume-scheduling-", 3, 0, 0)
642 defer config.teardown()
643
644 pv := makePV("local-pv", classImmediate, "", "", node1)
645 pvc := makePVC("local-pvc", config.ns, &classImmediate, "")
646
647
648 if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
649 t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
650 }
651
652
653 if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
654 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
655 }
656
657
658 if err := waitForPVCBound(config.client, pvc); err != nil {
659 t.Fatalf("PVC %q failed to bind: %v", pvc.Name, err)
660 }
661
662 nodeMarkers := []interface{}{
663 markNodeAffinity,
664 markNodeSelector,
665 }
666 for i := 0; i < len(nodeMarkers); i++ {
667 podName := "local-pod-" + strconv.Itoa(i+1)
668 pod := makePod(podName, config.ns, []string{"local-pvc"})
669 nodeMarkers[i].(func(*v1.Pod, string))(pod, "node-2")
670
671 if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
672 t.Fatalf("Failed to create Pod %q: %v", pod.Name, err)
673 }
674
675 if err := waitForPodUnschedulable(config.client, pod); err != nil {
676 t.Errorf("Failed as Pod %s was not unschedulable: %v", pod.Name, err)
677 }
678
679 p, err := config.client.CoreV1().Pods(config.ns).Get(context.TODO(), podName, metav1.GetOptions{})
680 if err != nil {
681 t.Fatalf("Failed to access Pod %s status: %v", podName, err)
682 }
683 if strings.Compare(string(p.Status.Phase), "Pending") != 0 {
684 t.Fatalf("Failed as Pod %s was in: %s state and not in expected: Pending state", podName, p.Status.Phase)
685 }
686 if strings.Compare(p.Status.Conditions[0].Reason, "Unschedulable") != 0 {
687 t.Fatalf("Failed as Pod %s reason was: %s but expected: Unschedulable", podName, p.Status.Conditions[0].Reason)
688 }
689 if !strings.Contains(p.Status.Conditions[0].Message, "node(s) didn't match Pod's node affinity") {
690 t.Fatalf("Failed as Pod's %s failure message does not contain expected message: node(s) didn't match Pod's node affinity. Got message %q", podName, p.Status.Conditions[0].Message)
691 }
692
693 if err := config.client.CoreV1().Pods(config.ns).Delete(context.TODO(), podName, metav1.DeleteOptions{}); err != nil {
694 t.Fatalf("Failed to delete Pod %s: %v", podName, err)
695 }
696 }
697 }
698
699 func TestVolumeProvision(t *testing.T) {
700 config := setupCluster(t, "volume-scheduling", 1, 0, 0)
701 defer config.teardown()
702
703 type testcaseType struct {
704 pod *v1.Pod
705 pvs []*testPV
706 boundPvcs []*testPVC
707 provisionedPvcs []*testPVC
708
709 unboundPvcs []*testPVC
710 shouldFail bool
711 }
712
713 cases := map[string]testcaseType{
714 "wait provisioned": {
715 pod: makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
716 provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
717 },
718 "topolgy unsatisfied": {
719 pod: makePod("pod-pvc-topomismatch", config.ns, []string{"pvc-topomismatch"}),
720 unboundPvcs: []*testPVC{{"pvc-topomismatch", classTopoMismatch, ""}},
721 shouldFail: true,
722 },
723 "wait one bound, one provisioned": {
724 pod: makePod("pod-pvc-canbind-or-provision", config.ns, []string{"pvc-w-canbind", "pvc-canprovision"}),
725 pvs: []*testPV{{"pv-w-canbind", classWait, "", node1}},
726 boundPvcs: []*testPVC{{"pvc-w-canbind", classWait, ""}},
727 provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
728 },
729 "one immediate pv prebound, one wait provisioned": {
730 pod: makePod("pod-i-pv-prebound-w-provisioned", config.ns, []string{"pvc-i-pv-prebound", "pvc-canprovision"}),
731 pvs: []*testPV{{"pv-i-prebound", classImmediate, "pvc-i-pv-prebound", node1}},
732 boundPvcs: []*testPVC{{"pvc-i-pv-prebound", classImmediate, ""}},
733 provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
734 },
735 "wait one pv prebound, one provisioned": {
736 pod: makePod("pod-w-pv-prebound-w-provisioned", config.ns, []string{"pvc-w-pv-prebound", "pvc-canprovision"}),
737 pvs: []*testPV{{"pv-w-prebound", classWait, "pvc-w-pv-prebound", node1}},
738 boundPvcs: []*testPVC{{"pvc-w-pv-prebound", classWait, ""}},
739 provisionedPvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
740 },
741 "immediate provisioned by controller": {
742 pod: makePod("pod-i-unbound", config.ns, []string{"pvc-controller-provisioned"}),
743
744
745
746 boundPvcs: []*testPVC{{"pvc-controller-provisioned", classImmediate, ""}},
747 },
748 }
749
750 run := func(t *testing.T, test testcaseType) {
751 t.Log("Creating StorageClass")
752 suffix := rand.String(4)
753 classes := map[string]*storagev1.StorageClass{}
754 classes[classImmediate] = makeDynamicProvisionerStorageClass(fmt.Sprintf("immediate-%v", suffix), &modeImmediate, nil)
755 classes[classWait] = makeDynamicProvisionerStorageClass(fmt.Sprintf("wait-%v", suffix), &modeWait, nil)
756 topo := []v1.TopologySelectorTerm{
757 {
758 MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
759 {
760 Key: nodeAffinityLabelKey,
761 Values: []string{node2},
762 },
763 },
764 },
765 }
766 classes[classTopoMismatch] = makeDynamicProvisionerStorageClass(fmt.Sprintf("topomismatch-%v", suffix), &modeWait, topo)
767 for _, sc := range classes {
768 if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
769 t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
770 }
771 }
772
773 t.Log("Creating PVs")
774 for _, pvConfig := range test.pvs {
775 pv := makePV(pvConfig.name, classes[pvConfig.scName].Name, pvConfig.preboundPVC, config.ns, pvConfig.node)
776 if _, err := config.client.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
777 t.Fatalf("Failed to create PersistentVolume %q: %v", pv.Name, err)
778 }
779 if err := waitForPVPhase(config.client, pvConfig.name, v1.VolumeAvailable); err != nil {
780 t.Fatalf("PersistentVolume %q failed to become available: %v", pvConfig.name, err)
781 }
782 }
783
784 t.Log("Creating PVCs")
785 for _, pvcConfig := range test.boundPvcs {
786 pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
787 if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
788 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
789 }
790 }
791
792 t.Log("Creating unbound PVCs")
793 for _, pvcConfig := range test.unboundPvcs {
794 pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
795 if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
796 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
797 }
798 }
799
800 t.Log("Creating unbound PVCs which should be dynamically provisioned")
801 for _, pvcConfig := range test.provisionedPvcs {
802 pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
803 if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
804 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
805 }
806 }
807
808 t.Log("Creating the pod to schedule")
809 if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
810 t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
811 }
812 if test.shouldFail {
813 if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
814 t.Errorf("Pod %q was not unschedulable: %v", test.pod.Name, err)
815 }
816 } else {
817 if err := waitForPodToSchedule(config.client, test.pod); err != nil {
818 t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
819 }
820 }
821
822 t.Log("Validating PVC/PV binding")
823 for _, pvc := range test.boundPvcs {
824 validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, false)
825 }
826 for _, pvc := range test.unboundPvcs {
827 validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimPending, false)
828 }
829 for _, pvc := range test.provisionedPvcs {
830 validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, true)
831 }
832 for _, pv := range test.pvs {
833 validatePVPhase(t, config.client, pv.name, v1.VolumeBound)
834 }
835
836
837 t.Log("Deleting test objects")
838 deleteTestObjects(config.client, config.ns, deleteOption)
839 }
840
841 for name, test := range cases {
842 t.Run(name, func(t *testing.T) { run(t, test) })
843 }
844 }
845
846
847 func TestCapacity(t *testing.T) {
848 config := setupCluster(t, "volume-scheduling", 1, 0, 0)
849 defer config.teardown()
850
851 type testcaseType struct {
852 pod *v1.Pod
853 pvcs []*testPVC
854 haveCapacity bool
855 capacitySupported bool
856 }
857
858 cases := map[string]testcaseType{
859 "baseline": {
860 pod: makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
861 pvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
862 },
863 "out of space": {
864 pod: makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
865 pvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
866 capacitySupported: true,
867 },
868 "with space": {
869 pod: makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
870 pvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
871 capacitySupported: true,
872 haveCapacity: true,
873 },
874 "ignored": {
875 pod: makePod("pod-pvc-canprovision", config.ns, []string{"pvc-canprovision"}),
876 pvcs: []*testPVC{{"pvc-canprovision", classWait, ""}},
877 haveCapacity: true,
878 },
879 }
880
881 run := func(t *testing.T, test testcaseType) {
882
883 suffix := rand.String(4)
884 classes := map[string]*storagev1.StorageClass{}
885 classes[classImmediate] = makeDynamicProvisionerStorageClass(fmt.Sprintf("immediate-%v", suffix), &modeImmediate, nil)
886 classes[classWait] = makeDynamicProvisionerStorageClass(fmt.Sprintf("wait-%v", suffix), &modeWait, nil)
887 topo := []v1.TopologySelectorTerm{
888 {
889 MatchLabelExpressions: []v1.TopologySelectorLabelRequirement{
890 {
891 Key: nodeAffinityLabelKey,
892 Values: []string{node2},
893 },
894 },
895 },
896 }
897 classes[classTopoMismatch] = makeDynamicProvisionerStorageClass(fmt.Sprintf("topomismatch-%v", suffix), &modeWait, topo)
898 for _, sc := range classes {
899 if _, err := config.client.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
900 t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
901 }
902 }
903
904
905
906 if test.capacitySupported {
907 if _, err := config.client.StorageV1().CSIDrivers().Create(context.TODO(),
908 &storagev1.CSIDriver{
909 ObjectMeta: metav1.ObjectMeta{
910 Name: provisionerPluginName,
911 },
912 Spec: storagev1.CSIDriverSpec{
913 StorageCapacity: &test.capacitySupported,
914 },
915 },
916 metav1.CreateOptions{}); err != nil {
917 t.Fatalf("Failed to create CSIDriver: %v", err)
918 }
919
920
921
922
923 time.Sleep(5 * time.Second)
924 }
925
926
927 if test.haveCapacity {
928 if _, err := config.client.StorageV1().CSIStorageCapacities("default").Create(context.TODO(),
929 &storagev1.CSIStorageCapacity{
930 ObjectMeta: metav1.ObjectMeta{
931 GenerateName: "foo-",
932 },
933 StorageClassName: classes[classWait].Name,
934 NodeTopology: &metav1.LabelSelector{},
935
936 Capacity: resource.NewQuantity(6*1024*1024*1024, resource.BinarySI),
937 },
938 metav1.CreateOptions{}); err != nil {
939 t.Fatalf("Failed to create CSIStorageCapacity: %v", err)
940 }
941 }
942
943
944 for _, pvcConfig := range test.pvcs {
945 pvc := makePVC(pvcConfig.name, config.ns, &classes[pvcConfig.scName].Name, pvcConfig.preboundPV)
946 if _, err := config.client.CoreV1().PersistentVolumeClaims(config.ns).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
947 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
948 }
949 }
950
951
952 if _, err := config.client.CoreV1().Pods(config.ns).Create(context.TODO(), test.pod, metav1.CreateOptions{}); err != nil {
953 t.Fatalf("Failed to create Pod %q: %v", test.pod.Name, err)
954 }
955
956
957 shouldFail := test.capacitySupported && !test.haveCapacity
958 if shouldFail {
959 if err := waitForPodUnschedulable(config.client, test.pod); err != nil {
960 t.Errorf("Pod %q was not unschedulable: %v", test.pod.Name, err)
961 }
962 } else {
963 if err := waitForPodToSchedule(config.client, test.pod); err != nil {
964 t.Errorf("Failed to schedule Pod %q: %v", test.pod.Name, err)
965 }
966 }
967
968
969 for _, pvc := range test.pvcs {
970 if shouldFail {
971 validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimPending, false)
972 } else {
973 validatePVCPhase(t, config.client, pvc.name, config.ns, v1.ClaimBound, true)
974 }
975 }
976
977
978 deleteTestObjects(config.client, config.ns, deleteOption)
979 }
980
981 for name, test := range cases {
982 t.Run(name, func(t *testing.T) { run(t, test) })
983 }
984 }
985
986
987
988
989 func TestRescheduleProvisioning(t *testing.T) {
990 testCtx := testutil.InitTestAPIServer(t, "reschedule-volume-provision", nil)
991
992 clientset := testCtx.ClientSet
993 ns := testCtx.NS.Name
994
995 defer func() {
996 deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
997 }()
998
999 ctrl, informerFactory, err := initPVController(t, testCtx, 0)
1000 if err != nil {
1001 t.Fatalf("Failed to create PV controller: %v", err)
1002 }
1003
1004
1005 testNode := makeNode(1)
1006 if _, err := clientset.CoreV1().Nodes().Create(context.TODO(), testNode, metav1.CreateOptions{}); err != nil {
1007 t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
1008 }
1009 scName := "fail-provision"
1010 sc := makeDynamicProvisionerStorageClass(scName, &modeWait, nil)
1011
1012 sc.Parameters[volumetest.ExpectProvisionFailureKey] = ""
1013 if _, err := clientset.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
1014 t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
1015 }
1016
1017
1018 pvcName := "pvc-fail-to-provision"
1019 pvc := makePVC(pvcName, ns, &scName, "")
1020 pvc.Annotations = map[string]string{"volume.kubernetes.io/selected-node": node1}
1021 pvc, err = clientset.CoreV1().PersistentVolumeClaims(ns).Create(context.TODO(), pvc, metav1.CreateOptions{})
1022 if err != nil {
1023 t.Fatalf("Failed to create PersistentVolumeClaim %q: %v", pvc.Name, err)
1024 }
1025
1026 selectedNodeAnn, exist := pvc.Annotations["volume.kubernetes.io/selected-node"]
1027 if !exist || selectedNodeAnn != node1 {
1028 t.Fatalf("Created pvc is not annotated as expected")
1029 }
1030
1031
1032 go ctrl.Run(testCtx.Ctx)
1033 informerFactory.Start(testCtx.Ctx.Done())
1034 informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
1035
1036
1037 if err := waitForProvisionAnn(clientset, pvc, false); err != nil {
1038 t.Errorf("Expect to reschedule provision for PVC %v/%v, but still found selected-node annotation on it", ns, pvcName)
1039 }
1040 }
1041
1042 func setupCluster(t *testing.T, nsName string, numberOfNodes int, resyncPeriod time.Duration, provisionDelaySeconds int) *testConfig {
1043 testCtx := testutil.InitTestSchedulerWithOptions(t, testutil.InitTestAPIServer(t, nsName, nil), resyncPeriod)
1044 testutil.SyncSchedulerInformerFactory(testCtx)
1045 go testCtx.Scheduler.Run(testCtx.Ctx)
1046
1047 clientset := testCtx.ClientSet
1048 ns := testCtx.NS.Name
1049
1050 ctrl, informerFactory, err := initPVController(t, testCtx, provisionDelaySeconds)
1051 if err != nil {
1052 t.Fatalf("Failed to create PV controller: %v", err)
1053 }
1054 go ctrl.Run(testCtx.Ctx)
1055
1056 informerFactory.Start(testCtx.Ctx.Done())
1057 informerFactory.WaitForCacheSync(testCtx.Ctx.Done())
1058
1059
1060
1061 for i := 0; i < numberOfNodes; i++ {
1062 testNode := makeNode(i + 1)
1063 if _, err := clientset.CoreV1().Nodes().Create(context.TODO(), testNode, metav1.CreateOptions{}); err != nil {
1064 t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
1065 }
1066 }
1067
1068
1069 for _, sc := range sharedClasses {
1070 if _, err := clientset.StorageV1().StorageClasses().Create(context.TODO(), sc, metav1.CreateOptions{}); err != nil {
1071 t.Fatalf("Failed to create StorageClass %q: %v", sc.Name, err)
1072 }
1073 }
1074
1075 return &testConfig{
1076 client: clientset,
1077 ns: ns,
1078 stop: testCtx.Ctx.Done(),
1079 teardown: func() {
1080 klog.Infof("test cluster %q start to tear down", ns)
1081 deleteTestObjects(clientset, ns, metav1.DeleteOptions{})
1082 },
1083 }
1084 }
1085
1086 func initPVController(t *testing.T, testCtx *testutil.TestContext, provisionDelaySeconds int) (*persistentvolume.PersistentVolumeController, informers.SharedInformerFactory, error) {
1087 clientset := testCtx.ClientSet
1088
1089 informerFactory := informers.NewSharedInformerFactory(clientset, 0)
1090
1091
1092 host := volumetest.NewFakeVolumeHost(t, "/tmp/fake", nil, nil)
1093 plugin := &volumetest.FakeVolumePlugin{
1094 PluginName: provisionerPluginName,
1095 Host: host,
1096 Config: volume.VolumeConfig{},
1097 LastProvisionerOptions: volume.VolumeOptions{},
1098 ProvisionDelaySeconds: provisionDelaySeconds,
1099 NewAttacherCallCount: 0,
1100 NewDetacherCallCount: 0,
1101 Mounters: nil,
1102 Unmounters: nil,
1103 Attachers: nil,
1104 Detachers: nil,
1105 }
1106 plugins := []volume.VolumePlugin{plugin}
1107
1108 params := persistentvolume.ControllerParameters{
1109 KubeClient: clientset,
1110
1111
1112 SyncPeriod: 5 * time.Second,
1113 VolumePlugins: plugins,
1114 Cloud: nil,
1115 ClusterName: "volume-test-cluster",
1116 VolumeInformer: informerFactory.Core().V1().PersistentVolumes(),
1117 ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
1118 ClassInformer: informerFactory.Storage().V1().StorageClasses(),
1119 PodInformer: informerFactory.Core().V1().Pods(),
1120 NodeInformer: informerFactory.Core().V1().Nodes(),
1121 EnableDynamicProvisioning: true,
1122 }
1123 ctrl, err := persistentvolume.NewController(testCtx.Ctx, params)
1124 if err != nil {
1125 return nil, nil, err
1126 }
1127
1128 return ctrl, informerFactory, nil
1129 }
1130
1131 func deleteTestObjects(client clientset.Interface, ns string, option metav1.DeleteOptions) {
1132 client.CoreV1().Pods(ns).DeleteCollection(context.TODO(), option, metav1.ListOptions{})
1133 client.CoreV1().PersistentVolumeClaims(ns).DeleteCollection(context.TODO(), option, metav1.ListOptions{})
1134 client.CoreV1().PersistentVolumes().DeleteCollection(context.TODO(), option, metav1.ListOptions{})
1135 client.StorageV1().StorageClasses().DeleteCollection(context.TODO(), option, metav1.ListOptions{})
1136 client.StorageV1().CSIDrivers().DeleteCollection(context.TODO(), option, metav1.ListOptions{})
1137 client.StorageV1().CSIStorageCapacities("default").DeleteCollection(context.TODO(), option, metav1.ListOptions{})
1138 }
1139
1140 func makeStorageClass(name string, mode *storagev1.VolumeBindingMode) *storagev1.StorageClass {
1141 return &storagev1.StorageClass{
1142 ObjectMeta: metav1.ObjectMeta{
1143 Name: name,
1144 },
1145 Provisioner: "kubernetes.io/no-provisioner",
1146 VolumeBindingMode: mode,
1147 }
1148 }
1149
1150 func makeDynamicProvisionerStorageClass(name string, mode *storagev1.VolumeBindingMode, allowedTopologies []v1.TopologySelectorTerm) *storagev1.StorageClass {
1151 return &storagev1.StorageClass{
1152 ObjectMeta: metav1.ObjectMeta{
1153 Name: name,
1154 },
1155 Provisioner: provisionerPluginName,
1156 VolumeBindingMode: mode,
1157 AllowedTopologies: allowedTopologies,
1158 Parameters: map[string]string{},
1159 }
1160 }
1161
1162 func makePV(name, scName, pvcName, ns, node string) *v1.PersistentVolume {
1163 pv := &v1.PersistentVolume{
1164 ObjectMeta: metav1.ObjectMeta{
1165 Name: name,
1166 Annotations: map[string]string{},
1167 },
1168 Spec: v1.PersistentVolumeSpec{
1169 Capacity: v1.ResourceList{
1170 v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"),
1171 },
1172 AccessModes: []v1.PersistentVolumeAccessMode{
1173 v1.ReadWriteOnce,
1174 },
1175 StorageClassName: scName,
1176 PersistentVolumeSource: v1.PersistentVolumeSource{
1177 Local: &v1.LocalVolumeSource{
1178 Path: "/test-path",
1179 },
1180 },
1181 },
1182 }
1183
1184 if pvcName != "" {
1185 pv.Spec.ClaimRef = &v1.ObjectReference{Name: pvcName, Namespace: ns}
1186 }
1187
1188 if node != "" {
1189 pv.Spec.NodeAffinity = &v1.VolumeNodeAffinity{
1190 Required: &v1.NodeSelector{
1191 NodeSelectorTerms: []v1.NodeSelectorTerm{
1192 {
1193 MatchExpressions: []v1.NodeSelectorRequirement{
1194 {
1195 Key: nodeAffinityLabelKey,
1196 Operator: v1.NodeSelectorOpIn,
1197 Values: []string{node},
1198 },
1199 },
1200 },
1201 },
1202 },
1203 }
1204 }
1205
1206 return pv
1207 }
1208
1209 func makePVC(name, ns string, scName *string, volumeName string) *v1.PersistentVolumeClaim {
1210 return &v1.PersistentVolumeClaim{
1211 ObjectMeta: metav1.ObjectMeta{
1212 Name: name,
1213 Namespace: ns,
1214 },
1215 Spec: v1.PersistentVolumeClaimSpec{
1216 AccessModes: []v1.PersistentVolumeAccessMode{
1217 v1.ReadWriteOnce,
1218 },
1219 Resources: v1.VolumeResourceRequirements{
1220 Requests: v1.ResourceList{
1221 v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"),
1222 },
1223 },
1224 StorageClassName: scName,
1225 VolumeName: volumeName,
1226 },
1227 }
1228 }
1229
1230 func makePod(name, ns string, pvcs []string) *v1.Pod {
1231 volumes := []v1.Volume{}
1232 for i, pvc := range pvcs {
1233 volumes = append(volumes, v1.Volume{
1234 Name: fmt.Sprintf("vol%v", i),
1235 VolumeSource: v1.VolumeSource{
1236 PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
1237 ClaimName: pvc,
1238 },
1239 },
1240 })
1241 }
1242
1243 return &v1.Pod{
1244 ObjectMeta: metav1.ObjectMeta{
1245 Name: name,
1246 Namespace: ns,
1247 Labels: map[string]string{
1248 "app": "volume-binding-test",
1249 },
1250 },
1251 Spec: v1.PodSpec{
1252 Containers: []v1.Container{
1253 {
1254 Name: "write-pod",
1255 Image: imageutils.GetE2EImage(imageutils.BusyBox),
1256 Command: []string{"/bin/sh"},
1257 Args: []string{"-c", "while true; do sleep 1; done"},
1258 },
1259 },
1260 Volumes: volumes,
1261 },
1262 }
1263 }
1264
1265
1266 func makeNode(index int) *v1.Node {
1267 name := fmt.Sprintf("node-%d", index)
1268 return &v1.Node{
1269 ObjectMeta: metav1.ObjectMeta{
1270 Name: name,
1271 Labels: map[string]string{nodeAffinityLabelKey: name},
1272 },
1273 Spec: v1.NodeSpec{Unschedulable: false},
1274 Status: v1.NodeStatus{
1275 Capacity: v1.ResourceList{
1276 v1.ResourcePods: *resource.NewQuantity(podLimit, resource.DecimalSI),
1277 },
1278 Conditions: []v1.NodeCondition{
1279 {
1280 Type: v1.NodeReady,
1281 Status: v1.ConditionTrue,
1282 Reason: fmt.Sprintf("schedulable condition"),
1283 LastHeartbeatTime: metav1.Time{Time: time.Now()},
1284 },
1285 },
1286 },
1287 }
1288 }
1289
1290 func validatePVCPhase(t *testing.T, client clientset.Interface, pvcName string, ns string, phase v1.PersistentVolumeClaimPhase, isProvisioned bool) {
1291 claim, err := client.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), pvcName, metav1.GetOptions{})
1292 if err != nil {
1293 t.Errorf("Failed to get PVC %v/%v: %v", ns, pvcName, err)
1294 }
1295
1296 if claim.Status.Phase != phase {
1297 t.Errorf("PVC %v/%v phase not %v, got %v", ns, pvcName, phase, claim.Status.Phase)
1298 }
1299
1300
1301 if phase == v1.ClaimBound {
1302 if err := validateProvisionAnn(claim, isProvisioned); err != nil {
1303 t.Errorf("Provisoning annotation on PVC %v/%v not as expected: %v", ns, pvcName, err)
1304 }
1305 }
1306 }
1307
1308 func validateProvisionAnn(claim *v1.PersistentVolumeClaim, volIsProvisioned bool) error {
1309 selectedNode, provisionAnnoExist := claim.Annotations["volume.kubernetes.io/selected-node"]
1310 if volIsProvisioned {
1311 if !provisionAnnoExist {
1312 return fmt.Errorf("PVC %v/%v expected to be provisioned, but no selected-node annotation found", claim.Namespace, claim.Name)
1313 }
1314 if selectedNode != node1 {
1315 return fmt.Errorf("PVC %v/%v expected to be annotated as %v, but got %v", claim.Namespace, claim.Name, node1, selectedNode)
1316 }
1317 }
1318 if !volIsProvisioned && provisionAnnoExist {
1319 return fmt.Errorf("PVC %v/%v not expected to be provisioned, but found selected-node annotation", claim.Namespace, claim.Name)
1320 }
1321
1322 return nil
1323 }
1324
1325 func waitForProvisionAnn(client clientset.Interface, pvc *v1.PersistentVolumeClaim, annShouldExist bool) error {
1326 return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
1327 claim, err := client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), pvc.Name, metav1.GetOptions{})
1328 if err != nil {
1329 return false, err
1330 }
1331 if err := validateProvisionAnn(claim, annShouldExist); err == nil {
1332 return true, nil
1333 }
1334 return false, nil
1335 })
1336 }
1337
1338 func validatePVPhase(t *testing.T, client clientset.Interface, pvName string, phase v1.PersistentVolumePhase) {
1339 pv, err := client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
1340 if err != nil {
1341 t.Errorf("Failed to get PV %v: %v", pvName, err)
1342 }
1343
1344 if pv.Status.Phase != phase {
1345 t.Errorf("PV %v phase not %v, got %v", pvName, phase, pv.Status.Phase)
1346 }
1347 }
1348
1349 func waitForPVPhase(client clientset.Interface, pvName string, phase v1.PersistentVolumePhase) error {
1350 return wait.PollImmediate(time.Second, 30*time.Second, func() (bool, error) {
1351 pv, err := client.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
1352 if err != nil {
1353 return false, err
1354 }
1355
1356 if pv.Status.Phase == phase {
1357 return true, nil
1358 }
1359 return false, nil
1360 })
1361 }
1362
1363 func waitForPVCBound(client clientset.Interface, pvc *v1.PersistentVolumeClaim) error {
1364 return wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
1365 claim, err := client.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(context.TODO(), pvc.Name, metav1.GetOptions{})
1366 if err != nil {
1367 return false, err
1368 }
1369 if claim.Status.Phase == v1.ClaimBound {
1370 return true, nil
1371 }
1372 return false, nil
1373 })
1374 }
1375
1376 func markNodeAffinity(pod *v1.Pod, node string) {
1377 affinity := &v1.Affinity{
1378 NodeAffinity: &v1.NodeAffinity{
1379 RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
1380 NodeSelectorTerms: []v1.NodeSelectorTerm{
1381 {
1382 MatchExpressions: []v1.NodeSelectorRequirement{
1383 {
1384 Key: nodeAffinityLabelKey,
1385 Operator: v1.NodeSelectorOpIn,
1386 Values: []string{node},
1387 },
1388 },
1389 },
1390 },
1391 },
1392 },
1393 }
1394 pod.Spec.Affinity = affinity
1395 }
1396
1397 func markNodeSelector(pod *v1.Pod, node string) {
1398 ns := map[string]string{
1399 nodeAffinityLabelKey: node,
1400 }
1401 pod.Spec.NodeSelector = ns
1402 }
1403
View as plain text