1 package eventsource
2
3 import (
4 "net/http"
5 "strings"
6 "sync"
7 "time"
8 )
9
10 type subscription struct {
11 channel string
12 lastEventID string
13 out chan<- eventOrComment
14 }
15
16 type eventOrComment interface{}
17
18 type outbound struct {
19 channels []string
20 eventOrComment eventOrComment
21 ackCh chan<- struct{}
22 }
23
24 type registration struct {
25 channel string
26 repository Repository
27 }
28
29 type unregistration struct {
30 channel string
31 forceDisconnect bool
32 }
33
34 type comment struct {
35 value string
36 }
37
38 type eventBatch struct {
39 events <-chan Event
40 }
41
42
43
44 type Server struct {
45 AllowCORS bool
46 ReplayAll bool
47 BufferSize int
48 Gzip bool
49 MaxConnTime time.Duration
50 Logger Logger
51 registrations chan *registration
52 unregistrations chan *unregistration
53 pub chan *outbound
54 subs chan *subscription
55 unsubs chan *subscription
56 quit chan bool
57 isClosed bool
58 isClosedMutex sync.RWMutex
59 }
60
61
62 func NewServer() *Server {
63 srv := &Server{
64 registrations: make(chan *registration),
65 unregistrations: make(chan *unregistration),
66 pub: make(chan *outbound),
67 subs: make(chan *subscription),
68 unsubs: make(chan *subscription, 2),
69 quit: make(chan bool),
70 BufferSize: 128,
71 }
72 go srv.run()
73 return srv
74 }
75
76
77 func (srv *Server) Close() {
78 srv.quit <- true
79 srv.markServerClosed()
80 }
81
82
83
84
85
86
87 func (srv *Server) Handler(channel string) http.HandlerFunc {
88 return func(w http.ResponseWriter, req *http.Request) {
89 h := w.Header()
90 h.Set("Content-Type", "text/event-stream; charset=utf-8")
91 h.Set("Cache-Control", "no-cache, no-store, must-revalidate")
92 h.Set("Connection", "keep-alive")
93 if srv.AllowCORS {
94 h.Set("Access-Control-Allow-Origin", "*")
95 }
96 useGzip := srv.Gzip && strings.Contains(req.Header.Get("Accept-Encoding"), "gzip")
97 if useGzip {
98 h.Set("Content-Encoding", "gzip")
99 }
100 w.WriteHeader(http.StatusOK)
101
102
103
104 if srv.isServerClosed() {
105 return
106 }
107
108 var maxConnTimeCh <-chan time.Time
109 if srv.MaxConnTime > 0 {
110 t := time.NewTimer(srv.MaxConnTime)
111 defer t.Stop()
112 maxConnTimeCh = t.C
113 }
114
115 eventCh := make(chan eventOrComment, srv.BufferSize)
116 sub := &subscription{
117 channel: channel,
118 lastEventID: req.Header.Get("Last-Event-ID"),
119 out: eventCh,
120 }
121 srv.subs <- sub
122 flusher := w.(http.Flusher)
123 flusher.Flush()
124 enc := NewEncoder(w, useGzip)
125
126 writeEventOrComment := func(ec eventOrComment) bool {
127 if err := enc.Encode(ec); err != nil {
128 srv.unsubs <- sub
129 if srv.Logger != nil {
130 srv.Logger.Println(err)
131 }
132 return false
133 }
134 flusher.Flush()
135 return true
136 }
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155 var readMainCh <-chan eventOrComment = eventCh
156 var readBatchCh <-chan Event
157 closedNormally := false
158 closeNotify := req.Context().Done()
159
160 ReadLoop:
161 for {
162 select {
163 case <-closeNotify:
164 break ReadLoop
165 case <-maxConnTimeCh:
166 break ReadLoop
167 case ev, ok := <-readMainCh:
168 if !ok {
169 closedNormally = true
170 break ReadLoop
171 }
172 if batch, ok := ev.(eventBatch); ok {
173 readBatchCh = batch.events
174 readMainCh = nil
175 } else if !writeEventOrComment(ev) {
176 break ReadLoop
177 }
178 case ev, ok := <-readBatchCh:
179 if !ok {
180 readBatchCh = nil
181 readMainCh = eventCh
182 } else if !writeEventOrComment(ev) {
183 break ReadLoop
184 }
185 }
186 }
187 if !closedNormally {
188 srv.unsubs <- sub
189 }
190 }
191 }
192
193
194
195
196
197
198 func (srv *Server) Register(channel string, repo Repository) {
199 srv.registrations <- ®istration{
200 channel: channel,
201 repository: repo,
202 }
203 }
204
205
206
207
208
209
210
211
212 func (srv *Server) Unregister(channel string, forceDisconnect bool) {
213 srv.unregistrations <- &unregistration{
214 channel: channel,
215 forceDisconnect: forceDisconnect,
216 }
217 }
218
219
220 func (srv *Server) Publish(channels []string, ev Event) {
221 srv.pub <- &outbound{
222 channels: channels,
223 eventOrComment: ev,
224 }
225 }
226
227
228
229
230
231
232
233
234
235 func (srv *Server) PublishWithAcknowledgment(channels []string, ev Event) <-chan struct{} {
236 ackCh := make(chan struct{}, 1)
237 srv.pub <- &outbound{
238 channels: channels,
239 eventOrComment: ev,
240 ackCh: ackCh,
241 }
242 return ackCh
243 }
244
245
246 func (srv *Server) PublishComment(channels []string, text string) {
247 srv.pub <- &outbound{
248 channels: channels,
249 eventOrComment: comment{value: text},
250 }
251 }
252
253 func (srv *Server) run() {
254
255 subs := make(map[string]map[*subscription]struct{})
256 repos := make(map[string]Repository)
257 trySend := func(sub *subscription, ec eventOrComment) {
258 if !sub.send(ec) {
259 sub.close()
260 delete(subs[sub.channel], sub)
261 }
262 }
263 for {
264 select {
265 case reg := <-srv.registrations:
266 repos[reg.channel] = reg.repository
267 case unreg := <-srv.unregistrations:
268 delete(repos, unreg.channel)
269 previousSubs := subs[unreg.channel]
270 delete(subs, unreg.channel)
271 if unreg.forceDisconnect {
272 for s := range previousSubs {
273 s.close()
274 }
275 }
276 case sub := <-srv.unsubs:
277 delete(subs[sub.channel], sub)
278 case pub := <-srv.pub:
279 for _, c := range pub.channels {
280 for s := range subs[c] {
281 trySend(s, pub.eventOrComment)
282 }
283 }
284 if pub.ackCh != nil {
285 select {
286
287
288 case pub.ackCh <- struct{}{}:
289 default:
290 }
291 }
292 case sub := <-srv.subs:
293 if _, ok := subs[sub.channel]; !ok {
294 subs[sub.channel] = make(map[*subscription]struct{})
295 }
296 subs[sub.channel][sub] = struct{}{}
297 if srv.ReplayAll || len(sub.lastEventID) > 0 {
298 repo, ok := repos[sub.channel]
299 if ok {
300 batchCh := repo.Replay(sub.channel, sub.lastEventID)
301 if batchCh != nil {
302 trySend(sub, eventBatch{events: batchCh})
303 }
304 }
305 }
306 case <-srv.quit:
307 for _, sub := range subs {
308 for s := range sub {
309 s.close()
310 }
311 }
312 return
313 }
314 }
315 }
316
317 func (srv *Server) isServerClosed() bool {
318 srv.isClosedMutex.RLock()
319 defer srv.isClosedMutex.RUnlock()
320 return srv.isClosed
321 }
322
323 func (srv *Server) markServerClosed() {
324 srv.isClosedMutex.Lock()
325 defer srv.isClosedMutex.Unlock()
326 srv.isClosed = true
327 }
328
329
330
331
332
333
334
335
336
337 func (s *subscription) send(e eventOrComment) bool {
338 if s.out == nil {
339 return true
340 }
341 select {
342 case s.out <- e:
343 return true
344 default:
345 s.close()
346 return false
347 }
348 }
349
350
351
352
353 func (s *subscription) close() {
354 close(s.out)
355 s.out = nil
356 }
357
View as plain text