...

Source file src/github.com/launchdarkly/eventsource/stream_options.go

Documentation: github.com/launchdarkly/eventsource

     1  package eventsource
     2  
     3  import (
     4  	"net/http"
     5  	"time"
     6  )
     7  
     8  type streamOptions struct {
     9  	initialRetry        time.Duration
    10  	httpClient          *http.Client
    11  	lastEventID         string
    12  	logger              Logger
    13  	backoffMaxDelay     time.Duration
    14  	jitterRatio         float64
    15  	readTimeout         time.Duration
    16  	retryResetInterval  time.Duration
    17  	initialRetryTimeout time.Duration
    18  	errorHandler        StreamErrorHandler
    19  }
    20  
    21  // StreamOption is a common interface for optional configuration parameters that can be
    22  // used in creating a stream.
    23  type StreamOption interface {
    24  	apply(s *streamOptions) error
    25  }
    26  
    27  type readTimeoutOption struct {
    28  	timeout time.Duration
    29  }
    30  
    31  func (o readTimeoutOption) apply(s *streamOptions) error {
    32  	s.readTimeout = o.timeout
    33  	return nil
    34  }
    35  
    36  // StreamOptionReadTimeout returns an option that sets the read timeout interval for a
    37  // stream when the stream is created. If the stream does not receive new data within this
    38  // length of time, it will restart the connection.
    39  //
    40  // By default, there is no read timeout.
    41  func StreamOptionReadTimeout(timeout time.Duration) StreamOption {
    42  	return readTimeoutOption{timeout: timeout}
    43  }
    44  
    45  type initialRetryOption struct {
    46  	retry time.Duration
    47  }
    48  
    49  func (o initialRetryOption) apply(s *streamOptions) error {
    50  	s.initialRetry = o.retry
    51  	return nil
    52  }
    53  
    54  // StreamOptionInitialRetry returns an option that sets the initial retry delay for a
    55  // stream when the stream is created.
    56  //
    57  // This delay will be used the first time the stream has to be restarted; the interval will
    58  // increase exponentially on subsequent reconnections. Each time, there will also be a
    59  // pseudo-random jitter so that the actual value may be up to 50% less. So, for instance,
    60  // if you set the initial delay to 1 second, the first reconnection will use a delay between
    61  // 0.5s and 1s inclusive, and subsequent reconnections will be 1s-2s, 2s-4s, etc.
    62  //
    63  // The default value is DefaultInitialRetry. In a future version, this value may change, so
    64  // if you need a specific value it is best to set it explicitly.
    65  func StreamOptionInitialRetry(retry time.Duration) StreamOption {
    66  	return initialRetryOption{retry: retry}
    67  }
    68  
    69  type useBackoffOption struct {
    70  	maxDelay time.Duration
    71  }
    72  
    73  func (o useBackoffOption) apply(s *streamOptions) error {
    74  	s.backoffMaxDelay = o.maxDelay
    75  	return nil
    76  }
    77  
    78  // StreamOptionUseBackoff returns an option that determines whether to use an exponential
    79  // backoff for reconnection delays.
    80  //
    81  // If the maxDelay parameter is greater than zero, backoff is enabled. The retry delay interval
    82  // will be doubled (not counting jitter - see StreamOptionUseJitter) for consecutive stream
    83  // reconnections, but will never be greater than maxDelay.
    84  //
    85  // For consistency with earlier versions, this is currently zero (disabled) by default. In
    86  // a future version this may change, so if you do not want backoff behavior you should explicitly
    87  // set it to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd"
    88  // behavior in the case of a server outage.
    89  func StreamOptionUseBackoff(maxDelay time.Duration) StreamOption {
    90  	return useBackoffOption{maxDelay}
    91  }
    92  
    93  type canRetryFirstConnectionOption struct {
    94  	initialRetryTimeout time.Duration
    95  }
    96  
    97  func (o canRetryFirstConnectionOption) apply(s *streamOptions) error {
    98  	s.initialRetryTimeout = o.initialRetryTimeout
    99  	return nil
   100  }
   101  
   102  // StreamOptionCanRetryFirstConnection returns an option that determines whether to apply
   103  // retry behavior to the first connection attempt for the stream.
   104  //
   105  // If the timeout is nonzero, an initial connection failure when subscribing will not cause an
   106  // error result, but will trigger the same retry logic as if an existing connection had failed.
   107  // The stream constructor will not return until a connection has been made, or until the
   108  // specified timeout expires, if the timeout is positive; if the timeout is negative, it
   109  // will continue retrying indefinitely.
   110  //
   111  // The default value is zero: an initial connection failure will not be retried.
   112  func StreamOptionCanRetryFirstConnection(initialRetryTimeout time.Duration) StreamOption {
   113  	return canRetryFirstConnectionOption{initialRetryTimeout}
   114  }
   115  
   116  type useJitterOption struct {
   117  	jitterRatio float64
   118  }
   119  
   120  func (o useJitterOption) apply(s *streamOptions) error {
   121  	s.jitterRatio = o.jitterRatio
   122  	return nil
   123  }
   124  
   125  // StreamOptionUseJitter returns an option that determines whether to use a randomized
   126  // jitter for reconnection delays.
   127  //
   128  // If jitterRatio is greater than zero, it represents a proportion up to 1.0 (100%) that will
   129  // be deducted from the retry delay interval would otherwise be used: for instance, 0.5 means
   130  // that the delay will be randomly decreased by up to 50%. A value greater than 1.0 is treated
   131  // as equal to 1.0.
   132  //
   133  // For consistency with earlier versions, this is currently disabled (zero) by default. In
   134  // a future version this may change, so if you do not want jitter you should explicitly set it
   135  // to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd"
   136  // behavior in the case of a server outage.
   137  func StreamOptionUseJitter(jitterRatio float64) StreamOption {
   138  	return useJitterOption{jitterRatio}
   139  }
   140  
   141  type retryResetIntervalOption struct {
   142  	retryResetInterval time.Duration
   143  }
   144  
   145  func (o retryResetIntervalOption) apply(s *streamOptions) error {
   146  	s.retryResetInterval = o.retryResetInterval
   147  	return nil
   148  }
   149  
   150  // StreamOptionRetryResetInterval returns an option that sets the minimum amount of time that a
   151  // connection must stay open before the Stream resets its backoff delay. This is only relevant if
   152  // backoff is enabled (see StreamOptionUseBackoff).
   153  //
   154  // If a connection fails before the threshold has elapsed, the delay before reconnecting will be
   155  // greater than the last delay; if it fails after the threshold, the delay will start over at the
   156  // the initial minimum value. This prevents long delays from occurring on connections that are only
   157  // rarely restarted.
   158  //
   159  // The default value is DefaultRetryResetInterval.
   160  func StreamOptionRetryResetInterval(retryResetInterval time.Duration) StreamOption {
   161  	return retryResetIntervalOption{retryResetInterval: retryResetInterval}
   162  }
   163  
   164  type lastEventIDOption struct {
   165  	lastEventID string
   166  }
   167  
   168  func (o lastEventIDOption) apply(s *streamOptions) error {
   169  	s.lastEventID = o.lastEventID
   170  	return nil
   171  }
   172  
   173  // StreamOptionLastEventID returns an option that sets the initial last event ID for a
   174  // stream when the stream is created. If specified, this value will be sent to the server
   175  // in case it can replay missed events.
   176  func StreamOptionLastEventID(lastEventID string) StreamOption {
   177  	return lastEventIDOption{lastEventID: lastEventID}
   178  }
   179  
   180  type httpClientOption struct {
   181  	client *http.Client
   182  }
   183  
   184  func (o httpClientOption) apply(s *streamOptions) error {
   185  	if o.client != nil {
   186  		s.httpClient = o.client
   187  	}
   188  	return nil
   189  }
   190  
   191  // StreamOptionHTTPClient returns an option that overrides the default HTTP client used by
   192  // a stream when the stream is created.
   193  func StreamOptionHTTPClient(client *http.Client) StreamOption {
   194  	return httpClientOption{client: client}
   195  }
   196  
   197  type loggerOption struct {
   198  	logger Logger
   199  }
   200  
   201  func (o loggerOption) apply(s *streamOptions) error {
   202  	s.logger = o.logger
   203  	return nil
   204  }
   205  
   206  // StreamOptionLogger returns an option that sets the logger for a stream when the stream
   207  // is created (to change it later, you can use SetLogger). By default, there is no logger.
   208  func StreamOptionLogger(logger Logger) StreamOption {
   209  	return loggerOption{logger: logger}
   210  }
   211  
   212  type streamErrorHandlerOption struct {
   213  	handler StreamErrorHandler
   214  }
   215  
   216  func (o streamErrorHandlerOption) apply(s *streamOptions) error {
   217  	s.errorHandler = o.handler
   218  	return nil
   219  }
   220  
   221  // StreamOptionErrorHandler returns an option that causes a Stream to call the specified function
   222  // for stream errors.
   223  //
   224  // If non-nil, this function will be called whenever Stream encounters either a network error or an
   225  // HTTP error response status. The returned value determines whether Stream should retry as usual,
   226  // or immediately stop as if Close had been called.
   227  //
   228  // When used, this mechanism replaces the Errors channel; that channel will be pre-closed and Stream
   229  // will not push any errors to it, so the caller does not need to consume the channel.
   230  //
   231  // Note that using a handler is the only way to have control over how Stream handles errors during
   232  // the initial connection attempt, since there would be no way for the caller to consume the Errors
   233  // channel before the Subscribe method has returned.
   234  func StreamOptionErrorHandler(handler StreamErrorHandler) StreamOption {
   235  	return streamErrorHandlerOption{handler}
   236  }
   237  
   238  const (
   239  	// DefaultInitialRetry is the default value for StreamOptionalInitialRetry.
   240  	DefaultInitialRetry = time.Second * 3
   241  	// DefaultRetryResetInterval is the default value for StreamOptionRetryResetInterval.
   242  	DefaultRetryResetInterval = time.Second * 60
   243  )
   244  

View as plain text