...
1 package eventsource
2
3 import (
4 "io"
5 "net/http/httptest"
6 "reflect"
7 "testing"
8 "time"
9 )
10
11 const (
12 eventChannelName = "Test"
13 timeToWaitForEvent = 100 * time.Millisecond
14 )
15
16 func TestStreamSubscribeEventsChan(t *testing.T) {
17 server := NewServer()
18 httpServer := httptest.NewServer(server.Handler(eventChannelName))
19
20
21 defer httpServer.Close()
22 defer server.Close()
23
24 stream := mustSubscribe(t, httpServer.URL, "")
25
26 publishedEvent := &publication{id: "123"}
27 server.Publish([]string{eventChannelName}, publishedEvent)
28
29 select {
30 case receivedEvent := <-stream.Events:
31 if !reflect.DeepEqual(receivedEvent, publishedEvent) {
32 t.Errorf("got event %+v, want %+v", receivedEvent, publishedEvent)
33 }
34 case <-time.After(timeToWaitForEvent):
35 t.Error("Timed out waiting for event")
36 }
37 }
38
39 func TestStreamSubscribeErrorsChan(t *testing.T) {
40 server := NewServer()
41 httpServer := httptest.NewServer(server.Handler(eventChannelName))
42
43 defer httpServer.Close()
44
45 stream := mustSubscribe(t, httpServer.URL, "")
46 server.Close()
47
48 select {
49 case err := <-stream.Errors:
50 if err != io.EOF {
51 t.Errorf("got error %+v, want %+v", err, io.EOF)
52 }
53 case <-time.After(timeToWaitForEvent):
54 t.Error("Timed out waiting for error event")
55 }
56 }
57
58 func TestStreamClose(t *testing.T) {
59 server := NewServer()
60 httpServer := httptest.NewServer(server.Handler(eventChannelName))
61
62
63 defer httpServer.Close()
64 defer server.Close()
65
66 stream := mustSubscribe(t, httpServer.URL, "")
67 stream.Close()
68
69 stream.Close()
70
71 select {
72 case _, ok := <-stream.Events:
73 if ok {
74 t.Error("Expected stream.Events channel to be closed. Is still open.")
75 }
76 case <-time.After(timeToWaitForEvent):
77 t.Error("Timed out waiting for stream.Events channel to close")
78 }
79
80 select {
81 case _, ok := <-stream.Errors:
82 if ok {
83 t.Error("Expected stream.Errors channel to be closed. Is still open.")
84 }
85 case <-time.After(timeToWaitForEvent):
86 t.Error("Timed out waiting for stream.Errors channel to close")
87 }
88 }
89
90 func mustSubscribe(t *testing.T, url, lastEventId string) *Stream {
91 stream, err := Subscribe(url, lastEventId)
92 if err != nil {
93 t.Fatalf("Failed to subscribe: %s", err)
94 }
95 return stream
96 }
97
View as plain text