...

Source file src/github.com/donovanhide/eventsource/stream_test.go

Documentation: github.com/donovanhide/eventsource

     1  package eventsource
     2  
     3  import (
     4  	"io"
     5  	"net/http/httptest"
     6  	"reflect"
     7  	"testing"
     8  	"time"
     9  )
    10  
    11  const (
    12  	eventChannelName   = "Test"
    13  	timeToWaitForEvent = 100 * time.Millisecond
    14  )
    15  
    16  func TestStreamSubscribeEventsChan(t *testing.T) {
    17  	server := NewServer()
    18  	httpServer := httptest.NewServer(server.Handler(eventChannelName))
    19  	// The server has to be closed before the httpServer is closed.
    20  	// Otherwise the httpServer has still an open connection and it can not close.
    21  	defer httpServer.Close()
    22  	defer server.Close()
    23  
    24  	stream := mustSubscribe(t, httpServer.URL, "")
    25  
    26  	publishedEvent := &publication{id: "123"}
    27  	server.Publish([]string{eventChannelName}, publishedEvent)
    28  
    29  	select {
    30  	case receivedEvent := <-stream.Events:
    31  		if !reflect.DeepEqual(receivedEvent, publishedEvent) {
    32  			t.Errorf("got event %+v, want %+v", receivedEvent, publishedEvent)
    33  		}
    34  	case <-time.After(timeToWaitForEvent):
    35  		t.Error("Timed out waiting for event")
    36  	}
    37  }
    38  
    39  func TestStreamSubscribeErrorsChan(t *testing.T) {
    40  	server := NewServer()
    41  	httpServer := httptest.NewServer(server.Handler(eventChannelName))
    42  
    43  	defer httpServer.Close()
    44  
    45  	stream := mustSubscribe(t, httpServer.URL, "")
    46  	server.Close()
    47  
    48  	select {
    49  	case err := <-stream.Errors:
    50  		if err != io.EOF {
    51  			t.Errorf("got error %+v, want %+v", err, io.EOF)
    52  		}
    53  	case <-time.After(timeToWaitForEvent):
    54  		t.Error("Timed out waiting for error event")
    55  	}
    56  }
    57  
    58  func TestStreamClose(t *testing.T) {
    59  	server := NewServer()
    60  	httpServer := httptest.NewServer(server.Handler(eventChannelName))
    61  	// The server has to be closed before the httpServer is closed.
    62  	// Otherwise the httpServer has still an open connection and it can not close.
    63  	defer httpServer.Close()
    64  	defer server.Close()
    65  
    66  	stream := mustSubscribe(t, httpServer.URL, "")
    67  	stream.Close()
    68  	// its safe to Close the stream multiple times
    69  	stream.Close()
    70  
    71  	select {
    72  	case _, ok := <-stream.Events:
    73  		if ok {
    74  			t.Error("Expected stream.Events channel to be closed. Is still open.")
    75  		}
    76  	case <-time.After(timeToWaitForEvent):
    77  		t.Error("Timed out waiting for stream.Events channel to close")
    78  	}
    79  
    80  	select {
    81  	case _, ok := <-stream.Errors:
    82  		if ok {
    83  			t.Error("Expected stream.Errors channel to be closed. Is still open.")
    84  		}
    85  	case <-time.After(timeToWaitForEvent):
    86  		t.Error("Timed out waiting for stream.Errors channel to close")
    87  	}
    88  }
    89  
    90  func mustSubscribe(t *testing.T, url, lastEventId string) *Stream {
    91  	stream, err := Subscribe(url, lastEventId)
    92  	if err != nil {
    93  		t.Fatalf("Failed to subscribe: %s", err)
    94  	}
    95  	return stream
    96  }
    97  

View as plain text