...
1 package pool
2
3 import (
4 "fmt"
5 "strconv"
6 "sync/atomic"
7 "testing"
8 "time"
9
10 "github.com/stretchr/testify/require"
11 )
12
13 func ExamplePool() {
14 p := New().WithMaxGoroutines(3)
15 for i := 0; i < 5; i++ {
16 p.Go(func() {
17 fmt.Println("conc")
18 })
19 }
20 p.Wait()
21
22
23
24
25
26
27 }
28
29 func TestPool(t *testing.T) {
30 t.Parallel()
31
32 t.Run("basic", func(t *testing.T) {
33 t.Parallel()
34
35 g := New()
36 var completed atomic.Int64
37 for i := 0; i < 100; i++ {
38 g.Go(func() {
39 time.Sleep(1 * time.Millisecond)
40 completed.Add(1)
41 })
42 }
43 g.Wait()
44 require.Equal(t, completed.Load(), int64(100))
45 })
46
47 t.Run("panics on configuration after init", func(t *testing.T) {
48 t.Run("before wait", func(t *testing.T) {
49 t.Parallel()
50 g := New()
51 g.Go(func() {})
52 require.Panics(t, func() { g.WithMaxGoroutines(10) })
53 })
54
55 t.Run("after wait", func(t *testing.T) {
56 t.Parallel()
57 g := New()
58 g.Go(func() {})
59 g.Wait()
60 require.Panics(t, func() { g.WithMaxGoroutines(10) })
61 })
62 })
63
64 t.Run("limit", func(t *testing.T) {
65 t.Parallel()
66 for _, maxConcurrent := range []int{1, 10, 100} {
67 t.Run(strconv.Itoa(maxConcurrent), func(t *testing.T) {
68 g := New().WithMaxGoroutines(maxConcurrent)
69
70 var currentConcurrent atomic.Int64
71 var errCount atomic.Int64
72 taskCount := maxConcurrent * 10
73 for i := 0; i < taskCount; i++ {
74 g.Go(func() {
75 cur := currentConcurrent.Add(1)
76 if cur > int64(maxConcurrent) {
77 errCount.Add(1)
78 }
79 time.Sleep(time.Millisecond)
80 currentConcurrent.Add(-1)
81 })
82 }
83 g.Wait()
84 require.Equal(t, int64(0), errCount.Load())
85 require.Equal(t, int64(0), currentConcurrent.Load())
86 })
87 }
88 })
89
90 t.Run("propagate panic", func(t *testing.T) {
91 t.Parallel()
92 g := New()
93 for i := 0; i < 10; i++ {
94 i := i
95 g.Go(func() {
96 if i == 5 {
97 panic(i)
98 }
99 })
100 }
101 require.Panics(t, g.Wait)
102 })
103
104 t.Run("panics do not exhaust goroutines", func(t *testing.T) {
105 t.Parallel()
106 g := New().WithMaxGoroutines(2)
107 for i := 0; i < 10; i++ {
108 g.Go(func() {
109 panic(42)
110 })
111 }
112 require.Panics(t, g.Wait)
113 })
114
115 t.Run("panics on invalid WithMaxGoroutines", func(t *testing.T) {
116 t.Parallel()
117 require.Panics(t, func() { New().WithMaxGoroutines(0) })
118 })
119
120 t.Run("returns correct MaxGoroutines", func(t *testing.T) {
121 t.Parallel()
122 p := New().WithMaxGoroutines(42)
123 require.Equal(t, 42, p.MaxGoroutines())
124 })
125 }
126
127 func BenchmarkPool(b *testing.B) {
128 b.Run("startup and teardown", func(b *testing.B) {
129 for i := 0; i < b.N; i++ {
130 p := New()
131 p.Go(func() {})
132 p.Wait()
133 }
134 })
135
136 b.Run("per task", func(b *testing.B) {
137 p := New()
138 f := func() {}
139 for i := 0; i < b.N; i++ {
140 p.Go(f)
141 }
142 p.Wait()
143 })
144 }
145
View as plain text