1
16
17 package async
18
19 import (
20 "sync"
21 "testing"
22 "time"
23 )
24
25
26 type receiver struct {
27 lock sync.Mutex
28 run bool
29 retryFn func()
30 }
31
32 func (r *receiver) F() {
33 r.lock.Lock()
34 defer r.lock.Unlock()
35 r.run = true
36
37 if r.retryFn != nil {
38 r.retryFn()
39 r.retryFn = nil
40 }
41 }
42
43 func (r *receiver) reset() bool {
44 r.lock.Lock()
45 defer r.lock.Unlock()
46 was := r.run
47 r.run = false
48 return was
49 }
50
51 func (r *receiver) setRetryFn(retryFn func()) {
52 r.lock.Lock()
53 defer r.lock.Unlock()
54 r.retryFn = retryFn
55 }
56
57
58 type timerUpdate struct {
59 active bool
60 next time.Duration
61 }
62
63
64 type fakeTimer struct {
65 c chan time.Time
66
67 lock sync.Mutex
68 now time.Time
69 timeout time.Time
70 active bool
71
72 updated chan timerUpdate
73 }
74
75 func newFakeTimer() *fakeTimer {
76 ft := &fakeTimer{
77 now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC),
78 c: make(chan time.Time),
79 updated: make(chan timerUpdate),
80 }
81 return ft
82 }
83
84 func (ft *fakeTimer) C() <-chan time.Time {
85 return ft.c
86 }
87
88 func (ft *fakeTimer) Reset(in time.Duration) bool {
89 ft.lock.Lock()
90 defer ft.lock.Unlock()
91
92 was := ft.active
93 ft.active = true
94 ft.timeout = ft.now.Add(in)
95 ft.updated <- timerUpdate{
96 active: true,
97 next: in,
98 }
99 return was
100 }
101
102 func (ft *fakeTimer) Stop() bool {
103 ft.lock.Lock()
104 defer ft.lock.Unlock()
105
106 was := ft.active
107 ft.active = false
108 ft.updated <- timerUpdate{
109 active: false,
110 }
111 return was
112 }
113
114 func (ft *fakeTimer) Now() time.Time {
115 ft.lock.Lock()
116 defer ft.lock.Unlock()
117
118 return ft.now
119 }
120
121 func (ft *fakeTimer) Remaining() time.Duration {
122 ft.lock.Lock()
123 defer ft.lock.Unlock()
124
125 return ft.timeout.Sub(ft.now)
126 }
127
128 func (ft *fakeTimer) Since(t time.Time) time.Duration {
129 ft.lock.Lock()
130 defer ft.lock.Unlock()
131
132 return ft.now.Sub(t)
133 }
134
135 func (ft *fakeTimer) Sleep(d time.Duration) {
136
137 ft.advance(d)
138 }
139
140
141 func (ft *fakeTimer) advance(d time.Duration) {
142 ft.lock.Lock()
143 defer ft.lock.Unlock()
144
145 ft.now = ft.now.Add(d)
146 if ft.active && !ft.now.Before(ft.timeout) {
147 ft.active = false
148 ft.c <- ft.timeout
149 }
150 }
151
152
153
154 func checkTimer(name string, t *testing.T, upd timerUpdate, active bool, next time.Duration) {
155 if upd.active != active {
156 t.Fatalf("%s: expected timer active=%v", name, active)
157 }
158 if active && upd.next != next {
159 t.Fatalf("%s: expected timer to be %v, got %v", name, next, upd.next)
160 }
161 }
162
163
164 func checkReceiver(name string, t *testing.T, receiver *receiver, expected bool) {
165 triggered := receiver.reset()
166 if expected && !triggered {
167 t.Fatalf("%s: function should have been called", name)
168 } else if !expected && triggered {
169 t.Fatalf("%s: function should not have been called", name)
170 }
171 }
172
173
174 var minInterval = 1 * time.Second
175 var maxInterval = 10 * time.Second
176
177 func waitForReset(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectCall bool, expectNext time.Duration) {
178 upd := <-timer.updated
179 checkReceiver(name, t, obj, expectCall)
180 checkReceiver(name, t, obj, false)
181 checkTimer(name, t, upd, false, 0)
182 upd = <-timer.updated
183 checkTimer(name, t, upd, true, expectNext)
184 }
185
186 func waitForRun(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
187 waitForReset(name, t, timer, obj, true, maxInterval)
188 }
189
190 func waitForRunWithRetry(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
191
192 waitForRun(name, t, timer, obj)
193 waitForReset(name, t, timer, obj, false, expectNext)
194 }
195
196 func waitForDefer(name string, t *testing.T, timer *fakeTimer, obj *receiver, expectNext time.Duration) {
197 waitForReset(name, t, timer, obj, false, expectNext)
198 }
199
200 func waitForNothing(name string, t *testing.T, timer *fakeTimer, obj *receiver) {
201 select {
202 case <-timer.c:
203 t.Fatalf("%s: unexpected timer tick", name)
204 case upd := <-timer.updated:
205 t.Fatalf("%s: unexpected timer update %v", name, upd)
206 default:
207 }
208 checkReceiver(name, t, obj, false)
209 }
210
211 func Test_BoundedFrequencyRunnerNoBurst(t *testing.T) {
212 obj := &receiver{}
213 timer := newFakeTimer()
214 runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
215 stop := make(chan struct{})
216
217 var upd timerUpdate
218
219
220 go runner.Loop(stop)
221 upd = <-timer.updated
222 checkTimer("init", t, upd, true, maxInterval)
223 checkReceiver("init", t, obj, false)
224
225
226
227 runner.Run()
228 waitForRun("first run", t, timer, obj)
229
230
231 timer.advance(500 * time.Millisecond)
232 runner.Run()
233 waitForDefer("too soon after first", t, timer, obj, 500*time.Millisecond)
234
235
236 timer.advance(499 * time.Millisecond)
237 runner.Run()
238 waitForDefer("still too soon after first", t, timer, obj, 1*time.Millisecond)
239
240
241 timer.advance(1 * time.Millisecond)
242 waitForRun("second run", t, timer, obj)
243
244
245 runner.Run()
246 waitForDefer("too soon after second", t, timer, obj, 1*time.Second)
247
248
249 timer.advance(1 * time.Millisecond)
250 runner.Run()
251 waitForDefer("still too soon after second", t, timer, obj, 999*time.Millisecond)
252
253
254 timer.advance(998 * time.Millisecond)
255 waitForNothing("premature", t, timer, obj)
256
257
258 timer.advance(1 * time.Millisecond)
259 waitForRun("third run", t, timer, obj)
260
261
262 timer.advance(1 * time.Second)
263 waitForNothing("minInterval", t, timer, obj)
264
265
266 timer.advance(9 * time.Second)
267 waitForRun("maxInterval", t, timer, obj)
268
269
270 timer.advance(1 * time.Millisecond)
271 runner.Run()
272 waitForDefer("too soon after maxInterval run", t, timer, obj, 999*time.Millisecond)
273
274
275 timer.advance(999 * time.Millisecond)
276 waitForRun("fifth run", t, timer, obj)
277
278
279 stop <- struct{}{}
280
281
282 <-timer.updated
283 }
284
285 func Test_BoundedFrequencyRunnerBurst(t *testing.T) {
286 obj := &receiver{}
287 timer := newFakeTimer()
288 runner := construct("test-runner", obj.F, minInterval, maxInterval, 2, timer)
289 stop := make(chan struct{})
290
291 var upd timerUpdate
292
293
294 go runner.Loop(stop)
295 upd = <-timer.updated
296 checkTimer("init", t, upd, true, maxInterval)
297 checkReceiver("init", t, obj, false)
298
299
300
301 runner.Run()
302 waitForRun("first run", t, timer, obj)
303
304
305 timer.advance(1 * time.Millisecond)
306 runner.Run()
307 waitForRun("second run", t, timer, obj)
308
309
310 timer.advance(498 * time.Millisecond)
311 runner.Run()
312 waitForDefer("too soon after second", t, timer, obj, 502*time.Millisecond)
313
314
315 timer.advance(1 * time.Millisecond)
316 runner.Run()
317 waitForDefer("too soon after second 2", t, timer, obj, 501*time.Millisecond)
318
319
320 timer.advance(1 * time.Millisecond)
321 runner.Run()
322 waitForDefer("too soon after second 3", t, timer, obj, 500*time.Millisecond)
323
324
325
326 timer.advance(499 * time.Millisecond)
327 waitForNothing("not minInterval", t, timer, obj)
328 runner.Run()
329 waitForRun("third run", t, timer, obj)
330
331
332 timer.advance(1 * time.Millisecond)
333 runner.Run()
334 waitForDefer("too soon after third", t, timer, obj, 999*time.Millisecond)
335
336
337 timer.advance(998 * time.Millisecond)
338 runner.Run()
339 waitForDefer("too soon after third 2", t, timer, obj, 1*time.Millisecond)
340
341
342 timer.advance(1 * time.Millisecond)
343 waitForRun("fourth run", t, timer, obj)
344
345
346 timer.advance(2 * time.Second)
347 runner.Run()
348 waitForRun("fifth run", t, timer, obj)
349 runner.Run()
350 waitForRun("sixth run", t, timer, obj)
351 runner.Run()
352 waitForDefer("too soon after sixth", t, timer, obj, 1*time.Second)
353
354
355 timer.advance(1 * time.Second)
356 waitForRun("seventh run", t, timer, obj)
357
358
359 timer.advance(10 * time.Second)
360 waitForRun("maxInterval", t, timer, obj)
361
362
363 stop <- struct{}{}
364
365
366 <-timer.updated
367 }
368
369 func Test_BoundedFrequencyRunnerRetryAfter(t *testing.T) {
370 obj := &receiver{}
371 timer := newFakeTimer()
372 runner := construct("test-runner", obj.F, minInterval, maxInterval, 1, timer)
373 stop := make(chan struct{})
374
375 var upd timerUpdate
376
377
378 go runner.Loop(stop)
379 upd = <-timer.updated
380 checkTimer("init", t, upd, true, maxInterval)
381 checkReceiver("init", t, obj, false)
382
383
384
385 obj.setRetryFn(func() { runner.RetryAfter(5 * time.Second) })
386 runner.Run()
387 waitForRunWithRetry("first run", t, timer, obj, 5*time.Second)
388
389
390 timer.advance(time.Second)
391 waitForNothing("minInterval, nothing queued", t, timer, obj)
392
393
394 timer.advance(4 * time.Second)
395 waitForRun("retry", t, timer, obj)
396
397
398 timer.advance(499 * time.Millisecond)
399 runner.Run()
400 waitForDefer("too soon after retry", t, timer, obj, 501*time.Millisecond)
401
402
403 timer.advance(501 * time.Millisecond)
404 runner.RetryAfter(5 * time.Second)
405 waitForRunWithRetry("second run", t, timer, obj, 5*time.Second)
406
407
408 timer.advance(time.Second)
409 waitForNothing("minInterval, nothing queued", t, timer, obj)
410
411
412 runner.Run()
413 waitForRun("third run", t, timer, obj)
414
415
416 timer.advance(4 * time.Second)
417 waitForNothing("retry cancelled", t, timer, obj)
418
419
420 obj.setRetryFn(func() {
421 go func() {
422 time.Sleep(100 * time.Millisecond)
423 runner.RetryAfter(5 * time.Second)
424 }()
425 })
426 runner.Run()
427 waitForRunWithRetry("fourth run", t, timer, obj, 5*time.Second)
428
429
430 timer.advance(100 * time.Millisecond)
431 runner.Run()
432 waitForDefer("too soon after fourth run", t, timer, obj, 900*time.Millisecond)
433
434
435 timer.advance(900 * time.Millisecond)
436 waitForRun("fifth run", t, timer, obj)
437
438
439 timer.advance(4 * time.Second)
440 waitForNothing("retry cancelled", t, timer, obj)
441
442
443 timer.advance(5 * time.Second)
444 waitForNothing("premature", t, timer, obj)
445 timer.advance(time.Second)
446 waitForRun("maxInterval", t, timer, obj)
447
448
449 stop <- struct{}{}
450
451
452 <-timer.updated
453 }
454
View as plain text