...

Source file src/github.com/launchdarkly/eventsource/stream.go

Documentation: github.com/launchdarkly/eventsource

     1  package eventsource
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"io"
     7  	"io/ioutil"
     8  	"net/http"
     9  	"sync"
    10  	"time"
    11  )
    12  
    13  // Stream handles a connection for receiving Server Sent Events.
    14  // It will try and reconnect if the connection is lost, respecting both
    15  // received retry delays and event id's.
    16  type Stream struct {
    17  	c           *http.Client
    18  	req         *http.Request
    19  	lastEventID string
    20  	readTimeout time.Duration
    21  	retryDelay  *retryDelayStrategy
    22  	// Events emits the events received by the stream
    23  	Events chan Event
    24  	// Errors emits any errors encountered while reading events from the stream.
    25  	//
    26  	// Errors during initialization of the stream are not pushed to this channel, since until the
    27  	// Subscribe method has returned the caller would not be able to consume the channel. If you have
    28  	// configured the Stream to be able to retry on initialization errors, but you still want to know
    29  	// about those errors or control how they are handled, use StreamOptionErrorHandler.
    30  	//
    31  	// If an error handler has been specified with StreamOptionErrorHandler, the Errors channel is
    32  	// not used and will be nil.
    33  	Errors       chan error
    34  	errorHandler StreamErrorHandler
    35  	// Logger is a logger that, when set, will be used for logging informational messages.
    36  	//
    37  	// This field is exported for backward compatibility, but should not be set directly because
    38  	// it may be used by multiple goroutines. Use SetLogger instead.
    39  	Logger      Logger
    40  	restarter   chan struct{}
    41  	closer      chan struct{}
    42  	closeOnce   sync.Once
    43  	mu          sync.RWMutex
    44  	connections int
    45  }
    46  
    47  var (
    48  	// ErrReadTimeout is the error that will be emitted if a stream was closed due to not
    49  	// receiving any data within the configured read timeout interval.
    50  	ErrReadTimeout = errors.New("Read timeout on stream")
    51  )
    52  
    53  // SubscriptionError is an error object returned from a stream when there is an HTTP error.
    54  type SubscriptionError struct {
    55  	Code    int
    56  	Message string
    57  }
    58  
    59  func (e SubscriptionError) Error() string {
    60  	s := fmt.Sprintf("error %d", e.Code)
    61  	if e.Message != "" {
    62  		s = s + ": " + e.Message
    63  	}
    64  	return s
    65  }
    66  
    67  // Subscribe to the Events emitted from the specified url.
    68  // If lastEventId is non-empty it will be sent to the server in case it can replay missed events.
    69  // Deprecated: use SubscribeWithURL instead.
    70  func Subscribe(url, lastEventID string) (*Stream, error) {
    71  	return SubscribeWithURL(url, StreamOptionLastEventID(lastEventID))
    72  }
    73  
    74  // SubscribeWithURL subscribes to the Events emitted from the specified URL. The stream can
    75  // be configured by providing any number of StreamOption values.
    76  func SubscribeWithURL(url string, options ...StreamOption) (*Stream, error) {
    77  	req, err := http.NewRequest("GET", url, nil)
    78  	if err != nil {
    79  		return nil, err
    80  	}
    81  	return SubscribeWithRequestAndOptions(req, options...)
    82  }
    83  
    84  // SubscribeWithRequest will take an http.Request to set up the stream, allowing custom headers
    85  // to be specified, authentication to be configured, etc.
    86  // Deprecated: use SubscribeWithRequestAndOptions instead.
    87  func SubscribeWithRequest(lastEventID string, request *http.Request) (*Stream, error) {
    88  	return SubscribeWithRequestAndOptions(request, StreamOptionLastEventID(lastEventID))
    89  }
    90  
    91  // SubscribeWith takes a HTTP client and request providing customization over both headers and
    92  // control over the HTTP client settings (timeouts, tls, etc)
    93  // If request.Body is set, then request.GetBody should also be set so that we can reissue the request
    94  // Deprecated: use SubscribeWithRequestAndOptions instead.
    95  func SubscribeWith(lastEventID string, client *http.Client, request *http.Request) (*Stream, error) {
    96  	return SubscribeWithRequestAndOptions(request, StreamOptionHTTPClient(client),
    97  		StreamOptionLastEventID(lastEventID))
    98  }
    99  
   100  // SubscribeWithRequestAndOptions takes an initial http.Request to set up the stream - allowing
   101  // custom headers, authentication, etc. to be configured - and also takes any number of
   102  // StreamOption values to set other properties of the stream, such as timeouts or a specific
   103  // HTTP client to use.
   104  func SubscribeWithRequestAndOptions(request *http.Request, options ...StreamOption) (*Stream, error) {
   105  	defaultClient := *http.DefaultClient
   106  
   107  	configuredOptions := streamOptions{
   108  		httpClient:         &defaultClient,
   109  		initialRetry:       DefaultInitialRetry,
   110  		retryResetInterval: DefaultRetryResetInterval,
   111  	}
   112  
   113  	for _, o := range options {
   114  		if err := o.apply(&configuredOptions); err != nil {
   115  			return nil, err
   116  		}
   117  	}
   118  
   119  	stream := newStream(request, configuredOptions)
   120  
   121  	var initialRetryTimeoutCh <-chan time.Time
   122  	var lastError error
   123  	if configuredOptions.initialRetryTimeout > 0 {
   124  		initialRetryTimeoutCh = time.After(configuredOptions.initialRetryTimeout)
   125  	}
   126  	for {
   127  		r, err := stream.connect()
   128  		if err == nil {
   129  			go stream.stream(r)
   130  			return stream, nil
   131  		}
   132  		lastError = err
   133  		if configuredOptions.initialRetryTimeout == 0 {
   134  			return nil, err
   135  		}
   136  		if configuredOptions.errorHandler != nil {
   137  			result := configuredOptions.errorHandler(err)
   138  			if result.CloseNow {
   139  				return nil, err
   140  			}
   141  		}
   142  		// We never push errors to the Errors channel during initialization-- the caller would have no way to
   143  		// consume the channel, since we haven't returned a Stream instance.
   144  		delay := stream.retryDelay.NextRetryDelay(time.Now())
   145  		if configuredOptions.logger != nil {
   146  			configuredOptions.logger.Printf("Connection failed (%s), retrying in %0.4f secs\n", err, delay.Seconds())
   147  		}
   148  		nextRetryCh := time.After(delay)
   149  		select {
   150  		case <-initialRetryTimeoutCh:
   151  			if lastError == nil {
   152  				lastError = errors.New("timeout elapsed while waiting to connect")
   153  			}
   154  			return nil, lastError
   155  		case <-nextRetryCh:
   156  			continue
   157  		}
   158  	}
   159  }
   160  
   161  func newStream(request *http.Request, configuredOptions streamOptions) *Stream {
   162  	var backoff backoffStrategy
   163  	var jitter jitterStrategy
   164  	if configuredOptions.backoffMaxDelay > 0 {
   165  		backoff = newDefaultBackoff(configuredOptions.backoffMaxDelay)
   166  	}
   167  	if configuredOptions.jitterRatio > 0 {
   168  		jitter = newDefaultJitter(configuredOptions.jitterRatio, 0)
   169  	}
   170  	retryDelay := newRetryDelayStrategy(
   171  		configuredOptions.initialRetry,
   172  		configuredOptions.retryResetInterval,
   173  		backoff,
   174  		jitter,
   175  	)
   176  
   177  	stream := &Stream{
   178  		c:            configuredOptions.httpClient,
   179  		lastEventID:  configuredOptions.lastEventID,
   180  		readTimeout:  configuredOptions.readTimeout,
   181  		req:          request,
   182  		retryDelay:   retryDelay,
   183  		Events:       make(chan Event),
   184  		errorHandler: configuredOptions.errorHandler,
   185  		Logger:       configuredOptions.logger,
   186  		restarter:    make(chan struct{}, 1),
   187  		closer:       make(chan struct{}),
   188  	}
   189  
   190  	if configuredOptions.errorHandler == nil {
   191  		// The Errors channel is only used if there is no error handler.
   192  		stream.Errors = make(chan error)
   193  	}
   194  
   195  	// override checkRedirect to include headers before go1.8
   196  	// we'd prefer to skip this because it is not thread-safe and breaks golang race condition checking
   197  	setCheckRedirect(stream.c)
   198  
   199  	return stream
   200  }
   201  
   202  // Restart forces the stream to drop the currently active connection and attempt to connect again, in the
   203  // same way it would if the connection had failed. There will be a delay before reconnection, as defined
   204  // by the Stream configuration (StreamOptionInitialRetry, StreamOptionUseBackoff, etc.).
   205  //
   206  // This method is safe for concurrent access. Its behavior is asynchronous: Restart returns immediately
   207  // and the connection is restarted as soon as possible from another goroutine after that. It is possible
   208  // for additional events from the original connection to be delivered during that interval.ssible.
   209  //
   210  // If the stream has already been closed with Close, Restart has no effect.
   211  func (stream *Stream) Restart() {
   212  	// Note the non-blocking send: if there's already been a Restart call that hasn't been processed yet,
   213  	// we'll just leave that one in the channel.
   214  	select {
   215  	case stream.restarter <- struct{}{}:
   216  		break
   217  	default:
   218  		break
   219  	}
   220  }
   221  
   222  // Close closes the stream permanently. It is safe for concurrent access and can be called multiple times.
   223  func (stream *Stream) Close() {
   224  	stream.closeOnce.Do(func() {
   225  		close(stream.closer)
   226  	})
   227  }
   228  
   229  func (stream *Stream) connect() (io.ReadCloser, error) {
   230  	var err error
   231  	var resp *http.Response
   232  	stream.req.Header.Set("Cache-Control", "no-cache")
   233  	stream.req.Header.Set("Accept", "text/event-stream")
   234  	if len(stream.lastEventID) > 0 {
   235  		stream.req.Header.Set("Last-Event-ID", stream.lastEventID)
   236  	}
   237  	req := *stream.req
   238  
   239  	// All but the initial connection will need to regenerate the body
   240  	if stream.connections > 0 && req.GetBody != nil {
   241  		if req.Body, err = req.GetBody(); err != nil {
   242  			return nil, err
   243  		}
   244  	}
   245  
   246  	if resp, err = stream.c.Do(&req); err != nil {
   247  		return nil, err
   248  	}
   249  	stream.connections++
   250  	if resp.StatusCode != 200 {
   251  		message, _ := ioutil.ReadAll(resp.Body)
   252  		_ = resp.Body.Close()
   253  		err = SubscriptionError{
   254  			Code:    resp.StatusCode,
   255  			Message: string(message),
   256  		}
   257  		return nil, err
   258  	}
   259  	return resp.Body, nil
   260  }
   261  
   262  func (stream *Stream) stream(r io.ReadCloser) {
   263  	retryChan := make(chan struct{}, 1)
   264  
   265  	scheduleRetry := func() {
   266  		logger := stream.getLogger()
   267  		delay := stream.retryDelay.NextRetryDelay(time.Now())
   268  		if logger != nil {
   269  			logger.Printf("Reconnecting in %0.4f secs", delay.Seconds())
   270  		}
   271  		time.AfterFunc(delay, func() {
   272  			retryChan <- struct{}{}
   273  		})
   274  	}
   275  
   276  	reportErrorAndMaybeContinue := func(err error) bool {
   277  		if stream.errorHandler != nil {
   278  			result := stream.errorHandler(err)
   279  			if result.CloseNow {
   280  				stream.Close()
   281  				return false
   282  			}
   283  		} else if stream.Errors != nil {
   284  			stream.Errors <- err
   285  		}
   286  		return true
   287  	}
   288  
   289  NewStream:
   290  	for {
   291  		events := make(chan Event)
   292  		errs := make(chan error)
   293  
   294  		if r != nil {
   295  			dec := NewDecoderWithOptions(r, DecoderOptionReadTimeout(stream.readTimeout))
   296  			go func() {
   297  				for {
   298  					ev, err := dec.Decode()
   299  
   300  					if err != nil {
   301  						errs <- err
   302  						close(errs)
   303  						close(events)
   304  						return
   305  					}
   306  					events <- ev
   307  				}
   308  			}()
   309  		}
   310  
   311  		discardCurrentStream := func() {
   312  			if r != nil {
   313  				_ = r.Close()
   314  				r = nil
   315  				// allow the decoding goroutine to terminate
   316  				for range errs {
   317  				}
   318  				for range events {
   319  				}
   320  			}
   321  		}
   322  
   323  		for {
   324  			select {
   325  			case <-stream.restarter:
   326  				discardCurrentStream()
   327  				scheduleRetry()
   328  				continue NewStream
   329  			case err := <-errs:
   330  				if !reportErrorAndMaybeContinue(err) {
   331  					break NewStream
   332  				}
   333  				discardCurrentStream()
   334  				scheduleRetry()
   335  				continue NewStream
   336  			case ev := <-events:
   337  				pub := ev.(*publication)
   338  				if pub.Retry() > 0 {
   339  					stream.retryDelay.SetBaseDelay(time.Duration(pub.Retry()) * time.Millisecond)
   340  				}
   341  				if len(pub.Id()) > 0 {
   342  					stream.lastEventID = pub.Id()
   343  				}
   344  				stream.retryDelay.SetGoodSince(time.Now())
   345  				stream.Events <- ev
   346  			case <-stream.closer:
   347  				discardCurrentStream()
   348  				break NewStream
   349  			case <-retryChan:
   350  				var err error
   351  				r, err = stream.connect()
   352  				if err != nil {
   353  					r = nil
   354  					if !reportErrorAndMaybeContinue(err) {
   355  						break NewStream
   356  					}
   357  					scheduleRetry()
   358  				}
   359  				continue NewStream
   360  			}
   361  		}
   362  	}
   363  
   364  	if stream.Errors != nil {
   365  		close(stream.Errors)
   366  	}
   367  	close(stream.Events)
   368  }
   369  
   370  func (stream *Stream) getRetryDelayStrategy() *retryDelayStrategy { // nolint:megacheck // unused except by tests
   371  	return stream.retryDelay
   372  }
   373  
   374  // SetLogger sets the Logger field in a thread-safe manner.
   375  func (stream *Stream) SetLogger(logger Logger) {
   376  	stream.mu.Lock()
   377  	defer stream.mu.Unlock()
   378  	stream.Logger = logger
   379  }
   380  
   381  func (stream *Stream) getLogger() Logger {
   382  	stream.mu.RLock()
   383  	defer stream.mu.RUnlock()
   384  	return stream.Logger
   385  }
   386  

View as plain text