1 package eventsource 2 3 import ( 4 "net/http" 5 "time" 6 ) 7 8 type streamOptions struct { 9 initialRetry time.Duration 10 httpClient *http.Client 11 lastEventID string 12 logger Logger 13 backoffMaxDelay time.Duration 14 jitterRatio float64 15 readTimeout time.Duration 16 retryResetInterval time.Duration 17 initialRetryTimeout time.Duration 18 errorHandler StreamErrorHandler 19 } 20 21 // StreamOption is a common interface for optional configuration parameters that can be 22 // used in creating a stream. 23 type StreamOption interface { 24 apply(s *streamOptions) error 25 } 26 27 type readTimeoutOption struct { 28 timeout time.Duration 29 } 30 31 func (o readTimeoutOption) apply(s *streamOptions) error { 32 s.readTimeout = o.timeout 33 return nil 34 } 35 36 // StreamOptionReadTimeout returns an option that sets the read timeout interval for a 37 // stream when the stream is created. If the stream does not receive new data within this 38 // length of time, it will restart the connection. 39 // 40 // By default, there is no read timeout. 41 func StreamOptionReadTimeout(timeout time.Duration) StreamOption { 42 return readTimeoutOption{timeout: timeout} 43 } 44 45 type initialRetryOption struct { 46 retry time.Duration 47 } 48 49 func (o initialRetryOption) apply(s *streamOptions) error { 50 s.initialRetry = o.retry 51 return nil 52 } 53 54 // StreamOptionInitialRetry returns an option that sets the initial retry delay for a 55 // stream when the stream is created. 56 // 57 // This delay will be used the first time the stream has to be restarted; the interval will 58 // increase exponentially on subsequent reconnections. Each time, there will also be a 59 // pseudo-random jitter so that the actual value may be up to 50% less. So, for instance, 60 // if you set the initial delay to 1 second, the first reconnection will use a delay between 61 // 0.5s and 1s inclusive, and subsequent reconnections will be 1s-2s, 2s-4s, etc. 62 // 63 // The default value is DefaultInitialRetry. In a future version, this value may change, so 64 // if you need a specific value it is best to set it explicitly. 65 func StreamOptionInitialRetry(retry time.Duration) StreamOption { 66 return initialRetryOption{retry: retry} 67 } 68 69 type useBackoffOption struct { 70 maxDelay time.Duration 71 } 72 73 func (o useBackoffOption) apply(s *streamOptions) error { 74 s.backoffMaxDelay = o.maxDelay 75 return nil 76 } 77 78 // StreamOptionUseBackoff returns an option that determines whether to use an exponential 79 // backoff for reconnection delays. 80 // 81 // If the maxDelay parameter is greater than zero, backoff is enabled. The retry delay interval 82 // will be doubled (not counting jitter - see StreamOptionUseJitter) for consecutive stream 83 // reconnections, but will never be greater than maxDelay. 84 // 85 // For consistency with earlier versions, this is currently zero (disabled) by default. In 86 // a future version this may change, so if you do not want backoff behavior you should explicitly 87 // set it to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd" 88 // behavior in the case of a server outage. 89 func StreamOptionUseBackoff(maxDelay time.Duration) StreamOption { 90 return useBackoffOption{maxDelay} 91 } 92 93 type canRetryFirstConnectionOption struct { 94 initialRetryTimeout time.Duration 95 } 96 97 func (o canRetryFirstConnectionOption) apply(s *streamOptions) error { 98 s.initialRetryTimeout = o.initialRetryTimeout 99 return nil 100 } 101 102 // StreamOptionCanRetryFirstConnection returns an option that determines whether to apply 103 // retry behavior to the first connection attempt for the stream. 104 // 105 // If the timeout is nonzero, an initial connection failure when subscribing will not cause an 106 // error result, but will trigger the same retry logic as if an existing connection had failed. 107 // The stream constructor will not return until a connection has been made, or until the 108 // specified timeout expires, if the timeout is positive; if the timeout is negative, it 109 // will continue retrying indefinitely. 110 // 111 // The default value is zero: an initial connection failure will not be retried. 112 func StreamOptionCanRetryFirstConnection(initialRetryTimeout time.Duration) StreamOption { 113 return canRetryFirstConnectionOption{initialRetryTimeout} 114 } 115 116 type useJitterOption struct { 117 jitterRatio float64 118 } 119 120 func (o useJitterOption) apply(s *streamOptions) error { 121 s.jitterRatio = o.jitterRatio 122 return nil 123 } 124 125 // StreamOptionUseJitter returns an option that determines whether to use a randomized 126 // jitter for reconnection delays. 127 // 128 // If jitterRatio is greater than zero, it represents a proportion up to 1.0 (100%) that will 129 // be deducted from the retry delay interval would otherwise be used: for instance, 0.5 means 130 // that the delay will be randomly decreased by up to 50%. A value greater than 1.0 is treated 131 // as equal to 1.0. 132 // 133 // For consistency with earlier versions, this is currently disabled (zero) by default. In 134 // a future version this may change, so if you do not want jitter you should explicitly set it 135 // to zero. It is recommended to use both backoff and jitter, to avoid "thundering herd" 136 // behavior in the case of a server outage. 137 func StreamOptionUseJitter(jitterRatio float64) StreamOption { 138 return useJitterOption{jitterRatio} 139 } 140 141 type retryResetIntervalOption struct { 142 retryResetInterval time.Duration 143 } 144 145 func (o retryResetIntervalOption) apply(s *streamOptions) error { 146 s.retryResetInterval = o.retryResetInterval 147 return nil 148 } 149 150 // StreamOptionRetryResetInterval returns an option that sets the minimum amount of time that a 151 // connection must stay open before the Stream resets its backoff delay. This is only relevant if 152 // backoff is enabled (see StreamOptionUseBackoff). 153 // 154 // If a connection fails before the threshold has elapsed, the delay before reconnecting will be 155 // greater than the last delay; if it fails after the threshold, the delay will start over at the 156 // the initial minimum value. This prevents long delays from occurring on connections that are only 157 // rarely restarted. 158 // 159 // The default value is DefaultRetryResetInterval. 160 func StreamOptionRetryResetInterval(retryResetInterval time.Duration) StreamOption { 161 return retryResetIntervalOption{retryResetInterval: retryResetInterval} 162 } 163 164 type lastEventIDOption struct { 165 lastEventID string 166 } 167 168 func (o lastEventIDOption) apply(s *streamOptions) error { 169 s.lastEventID = o.lastEventID 170 return nil 171 } 172 173 // StreamOptionLastEventID returns an option that sets the initial last event ID for a 174 // stream when the stream is created. If specified, this value will be sent to the server 175 // in case it can replay missed events. 176 func StreamOptionLastEventID(lastEventID string) StreamOption { 177 return lastEventIDOption{lastEventID: lastEventID} 178 } 179 180 type httpClientOption struct { 181 client *http.Client 182 } 183 184 func (o httpClientOption) apply(s *streamOptions) error { 185 if o.client != nil { 186 s.httpClient = o.client 187 } 188 return nil 189 } 190 191 // StreamOptionHTTPClient returns an option that overrides the default HTTP client used by 192 // a stream when the stream is created. 193 func StreamOptionHTTPClient(client *http.Client) StreamOption { 194 return httpClientOption{client: client} 195 } 196 197 type loggerOption struct { 198 logger Logger 199 } 200 201 func (o loggerOption) apply(s *streamOptions) error { 202 s.logger = o.logger 203 return nil 204 } 205 206 // StreamOptionLogger returns an option that sets the logger for a stream when the stream 207 // is created (to change it later, you can use SetLogger). By default, there is no logger. 208 func StreamOptionLogger(logger Logger) StreamOption { 209 return loggerOption{logger: logger} 210 } 211 212 type streamErrorHandlerOption struct { 213 handler StreamErrorHandler 214 } 215 216 func (o streamErrorHandlerOption) apply(s *streamOptions) error { 217 s.errorHandler = o.handler 218 return nil 219 } 220 221 // StreamOptionErrorHandler returns an option that causes a Stream to call the specified function 222 // for stream errors. 223 // 224 // If non-nil, this function will be called whenever Stream encounters either a network error or an 225 // HTTP error response status. The returned value determines whether Stream should retry as usual, 226 // or immediately stop as if Close had been called. 227 // 228 // When used, this mechanism replaces the Errors channel; that channel will be pre-closed and Stream 229 // will not push any errors to it, so the caller does not need to consume the channel. 230 // 231 // Note that using a handler is the only way to have control over how Stream handles errors during 232 // the initial connection attempt, since there would be no way for the caller to consume the Errors 233 // channel before the Subscribe method has returned. 234 func StreamOptionErrorHandler(handler StreamErrorHandler) StreamOption { 235 return streamErrorHandlerOption{handler} 236 } 237 238 const ( 239 // DefaultInitialRetry is the default value for StreamOptionalInitialRetry. 240 DefaultInitialRetry = time.Second * 3 241 // DefaultRetryResetInterval is the default value for StreamOptionRetryResetInterval. 242 DefaultRetryResetInterval = time.Second * 60 243 ) 244