...
1 package websocket
2
3 import (
4 "net/url"
5 "sync"
6 "testing"
7 "time"
8
9 "github.com/stretchr/testify/assert"
10 )
11
12 func TestAdd(t *testing.T) {
13 m := &Manager{
14 subscribers: []subscriber{},
15 }
16
17 s1 := newSubscriber(url.Values{})
18 m.add(s1)
19
20 assert.Equal(t, 1, len(m.subscribers))
21 assert.Equal(t, s1.channel, m.subscribers[0].channel)
22
23 s2 := newSubscriber(url.Values{})
24 m.add(s2)
25
26 assert.Equal(t, 2, len(m.subscribers))
27 assert.Equal(t, s2.channel, m.subscribers[1].channel)
28 }
29
30 func TestRemove(t *testing.T) {
31 s1 := newSubscriber(url.Values{})
32 s2 := newSubscriber(url.Values{})
33 m := &Manager{
34 subscribers: []subscriber{
35 s1,
36 s2,
37 },
38 }
39 m.remove(s1)
40
41 assert.Equal(t, 1, len(m.subscribers))
42 assert.Equal(t, s2.channel, m.subscribers[0].channel)
43
44 m.remove(s2)
45
46 assert.Equal(t, 0, len(m.subscribers))
47 }
48
49 func TestSend(t *testing.T) {
50 s1 := newSubscriber(url.Values{})
51 s2 := newSubscriber(url.Values(
52 map[string][]string{
53 "topic": {"host"},
54 },
55 ))
56 s3 := newSubscriber(url.Values(
57 map[string][]string{
58 "topic": {"host", "cluster"},
59 },
60 ))
61
62 m := &Manager{
63 subscribers: []subscriber{
64 s1,
65 s2,
66 s3,
67 },
68 }
69
70 clusterEvent := Event{
71 Topic: "cluster",
72 Data: "example",
73 }
74
75
76
77 clusterResults := []*results{
78 {subscriber: s1},
79 {subscriber: s2},
80 {subscriber: s3},
81 }
82
83 wg := waitForEvents(clusterResults)
84
85 m.Send(clusterEvent)
86 wg.Wait()
87
88
89 assert.True(t, clusterResults[0].received)
90 assert.False(t, clusterResults[1].received)
91 assert.True(t, clusterResults[2].received)
92
93 hostEvent := Event{
94 Topic: "host",
95 Data: "example",
96 }
97
98
99
100 hostResults := []*results{
101 {subscriber: s1},
102 {subscriber: s2},
103 {subscriber: s3},
104 }
105
106 wg = waitForEvents(hostResults)
107
108 m.Send(hostEvent)
109 wg.Wait()
110
111
112 assert.True(t, hostResults[0].received)
113 assert.True(t, hostResults[1].received)
114 assert.True(t, hostResults[2].received)
115 }
116
117 type results struct {
118 subscriber subscriber
119 received bool
120 }
121
122 func waitForEvents(res []*results) *sync.WaitGroup {
123 var wg sync.WaitGroup
124
125 wg.Add(len(res))
126
127 for _, r := range res {
128 go func(r *results) {
129 defer wg.Done()
130
131 select {
132
133 case <-r.subscriber.channel:
134 r.received = true
135
136 case <-time.After(1 * time.Second):
137 r.received = false
138 }
139 }(r)
140 }
141 return &wg
142 }
143
View as plain text