...
1 package eventsource
2
3 import (
4 "errors"
5 "fmt"
6 "io"
7 "io/ioutil"
8 "log"
9 "net/http"
10 "sync"
11 "time"
12 )
13
14
15
16
17 type Stream struct {
18 c *http.Client
19 req *http.Request
20 lastEventId string
21 retry time.Duration
22
23 Events chan Event
24
25
26
27
28 Errors chan error
29
30 Logger *log.Logger
31
32 isClosed bool
33
34 isClosedMutex sync.RWMutex
35 }
36
37 type SubscriptionError struct {
38 Code int
39 Message string
40 }
41
42 func (e SubscriptionError) Error() string {
43 return fmt.Sprintf("%d: %s", e.Code, e.Message)
44 }
45
46
47
48 func Subscribe(url, lastEventId string) (*Stream, error) {
49 req, err := http.NewRequest("GET", url, nil)
50 if err != nil {
51 return nil, err
52 }
53 return SubscribeWithRequest(lastEventId, req)
54 }
55
56
57
58 func SubscribeWithRequest(lastEventId string, request *http.Request) (*Stream, error) {
59 return SubscribeWith(lastEventId, http.DefaultClient, request)
60 }
61
62
63
64 func SubscribeWith(lastEventId string, client *http.Client, request *http.Request) (*Stream, error) {
65 stream := &Stream{
66 c: client,
67 req: request,
68 lastEventId: lastEventId,
69 retry: time.Millisecond * 3000,
70 Events: make(chan Event),
71 Errors: make(chan error),
72 }
73 stream.c.CheckRedirect = checkRedirect
74
75 r, err := stream.connect()
76 if err != nil {
77 return nil, err
78 }
79 go stream.stream(r)
80 return stream, nil
81 }
82
83
84 func (stream *Stream) Close() {
85 if stream.isStreamClosed() {
86 return
87 }
88
89 stream.markStreamClosed()
90 close(stream.Errors)
91 close(stream.Events)
92 }
93
94 func (stream *Stream) isStreamClosed() bool {
95 stream.isClosedMutex.RLock()
96 defer stream.isClosedMutex.RUnlock()
97 return stream.isClosed
98 }
99
100 func (stream *Stream) markStreamClosed() {
101 stream.isClosedMutex.Lock()
102 defer stream.isClosedMutex.Unlock()
103 stream.isClosed = true
104 }
105
106
107
108 func checkRedirect(req *http.Request, via []*http.Request) error {
109 if len(via) >= 10 {
110 return errors.New("stopped after 10 redirects")
111 }
112 for k, vv := range via[0].Header {
113 for _, v := range vv {
114 req.Header.Add(k, v)
115 }
116 }
117 return nil
118 }
119
120 func (stream *Stream) connect() (r io.ReadCloser, err error) {
121 var resp *http.Response
122 stream.req.Header.Set("Cache-Control", "no-cache")
123 stream.req.Header.Set("Accept", "text/event-stream")
124 if len(stream.lastEventId) > 0 {
125 stream.req.Header.Set("Last-Event-ID", stream.lastEventId)
126 }
127 if resp, err = stream.c.Do(stream.req); err != nil {
128 return
129 }
130 if resp.StatusCode != 200 {
131 message, _ := ioutil.ReadAll(resp.Body)
132 err = SubscriptionError{
133 Code: resp.StatusCode,
134 Message: string(message),
135 }
136 }
137 r = resp.Body
138 return
139 }
140
141 func (stream *Stream) stream(r io.ReadCloser) {
142 defer r.Close()
143
144
145 stream.receiveEvents(r)
146
147
148 stream.retryRestartStream()
149 }
150
151 func (stream *Stream) receiveEvents(r io.ReadCloser) {
152 dec := NewDecoder(r)
153
154 for {
155 ev, err := dec.Decode()
156 if stream.isStreamClosed() {
157 return
158 }
159 if err != nil {
160 stream.Errors <- err
161 return
162 }
163
164 pub := ev.(*publication)
165 if pub.Retry() > 0 {
166 stream.retry = time.Duration(pub.Retry()) * time.Millisecond
167 }
168 if len(pub.Id()) > 0 {
169 stream.lastEventId = pub.Id()
170 }
171 stream.Events <- ev
172 }
173 }
174
175 func (stream *Stream) retryRestartStream() {
176 backoff := stream.retry
177 for {
178 if stream.Logger != nil {
179 stream.Logger.Printf("Reconnecting in %0.4f secs\n", backoff.Seconds())
180 }
181 time.Sleep(backoff)
182 if stream.isStreamClosed() {
183 return
184 }
185
186
187
188 r, err := stream.connect()
189 if err == nil {
190 go stream.stream(r)
191 return
192 }
193 stream.Errors <- err
194 backoff *= 2
195 }
196 }
197
View as plain text