1
16
17 package operationexecutor
18
19 import (
20 "fmt"
21 "k8s.io/klog/v2"
22 "strconv"
23 "testing"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 "k8s.io/apimachinery/pkg/api/resource"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/types"
30 "k8s.io/apimachinery/pkg/util/uuid"
31 csitrans "k8s.io/csi-translation-lib"
32 "k8s.io/klog/v2/ktesting"
33 "k8s.io/kubernetes/pkg/volume"
34 "k8s.io/kubernetes/pkg/volume/util/hostutil"
35 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
36 )
37
38 const (
39 numVolumesToMount = 2
40 numAttachableVolumesToUnmount = 2
41 numNonAttachableVolumesToUnmount = 2
42 numDevicesToUnmount = 2
43 numVolumesToAttach = 2
44 numVolumesToDetach = 2
45 numVolumesToVerifyAttached = 2
46 numVolumesToVerifyControllerAttached = 2
47 numVolumesToMap = 2
48 numAttachableVolumesToUnmap = 2
49 numNonAttachableVolumesToUnmap = 2
50 numDevicesToUnmap = 2
51 )
52
53 var _ OperationGenerator = &fakeOperationGenerator{}
54
55 func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachableAndNonDevicemountablePlugins(t *testing.T) {
56
57 ch, quit, oe := setup()
58 volumesToMount := make([]VolumeToMount, numVolumesToMount)
59 secretName := "secret-volume"
60 volumeName := v1.UniqueVolumeName(secretName)
61
62
63 for i := range volumesToMount {
64 podName := "pod-" + strconv.Itoa(i+1)
65 pod := getTestPodWithSecret(podName, secretName)
66 volumesToMount[i] = VolumeToMount{
67 Pod: pod,
68 VolumeName: volumeName,
69 PluginIsAttachable: false,
70 PluginIsDeviceMountable: false,
71 ReportedInUse: true,
72 }
73 oe.MountVolume(0 , volumesToMount[i], nil , false )
74 }
75
76
77 if !isOperationRunConcurrently(ch, quit, numVolumesToMount) {
78 t.Fatalf("Unable to start mount operations in Concurrent for non-attachable volumes")
79 }
80 }
81
82 func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins(t *testing.T) {
83 t.Parallel()
84
85
86 ch, quit, oe := setup()
87 volumesToMount := make([]VolumeToMount, numVolumesToAttach)
88 pdName := "pd-volume"
89 volumeName := v1.UniqueVolumeName(pdName)
90
91 for i := range volumesToMount {
92 podName := "pod-" + strconv.Itoa(i+1)
93 pod := getTestPodWithGCEPD(podName, pdName)
94 volumesToMount[i] = VolumeToMount{
95 Pod: pod,
96 VolumeName: volumeName,
97 PluginIsAttachable: true,
98 ReportedInUse: true,
99 }
100 oe.MountVolume(0 , volumesToMount[i], nil , false )
101 }
102
103
104 if !isOperationRunSerially(ch, quit) {
105 t.Fatalf("Mount operations should not start concurrently for attachable volumes")
106 }
107 }
108
109 func TestOperationExecutor_MountVolume_ConcurrentMountForDeviceMountablePlugins(t *testing.T) {
110 t.Parallel()
111
112
113 ch, quit, oe := setup()
114 volumesToMount := make([]VolumeToMount, numVolumesToAttach)
115 pdName := "pd-volume"
116 volumeName := v1.UniqueVolumeName(pdName)
117
118 for i := range volumesToMount {
119 podName := "pod-" + strconv.Itoa(i+1)
120 pod := getTestPodWithGCEPD(podName, pdName)
121 volumesToMount[i] = VolumeToMount{
122 Pod: pod,
123 VolumeName: volumeName,
124 PluginIsDeviceMountable: true,
125 ReportedInUse: true,
126 }
127 oe.MountVolume(0 , volumesToMount[i], nil , false )
128 }
129
130
131 if !isOperationRunSerially(ch, quit) {
132 t.Fatalf("Mount operations should not start concurrently for devicemountable volumes")
133 }
134 }
135
136 func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins(t *testing.T) {
137
138 ch, quit, oe := setup()
139 volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmount+numNonAttachableVolumesToUnmount)
140 pdName := "pd-volume"
141 secretName := "secret-volume"
142
143
144 for i := 0; i < numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount; i++ {
145 podName := "pod-" + strconv.Itoa(i+1)
146 if i < numNonAttachableVolumesToUnmount {
147 pod := getTestPodWithSecret(podName, secretName)
148 volumesToUnmount[i] = MountedVolume{
149 PodName: volumetypes.UniquePodName(podName),
150 VolumeName: v1.UniqueVolumeName(secretName),
151 PodUID: pod.UID,
152 }
153 } else {
154 pod := getTestPodWithGCEPD(podName, pdName)
155 volumesToUnmount[i] = MountedVolume{
156 PodName: volumetypes.UniquePodName(podName),
157 VolumeName: v1.UniqueVolumeName(pdName),
158 PodUID: pod.UID,
159 }
160 }
161 oe.UnmountVolume(volumesToUnmount[i], nil , "" )
162 }
163
164
165 if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmount+numAttachableVolumesToUnmount) {
166 t.Fatalf("Unable to start unmount operations concurrently for volume plugins")
167 }
168 }
169
170 func TestOperationExecutor_UnmountDeviceConcurrently(t *testing.T) {
171 t.Parallel()
172
173
174 ch, quit, oe := setup()
175 attachedVolumes := make([]AttachedVolume, numDevicesToUnmount)
176 pdName := "pd-volume"
177
178
179 for i := range attachedVolumes {
180 attachedVolumes[i] = AttachedVolume{
181 VolumeName: v1.UniqueVolumeName(pdName),
182 NodeName: "node-name",
183 }
184 oe.UnmountDevice(attachedVolumes[i], nil , nil )
185 }
186
187
188 if !isOperationRunSerially(ch, quit) {
189 t.Fatalf("Unmount device operations should not start concurrently")
190 }
191 }
192
193 func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToSameNode(t *testing.T) {
194 t.Parallel()
195
196
197 ch, quit, oe := setup()
198 volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
199 pdName := "pd-volume"
200
201
202 for i := range volumesToAttach {
203 volumesToAttach[i] = VolumeToAttach{
204 VolumeName: v1.UniqueVolumeName(pdName),
205 NodeName: "node",
206 VolumeSpec: &volume.Spec{
207 PersistentVolume: &v1.PersistentVolume{
208 Spec: v1.PersistentVolumeSpec{
209 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
210 },
211 },
212 },
213 }
214 logger, _ := ktesting.NewTestContext(t)
215 oe.AttachVolume(logger, volumesToAttach[i], nil )
216 }
217
218
219 if !isOperationRunSerially(ch, quit) {
220 t.Fatalf("Attach volume operations should not start concurrently")
221 }
222 }
223
224 func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToSameNode(t *testing.T) {
225 t.Parallel()
226
227
228 ch, quit, oe := setup()
229 volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
230 pdName := "pd-volume"
231
232
233 for i := range volumesToAttach {
234 volumesToAttach[i] = VolumeToAttach{
235 VolumeName: v1.UniqueVolumeName(pdName),
236 NodeName: "node",
237 VolumeSpec: &volume.Spec{
238 PersistentVolume: &v1.PersistentVolume{
239 Spec: v1.PersistentVolumeSpec{
240 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
241 },
242 },
243 },
244 }
245 logger, _ := ktesting.NewTestContext(t)
246 oe.AttachVolume(logger, volumesToAttach[i], nil )
247 }
248
249
250 if !isOperationRunSerially(ch, quit) {
251 t.Fatalf("Attach volume operations should not start concurrently")
252 }
253 }
254
255 func TestOperationExecutor_AttachSingleNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) {
256 t.Parallel()
257
258
259 ch, quit, oe := setup()
260 volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
261 pdName := "pd-volume"
262
263
264 for i := range volumesToAttach {
265 volumesToAttach[i] = VolumeToAttach{
266 VolumeName: v1.UniqueVolumeName(pdName),
267 NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
268 VolumeSpec: &volume.Spec{
269 PersistentVolume: &v1.PersistentVolume{
270 Spec: v1.PersistentVolumeSpec{
271 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
272 },
273 },
274 },
275 }
276 logger, _ := ktesting.NewTestContext(t)
277 oe.AttachVolume(logger, volumesToAttach[i], nil )
278 }
279
280
281 if !isOperationRunSerially(ch, quit) {
282 t.Fatalf("Attach volume operations should not start concurrently")
283 }
284 }
285
286 func TestOperationExecutor_AttachMultiNodeVolumeConcurrentlyToDifferentNodes(t *testing.T) {
287
288 ch, quit, oe := setup()
289 volumesToAttach := make([]VolumeToAttach, numVolumesToAttach)
290 pdName := "pd-volume"
291
292
293 for i := range volumesToAttach {
294 volumesToAttach[i] = VolumeToAttach{
295 VolumeName: v1.UniqueVolumeName(pdName),
296 NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
297 VolumeSpec: &volume.Spec{
298 PersistentVolume: &v1.PersistentVolume{
299 Spec: v1.PersistentVolumeSpec{
300 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
301 },
302 },
303 },
304 }
305 logger, _ := ktesting.NewTestContext(t)
306 oe.AttachVolume(logger, volumesToAttach[i], nil )
307 }
308
309
310 if !isOperationRunConcurrently(ch, quit, numVolumesToAttach) {
311 t.Fatalf("Attach volume operations should not execute serially")
312 }
313 }
314
315 func TestOperationExecutor_DetachSingleNodeVolumeConcurrentlyFromSameNode(t *testing.T) {
316 t.Parallel()
317
318
319 ch, quit, oe := setup()
320 attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
321 pdName := "pd-volume"
322
323
324 for i := range attachedVolumes {
325 attachedVolumes[i] = AttachedVolume{
326 VolumeName: v1.UniqueVolumeName(pdName),
327 NodeName: "node",
328 VolumeSpec: &volume.Spec{
329 PersistentVolume: &v1.PersistentVolume{
330 Spec: v1.PersistentVolumeSpec{
331 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
332 },
333 },
334 },
335 }
336 logger, _ := ktesting.NewTestContext(t)
337 oe.DetachVolume(logger, attachedVolumes[i], true , nil )
338 }
339
340
341 if !isOperationRunSerially(ch, quit) {
342 t.Fatalf("DetachVolume operations should not run concurrently")
343 }
344 }
345
346 func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromSameNode(t *testing.T) {
347 t.Parallel()
348
349
350 ch, quit, oe := setup()
351 attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
352 pdName := "pd-volume"
353
354
355 for i := range attachedVolumes {
356 attachedVolumes[i] = AttachedVolume{
357 VolumeName: v1.UniqueVolumeName(pdName),
358 NodeName: "node",
359 VolumeSpec: &volume.Spec{
360 PersistentVolume: &v1.PersistentVolume{
361 Spec: v1.PersistentVolumeSpec{
362 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
363 },
364 },
365 },
366 }
367 logger, _ := ktesting.NewTestContext(t)
368 oe.DetachVolume(logger, attachedVolumes[i], true , nil )
369 }
370
371
372 if !isOperationRunSerially(ch, quit) {
373 t.Fatalf("DetachVolume operations should not run concurrently")
374 }
375 }
376
377 func TestOperationExecutor_DetachMultiNodeVolumeConcurrentlyFromDifferentNodes(t *testing.T) {
378
379 ch, quit, oe := setup()
380 attachedVolumes := make([]AttachedVolume, numVolumesToDetach)
381 pdName := "pd-volume"
382
383
384 for i := range attachedVolumes {
385 attachedVolumes[i] = AttachedVolume{
386 VolumeName: v1.UniqueVolumeName(pdName),
387 NodeName: types.NodeName(fmt.Sprintf("node%d", i)),
388 VolumeSpec: &volume.Spec{
389 PersistentVolume: &v1.PersistentVolume{
390 Spec: v1.PersistentVolumeSpec{
391 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany},
392 },
393 },
394 },
395 }
396 logger, _ := ktesting.NewTestContext(t)
397 oe.DetachVolume(logger, attachedVolumes[i], true , nil )
398 }
399
400
401 if !isOperationRunConcurrently(ch, quit, numVolumesToDetach) {
402 t.Fatalf("Attach volume operations should not execute serially")
403 }
404 }
405
406 func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnSameNode(t *testing.T) {
407
408 ch, quit, oe := setup()
409
410
411 for i := 0; i < numVolumesToVerifyAttached; i++ {
412 oe.VerifyVolumesAreAttachedPerNode(nil , "node-name", nil )
413 }
414
415
416 if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
417 t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
418 }
419 }
420
421 func TestOperationExecutor_VerifyVolumesAreAttachedConcurrentlyOnDifferentNodes(t *testing.T) {
422
423 ch, quit, oe := setup()
424
425
426 for i := 0; i < numVolumesToVerifyAttached; i++ {
427 oe.VerifyVolumesAreAttachedPerNode(
428 nil,
429 types.NodeName(fmt.Sprintf("node-name-%d", i)),
430 nil )
431 }
432
433
434 if !isOperationRunConcurrently(ch, quit, numVolumesToVerifyAttached) {
435 t.Fatalf("VerifyVolumesAreAttached operation is not being run concurrently")
436 }
437 }
438
439 func TestOperationExecutor_VerifyControllerAttachedVolumeConcurrently(t *testing.T) {
440 t.Parallel()
441
442
443 ch, quit, oe := setup()
444 volumesToMount := make([]VolumeToMount, numVolumesToVerifyControllerAttached)
445 pdName := "pd-volume"
446
447
448 for i := range volumesToMount {
449 volumesToMount[i] = VolumeToMount{
450 VolumeName: v1.UniqueVolumeName(pdName),
451 }
452 logger, _ := ktesting.NewTestContext(t)
453 oe.VerifyControllerAttachedVolume(logger, volumesToMount[i], types.NodeName("node-name"), nil )
454 }
455
456
457 if !isOperationRunSerially(ch, quit) {
458 t.Fatalf("VerifyControllerAttachedVolume should not run concurrently")
459 }
460 }
461
462 func TestOperationExecutor_MountVolume_ConcurrentMountForNonAttachablePlugins_VolumeMode_Block(t *testing.T) {
463
464 ch, quit, oe := setup()
465 volumesToMount := make([]VolumeToMount, numVolumesToMap)
466 secretName := "secret-volume"
467 volumeName := v1.UniqueVolumeName(secretName)
468 volumeMode := v1.PersistentVolumeBlock
469 tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
470
471
472 for i := range volumesToMount {
473 podName := "pod-" + strconv.Itoa(i+1)
474 pod := getTestPodWithSecret(podName, secretName)
475 volumesToMount[i] = VolumeToMount{
476 Pod: pod,
477 VolumeName: volumeName,
478 PluginIsAttachable: false,
479 ReportedInUse: true,
480 VolumeSpec: tmpSpec,
481 }
482 oe.MountVolume(0 , volumesToMount[i], nil , false)
483 }
484
485
486 if !isOperationRunConcurrently(ch, quit, numVolumesToMap) {
487 t.Fatalf("Unable to start map operations in Concurrent for non-attachable volumes")
488 }
489 }
490
491 func TestOperationExecutor_MountVolume_ConcurrentMountForAttachablePlugins_VolumeMode_Block(t *testing.T) {
492 t.Parallel()
493
494
495 ch, quit, oe := setup()
496 volumesToMount := make([]VolumeToMount, numVolumesToAttach)
497 pdName := "pd-volume"
498 volumeName := v1.UniqueVolumeName(pdName)
499 volumeMode := v1.PersistentVolumeBlock
500 tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
501
502
503 for i := range volumesToMount {
504 podName := "pod-" + strconv.Itoa(i+1)
505 pod := getTestPodWithGCEPD(podName, pdName)
506 volumesToMount[i] = VolumeToMount{
507 Pod: pod,
508 VolumeName: volumeName,
509 PluginIsAttachable: true,
510 ReportedInUse: true,
511 VolumeSpec: tmpSpec,
512 }
513 oe.MountVolume(0 , volumesToMount[i], nil , false)
514 }
515
516
517 if !isOperationRunSerially(ch, quit) {
518 t.Fatalf("Map operations should not start concurrently for attachable volumes")
519 }
520 }
521
522 func TestOperationExecutor_UnmountVolume_ConcurrentUnmountForAllPlugins_VolumeMode_Block(t *testing.T) {
523
524 ch, quit, oe := setup()
525 volumesToUnmount := make([]MountedVolume, numAttachableVolumesToUnmap+numNonAttachableVolumesToUnmap)
526 pdName := "pd-volume"
527 secretName := "secret-volume"
528 volumeMode := v1.PersistentVolumeBlock
529 tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
530
531
532 for i := 0; i < numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap; i++ {
533 podName := "pod-" + strconv.Itoa(i+1)
534 if i < numNonAttachableVolumesToUnmap {
535 pod := getTestPodWithSecret(podName, secretName)
536 volumesToUnmount[i] = MountedVolume{
537 PodName: volumetypes.UniquePodName(podName),
538 VolumeName: v1.UniqueVolumeName(secretName),
539 PodUID: pod.UID,
540 VolumeSpec: tmpSpec,
541 }
542 } else {
543 pod := getTestPodWithGCEPD(podName, pdName)
544 volumesToUnmount[i] = MountedVolume{
545 PodName: volumetypes.UniquePodName(podName),
546 VolumeName: v1.UniqueVolumeName(pdName),
547 PodUID: pod.UID,
548 VolumeSpec: tmpSpec,
549 }
550 }
551 oe.UnmountVolume(volumesToUnmount[i], nil , "" )
552 }
553
554
555 if !isOperationRunConcurrently(ch, quit, numNonAttachableVolumesToUnmap+numAttachableVolumesToUnmap) {
556 t.Fatalf("Unable to start unmap operations concurrently for volume plugins")
557 }
558 }
559
560 func TestOperationExecutor_UnmountDeviceConcurrently_VolumeMode_Block(t *testing.T) {
561 t.Parallel()
562
563
564 ch, quit, oe := setup()
565 attachedVolumes := make([]AttachedVolume, numDevicesToUnmap)
566 pdName := "pd-volume"
567 volumeMode := v1.PersistentVolumeBlock
568 tmpSpec := &volume.Spec{PersistentVolume: &v1.PersistentVolume{Spec: v1.PersistentVolumeSpec{VolumeMode: &volumeMode}}}
569
570
571 for i := range attachedVolumes {
572 attachedVolumes[i] = AttachedVolume{
573 VolumeName: v1.UniqueVolumeName(pdName),
574 NodeName: "node-name",
575 VolumeSpec: tmpSpec,
576 }
577 oe.UnmountDevice(attachedVolumes[i], nil , nil )
578 }
579
580
581 if !isOperationRunSerially(ch, quit) {
582 t.Fatalf("Unmap device operations should not start concurrently")
583 }
584 }
585
586 type fakeOperationGenerator struct {
587 ch chan interface{}
588 quit chan interface{}
589 }
590
591 func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
592 return &fakeOperationGenerator{
593 ch: ch,
594 quit: quit,
595 }
596 }
597
598 func (fopg *fakeOperationGenerator) GenerateMountVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater, isRemount bool) volumetypes.GeneratedOperations {
599 opFunc := func() volumetypes.OperationContext {
600 startOperationAndBlock(fopg.ch, fopg.quit)
601 return volumetypes.NewOperationContext(nil, nil, false)
602 }
603 return volumetypes.GeneratedOperations{
604 OperationFunc: opFunc,
605 }
606 }
607 func (fopg *fakeOperationGenerator) GenerateUnmountVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, podsDir string) (volumetypes.GeneratedOperations, error) {
608 opFunc := func() volumetypes.OperationContext {
609 startOperationAndBlock(fopg.ch, fopg.quit)
610 return volumetypes.NewOperationContext(nil, nil, false)
611 }
612 return volumetypes.GeneratedOperations{
613 OperationFunc: opFunc,
614 }, nil
615 }
616 func (fopg *fakeOperationGenerator) GenerateAttachVolumeFunc(logger klog.Logger, volumeToAttach VolumeToAttach, actualStateOfWorld ActualStateOfWorldAttacherUpdater) volumetypes.GeneratedOperations {
617 opFunc := func() volumetypes.OperationContext {
618 startOperationAndBlock(fopg.ch, fopg.quit)
619 return volumetypes.NewOperationContext(nil, nil, false)
620 }
621 return volumetypes.GeneratedOperations{
622 OperationFunc: opFunc,
623 }
624 }
625 func (fopg *fakeOperationGenerator) GenerateDetachVolumeFunc(logger klog.Logger, volumeToDetach AttachedVolume, verifySafeToDetach bool, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
626 opFunc := func() volumetypes.OperationContext {
627 startOperationAndBlock(fopg.ch, fopg.quit)
628 return volumetypes.NewOperationContext(nil, nil, false)
629 }
630 return volumetypes.GeneratedOperations{
631 OperationFunc: opFunc,
632 }, nil
633 }
634 func (fopg *fakeOperationGenerator) GenerateVolumesAreAttachedFunc(attachedVolumes []AttachedVolume, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
635 opFunc := func() volumetypes.OperationContext {
636 startOperationAndBlock(fopg.ch, fopg.quit)
637 return volumetypes.NewOperationContext(nil, nil, false)
638 }
639 return volumetypes.GeneratedOperations{
640 OperationFunc: opFunc,
641 }, nil
642 }
643 func (fopg *fakeOperationGenerator) GenerateUnmountDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
644 opFunc := func() volumetypes.OperationContext {
645 startOperationAndBlock(fopg.ch, fopg.quit)
646 return volumetypes.NewOperationContext(nil, nil, false)
647 }
648 return volumetypes.GeneratedOperations{
649 OperationFunc: opFunc,
650 }, nil
651 }
652 func (fopg *fakeOperationGenerator) GenerateVerifyControllerAttachedVolumeFunc(logger klog.Logger, volumeToMount VolumeToMount, nodeName types.NodeName, actualStateOfWorld ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
653 opFunc := func() volumetypes.OperationContext {
654 startOperationAndBlock(fopg.ch, fopg.quit)
655 return volumetypes.NewOperationContext(nil, nil, false)
656 }
657 return volumetypes.GeneratedOperations{
658 OperationFunc: opFunc,
659 }, nil
660 }
661
662 func (fopg *fakeOperationGenerator) GenerateExpandVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) (volumetypes.GeneratedOperations, error) {
663 opFunc := func() volumetypes.OperationContext {
664 startOperationAndBlock(fopg.ch, fopg.quit)
665 return volumetypes.NewOperationContext(nil, nil, false)
666 }
667 return volumetypes.GeneratedOperations{
668 OperationFunc: opFunc,
669 }, nil
670 }
671
672 func (fopg *fakeOperationGenerator) GenerateExpandAndRecoverVolumeFunc(pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume, resizerName string) (volumetypes.GeneratedOperations, error) {
673 opFunc := func() volumetypes.OperationContext {
674 startOperationAndBlock(fopg.ch, fopg.quit)
675 return volumetypes.NewOperationContext(nil, nil, false)
676 }
677 return volumetypes.GeneratedOperations{
678 OperationFunc: opFunc,
679 }, nil
680 }
681
682 func (fopg *fakeOperationGenerator) GenerateExpandInUseVolumeFunc(volumeToMount VolumeToMount, actualStateOfWorld ActualStateOfWorldMounterUpdater, currentSize resource.Quantity) (volumetypes.GeneratedOperations, error) {
683 opFunc := func() volumetypes.OperationContext {
684 startOperationAndBlock(fopg.ch, fopg.quit)
685 return volumetypes.NewOperationContext(nil, nil, false)
686 }
687 return volumetypes.GeneratedOperations{
688 OperationFunc: opFunc,
689 }, nil
690 }
691
692 func (fopg *fakeOperationGenerator) GenerateBulkVolumeVerifyFunc(
693 pluginNodeVolumes map[types.NodeName][]*volume.Spec,
694 pluginNane string,
695 volumeSpecMap map[*volume.Spec]v1.UniqueVolumeName,
696 actualStateOfWorldAttacherUpdater ActualStateOfWorldAttacherUpdater) (volumetypes.GeneratedOperations, error) {
697 opFunc := func() volumetypes.OperationContext {
698 startOperationAndBlock(fopg.ch, fopg.quit)
699 return volumetypes.NewOperationContext(nil, nil, false)
700 }
701 return volumetypes.GeneratedOperations{
702 OperationFunc: opFunc,
703 }, nil
704 }
705
706 func (fopg *fakeOperationGenerator) GenerateMapVolumeFunc(waitForAttachTimeout time.Duration, volumeToMount VolumeToMount, actualStateOfWorldMounterUpdater ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
707 opFunc := func() volumetypes.OperationContext {
708 startOperationAndBlock(fopg.ch, fopg.quit)
709 return volumetypes.NewOperationContext(nil, nil, false)
710 }
711 return volumetypes.GeneratedOperations{
712 OperationFunc: opFunc,
713 }, nil
714 }
715
716 func (fopg *fakeOperationGenerator) GenerateUnmapVolumeFunc(volumeToUnmount MountedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater) (volumetypes.GeneratedOperations, error) {
717 opFunc := func() volumetypes.OperationContext {
718 startOperationAndBlock(fopg.ch, fopg.quit)
719 return volumetypes.NewOperationContext(nil, nil, false)
720 }
721 return volumetypes.GeneratedOperations{
722 OperationFunc: opFunc,
723 }, nil
724 }
725
726 func (fopg *fakeOperationGenerator) GenerateUnmapDeviceFunc(deviceToDetach AttachedVolume, actualStateOfWorld ActualStateOfWorldMounterUpdater, hostutil hostutil.HostUtils) (volumetypes.GeneratedOperations, error) {
727 opFunc := func() volumetypes.OperationContext {
728 startOperationAndBlock(fopg.ch, fopg.quit)
729 return volumetypes.NewOperationContext(nil, nil, false)
730 }
731 return volumetypes.GeneratedOperations{
732 OperationFunc: opFunc,
733 }, nil
734 }
735
736 func (fopg *fakeOperationGenerator) GetVolumePluginMgr() *volume.VolumePluginMgr {
737 return nil
738 }
739
740 func (fopg *fakeOperationGenerator) GetCSITranslator() InTreeToCSITranslator {
741 return csitrans.New()
742 }
743
744 func getTestPodWithSecret(podName, secretName string) *v1.Pod {
745 return &v1.Pod{
746 ObjectMeta: metav1.ObjectMeta{
747 Name: podName,
748 UID: types.UID(podName),
749 },
750 Spec: v1.PodSpec{
751 Volumes: []v1.Volume{
752 {
753 Name: secretName,
754 VolumeSource: v1.VolumeSource{
755 Secret: &v1.SecretVolumeSource{
756 SecretName: secretName,
757 },
758 },
759 },
760 },
761 Containers: []v1.Container{
762 {
763 Name: "secret-volume-test",
764 Image: "registry.k8s.io/mounttest:0.8",
765 Args: []string{
766 "--file_content=/etc/secret-volume/data-1",
767 "--file_mode=/etc/secret-volume/data-1"},
768 VolumeMounts: []v1.VolumeMount{
769 {
770 Name: secretName,
771 MountPath: "/data",
772 },
773 },
774 },
775 },
776 RestartPolicy: v1.RestartPolicyNever,
777 },
778 }
779 }
780
781 func getTestPodWithGCEPD(podName, pdName string) *v1.Pod {
782 return &v1.Pod{
783 ObjectMeta: metav1.ObjectMeta{
784 Name: podName,
785 UID: types.UID(podName + string(uuid.NewUUID())),
786 },
787 Spec: v1.PodSpec{
788 Volumes: []v1.Volume{
789 {
790 Name: pdName,
791 VolumeSource: v1.VolumeSource{
792 GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
793 PDName: pdName,
794 FSType: "ext4",
795 ReadOnly: false,
796 },
797 },
798 },
799 },
800 Containers: []v1.Container{
801 {
802 Name: "pd-volume-test",
803 Image: "registry.k8s.io/mounttest:0.8",
804 Args: []string{
805 "--file_content=/etc/pd-volume/data-1",
806 },
807 VolumeMounts: []v1.VolumeMount{
808 {
809 Name: pdName,
810 MountPath: "/data",
811 },
812 },
813 },
814 },
815 RestartPolicy: v1.RestartPolicyNever,
816 },
817 }
818 }
819
820 func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
821 defer close(quit)
822 numOperationsStarted := 0
823 loop:
824 for {
825 select {
826 case <-ch:
827 numOperationsStarted++
828 if numOperationsStarted > 1 {
829 return false
830 }
831 case <-time.After(5 * time.Second):
832 break loop
833 }
834 }
835 return true
836 }
837
838 func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
839 defer close(quit)
840 numOperationsStarted := 0
841 loop:
842 for {
843 select {
844 case <-ch:
845 numOperationsStarted++
846 if numOperationsStarted == numOperationsToRun {
847 return true
848 }
849 case <-time.After(5 * time.Second):
850 break loop
851 }
852 }
853 return false
854 }
855
856 func setup() (chan interface{}, chan interface{}, OperationExecutor) {
857 ch, quit := make(chan interface{}), make(chan interface{})
858 return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
859 }
860
861
862
863 func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
864 ch <- nil
865 <-quit
866 }
867
View as plain text