...
1 package stream
2
3 import (
4 "fmt"
5 "sync/atomic"
6 "testing"
7 "time"
8
9 "github.com/stretchr/testify/require"
10 )
11
12 func ExampleStream() {
13 times := []int{20, 52, 16, 45, 4, 80}
14
15 stream := New()
16 for _, millis := range times {
17 dur := time.Duration(millis) * time.Millisecond
18 stream.Go(func() Callback {
19 time.Sleep(dur)
20
21 return func() { fmt.Println(dur) }
22 })
23 }
24 stream.Wait()
25
26
27
28
29
30
31
32
33 }
34
35 func TestStream(t *testing.T) {
36 t.Parallel()
37
38 t.Run("simple", func(t *testing.T) {
39 t.Parallel()
40 s := New()
41 var res []int
42 for i := 0; i < 5; i++ {
43 i := i
44 s.Go(func() Callback {
45 i *= 2
46 return func() {
47 res = append(res, i)
48 }
49 })
50 }
51 s.Wait()
52 require.Equal(t, []int{0, 2, 4, 6, 8}, res)
53 })
54
55 t.Run("max goroutines", func(t *testing.T) {
56 t.Parallel()
57 s := New().WithMaxGoroutines(5)
58 var currentTaskCount atomic.Int64
59 var currentCallbackCount atomic.Int64
60 for i := 0; i < 50; i++ {
61 s.Go(func() Callback {
62 curr := currentTaskCount.Add(1)
63 if curr > 5 {
64 t.Fatal("too many concurrent tasks being executed")
65 }
66 defer currentTaskCount.Add(-1)
67
68 time.Sleep(time.Millisecond)
69
70 return func() {
71 curr := currentCallbackCount.Add(1)
72 if curr > 1 {
73 t.Fatal("too many concurrent callbacks being executed")
74 }
75
76 time.Sleep(time.Millisecond)
77
78 defer currentCallbackCount.Add(-1)
79 }
80 })
81 }
82 s.Wait()
83 })
84
85 t.Run("panic in task is propagated", func(t *testing.T) {
86 t.Parallel()
87 s := New().WithMaxGoroutines(5)
88 s.Go(func() Callback {
89 panic("something really bad happened in the task")
90 })
91 require.Panics(t, s.Wait)
92 })
93
94 t.Run("panic in callback is propagated", func(t *testing.T) {
95 t.Parallel()
96 s := New().WithMaxGoroutines(5)
97 s.Go(func() Callback {
98 return func() {
99 panic("something really bad happened in the callback")
100 }
101 })
102 require.Panics(t, s.Wait)
103 })
104
105 t.Run("panic in callback does not block producers", func(t *testing.T) {
106 t.Parallel()
107 s := New().WithMaxGoroutines(5)
108 s.Go(func() Callback {
109 return func() {
110 panic("something really bad happened in the callback")
111 }
112 })
113 for i := 0; i < 100; i++ {
114 s.Go(func() Callback {
115 return func() {}
116 })
117 }
118 require.Panics(t, s.Wait)
119 })
120 }
121
122 func BenchmarkStream(b *testing.B) {
123 b.Run("startup and teardown", func(b *testing.B) {
124 for i := 0; i < b.N; i++ {
125 s := New()
126 s.Go(func() Callback { return func() {} })
127 s.Wait()
128 }
129 })
130
131 b.Run("per task", func(b *testing.B) {
132 n := 0
133 s := New()
134 for i := 0; i < b.N; i++ {
135 s.Go(func() Callback {
136 return func() {
137 n += 1
138 }
139 })
140 }
141 s.Wait()
142 })
143 }
144
View as plain text