1
16
17 package nestedpendingoperations
18
19 import (
20 "fmt"
21 "testing"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 "k8s.io/apimachinery/pkg/types"
26 "k8s.io/apimachinery/pkg/util/wait"
27 "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
28 volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
29 )
30
31 const (
32
33
34
35 testTimeout time.Duration = 1 * time.Minute
36
37
38
39
40 initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond
41
42
43
44
45 initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
46 )
47
48 func Test_NestedPendingOperations_Positive_SingleOp(t *testing.T) {
49
50 grm := NewNestedPendingOperations(false )
51 volumeName := v1.UniqueVolumeName("volume-name")
52
53
54 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
55
56
57 if err != nil {
58 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
59 }
60 }
61
62 func Test_NestedPendingOperations_Positive_TwoOps(t *testing.T) {
63
64 grm := NewNestedPendingOperations(false )
65 volume1Name := v1.UniqueVolumeName("volume1-name")
66 volume2Name := v1.UniqueVolumeName("volume2-name")
67
68
69 err1 := grm.Run(volume1Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
70 err2 := grm.Run(volume2Name, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
71
72
73 if err1 != nil {
74 t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume1Name, err1)
75 }
76
77 if err2 != nil {
78 t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", volume2Name, err2)
79 }
80 }
81
82 func Test_NestedPendingOperations_Positive_TwoSubOps(t *testing.T) {
83
84 grm := NewNestedPendingOperations(false )
85 volumeName := v1.UniqueVolumeName("volume-name")
86 operation1PodName := volumetypes.UniquePodName("operation1-podname")
87 operation2PodName := volumetypes.UniquePodName("operation2-podname")
88
89
90 err1 := grm.Run(volumeName, operation1PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
91 err2 := grm.Run(volumeName, operation2PodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
92
93
94 if err1 != nil {
95 t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation1PodName, err1)
96 }
97
98 if err2 != nil {
99 t.Fatalf("NestedPendingOperations %q failed. Expected: <no error> Actual: <%v>", operation2PodName, err2)
100 }
101 }
102
103 func Test_NestedPendingOperations_Positive_SingleOpWithExpBackoff(t *testing.T) {
104
105 grm := NewNestedPendingOperations(true )
106 volumeName := v1.UniqueVolumeName("volume-name")
107
108
109 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: noopFunc})
110
111
112 if err != nil {
113 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
114 }
115 }
116
117 func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
118
119 grm := NewNestedPendingOperations(false )
120 volumeName := v1.UniqueVolumeName("volume-name")
121 operation1DoneCh := make(chan interface{})
122 operation1 := generateCallbackFunc(operation1DoneCh)
123 err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
124 if err1 != nil {
125 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
126 }
127 operation2 := noopFunc
128 <-operation1DoneCh
129
130
131 err2 := retryWithExponentialBackOff(
132 time.Duration(initialOperationWaitTimeShort),
133 func() (bool, error) {
134 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
135 if err != nil {
136 t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
137 return false, nil
138 }
139 return true, nil
140 },
141 )
142
143
144 if err2 != nil {
145 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
146 }
147 }
148
149 func Test_NestedPendingOperations_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
150
151 grm := NewNestedPendingOperations(true )
152 volumeName := v1.UniqueVolumeName("volume-name")
153 operation1DoneCh := make(chan interface{})
154 operation1 := generateCallbackFunc(operation1DoneCh)
155 err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
156 if err1 != nil {
157 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
158 }
159 operation2 := noopFunc
160 <-operation1DoneCh
161
162
163 err2 := retryWithExponentialBackOff(
164 time.Duration(initialOperationWaitTimeShort),
165 func() (bool, error) {
166 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
167 if err != nil {
168 t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
169 return false, nil
170 }
171 return true, nil
172 },
173 )
174
175
176 if err2 != nil {
177 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
178 }
179 }
180
181 func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanics(t *testing.T) {
182
183 grm := NewNestedPendingOperations(false )
184 volumeName := v1.UniqueVolumeName("volume-name")
185 operation1 := panicFunc
186 err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
187 if err1 != nil {
188 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
189 }
190 operation2 := noopFunc
191
192
193 err2 := retryWithExponentialBackOff(
194 time.Duration(initialOperationWaitTimeShort),
195 func() (bool, error) {
196 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
197 if err != nil {
198 t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
199 return false, nil
200 }
201 return true, nil
202 },
203 )
204
205
206 if err2 != nil {
207 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
208 }
209 }
210
211 func Test_NestedPendingOperations_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
212
213 grm := NewNestedPendingOperations(true )
214 volumeName := v1.UniqueVolumeName("volume-name")
215 operation1 := panicFunc
216 err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
217 if err1 != nil {
218 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
219 }
220 operation2 := noopFunc
221
222
223 err2 := retryWithExponentialBackOff(
224 time.Duration(initialOperationWaitTimeLong),
225 func() (bool, error) {
226 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
227 if err != nil {
228 t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
229 return false, nil
230 }
231 return true, nil
232 },
233 )
234
235
236 if err2 != nil {
237 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
238 }
239 }
240
241 func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
242
243 grm := NewNestedPendingOperations(false )
244 volumeName := v1.UniqueVolumeName("volume-name")
245 operation1DoneCh := make(chan interface{})
246 operation1 := generateWaitFunc(operation1DoneCh)
247 err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
248 if err1 != nil {
249 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
250 }
251 operation2 := noopFunc
252
253
254 err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
255
256
257 if err2 == nil {
258 t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
259 }
260 if !IsAlreadyExists(err2) {
261 t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
262 }
263 }
264
265 func Test_NestedPendingOperations_Negative_SecondThirdOpWithDifferentNames(t *testing.T) {
266
267 grm := NewNestedPendingOperations(true )
268 volumeName := v1.UniqueVolumeName("volume-name")
269 op1Name := "mount_volume"
270 operation1 := errorFunc
271 err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: op1Name})
272 if err1 != nil {
273 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
274 }
275
276
277 operation2 := errorFunc
278 err2 := retryWithExponentialBackOff(
279 initialOperationWaitTimeShort,
280 func() (bool, error) {
281 err := grm.Run(volumeName,
282 EmptyUniquePodName,
283 EmptyNodeName,
284 volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: op1Name})
285
286 if exponentialbackoff.IsExponentialBackoff(err) {
287 return true, nil
288 }
289 return false, nil
290 },
291 )
292
293
294 if err2 != nil {
295 t.Fatalf("Expected NestedPendingOperations to fail with exponential backoff for operationKey : %s and operationName : %s", volumeName, op1Name)
296 }
297
298 operation3 := noopFunc
299 op3Name := "unmount_volume"
300
301 err3 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: op3Name})
302 if err3 != nil {
303 t.Fatalf("NestedPendingOperations failed. Expected <no error> Actual: <%v>", err3)
304 }
305 }
306
307 func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes2(t *testing.T) {
308
309 grm := NewNestedPendingOperations(false )
310 volumeName := v1.UniqueVolumeName("volume-name")
311 operationPodName := volumetypes.UniquePodName("operation-podname")
312 operation1DoneCh := make(chan interface{})
313 operation1 := generateWaitFunc(operation1DoneCh)
314 err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
315 if err1 != nil {
316 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
317 }
318 operation2 := noopFunc
319
320
321 err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
322
323
324 if err2 == nil {
325 t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
326 }
327 if !IsAlreadyExists(err2) {
328 t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
329 }
330 }
331
332 func Test_NestedPendingOperations_Negative_SecondSubOpBeforeFirstCompletes(t *testing.T) {
333
334 grm := NewNestedPendingOperations(false )
335 volumeName := v1.UniqueVolumeName("volume-name")
336 operationPodName := volumetypes.UniquePodName("operation-podname")
337 operation1DoneCh := make(chan interface{})
338 operation1 := generateWaitFunc(operation1DoneCh)
339 err1 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
340 if err1 != nil {
341 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
342 }
343 operation2 := noopFunc
344
345
346 err2 := grm.Run(volumeName, operationPodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
347
348
349 if err2 == nil {
350 t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
351 }
352 if !IsAlreadyExists(err2) {
353 t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
354 }
355 }
356
357 func Test_NestedPendingOperations_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
358
359 grm := NewNestedPendingOperations(true )
360 volumeName := v1.UniqueVolumeName("volume-name")
361 operation1DoneCh := make(chan interface{})
362 operation1 := generateWaitFunc(operation1DoneCh)
363 err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
364 if err1 != nil {
365 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
366 }
367 operation2 := noopFunc
368
369
370 err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
371
372
373 if err2 == nil {
374 t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
375 }
376 if !IsAlreadyExists(err2) {
377 t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
378 }
379 }
380
381 func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
382
383 grm := NewNestedPendingOperations(false )
384 volumeName := v1.UniqueVolumeName("volume-name")
385 operation1DoneCh := make(chan interface{})
386 operation1 := generateWaitFunc(operation1DoneCh)
387 err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
388 if err1 != nil {
389 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
390 }
391 operation2 := noopFunc
392 operation3 := noopFunc
393
394
395 err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
396
397
398 if err2 == nil {
399 t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
400 }
401 if !IsAlreadyExists(err2) {
402 t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
403 }
404
405
406 operation1DoneCh <- true
407 err3 := retryWithExponentialBackOff(
408 time.Duration(initialOperationWaitTimeShort),
409 func() (bool, error) {
410 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3})
411 if err != nil {
412 t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
413 return false, nil
414 }
415 return true, nil
416 },
417 )
418
419
420 if err3 != nil {
421 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
422 }
423 }
424
425 func Test_NestedPendingOperations_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
426
427 grm := NewNestedPendingOperations(true )
428 volumeName := v1.UniqueVolumeName("volume-name")
429 operation1DoneCh := make(chan interface{})
430 operation1 := generateWaitFunc(operation1DoneCh)
431 err1 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
432 if err1 != nil {
433 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
434 }
435 operation2 := noopFunc
436 operation3 := noopFunc
437
438
439 err2 := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation2})
440
441
442 if err2 == nil {
443 t.Fatalf("NestedPendingOperations did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", volumeName)
444 }
445 if !IsAlreadyExists(err2) {
446 t.Fatalf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
447 }
448
449
450 operation1DoneCh <- true
451 err3 := retryWithExponentialBackOff(
452 time.Duration(initialOperationWaitTimeShort),
453 func() (bool, error) {
454 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation3})
455 if err != nil {
456 t.Logf("Warning: NestedPendingOperations failed with %v. Will retry.", err)
457 return false, nil
458 }
459 return true, nil
460 },
461 )
462
463
464 if err3 != nil {
465 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
466 }
467 }
468
469 func Test_NestedPendingOperations_Positive_WaitEmpty(t *testing.T) {
470
471
472 grm := NewNestedPendingOperations(false )
473
474
475 waitDoneCh := make(chan interface{}, 1)
476 go func() {
477 grm.Wait()
478 waitDoneCh <- true
479 }()
480
481
482 err := waitChannelWithTimeout(waitDoneCh, testTimeout)
483 if err != nil {
484 t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
485 }
486 }
487
488 func Test_NestedPendingOperations_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
489
490
491 grm := NewNestedPendingOperations(true )
492
493
494 waitDoneCh := make(chan interface{}, 1)
495 go func() {
496 grm.Wait()
497 waitDoneCh <- true
498 }()
499
500
501 err := waitChannelWithTimeout(waitDoneCh, testTimeout)
502 if err != nil {
503 t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
504 }
505 }
506
507 func Test_NestedPendingOperations_Positive_Wait(t *testing.T) {
508
509
510 grm := NewNestedPendingOperations(false )
511 volumeName := v1.UniqueVolumeName("volume-name")
512 operation1DoneCh := make(chan interface{})
513 operation1 := generateWaitFunc(operation1DoneCh)
514 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
515 if err != nil {
516 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
517 }
518
519
520 waitDoneCh := make(chan interface{}, 1)
521 go func() {
522 grm.Wait()
523 waitDoneCh <- true
524 }()
525
526
527 operation1DoneCh <- true
528
529
530 err = waitChannelWithTimeout(waitDoneCh, testTimeout)
531 if err != nil {
532 t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
533 }
534 }
535
536 func Test_NestedPendingOperations_Positive_WaitWithExpBackoff(t *testing.T) {
537
538
539 grm := NewNestedPendingOperations(true )
540 volumeName := v1.UniqueVolumeName("volume-name")
541 operation1DoneCh := make(chan interface{})
542 operation1 := generateWaitFunc(operation1DoneCh)
543 err := grm.Run(volumeName, EmptyUniquePodName, EmptyNodeName, volumetypes.GeneratedOperations{OperationFunc: operation1})
544 if err != nil {
545 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err)
546 }
547
548
549 waitDoneCh := make(chan interface{}, 1)
550 go func() {
551 grm.Wait()
552 waitDoneCh <- true
553 }()
554
555
556 operation1DoneCh <- true
557
558
559 err = waitChannelWithTimeout(waitDoneCh, testTimeout)
560 if err != nil {
561 t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
562 }
563 }
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596 func Test_NestedPendingOperations_SecondOpBeforeFirstCompletes(t *testing.T) {
597 const (
598 keyNone = iota
599 keyVolume
600 keyVolumePod
601 keyVolumeNode
602 )
603
604 type testCase struct {
605 testID int
606 keyTypes []int
607 expectPass bool
608 }
609
610 tests := []testCase{
611 {testID: 1, keyTypes: []int{keyNone, keyNone}, expectPass: true},
612 {testID: 2, keyTypes: []int{keyNone, keyVolume}, expectPass: true},
613 {testID: 3, keyTypes: []int{keyNone, keyVolumePod}, expectPass: true},
614 {testID: 4, keyTypes: []int{keyNone, keyVolumeNode}, expectPass: true},
615 {testID: 5, keyTypes: []int{keyVolume, keyNone}, expectPass: true},
616 {testID: 6, keyTypes: []int{keyVolume, keyVolumePod}, expectPass: false},
617 {testID: 7, keyTypes: []int{keyVolume, keyVolumeNode}, expectPass: false},
618 {testID: 8, keyTypes: []int{keyVolumePod, keyNone}, expectPass: true},
619 {testID: 9, keyTypes: []int{keyVolumePod, keyVolume}, expectPass: false},
620 {testID: 10, keyTypes: []int{keyVolumeNode, keyNone}, expectPass: true},
621 {testID: 11, keyTypes: []int{keyVolumeNode, keyVolume}, expectPass: false},
622 {testID: 12, keyTypes: []int{keyVolumeNode, keyVolumeNode}, expectPass: false},
623 }
624
625 for _, test := range tests {
626 var (
627 volumeNames []v1.UniqueVolumeName
628 podNames []volumetypes.UniquePodName
629 nodeNames []types.NodeName
630 )
631 for _, keyType := range test.keyTypes {
632 var (
633 v v1.UniqueVolumeName
634 p volumetypes.UniquePodName
635 n types.NodeName
636 )
637 switch keyType {
638 case keyNone:
639 v = EmptyUniqueVolumeName
640 p = EmptyUniquePodName
641 n = EmptyNodeName
642 case keyVolume:
643 v = v1.UniqueVolumeName("volume-name")
644 p = EmptyUniquePodName
645 n = EmptyNodeName
646 case keyVolumePod:
647 v = v1.UniqueVolumeName("volume-name")
648 p = volumetypes.UniquePodName("operation-podname")
649 n = EmptyNodeName
650 case keyVolumeNode:
651 v = v1.UniqueVolumeName("volume-name")
652 p = EmptyUniquePodName
653 n = types.NodeName("operation-nodename")
654 }
655 volumeNames = append(volumeNames, v)
656 podNames = append(podNames, p)
657 nodeNames = append(nodeNames, n)
658 }
659
660 t.Run(fmt.Sprintf("Test %d", test.testID), func(t *testing.T) {
661 if test.expectPass {
662 testConcurrentOperationsPositive(t,
663 volumeNames[0], podNames[0], nodeNames[0],
664 volumeNames[1], podNames[1], nodeNames[1],
665 )
666 } else {
667 testConcurrentOperationsNegative(t,
668 volumeNames[0], podNames[0], nodeNames[0],
669 volumeNames[1], podNames[1], nodeNames[1],
670 )
671 }
672 })
673
674 }
675
676 }
677
678 func Test_NestedPendingOperations_Positive_Issue_88355(t *testing.T) {
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698 const (
699 mainVolumeName = "main-volume"
700 opZVolumeName = "other-volume"
701 node1 = "node1"
702 node2 = "node2"
703
704
705
706 delay = 50 * time.Millisecond
707
708
709 reconcilerPeriod = 100 * time.Millisecond
710 )
711
712 grm := NewNestedPendingOperations(true )
713 opZContinueCh := make(chan interface{})
714 op1ContinueCh := make(chan interface{})
715 op2ContinueCh := make(chan interface{})
716 operationZ := generateWaitFunc(opZContinueCh)
717 operation1 := generateWaitFunc(op1ContinueCh)
718 operation2 := generateWaitWithErrorFunc(op2ContinueCh)
719 operation3 := noopFunc
720 operation4 := noopFunc
721
722 errZ := grm.Run(opZVolumeName, "" , "" , volumetypes.GeneratedOperations{OperationFunc: operationZ})
723 if errZ != nil {
724 t.Fatalf("NestedPendingOperations failed for operationZ. Expected: <no error> Actual: <%v>", errZ)
725 }
726
727 err1 := grm.Run(mainVolumeName, "" , node1, volumetypes.GeneratedOperations{OperationFunc: operation1})
728 if err1 != nil {
729 t.Fatalf("NestedPendingOperations failed for operation1. Expected: <no error> Actual: <%v>", err1)
730 }
731
732 err2 := grm.Run(mainVolumeName, "" , node2, volumetypes.GeneratedOperations{OperationFunc: operation2})
733 if err2 != nil {
734 t.Fatalf("NestedPendingOperations failed for operation2. Expected: <no error> Actual: <%v>", err2)
735 }
736
737 opZContinueCh <- true
738 time.Sleep(delay)
739 op2ContinueCh <- true
740 time.Sleep(delay)
741 op1ContinueCh <- true
742 time.Sleep(delay)
743
744 for {
745 err3 := grm.Run(mainVolumeName, "" , node2, volumetypes.GeneratedOperations{OperationFunc: operation3})
746 if err3 == nil {
747 break
748 } else if !exponentialbackoff.IsExponentialBackoff(err3) {
749 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
750 }
751 time.Sleep(reconcilerPeriod)
752 }
753
754 time.Sleep(delay)
755
756 err4 := grm.Run(mainVolumeName, "" , node2, volumetypes.GeneratedOperations{OperationFunc: operation4})
757 if err4 != nil {
758 t.Fatalf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err4)
759 }
760 }
761
762
763
764 func testConcurrentOperationsPositive(
765 t *testing.T,
766 volumeName1 v1.UniqueVolumeName,
767 podName1 volumetypes.UniquePodName,
768 nodeName1 types.NodeName,
769 volumeName2 v1.UniqueVolumeName,
770 podName2 volumetypes.UniquePodName,
771 nodeName2 types.NodeName) {
772
773
774 grm := NewNestedPendingOperations(false )
775 operation1DoneCh := make(chan interface{})
776 operation1 := generateWaitFunc(operation1DoneCh)
777 err1 := grm.Run(volumeName1, podName1, nodeName1 , volumetypes.GeneratedOperations{OperationFunc: operation1})
778 if err1 != nil {
779 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
780 }
781 operation2 := noopFunc
782
783
784 err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2})
785
786
787 if err2 != nil {
788 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
789 }
790 }
791
792
793
794 func testConcurrentOperationsNegative(
795 t *testing.T,
796 volumeName1 v1.UniqueVolumeName,
797 podName1 volumetypes.UniquePodName,
798 nodeName1 types.NodeName,
799 volumeName2 v1.UniqueVolumeName,
800 podName2 volumetypes.UniquePodName,
801 nodeName2 types.NodeName) {
802
803
804 grm := NewNestedPendingOperations(false )
805 operation1DoneCh := make(chan interface{})
806 operation1 := generateWaitFunc(operation1DoneCh)
807 err1 := grm.Run(volumeName1, podName1, nodeName1 , volumetypes.GeneratedOperations{OperationFunc: operation1})
808 if err1 != nil {
809 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
810 }
811 operation2 := noopFunc
812
813
814 err2 := grm.Run(volumeName2, podName2, nodeName2, volumetypes.GeneratedOperations{OperationFunc: operation2})
815
816
817 if err2 == nil {
818 t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist")
819 }
820 if !IsAlreadyExists(err2) {
821 t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err2)
822 }
823 }
824
825
826
827 func generateCallbackFunc(done chan<- interface{}) func() volumetypes.OperationContext {
828 return func() volumetypes.OperationContext {
829 done <- true
830 return volumetypes.NewOperationContext(nil, nil, false)
831 }
832 }
833
834 func generateWaitFunc(done <-chan interface{}) func() volumetypes.OperationContext {
835 return func() volumetypes.OperationContext {
836 <-done
837 return volumetypes.NewOperationContext(nil, nil, false)
838 }
839 }
840
841 func panicFunc() volumetypes.OperationContext {
842 panic("testing panic")
843 }
844
845 func errorFunc() volumetypes.OperationContext {
846 return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false)
847 }
848
849 func generateWaitWithErrorFunc(done <-chan interface{}) func() volumetypes.OperationContext {
850 return func() volumetypes.OperationContext {
851 <-done
852 return volumetypes.NewOperationContext(fmt.Errorf("placeholder1"), fmt.Errorf("placeholder2"), false)
853 }
854 }
855
856 func noopFunc() volumetypes.OperationContext {
857 return volumetypes.NewOperationContext(nil, nil, false)
858 }
859
860 func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
861 backoff := wait.Backoff{
862 Duration: initialDuration,
863 Factor: 3,
864 Jitter: 0,
865 Steps: 4,
866 }
867 return wait.ExponentialBackoff(backoff, fn)
868 }
869
870 func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error {
871 timer := time.NewTimer(timeout)
872 defer timer.Stop()
873
874 select {
875 case <-ch:
876
877 return nil
878 case <-timer.C:
879 return fmt.Errorf("timeout after %v", timeout)
880 }
881 }
882
883 func Test_NestedPendingOperations_OperationExists_PendingFirst(t *testing.T) {
884
885 grm := NewNestedPendingOperations(true )
886 volumeName := v1.UniqueVolumeName("test-volume")
887 podName1 := volumetypes.UniquePodName("pod1")
888 podName2 := volumetypes.UniquePodName("pod2")
889 podName3 := volumetypes.UniquePodName("pod3")
890 podName4 := EmptyUniquePodName
891 nodeName := EmptyNodeName
892
893
894
895 delay := 50 * time.Millisecond
896
897
898 operation1DoneCh := make(chan interface{})
899 operation1 := generateWaitWithErrorFunc(operation1DoneCh)
900 err1 := grm.Run(volumeName, podName1, nodeName , volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"})
901 if err1 != nil {
902 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
903 }
904
905
906 operation2DoneCh := make(chan interface{})
907 operation2 := generateWaitWithErrorFunc(operation2DoneCh)
908 err2 := grm.Run(volumeName, podName2, nodeName , volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"})
909 if err2 != nil {
910 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
911 }
912
913
914 operation3DoneCh := make(chan interface{})
915 operation3 := generateWaitFunc(operation3DoneCh)
916 defer func() {
917 close(operation3DoneCh)
918 }()
919 err3 := grm.Run(volumeName, podName3, nodeName , volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"})
920 if err3 != nil {
921 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
922 }
923
924 operation1DoneCh <- true
925 operation2DoneCh <- true
926 time.Sleep(delay)
927
928
929 operation4DoneCh := make(chan interface{})
930 operation4 := generateWaitFunc(operation4DoneCh)
931 defer func() {
932 close(operation4DoneCh)
933 }()
934 err4 := grm.Run(volumeName, podName4, nodeName , volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"})
935
936
937 if err4 == nil {
938 t.Errorf("NestedPendingOperations did not fail. Expected an operation to already exist")
939 }
940 if !IsAlreadyExists(err4) {
941 t.Errorf("NestedPendingOperations did not return alreadyExistsError, got: %v", err4)
942 }
943 }
944
945 func Test_NestedPendingOperations_OperationExists_ExactMatchFirstNoPending(t *testing.T) {
946
947 grm := NewNestedPendingOperations(true )
948 volumeName := v1.UniqueVolumeName("test-volume")
949 podName1 := volumetypes.UniquePodName("pod1")
950 podName2 := volumetypes.UniquePodName("pod2")
951 podName3 := volumetypes.UniquePodName("pod3")
952 podName4 := EmptyUniquePodName
953 nodeName := EmptyNodeName
954
955
956
957 delay := 50 * time.Millisecond
958 backoffDelay := 500 * time.Millisecond
959
960
961 operation1DoneCh := make(chan interface{})
962 operation1 := generateWaitWithErrorFunc(operation1DoneCh)
963 err1 := grm.Run(volumeName, podName1, nodeName , volumetypes.GeneratedOperations{OperationFunc: operation1, OperationName: "umount"})
964 if err1 != nil {
965 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err1)
966 }
967
968
969 operation2DoneCh := make(chan interface{})
970 operation2 := generateWaitWithErrorFunc(operation2DoneCh)
971 err2 := grm.Run(volumeName, podName2, nodeName , volumetypes.GeneratedOperations{OperationFunc: operation2, OperationName: "umount"})
972 if err2 != nil {
973 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err2)
974 }
975
976
977 operation3DoneCh := make(chan interface{})
978 operation3 := generateWaitWithErrorFunc(operation3DoneCh)
979 err3 := grm.Run(volumeName, podName3, nodeName , volumetypes.GeneratedOperations{OperationFunc: operation3, OperationName: "umount"})
980 if err3 != nil {
981 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err3)
982 }
983
984 operation1DoneCh <- true
985 operation2DoneCh <- true
986 operation3DoneCh <- true
987 time.Sleep(delay)
988
989
990 operation4DoneCh := make(chan interface{})
991 operation4 := generateWaitWithErrorFunc(operation4DoneCh)
992 err4 := grm.Run(volumeName, podName4, nodeName , volumetypes.GeneratedOperations{OperationFunc: operation4, OperationName: "mount"})
993 if err4 != nil {
994 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err4)
995 }
996
997 operation4DoneCh <- true
998
999
1000 time.Sleep(backoffDelay)
1001 operation5 := noopFunc
1002 err5 := grm.Run(volumeName, podName2, nodeName, volumetypes.GeneratedOperations{OperationFunc: operation5, OperationName: "umount"})
1003 if err5 != nil {
1004 t.Errorf("NestedPendingOperations failed. Expected: <no error> Actual: <%v>", err5)
1005 }
1006 time.Sleep(delay)
1007
1008
1009
1010 grm.(*nestedPendingOperations).lock.Lock()
1011 defer grm.(*nestedPendingOperations).lock.Unlock()
1012 for _, op := range grm.(*nestedPendingOperations).operations {
1013 if op.key.podName == podName2 {
1014 t.Errorf("NestedPendingOperations failed. Operation for pod2 should be removed")
1015 }
1016 }
1017
1018 }
1019
View as plain text