1 package eventsource
2
3 import (
4 "errors"
5 "fmt"
6 "io"
7 "io/ioutil"
8 "net/http"
9 "sync"
10 "time"
11 )
12
13
14
15
16 type Stream struct {
17 c *http.Client
18 req *http.Request
19 lastEventID string
20 readTimeout time.Duration
21 retryDelay *retryDelayStrategy
22
23 Events chan Event
24
25
26
27
28
29
30
31
32
33 Errors chan error
34 errorHandler StreamErrorHandler
35
36
37
38
39 Logger Logger
40 restarter chan struct{}
41 closer chan struct{}
42 closeOnce sync.Once
43 mu sync.RWMutex
44 connections int
45 }
46
47 var (
48
49
50 ErrReadTimeout = errors.New("Read timeout on stream")
51 )
52
53
54 type SubscriptionError struct {
55 Code int
56 Message string
57 }
58
59 func (e SubscriptionError) Error() string {
60 s := fmt.Sprintf("error %d", e.Code)
61 if e.Message != "" {
62 s = s + ": " + e.Message
63 }
64 return s
65 }
66
67
68
69
70 func Subscribe(url, lastEventID string) (*Stream, error) {
71 return SubscribeWithURL(url, StreamOptionLastEventID(lastEventID))
72 }
73
74
75
76 func SubscribeWithURL(url string, options ...StreamOption) (*Stream, error) {
77 req, err := http.NewRequest("GET", url, nil)
78 if err != nil {
79 return nil, err
80 }
81 return SubscribeWithRequestAndOptions(req, options...)
82 }
83
84
85
86
87 func SubscribeWithRequest(lastEventID string, request *http.Request) (*Stream, error) {
88 return SubscribeWithRequestAndOptions(request, StreamOptionLastEventID(lastEventID))
89 }
90
91
92
93
94
95 func SubscribeWith(lastEventID string, client *http.Client, request *http.Request) (*Stream, error) {
96 return SubscribeWithRequestAndOptions(request, StreamOptionHTTPClient(client),
97 StreamOptionLastEventID(lastEventID))
98 }
99
100
101
102
103
104 func SubscribeWithRequestAndOptions(request *http.Request, options ...StreamOption) (*Stream, error) {
105 defaultClient := *http.DefaultClient
106
107 configuredOptions := streamOptions{
108 httpClient: &defaultClient,
109 initialRetry: DefaultInitialRetry,
110 retryResetInterval: DefaultRetryResetInterval,
111 }
112
113 for _, o := range options {
114 if err := o.apply(&configuredOptions); err != nil {
115 return nil, err
116 }
117 }
118
119 stream := newStream(request, configuredOptions)
120
121 var initialRetryTimeoutCh <-chan time.Time
122 var lastError error
123 if configuredOptions.initialRetryTimeout > 0 {
124 initialRetryTimeoutCh = time.After(configuredOptions.initialRetryTimeout)
125 }
126 for {
127 r, err := stream.connect()
128 if err == nil {
129 go stream.stream(r)
130 return stream, nil
131 }
132 lastError = err
133 if configuredOptions.initialRetryTimeout == 0 {
134 return nil, err
135 }
136 if configuredOptions.errorHandler != nil {
137 result := configuredOptions.errorHandler(err)
138 if result.CloseNow {
139 return nil, err
140 }
141 }
142
143
144 delay := stream.retryDelay.NextRetryDelay(time.Now())
145 if configuredOptions.logger != nil {
146 configuredOptions.logger.Printf("Connection failed (%s), retrying in %0.4f secs\n", err, delay.Seconds())
147 }
148 nextRetryCh := time.After(delay)
149 select {
150 case <-initialRetryTimeoutCh:
151 if lastError == nil {
152 lastError = errors.New("timeout elapsed while waiting to connect")
153 }
154 return nil, lastError
155 case <-nextRetryCh:
156 continue
157 }
158 }
159 }
160
161 func newStream(request *http.Request, configuredOptions streamOptions) *Stream {
162 var backoff backoffStrategy
163 var jitter jitterStrategy
164 if configuredOptions.backoffMaxDelay > 0 {
165 backoff = newDefaultBackoff(configuredOptions.backoffMaxDelay)
166 }
167 if configuredOptions.jitterRatio > 0 {
168 jitter = newDefaultJitter(configuredOptions.jitterRatio, 0)
169 }
170 retryDelay := newRetryDelayStrategy(
171 configuredOptions.initialRetry,
172 configuredOptions.retryResetInterval,
173 backoff,
174 jitter,
175 )
176
177 stream := &Stream{
178 c: configuredOptions.httpClient,
179 lastEventID: configuredOptions.lastEventID,
180 readTimeout: configuredOptions.readTimeout,
181 req: request,
182 retryDelay: retryDelay,
183 Events: make(chan Event),
184 errorHandler: configuredOptions.errorHandler,
185 Logger: configuredOptions.logger,
186 restarter: make(chan struct{}, 1),
187 closer: make(chan struct{}),
188 }
189
190 if configuredOptions.errorHandler == nil {
191
192 stream.Errors = make(chan error)
193 }
194
195
196
197 setCheckRedirect(stream.c)
198
199 return stream
200 }
201
202
203
204
205
206
207
208
209
210
211 func (stream *Stream) Restart() {
212
213
214 select {
215 case stream.restarter <- struct{}{}:
216 break
217 default:
218 break
219 }
220 }
221
222
223 func (stream *Stream) Close() {
224 stream.closeOnce.Do(func() {
225 close(stream.closer)
226 })
227 }
228
229 func (stream *Stream) connect() (io.ReadCloser, error) {
230 var err error
231 var resp *http.Response
232 stream.req.Header.Set("Cache-Control", "no-cache")
233 stream.req.Header.Set("Accept", "text/event-stream")
234 if len(stream.lastEventID) > 0 {
235 stream.req.Header.Set("Last-Event-ID", stream.lastEventID)
236 }
237 req := *stream.req
238
239
240 if stream.connections > 0 && req.GetBody != nil {
241 if req.Body, err = req.GetBody(); err != nil {
242 return nil, err
243 }
244 }
245
246 if resp, err = stream.c.Do(&req); err != nil {
247 return nil, err
248 }
249 stream.connections++
250 if resp.StatusCode != 200 {
251 message, _ := ioutil.ReadAll(resp.Body)
252 _ = resp.Body.Close()
253 err = SubscriptionError{
254 Code: resp.StatusCode,
255 Message: string(message),
256 }
257 return nil, err
258 }
259 return resp.Body, nil
260 }
261
262 func (stream *Stream) stream(r io.ReadCloser) {
263 retryChan := make(chan struct{}, 1)
264
265 scheduleRetry := func() {
266 logger := stream.getLogger()
267 delay := stream.retryDelay.NextRetryDelay(time.Now())
268 if logger != nil {
269 logger.Printf("Reconnecting in %0.4f secs", delay.Seconds())
270 }
271 time.AfterFunc(delay, func() {
272 retryChan <- struct{}{}
273 })
274 }
275
276 reportErrorAndMaybeContinue := func(err error) bool {
277 if stream.errorHandler != nil {
278 result := stream.errorHandler(err)
279 if result.CloseNow {
280 stream.Close()
281 return false
282 }
283 } else if stream.Errors != nil {
284 stream.Errors <- err
285 }
286 return true
287 }
288
289 NewStream:
290 for {
291 events := make(chan Event)
292 errs := make(chan error)
293
294 if r != nil {
295 dec := NewDecoderWithOptions(r, DecoderOptionReadTimeout(stream.readTimeout))
296 go func() {
297 for {
298 ev, err := dec.Decode()
299
300 if err != nil {
301 errs <- err
302 close(errs)
303 close(events)
304 return
305 }
306 events <- ev
307 }
308 }()
309 }
310
311 discardCurrentStream := func() {
312 if r != nil {
313 _ = r.Close()
314 r = nil
315
316 for range errs {
317 }
318 for range events {
319 }
320 }
321 }
322
323 for {
324 select {
325 case <-stream.restarter:
326 discardCurrentStream()
327 scheduleRetry()
328 continue NewStream
329 case err := <-errs:
330 if !reportErrorAndMaybeContinue(err) {
331 break NewStream
332 }
333 discardCurrentStream()
334 scheduleRetry()
335 continue NewStream
336 case ev := <-events:
337 pub := ev.(*publication)
338 if pub.Retry() > 0 {
339 stream.retryDelay.SetBaseDelay(time.Duration(pub.Retry()) * time.Millisecond)
340 }
341 if len(pub.Id()) > 0 {
342 stream.lastEventID = pub.Id()
343 }
344 stream.retryDelay.SetGoodSince(time.Now())
345 stream.Events <- ev
346 case <-stream.closer:
347 discardCurrentStream()
348 break NewStream
349 case <-retryChan:
350 var err error
351 r, err = stream.connect()
352 if err != nil {
353 r = nil
354 if !reportErrorAndMaybeContinue(err) {
355 break NewStream
356 }
357 scheduleRetry()
358 }
359 continue NewStream
360 }
361 }
362 }
363
364 if stream.Errors != nil {
365 close(stream.Errors)
366 }
367 close(stream.Events)
368 }
369
370 func (stream *Stream) getRetryDelayStrategy() *retryDelayStrategy {
371 return stream.retryDelay
372 }
373
374
375 func (stream *Stream) SetLogger(logger Logger) {
376 stream.mu.Lock()
377 defer stream.mu.Unlock()
378 stream.Logger = logger
379 }
380
381 func (stream *Stream) getLogger() Logger {
382 stream.mu.RLock()
383 defer stream.mu.RUnlock()
384 return stream.Logger
385 }
386
View as plain text