...
1 package events
2
3 import (
4 "fmt"
5 "sync"
6 "testing"
7 )
8
9 func TestChannel(t *testing.T) {
10 const nevents = 100
11
12 sink := NewChannel(0)
13
14 go func() {
15 var wg sync.WaitGroup
16 for i := 1; i <= nevents; i++ {
17 event := "event-" + fmt.Sprint(i)
18 wg.Add(1)
19 go func(event Event) {
20 defer wg.Done()
21 if err := sink.Write(event); err != nil {
22 t.Fatalf("error writing event: %v", err)
23 }
24 }(event)
25 }
26 wg.Wait()
27 sink.Close()
28
29
30 for i := 1; i <= nevents; i++ {
31 if err := sink.Write(i); err != ErrSinkClosed {
32 t.Fatalf("unexpected error: %v != %v", err, ErrSinkClosed)
33 }
34 }
35 }()
36
37 var received int
38 loop:
39 for {
40 select {
41 case <-sink.C:
42 received++
43 case <-sink.Done():
44 break loop
45 }
46 }
47
48 sink.Close()
49 _, ok := <-sink.Done()
50 if ok {
51 t.Fatalf("done should be a closed channel")
52 }
53
54 if received != nevents {
55 t.Fatalf("events did not make it through sink: %v != %v", received, nevents)
56 }
57 }
58
View as plain text