...
1 package events
2
3 import (
4 "fmt"
5 "sync"
6 "testing"
7 "time"
8 )
9
10 func TestQueue(t *testing.T) {
11 const nevents = 1000
12
13 ts := newTestSink(t, nevents)
14 eq := NewQueue(
15
16 &delayedSink{
17 Sink: ts,
18 delay: time.Millisecond * 1,
19 })
20 time.Sleep(10 * time.Millisecond)
21
22 var wg sync.WaitGroup
23 for i := 1; i <= nevents; i++ {
24 wg.Add(1)
25 go func(event Event) {
26 if err := eq.Write(event); err != nil {
27 t.Fatalf("error writing event: %v", err)
28 }
29 wg.Done()
30 }("event-" + fmt.Sprint(i))
31 }
32
33 wg.Wait()
34 checkClose(t, eq)
35
36 ts.mu.Lock()
37 defer ts.mu.Unlock()
38
39 if len(ts.events) != nevents {
40 t.Fatalf("events did not make it to the sink: %d != %d", len(ts.events), 1000)
41 }
42
43 if !ts.closed {
44 t.Fatalf("sink should have been closed")
45 }
46 }
47
View as plain text