...
1 package pool
2
3 import (
4 "fmt"
5 "sort"
6 "strconv"
7 "sync/atomic"
8 "testing"
9 "time"
10
11 "github.com/stretchr/testify/require"
12 )
13
14 func ExampleResultPool() {
15 p := NewWithResults[int]()
16 for i := 0; i < 10; i++ {
17 i := i
18 p.Go(func() int {
19 return i * 2
20 })
21 }
22 res := p.Wait()
23
24 sort.Ints(res)
25 fmt.Println(res)
26
27
28
29 }
30
31 func TestResultGroup(t *testing.T) {
32 t.Parallel()
33
34 t.Run("panics on configuration after init", func(t *testing.T) {
35 t.Run("before wait", func(t *testing.T) {
36 t.Parallel()
37 g := NewWithResults[int]()
38 g.Go(func() int { return 0 })
39 require.Panics(t, func() { g.WithMaxGoroutines(10) })
40 })
41
42 t.Run("after wait", func(t *testing.T) {
43 t.Parallel()
44 g := NewWithResults[int]()
45 g.Go(func() int { return 0 })
46 require.Panics(t, func() { g.WithMaxGoroutines(10) })
47 })
48 })
49
50 t.Run("basic", func(t *testing.T) {
51 t.Parallel()
52 g := NewWithResults[int]()
53 expected := []int{}
54 for i := 0; i < 100; i++ {
55 i := i
56 expected = append(expected, i)
57 g.Go(func() int {
58 return i
59 })
60 }
61 res := g.Wait()
62 sort.Ints(res)
63 require.Equal(t, expected, res)
64 })
65
66 t.Run("limit", func(t *testing.T) {
67 t.Parallel()
68 for _, maxGoroutines := range []int{1, 10, 100} {
69 t.Run(strconv.Itoa(maxGoroutines), func(t *testing.T) {
70 g := NewWithResults[int]().WithMaxGoroutines(maxGoroutines)
71
72 var currentConcurrent atomic.Int64
73 var errCount atomic.Int64
74 taskCount := maxGoroutines * 10
75 expected := make([]int, taskCount)
76 for i := 0; i < taskCount; i++ {
77 i := i
78 expected[i] = i
79 g.Go(func() int {
80 cur := currentConcurrent.Add(1)
81 if cur > int64(maxGoroutines) {
82 errCount.Add(1)
83 }
84 time.Sleep(time.Millisecond)
85 currentConcurrent.Add(-1)
86 return i
87 })
88 }
89 res := g.Wait()
90 sort.Ints(res)
91 require.Equal(t, expected, res)
92 require.Equal(t, int64(0), errCount.Load())
93 require.Equal(t, int64(0), currentConcurrent.Load())
94 })
95 }
96 })
97 }
98
View as plain text