...
1 package pool
2
3 import (
4 "errors"
5 "fmt"
6 "strconv"
7 "sync/atomic"
8 "testing"
9 "time"
10
11 "github.com/stretchr/testify/require"
12 )
13
14 func ExampleErrorPool() {
15 p := New().WithErrors()
16 for i := 0; i < 3; i++ {
17 i := i
18 p.Go(func() error {
19 if i == 2 {
20 return errors.New("oh no!")
21 }
22 return nil
23 })
24 }
25 err := p.Wait()
26 fmt.Println(err)
27
28
29 }
30
31 func TestErrorPool(t *testing.T) {
32 t.Parallel()
33
34 err1 := errors.New("err1")
35 err2 := errors.New("err2")
36
37 t.Run("panics on configuration after init", func(t *testing.T) {
38 t.Run("before wait", func(t *testing.T) {
39 t.Parallel()
40 g := New().WithErrors()
41 g.Go(func() error { return nil })
42 require.Panics(t, func() { g.WithMaxGoroutines(10) })
43 })
44
45 t.Run("after wait", func(t *testing.T) {
46 t.Parallel()
47 g := New().WithErrors()
48 g.Go(func() error { return nil })
49 _ = g.Wait()
50 require.Panics(t, func() { g.WithMaxGoroutines(10) })
51 })
52 })
53
54 t.Run("wait returns no error if no errors", func(t *testing.T) {
55 t.Parallel()
56 g := New().WithErrors()
57 g.Go(func() error { return nil })
58 require.NoError(t, g.Wait())
59 })
60
61 t.Run("wait error if func returns error", func(t *testing.T) {
62 t.Parallel()
63 g := New().WithErrors()
64 g.Go(func() error { return err1 })
65 require.ErrorIs(t, g.Wait(), err1)
66 })
67
68 t.Run("wait error is all returned errors", func(t *testing.T) {
69 t.Parallel()
70 g := New().WithErrors()
71 g.Go(func() error { return err1 })
72 g.Go(func() error { return nil })
73 g.Go(func() error { return err2 })
74 err := g.Wait()
75 require.ErrorIs(t, err, err1)
76 require.ErrorIs(t, err, err2)
77 })
78
79 t.Run("propagates panics", func(t *testing.T) {
80 t.Parallel()
81 g := New().WithErrors()
82 for i := 0; i < 10; i++ {
83 i := i
84 g.Go(func() error {
85 if i == 5 {
86 panic("fatal")
87 }
88 return nil
89 })
90 }
91 require.Panics(t, func() { _ = g.Wait() })
92 })
93
94 t.Run("limit", func(t *testing.T) {
95 t.Parallel()
96 for _, maxGoroutines := range []int{1, 10, 100} {
97 t.Run(strconv.Itoa(maxGoroutines), func(t *testing.T) {
98 g := New().WithErrors().WithMaxGoroutines(maxGoroutines)
99
100 var currentConcurrent atomic.Int64
101 taskCount := maxGoroutines * 10
102 for i := 0; i < taskCount; i++ {
103 g.Go(func() error {
104 cur := currentConcurrent.Add(1)
105 if cur > int64(maxGoroutines) {
106 return fmt.Errorf("expected no more than %d concurrent goroutine", maxGoroutines)
107 }
108 time.Sleep(time.Millisecond)
109 currentConcurrent.Add(-1)
110 return nil
111 })
112 }
113 require.NoError(t, g.Wait())
114 require.Equal(t, int64(0), currentConcurrent.Load())
115 })
116 }
117 })
118 }
119
View as plain text