1
2
3
4
5 package bundler
6
7 import (
8 "context"
9 "fmt"
10 "math"
11 "reflect"
12 "runtime"
13 "sort"
14 "sync"
15 "testing"
16 "time"
17 )
18
19 func TestBundlerCount1(t *testing.T) {
20
21 handler := &testHandler{}
22 b := NewBundler(int(0), handler.handleImmediate)
23 b.BundleCountThreshold = 1
24 b.DelayThreshold = time.Second
25
26 for i := 0; i < 3; i++ {
27 if err := b.Add(i, 1); err != nil {
28 t.Fatal(err)
29 }
30 }
31 b.Flush()
32 got := handler.bundles()
33 want := [][]int{{0}, {1}, {2}}
34 if !reflect.DeepEqual(got, want) {
35 t.Errorf("bundles: got %v, want %v", got, want)
36 }
37
38
39 tgot := quantizeTimes(handler.times(), 100*time.Millisecond)
40 twant := []int{0, 0, 0}
41 if !reflect.DeepEqual(tgot, twant) {
42 t.Errorf("times: got %v, want %v", tgot, twant)
43 }
44 }
45
46 func TestBundlerCount3(t *testing.T) {
47 handler := &testHandler{}
48 b := NewBundler(int(0), handler.handleImmediate)
49 b.BundleCountThreshold = 3
50 b.DelayThreshold = 100 * time.Millisecond
51
52
53
54 for i := 0; i < 8; i++ {
55 if err := b.Add(i, 1); err != nil {
56 t.Fatal(err)
57 }
58 }
59 time.Sleep(5 * b.DelayThreshold)
60
61
62 bgot := handler.bundles()
63 bwant := [][]int{{0, 1, 2}, {3, 4, 5}, {6, 7}}
64 if !reflect.DeepEqual(bgot, bwant) {
65 t.Errorf("bundles: got %v, want %v", bgot, bwant)
66 }
67
68 tgot := quantizeTimes(handler.times(), b.DelayThreshold)
69 if len(tgot) != 3 || tgot[0] != 0 || tgot[1] != 0 || tgot[2] == 0 {
70 t.Errorf("times: got %v, want [0, 0, non-zero]", tgot)
71 }
72 }
73
74
75
76
77 func TestBundlerCountSlowHandler(t *testing.T) {
78 handler := &testHandler{}
79 b := NewBundler(int(0), handler.handleSlow)
80 b.BundleCountThreshold = 3
81 b.DelayThreshold = 500 * time.Millisecond
82
83 for i := 0; i < 10; i++ {
84 if err := b.Add(i, 1); err != nil {
85 t.Fatal(err)
86 }
87 }
88 time.Sleep(4 * 300 * time.Millisecond)
89
90
91 bgot := handler.bundles()
92 bwant := [][]int{{0, 1, 2}, {3, 4, 5}, {6, 7, 8}, {9}}
93 if !reflect.DeepEqual(bgot, bwant) {
94 t.Errorf("bundles: got %v, want %v", bgot, bwant)
95 }
96
97 tgot := quantizeTimes(handler.times(), 100*time.Millisecond)
98
99
100 twant := []int{0, 3, 6, 9}
101 if !reflect.DeepEqual(tgot, twant) {
102 t.Errorf("times: got %v, want [0, 0, non-zero]", tgot)
103 }
104 }
105
106 func TestBundlerByteThreshold(t *testing.T) {
107 handler := &testHandler{}
108 b := NewBundler(int(0), handler.handleImmediate)
109 b.BundleCountThreshold = 10
110 b.BundleByteThreshold = 3
111
112
113
114
115
116 b.HandlerLimit = 10
117 add := func(i interface{}, s int) {
118 if err := b.Add(i, s); err != nil {
119 t.Fatal(err)
120 }
121 }
122
123 add(1, 1)
124 add(2, 2)
125
126
127 add(3, 1)
128 add(4, 1)
129 add(5, 2)
130
131
132 add(6, 1)
133 b.Flush()
134 bgot := handler.bundles()
135
136
137 sort.Slice(bgot, func(i, j int) bool {
138 return bgot[i][0] < bgot[j][0]
139 })
140 bwant := [][]int{{1, 2}, {3, 4, 5}, {6}}
141 if !reflect.DeepEqual(bgot, bwant) {
142 t.Errorf("bundles: got %v, want %v", bgot, bwant)
143 }
144 tgot := quantizeTimes(handler.times(), b.DelayThreshold)
145 twant := []int{0, 0, 0}
146 if !reflect.DeepEqual(tgot, twant) {
147 t.Errorf("times: got %v, want %v", tgot, twant)
148 }
149 }
150
151 func TestBundlerLimit(t *testing.T) {
152 handler := &testHandler{}
153 b := NewBundler(int(0), handler.handleImmediate)
154 b.BundleCountThreshold = 10
155 b.BundleByteLimit = 3
156 add := func(i interface{}, s int) {
157 if err := b.Add(i, s); err != nil {
158 t.Fatal(err)
159 }
160 }
161
162 add(1, 1)
163 add(2, 2)
164
165 add(3, 1)
166 add(4, 1)
167 add(5, 2)
168
169 add(6, 2)
170
171 b.Flush()
172 bgot := handler.bundles()
173 bwant := [][]int{{1, 2}, {3, 4}, {5}, {6}}
174 if !reflect.DeepEqual(bgot, bwant) {
175 t.Errorf("bundles: got %v, want %v", bgot, bwant)
176 }
177 tgot := quantizeTimes(handler.times(), b.DelayThreshold)
178 twant := []int{0, 0, 0, 0}
179 if !reflect.DeepEqual(tgot, twant) {
180 t.Errorf("times: got %v, want %v", tgot, twant)
181 }
182 }
183
184 func TestAddWait(t *testing.T) {
185 var (
186 mu sync.Mutex
187 events []string
188 )
189 event := func(s string) {
190 mu.Lock()
191 events = append(events, s)
192 mu.Unlock()
193 }
194
195 handlec := make(chan int)
196 done := make(chan struct{})
197 b := NewBundler(int(0), func(interface{}) {
198 <-handlec
199 event("handle")
200 })
201 b.BufferedByteLimit = 3
202 addw := func(sz int) {
203 if err := b.AddWait(context.Background(), 0, sz); err != nil {
204 t.Fatal(err)
205 }
206 event(fmt.Sprintf("addw(%d)", sz))
207 }
208
209 addw(2)
210 go func() {
211 addw(3)
212 close(done)
213 }()
214
215 time.Sleep(100 * time.Millisecond)
216 handlec <- 1
217 select {
218 case <-time.After(time.Second):
219 t.Fatal("timed out")
220 case <-done:
221 }
222 want := []string{"addw(2)", "handle", "addw(3)"}
223 if !reflect.DeepEqual(events, want) {
224 t.Errorf("got %v\nwant%v", events, want)
225 }
226 }
227
228 func TestAddWaitCancel(t *testing.T) {
229 b := NewBundler(int(0), func(interface{}) {})
230 b.BufferedByteLimit = 3
231 ctx, cancel := context.WithCancel(context.Background())
232 go func() {
233 time.Sleep(100 * time.Millisecond)
234 cancel()
235 }()
236 err := b.AddWait(ctx, 0, 4)
237 if want := context.Canceled; err != want {
238 t.Fatalf("got %v, want %v", err, want)
239 }
240 }
241
242 func TestBundlerErrors(t *testing.T) {
243
244
245 b := NewBundler(int(0), func(interface{}) { select {} })
246 b.BundleByteLimit = 3
247 b.BufferedByteLimit = 10
248
249 if got, want := b.Add(1, 4), ErrOversizedItem; got != want {
250 t.Fatalf("got %v, want %v", got, want)
251 }
252
253 for i := 0; i < 5; i++ {
254 if err := b.Add(i, 2); err != nil {
255 t.Fatal(err)
256 }
257 }
258 if got, want := b.Add(5, 1), ErrOverflow; got != want {
259 t.Fatalf("got %v, want %v", got, want)
260 }
261 }
262
263 func TestModeError(t *testing.T) {
264
265 b := NewBundler(int(0), func(interface{}) {})
266 b.BundleByteLimit = 4
267 b.BufferedByteLimit = 4
268 if err := b.Add(0, 2); err != nil {
269 t.Fatal(err)
270 }
271 if got, want := b.AddWait(context.Background(), 0, 2), errMixedMethods; got != want {
272 t.Fatalf("got %v, want %v", got, want)
273 }
274
275 b1 := NewBundler(int(0), func(interface{}) {})
276 b1.BundleByteLimit = 4
277 b1.BufferedByteLimit = 4
278 if err := b1.AddWait(context.Background(), 0, 2); err != nil {
279 t.Fatal(err)
280 }
281 if got, want := b1.Add(0, 2), errMixedMethods; got != want {
282 t.Fatalf("got %v, want %v", got, want)
283 }
284 }
285
286
287 func TestConcurrentHandlersMax(t *testing.T) {
288 const handlerLimit = 10
289 var (
290 mu sync.Mutex
291 active int
292 maxHandlers int
293 )
294 b := NewBundler(int(0), func(s interface{}) {
295 mu.Lock()
296 active++
297 if active > maxHandlers {
298 maxHandlers = active
299 }
300 if maxHandlers > handlerLimit {
301 t.Errorf("too many handlers running (got %d; want %d)", maxHandlers, handlerLimit)
302 }
303 mu.Unlock()
304 time.Sleep(1 * time.Millisecond)
305 mu.Lock()
306 active--
307 mu.Unlock()
308 })
309 b.BundleCountThreshold = 5
310 b.HandlerLimit = 10
311 defer b.Flush()
312
313 more := 0
314 for i := 0; more == 0 || i < more; i++ {
315 mu.Lock()
316 m := maxHandlers
317 mu.Unlock()
318 if m >= handlerLimit && more == 0 {
319
320 more = 2 * i
321 }
322 b.Add(i, 1)
323 }
324 }
325
326
327 func TestConcurrentFlush(t *testing.T) {
328 var (
329 mu sync.Mutex
330 items = make(map[int]bool)
331 )
332 b := NewBundler(int(0), func(s interface{}) {
333 mu.Lock()
334 for _, i := range s.([]int) {
335 items[i] = true
336 }
337 mu.Unlock()
338 time.Sleep(10 * time.Millisecond)
339 })
340 b.BundleCountThreshold = 5
341 b.HandlerLimit = 10
342 defer b.Flush()
343
344 var wg sync.WaitGroup
345 defer wg.Wait()
346 for i := 0; i < 50; i++ {
347 b.Add(i, 1)
348 if i%100 == 0 {
349 i := i
350 wg.Add(1)
351 go func() {
352 defer wg.Done()
353 b.Flush()
354 mu.Lock()
355 defer mu.Unlock()
356 for j := 0; j <= i; j++ {
357 if !items[j] {
358
359 t.Errorf("flush(%d): item %d not handled", i, j)
360 break
361 }
362 }
363 }()
364 }
365 }
366 }
367
368
369 func TestBundlerTimeBasedFlushDeadlock(t *testing.T) {
370 const (
371 goroutines = 1e3
372 iterations = 1e3
373
374 N = goroutines * iterations
375 )
376
377 var wg sync.WaitGroup
378 wg.Add(N)
379
380 flush := func(i interface{}) {
381 time.Sleep(10 * time.Millisecond)
382 buf := i.([]int)
383 for i := 0; i < len(buf); i++ {
384 wg.Done()
385 }
386 }
387
388 b := NewBundler(int(0), flush)
389 b.DelayThreshold = 10 * time.Millisecond
390 b.HandlerLimit = 1
391
392
393 b.BundleCountThreshold = math.MaxInt32
394 b.BundleByteThreshold = math.MaxInt32
395
396 ctx, cancel := context.WithCancel(context.Background())
397 time.AfterFunc(15*time.Second, cancel)
398
399 add := func(i int) {
400 for j := 0; j < iterations; j++ {
401 if err := b.AddWait(ctx, i, 1); err != nil {
402 t.Fatalf("timed out: %v", err)
403 }
404 runtime.Gosched()
405 }
406 }
407
408 for i := 0; i < goroutines; i++ {
409 go add(i)
410 }
411
412
413 wg.Wait()
414 }
415
416 type testHandler struct {
417 mu sync.Mutex
418 b [][]int
419 t []time.Time
420 }
421
422 func (t *testHandler) bundles() [][]int {
423 t.mu.Lock()
424 defer t.mu.Unlock()
425 return t.b
426 }
427
428 func (t *testHandler) times() []time.Time {
429 t.mu.Lock()
430 defer t.mu.Unlock()
431 return t.t
432 }
433
434
435 func (t *testHandler) handleImmediate(b interface{}) {
436 t.mu.Lock()
437 defer t.mu.Unlock()
438 t.b = append(t.b, b.([]int))
439 t.t = append(t.t, time.Now())
440 }
441
442
443 func (t *testHandler) handleSlow(b interface{}) {
444 t.mu.Lock()
445 defer t.mu.Unlock()
446 t.b = append(t.b, b.([]int))
447 t.t = append(t.t, time.Now())
448 time.Sleep(300 * time.Millisecond)
449 }
450
451
452 func (t *testHandler) handleQuick(b interface{}) {
453 t.mu.Lock()
454 defer t.mu.Unlock()
455 t.b = append(t.b, b.([]int))
456 t.t = append(t.t, time.Now())
457 time.Sleep(time.Millisecond)
458 }
459
460
461
462
463
464
465 func quantizeTimes(times []time.Time, q time.Duration) []int {
466 var rs []int
467 for _, t := range times {
468 d := t.Sub(times[0])
469 r := int((d + q/2) / q)
470 rs = append(rs, r)
471 }
472 return rs
473 }
474
475 func TestQuantizeTimes(t *testing.T) {
476 quantum := 100 * time.Millisecond
477 for _, test := range []struct {
478 millis []int
479 want []int
480 }{
481 {[]int{10, 20, 30}, []int{0, 0, 0}},
482 {[]int{0, 49, 50, 90}, []int{0, 0, 1, 1}},
483 {[]int{0, 95, 170, 315}, []int{0, 1, 2, 3}},
484 } {
485 var times []time.Time
486 for _, ms := range test.millis {
487 times = append(times, time.Unix(0, int64(ms*1e6)))
488 }
489 got := quantizeTimes(times, quantum)
490 if !reflect.DeepEqual(got, test.want) {
491 t.Errorf("%v: got %v, want %v", test.millis, got, test.want)
492 }
493 }
494 }
495
496
497
498 func BenchmarkBundlerAdd(bench *testing.B) {
499
500 handler := &testHandler{}
501 b := NewBundler(int(0), handler.handleImmediate)
502 b.BundleCountThreshold = 1
503 b.DelayThreshold = time.Second
504
505 for i := 0; i < bench.N; i++ {
506 if err := b.Add(i, 1); err != nil {
507 bench.Fatal(err)
508 }
509 }
510 }
511
512
513
514 func BenchmarkBundlerAddAndFlush(bench *testing.B) {
515
516 handler := &testHandler{}
517 b := NewBundler(int(0), handler.handleImmediate)
518 b.BundleCountThreshold = 1
519 b.DelayThreshold = time.Second
520
521 for i := 0; i < bench.N; i++ {
522 if err := b.Add(i, 1); err != nil {
523 bench.Fatal(err)
524 }
525 }
526 b.Flush()
527 }
528
529
530
531 func BenchmarkBundlerAddAndFlushSlow1(bench *testing.B) {
532
533 handler := &testHandler{}
534 b := NewBundler(int(0), handler.handleQuick)
535 b.BundleCountThreshold = 1
536 b.DelayThreshold = time.Second
537
538 for i := 0; i < bench.N; i++ {
539 if err := b.Add(i, 1); err != nil {
540 bench.Fatal(err)
541 }
542 }
543 b.Flush()
544 }
545
546
547
548 func BenchmarkBundlerAddAndFlushSlow25(bench *testing.B) {
549
550 handler := &testHandler{}
551 b := NewBundler(int(0), handler.handleQuick)
552 b.BundleCountThreshold = 25
553 b.DelayThreshold = time.Second
554
555 for i := 0; i < bench.N; i++ {
556 if err := b.Add(i, 1); err != nil {
557 bench.Fatal(err)
558 }
559 }
560 b.Flush()
561 }
562
View as plain text