...
1 package events
2
3 import (
4 "fmt"
5 "sync"
6
7 "github.com/sirupsen/logrus"
8 )
9
10
11
12
13 type Broadcaster struct {
14 sinks []Sink
15 events chan Event
16 adds chan configureRequest
17 removes chan configureRequest
18
19 shutdown chan struct{}
20 closed chan struct{}
21 once sync.Once
22 }
23
24
25
26
27
28 func NewBroadcaster(sinks ...Sink) *Broadcaster {
29 b := Broadcaster{
30 sinks: sinks,
31 events: make(chan Event),
32 adds: make(chan configureRequest),
33 removes: make(chan configureRequest),
34 shutdown: make(chan struct{}),
35 closed: make(chan struct{}),
36 }
37
38
39 go b.run()
40
41 return &b
42 }
43
44
45
46
47 func (b *Broadcaster) Write(event Event) error {
48 select {
49 case b.events <- event:
50 case <-b.closed:
51 return ErrSinkClosed
52 }
53 return nil
54 }
55
56
57
58
59
60 func (b *Broadcaster) Add(sink Sink) error {
61 return b.configure(b.adds, sink)
62 }
63
64
65 func (b *Broadcaster) Remove(sink Sink) error {
66 return b.configure(b.removes, sink)
67 }
68
69 type configureRequest struct {
70 sink Sink
71 response chan error
72 }
73
74 func (b *Broadcaster) configure(ch chan configureRequest, sink Sink) error {
75 response := make(chan error, 1)
76
77 for {
78 select {
79 case ch <- configureRequest{
80 sink: sink,
81 response: response}:
82 ch = nil
83 case err := <-response:
84 return err
85 case <-b.closed:
86 return ErrSinkClosed
87 }
88 }
89 }
90
91
92
93 func (b *Broadcaster) Close() error {
94 b.once.Do(func() {
95 close(b.shutdown)
96 })
97
98 <-b.closed
99 return nil
100 }
101
102
103
104
105 func (b *Broadcaster) run() {
106 defer close(b.closed)
107 remove := func(target Sink) {
108 for i, sink := range b.sinks {
109 if sink == target {
110 b.sinks = append(b.sinks[:i], b.sinks[i+1:]...)
111 break
112 }
113 }
114 }
115
116 for {
117 select {
118 case event := <-b.events:
119 for _, sink := range b.sinks {
120 if err := sink.Write(event); err != nil {
121 if err == ErrSinkClosed {
122
123 remove(sink)
124 continue
125 }
126 logrus.WithField("event", event).WithField("events.sink", sink).WithError(err).
127 Errorf("broadcaster: dropping event")
128 }
129 }
130 case request := <-b.adds:
131
132
133
134 var found bool
135 for _, sink := range b.sinks {
136 if request.sink == sink {
137 found = true
138 break
139 }
140 }
141
142 if !found {
143 b.sinks = append(b.sinks, request.sink)
144 }
145
146 request.response <- nil
147 case request := <-b.removes:
148 remove(request.sink)
149 request.response <- nil
150 case <-b.shutdown:
151
152 for _, sink := range b.sinks {
153 if err := sink.Close(); err != nil && err != ErrSinkClosed {
154 logrus.WithField("events.sink", sink).WithError(err).
155 Errorf("broadcaster: closing sink failed")
156 }
157 }
158 return
159 }
160 }
161 }
162
163 func (b *Broadcaster) String() string {
164
165
166
167 b2 := map[string]interface{}{
168 "sinks": b.sinks,
169 "events": b.events,
170 "adds": b.adds,
171 "removes": b.removes,
172
173 "shutdown": b.shutdown,
174 "closed": b.closed,
175 }
176
177 return fmt.Sprint(b2)
178 }
179
View as plain text