...
1 package notifications
2
3 import (
4 "container/list"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/sirupsen/logrus"
10 )
11
12
13
14
15
16
17
18
19
20 type Broadcaster struct {
21 sinks []Sink
22 events chan []Event
23 closed chan chan struct{}
24 }
25
26
27
28
29
30
31 func NewBroadcaster(sinks ...Sink) *Broadcaster {
32 b := Broadcaster{
33 sinks: sinks,
34 events: make(chan []Event),
35 closed: make(chan chan struct{}),
36 }
37
38
39 go b.run()
40
41 return &b
42 }
43
44
45
46
47
48 func (b *Broadcaster) Write(events ...Event) error {
49 select {
50 case b.events <- events:
51 case <-b.closed:
52 return ErrSinkClosed
53 }
54 return nil
55 }
56
57
58
59 func (b *Broadcaster) Close() error {
60 logrus.Infof("broadcaster: closing")
61 select {
62 case <-b.closed:
63
64 return fmt.Errorf("broadcaster: already closed")
65 default:
66
67 closed := make(chan struct{})
68 b.closed <- closed
69 close(b.closed)
70 <-closed
71 return nil
72 }
73 }
74
75
76
77
78 func (b *Broadcaster) run() {
79 for {
80 select {
81 case block := <-b.events:
82 for _, sink := range b.sinks {
83 if err := sink.Write(block...); err != nil {
84 logrus.Errorf("broadcaster: error writing events to %v, these events will be lost: %v", sink, err)
85 }
86 }
87 case closing := <-b.closed:
88
89
90 for _, sink := range b.sinks {
91 if err := sink.Close(); err != nil {
92 logrus.Errorf("broadcaster: error closing sink %v: %v", sink, err)
93 }
94 }
95 closing <- struct{}{}
96
97 logrus.Debugf("broadcaster: closed")
98 return
99 }
100 }
101 }
102
103
104
105
106 type eventQueue struct {
107 sink Sink
108 events *list.List
109 listeners []eventQueueListener
110 cond *sync.Cond
111 mu sync.Mutex
112 closed bool
113 }
114
115
116 type eventQueueListener interface {
117 ingress(events ...Event)
118 egress(events ...Event)
119 }
120
121
122
123 func newEventQueue(sink Sink, listeners ...eventQueueListener) *eventQueue {
124 eq := eventQueue{
125 sink: sink,
126 events: list.New(),
127 listeners: listeners,
128 }
129
130 eq.cond = sync.NewCond(&eq.mu)
131 go eq.run()
132 return &eq
133 }
134
135
136
137 func (eq *eventQueue) Write(events ...Event) error {
138 eq.mu.Lock()
139 defer eq.mu.Unlock()
140
141 if eq.closed {
142 return ErrSinkClosed
143 }
144
145 for _, listener := range eq.listeners {
146 listener.ingress(events...)
147 }
148 eq.events.PushBack(events)
149 eq.cond.Signal()
150
151 return nil
152 }
153
154
155 func (eq *eventQueue) Close() error {
156 eq.mu.Lock()
157 defer eq.mu.Unlock()
158
159 if eq.closed {
160 return fmt.Errorf("eventqueue: already closed")
161 }
162
163
164 eq.closed = true
165 eq.cond.Signal()
166 eq.cond.Wait()
167
168 return eq.sink.Close()
169 }
170
171
172 func (eq *eventQueue) run() {
173 for {
174 block := eq.next()
175
176 if block == nil {
177 return
178 }
179
180 if err := eq.sink.Write(block...); err != nil {
181 logrus.Warnf("eventqueue: error writing events to %v, these events will be lost: %v", eq.sink, err)
182 }
183
184 for _, listener := range eq.listeners {
185 listener.egress(block...)
186 }
187 }
188 }
189
190
191
192
193 func (eq *eventQueue) next() []Event {
194 eq.mu.Lock()
195 defer eq.mu.Unlock()
196
197 for eq.events.Len() < 1 {
198 if eq.closed {
199 eq.cond.Broadcast()
200 return nil
201 }
202
203 eq.cond.Wait()
204 }
205
206 front := eq.events.Front()
207 block := front.Value.([]Event)
208 eq.events.Remove(front)
209
210 return block
211 }
212
213
214
215 type ignoredSink struct {
216 Sink
217 ignoreMediaTypes map[string]bool
218 ignoreActions map[string]bool
219 }
220
221 func newIgnoredSink(sink Sink, ignored []string, ignoreActions []string) Sink {
222 if len(ignored) == 0 {
223 return sink
224 }
225
226 ignoredMap := make(map[string]bool)
227 for _, mediaType := range ignored {
228 ignoredMap[mediaType] = true
229 }
230
231 ignoredActionsMap := make(map[string]bool)
232 for _, action := range ignoreActions {
233 ignoredActionsMap[action] = true
234 }
235
236 return &ignoredSink{
237 Sink: sink,
238 ignoreMediaTypes: ignoredMap,
239 ignoreActions: ignoredActionsMap,
240 }
241 }
242
243
244
245 func (imts *ignoredSink) Write(events ...Event) error {
246 var kept []Event
247 for _, e := range events {
248 if !imts.ignoreMediaTypes[e.Target.MediaType] {
249 kept = append(kept, e)
250 }
251 }
252 if len(kept) == 0 {
253 return nil
254 }
255
256 var results []Event
257 for _, e := range kept {
258 if !imts.ignoreActions[e.Action] {
259 results = append(results, e)
260 }
261 }
262 if len(results) == 0 {
263 return nil
264 }
265 return imts.Sink.Write(results...)
266 }
267
268
269
270
271
272
273 type retryingSink struct {
274 mu sync.Mutex
275 sink Sink
276 closed bool
277
278
279 failures struct {
280 threshold int
281 recent int
282 last time.Time
283 backoff time.Duration
284 }
285 }
286
287
288
289
290
291
292
293
294 func newRetryingSink(sink Sink, threshold int, backoff time.Duration) *retryingSink {
295 rs := &retryingSink{
296 sink: sink,
297 }
298 rs.failures.threshold = threshold
299 rs.failures.backoff = backoff
300
301 return rs
302 }
303
304
305
306 func (rs *retryingSink) Write(events ...Event) error {
307 rs.mu.Lock()
308 defer rs.mu.Unlock()
309
310 retry:
311
312 if rs.closed {
313 return ErrSinkClosed
314 }
315
316 if !rs.proceed() {
317 logrus.Warnf("%v encountered too many errors, backing off", rs.sink)
318 rs.wait(rs.failures.backoff)
319 goto retry
320 }
321
322 if err := rs.write(events...); err != nil {
323 if err == ErrSinkClosed {
324
325 return err
326 }
327
328 logrus.Errorf("retryingsink: error writing events: %v, retrying", err)
329 goto retry
330 }
331
332 return nil
333 }
334
335
336 func (rs *retryingSink) Close() error {
337 rs.mu.Lock()
338 defer rs.mu.Unlock()
339
340 if rs.closed {
341 return fmt.Errorf("retryingsink: already closed")
342 }
343
344 rs.closed = true
345 return rs.sink.Close()
346 }
347
348
349
350 func (rs *retryingSink) write(events ...Event) error {
351 if err := rs.sink.Write(events...); err != nil {
352 rs.failure()
353 return err
354 }
355
356 rs.reset()
357 return nil
358 }
359
360
361
362 func (rs *retryingSink) wait(backoff time.Duration) {
363 rs.mu.Unlock()
364 defer rs.mu.Lock()
365
366
367 time.Sleep(backoff)
368 }
369
370
371 func (rs *retryingSink) reset() {
372 rs.failures.recent = 0
373 rs.failures.last = time.Time{}
374 }
375
376
377 func (rs *retryingSink) failure() {
378 rs.failures.recent++
379 rs.failures.last = time.Now().UTC()
380 }
381
382
383
384 func (rs *retryingSink) proceed() bool {
385 return rs.failures.recent < rs.failures.threshold ||
386 time.Now().UTC().After(rs.failures.last.Add(rs.failures.backoff))
387 }
388
View as plain text