1
2
3
4
5 package singleflight
6
7 import (
8 "bytes"
9 "errors"
10 "fmt"
11 "os"
12 "os/exec"
13 "runtime"
14 "runtime/debug"
15 "strings"
16 "sync"
17 "sync/atomic"
18 "testing"
19 "time"
20 )
21
22 func TestDo(t *testing.T) {
23 var g Group
24 v, err, _ := g.Do("key", func() (interface{}, error) {
25 return "bar", nil
26 })
27 if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
28 t.Errorf("Do = %v; want %v", got, want)
29 }
30 if err != nil {
31 t.Errorf("Do error = %v", err)
32 }
33 }
34
35 func TestDoErr(t *testing.T) {
36 var g Group
37 someErr := errors.New("Some error")
38 v, err, _ := g.Do("key", func() (interface{}, error) {
39 return nil, someErr
40 })
41 if err != someErr {
42 t.Errorf("Do error = %v; want someErr %v", err, someErr)
43 }
44 if v != nil {
45 t.Errorf("unexpected non-nil value %#v", v)
46 }
47 }
48
49 func TestDoDupSuppress(t *testing.T) {
50 var g Group
51 var wg1, wg2 sync.WaitGroup
52 c := make(chan string, 1)
53 var calls int32
54 fn := func() (interface{}, error) {
55 if atomic.AddInt32(&calls, 1) == 1 {
56
57 wg1.Done()
58 }
59 v := <-c
60 c <- v
61
62 time.Sleep(10 * time.Millisecond)
63
64 return v, nil
65 }
66
67 const n = 10
68 wg1.Add(1)
69 for i := 0; i < n; i++ {
70 wg1.Add(1)
71 wg2.Add(1)
72 go func() {
73 defer wg2.Done()
74 wg1.Done()
75 v, err, _ := g.Do("key", fn)
76 if err != nil {
77 t.Errorf("Do error: %v", err)
78 return
79 }
80 if s, _ := v.(string); s != "bar" {
81 t.Errorf("Do = %T %v; want %q", v, v, "bar")
82 }
83 }()
84 }
85 wg1.Wait()
86
87
88 c <- "bar"
89 wg2.Wait()
90 if got := atomic.LoadInt32(&calls); got <= 0 || got >= n {
91 t.Errorf("number of calls = %d; want over 0 and less than %d", got, n)
92 }
93 }
94
95
96
97 func TestForget(t *testing.T) {
98 var g Group
99
100 var (
101 firstStarted = make(chan struct{})
102 unblockFirst = make(chan struct{})
103 firstFinished = make(chan struct{})
104 )
105
106 go func() {
107 g.Do("key", func() (i interface{}, e error) {
108 close(firstStarted)
109 <-unblockFirst
110 close(firstFinished)
111 return
112 })
113 }()
114 <-firstStarted
115 g.Forget("key")
116
117 unblockSecond := make(chan struct{})
118 secondResult := g.DoChan("key", func() (i interface{}, e error) {
119 <-unblockSecond
120 return 2, nil
121 })
122
123 close(unblockFirst)
124 <-firstFinished
125
126 thirdResult := g.DoChan("key", func() (i interface{}, e error) {
127 return 3, nil
128 })
129
130 close(unblockSecond)
131 <-secondResult
132 r := <-thirdResult
133 if r.Val != 2 {
134 t.Errorf("We should receive result produced by second call, expected: 2, got %d", r.Val)
135 }
136 }
137
138 func TestDoChan(t *testing.T) {
139 var g Group
140 ch := g.DoChan("key", func() (interface{}, error) {
141 return "bar", nil
142 })
143
144 res := <-ch
145 v := res.Val
146 err := res.Err
147 if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
148 t.Errorf("Do = %v; want %v", got, want)
149 }
150 if err != nil {
151 t.Errorf("Do error = %v", err)
152 }
153 }
154
155
156
157 func TestPanicDo(t *testing.T) {
158 var g Group
159 fn := func() (interface{}, error) {
160 panic("invalid memory address or nil pointer dereference")
161 }
162
163 const n = 5
164 waited := int32(n)
165 panicCount := int32(0)
166 done := make(chan struct{})
167 for i := 0; i < n; i++ {
168 go func() {
169 defer func() {
170 if err := recover(); err != nil {
171 t.Logf("Got panic: %v\n%s", err, debug.Stack())
172 atomic.AddInt32(&panicCount, 1)
173 }
174
175 if atomic.AddInt32(&waited, -1) == 0 {
176 close(done)
177 }
178 }()
179
180 g.Do("key", fn)
181 }()
182 }
183
184 select {
185 case <-done:
186 if panicCount != n {
187 t.Errorf("Expect %d panic, but got %d", n, panicCount)
188 }
189 case <-time.After(time.Second):
190 t.Fatalf("Do hangs")
191 }
192 }
193
194 func TestGoexitDo(t *testing.T) {
195 var g Group
196 fn := func() (interface{}, error) {
197 runtime.Goexit()
198 return nil, nil
199 }
200
201 const n = 5
202 waited := int32(n)
203 done := make(chan struct{})
204 for i := 0; i < n; i++ {
205 go func() {
206 var err error
207 defer func() {
208 if err != nil {
209 t.Errorf("Error should be nil, but got: %v", err)
210 }
211 if atomic.AddInt32(&waited, -1) == 0 {
212 close(done)
213 }
214 }()
215 _, err, _ = g.Do("key", fn)
216 }()
217 }
218
219 select {
220 case <-done:
221 case <-time.After(time.Second):
222 t.Fatalf("Do hangs")
223 }
224 }
225
226 func TestPanicDoChan(t *testing.T) {
227 if runtime.GOOS == "js" {
228 t.Skipf("js does not support exec")
229 }
230
231 if os.Getenv("TEST_PANIC_DOCHAN") != "" {
232 defer func() {
233 recover()
234 }()
235
236 g := new(Group)
237 ch := g.DoChan("", func() (interface{}, error) {
238 panic("Panicking in DoChan")
239 })
240 <-ch
241 t.Fatalf("DoChan unexpectedly returned")
242 }
243
244 t.Parallel()
245
246 cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v")
247 cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1")
248 out := new(bytes.Buffer)
249 cmd.Stdout = out
250 cmd.Stderr = out
251 if err := cmd.Start(); err != nil {
252 t.Fatal(err)
253 }
254
255 err := cmd.Wait()
256 t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out)
257 if err == nil {
258 t.Errorf("Test subprocess passed; want a crash due to panic in DoChan")
259 }
260 if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) {
261 t.Errorf("Test subprocess failed with an unexpected failure mode.")
262 }
263 if !bytes.Contains(out.Bytes(), []byte("Panicking in DoChan")) {
264 t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in DoChan")
265 }
266 }
267
268 func TestPanicDoSharedByDoChan(t *testing.T) {
269 if runtime.GOOS == "js" {
270 t.Skipf("js does not support exec")
271 }
272
273 if os.Getenv("TEST_PANIC_DOCHAN") != "" {
274 blocked := make(chan struct{})
275 unblock := make(chan struct{})
276
277 g := new(Group)
278 go func() {
279 defer func() {
280 recover()
281 }()
282 g.Do("", func() (interface{}, error) {
283 close(blocked)
284 <-unblock
285 panic("Panicking in Do")
286 })
287 }()
288
289 <-blocked
290 ch := g.DoChan("", func() (interface{}, error) {
291 panic("DoChan unexpectedly executed callback")
292 })
293 close(unblock)
294 <-ch
295 t.Fatalf("DoChan unexpectedly returned")
296 }
297
298 t.Parallel()
299
300 cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v")
301 cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1")
302 out := new(bytes.Buffer)
303 cmd.Stdout = out
304 cmd.Stderr = out
305 if err := cmd.Start(); err != nil {
306 t.Fatal(err)
307 }
308
309 err := cmd.Wait()
310 t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out)
311 if err == nil {
312 t.Errorf("Test subprocess passed; want a crash due to panic in Do shared by DoChan")
313 }
314 if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) {
315 t.Errorf("Test subprocess failed with an unexpected failure mode.")
316 }
317 if !bytes.Contains(out.Bytes(), []byte("Panicking in Do")) {
318 t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in Do")
319 }
320 }
321
View as plain text