1
16
17 package goroutinemap
18
19 import (
20 "fmt"
21 "testing"
22 "time"
23
24 "k8s.io/apimachinery/pkg/util/wait"
25 )
26
27 const (
28
29
30
31 testTimeout time.Duration = 1 * time.Minute
32
33
34
35
36 initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond
37
38
39
40
41 initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
42 )
43
44 func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
45
46 grm := NewGoRoutineMap(false )
47 operationName := "operation-name"
48 operation := func() error { return nil }
49
50
51 err := grm.Run(operationName, operation)
52
53
54 if err != nil {
55 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
56 }
57 }
58
59 func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
60
61 grm := NewGoRoutineMap(false )
62 operation1Name := "operation1-name"
63 operation2Name := "operation2-name"
64 operation := func() error { return nil }
65
66
67 err1 := grm.Run(operation1Name, operation)
68 err2 := grm.Run(operation2Name, operation)
69
70
71 if err1 != nil {
72 t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1Name, err1)
73 }
74
75 if err2 != nil {
76 t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2Name, err2)
77 }
78 }
79
80 func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
81
82 grm := NewGoRoutineMap(true )
83 operationName := "operation-name"
84 operation := func() error { return nil }
85
86
87 err := grm.Run(operationName, operation)
88
89
90 if err != nil {
91 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
92 }
93 }
94
95 func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
96
97 grm := NewGoRoutineMap(false )
98 operationName := "operation-name"
99 operation1DoneCh := make(chan interface{}, 0 )
100 operation1 := generateCallbackFunc(operation1DoneCh)
101 err1 := grm.Run(operationName, operation1)
102 if err1 != nil {
103 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
104 }
105 operation2 := generateNoopFunc()
106 <-operation1DoneCh
107
108
109 err2 := retryWithExponentialBackOff(
110 time.Duration(initialOperationWaitTimeShort),
111 func() (bool, error) {
112 err := grm.Run(operationName, operation2)
113 if err != nil {
114 t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
115 return false, nil
116 }
117 return true, nil
118 },
119 )
120
121
122 if err2 != nil {
123 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
124 }
125 }
126
127 func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
128
129 grm := NewGoRoutineMap(true )
130 operationName := "operation-name"
131 operation1DoneCh := make(chan interface{}, 0 )
132 operation1 := generateCallbackFunc(operation1DoneCh)
133 err1 := grm.Run(operationName, operation1)
134 if err1 != nil {
135 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
136 }
137 operation2 := generateNoopFunc()
138 <-operation1DoneCh
139
140
141 err2 := retryWithExponentialBackOff(
142 time.Duration(initialOperationWaitTimeShort),
143 func() (bool, error) {
144 err := grm.Run(operationName, operation2)
145 if err != nil {
146 t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
147 return false, nil
148 }
149 return true, nil
150 },
151 )
152
153
154 if err2 != nil {
155 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
156 }
157 }
158
159 func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
160
161 grm := NewGoRoutineMap(false )
162 operationName := "operation-name"
163 operation1 := generatePanicFunc()
164 err1 := grm.Run(operationName, operation1)
165 if err1 != nil {
166 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
167 }
168 operation2 := generateNoopFunc()
169
170
171 err2 := retryWithExponentialBackOff(
172 time.Duration(initialOperationWaitTimeShort),
173 func() (bool, error) {
174 err := grm.Run(operationName, operation2)
175 if err != nil {
176 t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
177 return false, nil
178 }
179 return true, nil
180 },
181 )
182
183
184 if err2 != nil {
185 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
186 }
187 }
188
189 func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
190
191 grm := NewGoRoutineMap(true )
192 operationName := "operation-name"
193 operation1 := generatePanicFunc()
194 err1 := grm.Run(operationName, operation1)
195 if err1 != nil {
196 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
197 }
198 operation2 := generateNoopFunc()
199
200
201 err2 := retryWithExponentialBackOff(
202 time.Duration(initialOperationWaitTimeLong),
203 func() (bool, error) {
204 err := grm.Run(operationName, operation2)
205 if err != nil {
206 t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
207 return false, nil
208 }
209 return true, nil
210 },
211 )
212
213
214 if err2 != nil {
215 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
216 }
217 }
218
219 func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
220
221 grm := NewGoRoutineMap(false )
222 operationName := "operation-name"
223 operation1DoneCh := make(chan interface{}, 0 )
224 operation1 := generateWaitFunc(operation1DoneCh)
225 err1 := grm.Run(operationName, operation1)
226 if err1 != nil {
227 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
228 }
229 operation2 := generateNoopFunc()
230
231
232 err2 := grm.Run(operationName, operation2)
233
234
235 if err2 == nil {
236 t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
237 }
238 if !IsAlreadyExists(err2) {
239 t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
240 }
241 }
242
243 func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
244
245 grm := NewGoRoutineMap(true )
246 operationName := "operation-name"
247 operation1DoneCh := make(chan interface{}, 0 )
248 operation1 := generateWaitFunc(operation1DoneCh)
249 err1 := grm.Run(operationName, operation1)
250 if err1 != nil {
251 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
252 }
253 operation2 := generateNoopFunc()
254
255
256 err2 := grm.Run(operationName, operation2)
257
258
259 if err2 == nil {
260 t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
261 }
262 if !IsAlreadyExists(err2) {
263 t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
264 }
265 }
266
267 func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
268
269 grm := NewGoRoutineMap(false )
270 operationName := "operation-name"
271 operation1DoneCh := make(chan interface{}, 0 )
272 operation1 := generateWaitFunc(operation1DoneCh)
273 err1 := grm.Run(operationName, operation1)
274 if err1 != nil {
275 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
276 }
277 operation2 := generateNoopFunc()
278 operation3 := generateNoopFunc()
279
280
281 err2 := grm.Run(operationName, operation2)
282
283
284 if err2 == nil {
285 t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
286 }
287 if !IsAlreadyExists(err2) {
288 t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
289 }
290
291
292 operation1DoneCh <- true
293 err3 := retryWithExponentialBackOff(
294 time.Duration(initialOperationWaitTimeShort),
295 func() (bool, error) {
296 err := grm.Run(operationName, operation3)
297 if err != nil {
298 t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
299 return false, nil
300 }
301 return true, nil
302 },
303 )
304
305
306 if err3 != nil {
307 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
308 }
309 }
310
311 func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
312
313 grm := NewGoRoutineMap(true )
314 operationName := "operation-name"
315 operation1DoneCh := make(chan interface{}, 0 )
316 operation1 := generateWaitFunc(operation1DoneCh)
317 err1 := grm.Run(operationName, operation1)
318 if err1 != nil {
319 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
320 }
321 operation2 := generateNoopFunc()
322 operation3 := generateNoopFunc()
323
324
325 err2 := grm.Run(operationName, operation2)
326
327
328 if err2 == nil {
329 t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
330 }
331 if !IsAlreadyExists(err2) {
332 t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
333 }
334
335
336 operation1DoneCh <- true
337 err3 := retryWithExponentialBackOff(
338 time.Duration(initialOperationWaitTimeShort),
339 func() (bool, error) {
340 err := grm.Run(operationName, operation3)
341 if err != nil {
342 t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
343 return false, nil
344 }
345 return true, nil
346 },
347 )
348
349
350 if err3 != nil {
351 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
352 }
353 }
354
355 func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) {
356
357
358 grm := NewGoRoutineMap(false )
359
360
361 waitDoneCh := make(chan interface{}, 1)
362 go func() {
363 grm.Wait()
364 waitDoneCh <- true
365 }()
366
367
368 err := waitChannelWithTimeout(waitDoneCh, testTimeout)
369 if err != nil {
370 t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
371 }
372 }
373
374 func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
375
376
377 grm := NewGoRoutineMap(true )
378
379
380 waitDoneCh := make(chan interface{}, 1)
381 go func() {
382 grm.Wait()
383 waitDoneCh <- true
384 }()
385
386
387 err := waitChannelWithTimeout(waitDoneCh, testTimeout)
388 if err != nil {
389 t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
390 }
391 }
392
393 func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
394
395
396 grm := NewGoRoutineMap(false )
397 operationName := "operation-name"
398 operation1DoneCh := make(chan interface{}, 0 )
399 operation1 := generateWaitFunc(operation1DoneCh)
400 err := grm.Run(operationName, operation1)
401 if err != nil {
402 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
403 }
404
405
406 waitDoneCh := make(chan interface{}, 1)
407 go func() {
408 grm.Wait()
409 waitDoneCh <- true
410 }()
411
412
413 operation1DoneCh <- true
414
415
416 err = waitChannelWithTimeout(waitDoneCh, testTimeout)
417 if err != nil {
418 t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
419 }
420 }
421
422 func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
423
424
425 grm := NewGoRoutineMap(true )
426 operationName := "operation-name"
427 operation1DoneCh := make(chan interface{}, 0 )
428 operation1 := generateWaitFunc(operation1DoneCh)
429 err := grm.Run(operationName, operation1)
430 if err != nil {
431 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
432 }
433
434
435 waitDoneCh := make(chan interface{}, 1)
436 go func() {
437 grm.Wait()
438 waitDoneCh <- true
439 }()
440
441
442 operation1DoneCh <- true
443
444
445 err = waitChannelWithTimeout(waitDoneCh, testTimeout)
446 if err != nil {
447 t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
448 }
449 }
450
451 func Test_NewGoRoutineMap_WaitForCompletionWithExpBackoff(t *testing.T) {
452 grm := NewGoRoutineMap(true )
453 operationName := "operation-err"
454
455 operation1DoneCh := make(chan interface{}, 0 )
456 operation1 := generateErrorFunc(operation1DoneCh)
457 err := grm.Run(operationName, operation1)
458 if err != nil {
459 t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
460 }
461
462
463 waitDoneCh := make(chan interface{}, 1)
464 go func() {
465 grm.WaitForCompletion()
466 waitDoneCh <- true
467 }()
468
469
470 operation1DoneCh <- true
471
472
473 err = waitChannelWithTimeout(waitDoneCh, testTimeout)
474 if err != nil {
475 t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
476 }
477 }
478
479 func generateCallbackFunc(done chan<- interface{}) func() error {
480 return func() error {
481 done <- true
482 return nil
483 }
484 }
485
486 func generateErrorFunc(done <-chan interface{}) func() error {
487 return func() error {
488 <-done
489 return fmt.Errorf("Generic error")
490 }
491 }
492
493 func generateWaitFunc(done <-chan interface{}) func() error {
494 return func() error {
495 <-done
496 return nil
497 }
498 }
499
500 func generatePanicFunc() func() error {
501 return func() error {
502 panic("testing panic")
503 }
504 }
505
506 func generateNoopFunc() func() error {
507 return func() error { return nil }
508 }
509
510 func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
511 backoff := wait.Backoff{
512 Duration: initialDuration,
513 Factor: 3,
514 Jitter: 0,
515 Steps: 4,
516 }
517 return wait.ExponentialBackoff(backoff, fn)
518 }
519
520 func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error {
521 timer := time.NewTimer(timeout)
522 defer timer.Stop()
523
524 select {
525 case <-ch:
526
527 return nil
528 case <-timer.C:
529 return fmt.Errorf("timeout after %v", timeout)
530 }
531 }
532
View as plain text