...

Source file src/edge-infra.dev/pkg/sds/interlock/websocket/websocket_test.go

Documentation: edge-infra.dev/pkg/sds/interlock/websocket

     1  package websocket
     2  
     3  import (
     4  	"net/url"
     5  	"sync"
     6  	"testing"
     7  	"time"
     8  
     9  	"github.com/stretchr/testify/assert"
    10  )
    11  
    12  func TestAdd(t *testing.T) {
    13  	m := &Manager{
    14  		subscribers: []subscriber{},
    15  	}
    16  
    17  	s1 := newSubscriber(url.Values{})
    18  	m.add(s1)
    19  
    20  	assert.Equal(t, 1, len(m.subscribers))
    21  	assert.Equal(t, s1.channel, m.subscribers[0].channel)
    22  
    23  	s2 := newSubscriber(url.Values{})
    24  	m.add(s2)
    25  
    26  	assert.Equal(t, 2, len(m.subscribers))
    27  	assert.Equal(t, s2.channel, m.subscribers[1].channel)
    28  }
    29  
    30  func TestRemove(t *testing.T) {
    31  	s1 := newSubscriber(url.Values{})
    32  	s2 := newSubscriber(url.Values{})
    33  	m := &Manager{
    34  		subscribers: []subscriber{
    35  			s1,
    36  			s2,
    37  		},
    38  	}
    39  	m.remove(s1)
    40  
    41  	assert.Equal(t, 1, len(m.subscribers))
    42  	assert.Equal(t, s2.channel, m.subscribers[0].channel)
    43  
    44  	m.remove(s2)
    45  
    46  	assert.Equal(t, 0, len(m.subscribers))
    47  }
    48  
    49  func TestSend(t *testing.T) {
    50  	s1 := newSubscriber(url.Values{})
    51  	s2 := newSubscriber(url.Values(
    52  		map[string][]string{
    53  			"topic": {"host"},
    54  		},
    55  	))
    56  	s3 := newSubscriber(url.Values(
    57  		map[string][]string{
    58  			"topic": {"host", "cluster"},
    59  		},
    60  	))
    61  
    62  	m := &Manager{
    63  		subscribers: []subscriber{
    64  			s1,
    65  			s2,
    66  			s3,
    67  		},
    68  	}
    69  
    70  	clusterEvent := Event{
    71  		Topic: "cluster",
    72  		Data:  "example",
    73  	}
    74  
    75  	// prepare results struct for each subscriber as this needs to be updated in
    76  	// a separate routine due to channel usage
    77  	clusterResults := []*results{
    78  		{subscriber: s1},
    79  		{subscriber: s2},
    80  		{subscriber: s3},
    81  	}
    82  
    83  	wg := waitForEvents(clusterResults)
    84  
    85  	m.Send(clusterEvent)
    86  	wg.Wait()
    87  
    88  	// ensure the correct subscribers received an event
    89  	assert.True(t, clusterResults[0].received)
    90  	assert.False(t, clusterResults[1].received)
    91  	assert.True(t, clusterResults[2].received)
    92  
    93  	hostEvent := Event{
    94  		Topic: "host",
    95  		Data:  "example",
    96  	}
    97  
    98  	// prepare results struct for each subscriber as this needs to be updated in
    99  	// a separate routine due to channel usage
   100  	hostResults := []*results{
   101  		{subscriber: s1},
   102  		{subscriber: s2},
   103  		{subscriber: s3},
   104  	}
   105  
   106  	wg = waitForEvents(hostResults)
   107  
   108  	m.Send(hostEvent)
   109  	wg.Wait()
   110  
   111  	// ensure the correct subscribers received an event
   112  	assert.True(t, hostResults[0].received)
   113  	assert.True(t, hostResults[1].received)
   114  	assert.True(t, hostResults[2].received)
   115  }
   116  
   117  type results struct {
   118  	subscriber subscriber
   119  	received   bool
   120  }
   121  
   122  func waitForEvents(res []*results) *sync.WaitGroup {
   123  	var wg sync.WaitGroup
   124  	// create a wait group with the same length as results
   125  	wg.Add(len(res))
   126  
   127  	for _, r := range res {
   128  		go func(r *results) {
   129  			defer wg.Done()
   130  
   131  			select {
   132  			// if subscriber channel receives message, set received to true
   133  			case <-r.subscriber.channel:
   134  				r.received = true
   135  			// if subscriber channel does not receive a message within 1 second, set received to false
   136  			case <-time.After(1 * time.Second):
   137  				r.received = false
   138  			}
   139  		}(r)
   140  	}
   141  	return &wg
   142  }
   143  

View as plain text