...

Source file src/github.com/donovanhide/eventsource/stream.go

Documentation: github.com/donovanhide/eventsource

     1  package eventsource
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"io"
     7  	"io/ioutil"
     8  	"log"
     9  	"net/http"
    10  	"sync"
    11  	"time"
    12  )
    13  
    14  // Stream handles a connection for receiving Server Sent Events.
    15  // It will try and reconnect if the connection is lost, respecting both
    16  // received retry delays and event id's.
    17  type Stream struct {
    18  	c           *http.Client
    19  	req         *http.Request
    20  	lastEventId string
    21  	retry       time.Duration
    22  	// Events emits the events received by the stream
    23  	Events chan Event
    24  	// Errors emits any errors encountered while reading events from the stream.
    25  	// It's mainly for informative purposes - the client isn't required to take any
    26  	// action when an error is encountered. The stream will always attempt to continue,
    27  	// even if that involves reconnecting to the server.
    28  	Errors chan error
    29  	// Logger is a logger that, when set, will be used for logging debug messages
    30  	Logger *log.Logger
    31  	// isClosed is a marker that the stream is/should be closed
    32  	isClosed bool
    33  	// isClosedMutex is a mutex protecting concurrent read/write access of isClosed
    34  	isClosedMutex sync.RWMutex
    35  }
    36  
    37  type SubscriptionError struct {
    38  	Code    int
    39  	Message string
    40  }
    41  
    42  func (e SubscriptionError) Error() string {
    43  	return fmt.Sprintf("%d: %s", e.Code, e.Message)
    44  }
    45  
    46  // Subscribe to the Events emitted from the specified url.
    47  // If lastEventId is non-empty it will be sent to the server in case it can replay missed events.
    48  func Subscribe(url, lastEventId string) (*Stream, error) {
    49  	req, err := http.NewRequest("GET", url, nil)
    50  	if err != nil {
    51  		return nil, err
    52  	}
    53  	return SubscribeWithRequest(lastEventId, req)
    54  }
    55  
    56  // SubscribeWithRequest will take an http.Request to setup the stream, allowing custom headers
    57  // to be specified, authentication to be configured, etc.
    58  func SubscribeWithRequest(lastEventId string, request *http.Request) (*Stream, error) {
    59  	return SubscribeWith(lastEventId, http.DefaultClient, request)
    60  }
    61  
    62  // SubscribeWith takes a http client and request providing customization over both headers and
    63  // control over the http client settings (timeouts, tls, etc)
    64  func SubscribeWith(lastEventId string, client *http.Client, request *http.Request) (*Stream, error) {
    65  	stream := &Stream{
    66  		c:           client,
    67  		req:         request,
    68  		lastEventId: lastEventId,
    69  		retry:       time.Millisecond * 3000,
    70  		Events:      make(chan Event),
    71  		Errors:      make(chan error),
    72  	}
    73  	stream.c.CheckRedirect = checkRedirect
    74  
    75  	r, err := stream.connect()
    76  	if err != nil {
    77  		return nil, err
    78  	}
    79  	go stream.stream(r)
    80  	return stream, nil
    81  }
    82  
    83  // Close will close the stream. It is safe for concurrent access and can be called multiple times.
    84  func (stream *Stream) Close() {
    85  	if stream.isStreamClosed() {
    86  		return
    87  	}
    88  
    89  	stream.markStreamClosed()
    90  	close(stream.Errors)
    91  	close(stream.Events)
    92  }
    93  
    94  func (stream *Stream) isStreamClosed() bool {
    95  	stream.isClosedMutex.RLock()
    96  	defer stream.isClosedMutex.RUnlock()
    97  	return stream.isClosed
    98  }
    99  
   100  func (stream *Stream) markStreamClosed() {
   101  	stream.isClosedMutex.Lock()
   102  	defer stream.isClosedMutex.Unlock()
   103  	stream.isClosed = true
   104  }
   105  
   106  // Go's http package doesn't copy headers across when it encounters
   107  // redirects so we need to do that manually.
   108  func checkRedirect(req *http.Request, via []*http.Request) error {
   109  	if len(via) >= 10 {
   110  		return errors.New("stopped after 10 redirects")
   111  	}
   112  	for k, vv := range via[0].Header {
   113  		for _, v := range vv {
   114  			req.Header.Add(k, v)
   115  		}
   116  	}
   117  	return nil
   118  }
   119  
   120  func (stream *Stream) connect() (r io.ReadCloser, err error) {
   121  	var resp *http.Response
   122  	stream.req.Header.Set("Cache-Control", "no-cache")
   123  	stream.req.Header.Set("Accept", "text/event-stream")
   124  	if len(stream.lastEventId) > 0 {
   125  		stream.req.Header.Set("Last-Event-ID", stream.lastEventId)
   126  	}
   127  	if resp, err = stream.c.Do(stream.req); err != nil {
   128  		return
   129  	}
   130  	if resp.StatusCode != 200 {
   131  		message, _ := ioutil.ReadAll(resp.Body)
   132  		err = SubscriptionError{
   133  			Code:    resp.StatusCode,
   134  			Message: string(message),
   135  		}
   136  	}
   137  	r = resp.Body
   138  	return
   139  }
   140  
   141  func (stream *Stream) stream(r io.ReadCloser) {
   142  	defer r.Close()
   143  
   144  	// receives events until an error is encountered
   145  	stream.receiveEvents(r)
   146  
   147  	// tries to reconnect and start the stream again
   148  	stream.retryRestartStream()
   149  }
   150  
   151  func (stream *Stream) receiveEvents(r io.ReadCloser) {
   152  	dec := NewDecoder(r)
   153  
   154  	for {
   155  		ev, err := dec.Decode()
   156  		if stream.isStreamClosed() {
   157  			return
   158  		}
   159  		if err != nil {
   160  			stream.Errors <- err
   161  			return
   162  		}
   163  
   164  		pub := ev.(*publication)
   165  		if pub.Retry() > 0 {
   166  			stream.retry = time.Duration(pub.Retry()) * time.Millisecond
   167  		}
   168  		if len(pub.Id()) > 0 {
   169  			stream.lastEventId = pub.Id()
   170  		}
   171  		stream.Events <- ev
   172  	}
   173  }
   174  
   175  func (stream *Stream) retryRestartStream() {
   176  	backoff := stream.retry
   177  	for {
   178  		if stream.Logger != nil {
   179  			stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
   180  		}
   181  		time.Sleep(backoff)
   182  		if stream.isStreamClosed() {
   183  			return
   184  		}
   185  		// NOTE: because of the defer we're opening the new connection
   186  		// before closing the old one. Shouldn't be a problem in practice,
   187  		// but something to be aware of.
   188  		r, err := stream.connect()
   189  		if err == nil {
   190  			go stream.stream(r)
   191  			return
   192  		}
   193  		stream.Errors <- err
   194  		backoff *= 2
   195  	}
   196  }
   197  

View as plain text