...
1 package events
2
3 import (
4 "fmt"
5 "math/rand"
6 "sync"
7 "testing"
8 "time"
9 )
10
11 type tOrB interface {
12 Fatalf(format string, args ...interface{})
13 Logf(format string, args ...interface{})
14 }
15
16 type testSink struct {
17 t tOrB
18
19 events []Event
20 expected int
21 mu sync.Mutex
22 closed bool
23 }
24
25 func newTestSink(t tOrB, expected int) *testSink {
26 return &testSink{
27 t: t,
28 events: make([]Event, 0, expected),
29 expected: expected,
30 }
31 }
32
33 func (ts *testSink) Write(event Event) error {
34 ts.mu.Lock()
35 defer ts.mu.Unlock()
36
37 if ts.closed {
38 return ErrSinkClosed
39 }
40
41 ts.events = append(ts.events, event)
42
43 if len(ts.events) > ts.expected {
44 ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected)
45 }
46
47 return nil
48 }
49
50 func (ts *testSink) Close() error {
51 ts.mu.Lock()
52 defer ts.mu.Unlock()
53 if ts.closed {
54 return ErrSinkClosed
55 }
56
57 ts.closed = true
58
59 if len(ts.events) != ts.expected {
60 ts.t.Fatalf("len(ts.events) == %v, expected %v", len(ts.events), ts.expected)
61 }
62
63 return nil
64 }
65
66 type delayedSink struct {
67 Sink
68 delay time.Duration
69 }
70
71 func (ds *delayedSink) Write(event Event) error {
72 time.Sleep(ds.delay)
73 return ds.Sink.Write(event)
74 }
75
76 type flakySink struct {
77 Sink
78 rate float64
79 mu sync.Mutex
80 }
81
82 func (fs *flakySink) Write(event Event) error {
83 fs.mu.Lock()
84 defer fs.mu.Unlock()
85
86 if rand.Float64() < fs.rate {
87 return fmt.Errorf("error writing event: %v", event)
88 }
89
90 return fs.Sink.Write(event)
91 }
92
93 func checkClose(t *testing.T, sink Sink) {
94 if err := sink.Close(); err != nil {
95 t.Fatalf("unexpected error closing: %v", err)
96 }
97
98
99 if err := sink.Close(); err != nil {
100 t.Fatalf("unexpected error on double close: %v", err)
101 }
102
103
104 if err := sink.Write("fail"); err == nil {
105 t.Fatalf("write after closed did not have an error")
106 } else if err != ErrSinkClosed {
107 t.Fatalf("error should be ErrSinkClosed")
108 }
109 }
110
111 func benchmarkSink(b *testing.B, sink Sink) {
112 defer sink.Close()
113 var event = "myevent"
114 for i := 0; i < b.N; i++ {
115 sink.Write(event)
116 }
117 }
118
View as plain text