package websocket import ( "net/url" "sync" "testing" "time" "github.com/stretchr/testify/assert" ) func TestAdd(t *testing.T) { m := &Manager{ subscribers: []subscriber{}, } s1 := newSubscriber(url.Values{}) m.add(s1) assert.Equal(t, 1, len(m.subscribers)) assert.Equal(t, s1.channel, m.subscribers[0].channel) s2 := newSubscriber(url.Values{}) m.add(s2) assert.Equal(t, 2, len(m.subscribers)) assert.Equal(t, s2.channel, m.subscribers[1].channel) } func TestRemove(t *testing.T) { s1 := newSubscriber(url.Values{}) s2 := newSubscriber(url.Values{}) m := &Manager{ subscribers: []subscriber{ s1, s2, }, } m.remove(s1) assert.Equal(t, 1, len(m.subscribers)) assert.Equal(t, s2.channel, m.subscribers[0].channel) m.remove(s2) assert.Equal(t, 0, len(m.subscribers)) } func TestSend(t *testing.T) { s1 := newSubscriber(url.Values{}) s2 := newSubscriber(url.Values( map[string][]string{ "topic": {"host"}, }, )) s3 := newSubscriber(url.Values( map[string][]string{ "topic": {"host", "cluster"}, }, )) m := &Manager{ subscribers: []subscriber{ s1, s2, s3, }, } clusterEvent := Event{ Topic: "cluster", Data: "example", } // prepare results struct for each subscriber as this needs to be updated in // a separate routine due to channel usage clusterResults := []*results{ {subscriber: s1}, {subscriber: s2}, {subscriber: s3}, } wg := waitForEvents(clusterResults) m.Send(clusterEvent) wg.Wait() // ensure the correct subscribers received an event assert.True(t, clusterResults[0].received) assert.False(t, clusterResults[1].received) assert.True(t, clusterResults[2].received) hostEvent := Event{ Topic: "host", Data: "example", } // prepare results struct for each subscriber as this needs to be updated in // a separate routine due to channel usage hostResults := []*results{ {subscriber: s1}, {subscriber: s2}, {subscriber: s3}, } wg = waitForEvents(hostResults) m.Send(hostEvent) wg.Wait() // ensure the correct subscribers received an event assert.True(t, hostResults[0].received) assert.True(t, hostResults[1].received) assert.True(t, hostResults[2].received) } type results struct { subscriber subscriber received bool } func waitForEvents(res []*results) *sync.WaitGroup { var wg sync.WaitGroup // create a wait group with the same length as results wg.Add(len(res)) for _, r := range res { go func(r *results) { defer wg.Done() select { // if subscriber channel receives message, set received to true case <-r.subscriber.channel: r.received = true // if subscriber channel does not receive a message within 1 second, set received to false case <-time.After(1 * time.Second): r.received = false } }(r) } return &wg }