...
1 package events
2
3 import (
4 "sync"
5 "testing"
6 )
7
8 func TestBroadcaster(t *testing.T) {
9 const nEvents = 1000
10 var sinks []Sink
11 b := NewBroadcaster()
12 for i := 0; i < 10; i++ {
13 sinks = append(sinks, newTestSink(t, nEvents))
14 b.Add(sinks[i])
15 b.Add(sinks[i])
16 }
17
18 var wg sync.WaitGroup
19 for i := 1; i <= nEvents; i++ {
20 wg.Add(1)
21 go func(event Event) {
22 if err := b.Write(event); err != nil {
23 t.Fatalf("error writing event %v: %v", event, err)
24 }
25 wg.Done()
26 }("event")
27 }
28
29 wg.Wait()
30
31 for i := range sinks {
32 b.Remove(sinks[i])
33 }
34
35
36 if err := b.Write("onemore"); err != nil {
37 t.Fatalf("unexpected error sending one more: %v", err)
38 }
39
40
41 for i := range sinks {
42 b.Add(sinks[i])
43 }
44
45 checkClose(t, b)
46
47
48 for _, sink := range sinks {
49 ts := sink.(*testSink)
50 ts.mu.Lock()
51 defer ts.mu.Unlock()
52
53 if len(ts.events) != nEvents {
54 t.Fatalf("not all events ended up in testsink: len(testSink) == %d, not %d", len(ts.events), nEvents)
55 }
56
57 if !ts.closed {
58 t.Fatalf("sink should have been closed")
59 }
60 }
61 }
62
63 func BenchmarkBroadcast10(b *testing.B) {
64 benchmarkBroadcast(b, 10)
65 }
66
67 func BenchmarkBroadcast100(b *testing.B) {
68 benchmarkBroadcast(b, 100)
69 }
70
71 func BenchmarkBroadcast1000(b *testing.B) {
72 benchmarkBroadcast(b, 1000)
73 }
74
75 func BenchmarkBroadcast10000(b *testing.B) {
76 benchmarkBroadcast(b, 10000)
77 }
78
79 func benchmarkBroadcast(b *testing.B, nsinks int) {
80
81
82
83
84 b.StopTimer()
85 var sinks []Sink
86 for i := 0; i < nsinks; i++ {
87
88 sinks = append(sinks, newTestSink(b, b.N))
89
90 }
91 b.StartTimer()
92
93
94
95
96 benchmarkSink(b, NewBroadcaster(sinks...))
97 }
98
View as plain text