...
1 package eventsource
2
3 import (
4 "log"
5 "net/http"
6 "strings"
7 "sync"
8 )
9
10 type subscription struct {
11 channel string
12 lastEventId string
13 out chan Event
14 }
15
16 type outbound struct {
17 channels []string
18 event Event
19 }
20 type registration struct {
21 channel string
22 repository Repository
23 }
24
25 type Server struct {
26 AllowCORS bool
27 ReplayAll bool
28 BufferSize int
29 Gzip bool
30 Logger *log.Logger
31 registrations chan *registration
32 pub chan *outbound
33 subs chan *subscription
34 unregister chan *subscription
35 quit chan bool
36 isClosed bool
37 isClosedMutex sync.RWMutex
38 }
39
40
41 func NewServer() *Server {
42 srv := &Server{
43 registrations: make(chan *registration),
44 pub: make(chan *outbound),
45 subs: make(chan *subscription),
46 unregister: make(chan *subscription, 2),
47 quit: make(chan bool),
48 BufferSize: 128,
49 }
50 go srv.run()
51 return srv
52 }
53
54
55 func (srv *Server) Close() {
56 srv.quit <- true
57 srv.markServerClosed()
58 }
59
60
61 func (srv *Server) Handler(channel string) http.HandlerFunc {
62 return func(w http.ResponseWriter, req *http.Request) {
63 h := w.Header()
64 h.Set("Content-Type", "text/event-stream; charset=utf-8")
65 h.Set("Cache-Control", "no-cache, no-store, must-revalidate")
66 h.Set("Connection", "keep-alive")
67 if srv.AllowCORS {
68 h.Set("Access-Control-Allow-Origin", "*")
69 }
70 useGzip := srv.Gzip && strings.Contains(req.Header.Get("Accept-Encoding"), "gzip")
71 if useGzip {
72 h.Set("Content-Encoding", "gzip")
73 }
74 w.WriteHeader(http.StatusOK)
75
76
77
78 if srv.isServerClosed() {
79 return
80 }
81
82 sub := &subscription{
83 channel: channel,
84 lastEventId: req.Header.Get("Last-Event-ID"),
85 out: make(chan Event, srv.BufferSize),
86 }
87 srv.subs <- sub
88 flusher := w.(http.Flusher)
89 notifier := w.(http.CloseNotifier)
90 flusher.Flush()
91 enc := NewEncoder(w, useGzip)
92 for {
93 select {
94 case <-notifier.CloseNotify():
95 srv.unregister <- sub
96 return
97 case ev, ok := <-sub.out:
98 if !ok {
99 return
100 }
101 if err := enc.Encode(ev); err != nil {
102 srv.unregister <- sub
103 if srv.Logger != nil {
104 srv.Logger.Println(err)
105 }
106 return
107 }
108 flusher.Flush()
109 }
110 }
111 }
112 }
113
114
115 func (srv *Server) Register(channel string, repo Repository) {
116 srv.registrations <- ®istration{
117 channel: channel,
118 repository: repo,
119 }
120 }
121
122
123 func (srv *Server) Publish(channels []string, ev Event) {
124 srv.pub <- &outbound{
125 channels: channels,
126 event: ev,
127 }
128 }
129
130 func replay(repo Repository, sub *subscription) {
131 for ev := range repo.Replay(sub.channel, sub.lastEventId) {
132 sub.out <- ev
133 }
134 }
135
136 func (srv *Server) run() {
137 subs := make(map[string]map[*subscription]struct{})
138 repos := make(map[string]Repository)
139 for {
140 select {
141 case reg := <-srv.registrations:
142 repos[reg.channel] = reg.repository
143 case sub := <-srv.unregister:
144 delete(subs[sub.channel], sub)
145 case pub := <-srv.pub:
146 for _, c := range pub.channels {
147 for s := range subs[c] {
148 select {
149 case s.out <- pub.event:
150 default:
151 srv.unregister <- s
152 close(s.out)
153 }
154
155 }
156 }
157 case sub := <-srv.subs:
158 if _, ok := subs[sub.channel]; !ok {
159 subs[sub.channel] = make(map[*subscription]struct{})
160 }
161 subs[sub.channel][sub] = struct{}{}
162 if srv.ReplayAll || len(sub.lastEventId) > 0 {
163 repo, ok := repos[sub.channel]
164 if ok {
165 go replay(repo, sub)
166 }
167 }
168 case <-srv.quit:
169 for _, sub := range subs {
170 for s := range sub {
171 close(s.out)
172 }
173 }
174 return
175 }
176 }
177 }
178
179 func (srv *Server) isServerClosed() bool {
180 srv.isClosedMutex.RLock()
181 defer srv.isClosedMutex.RUnlock()
182 return srv.isClosed
183 }
184
185 func (srv *Server) markServerClosed() {
186 srv.isClosedMutex.Lock()
187 defer srv.isClosedMutex.Unlock()
188 srv.isClosed = true
189 }
190
View as plain text