...
1 package events
2
3 import (
4 "fmt"
5 "math/rand"
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "github.com/sirupsen/logrus"
11 )
12
13
14
15
16
17
18 type RetryingSink struct {
19 sink Sink
20 strategy RetryStrategy
21 closed chan struct{}
22 once sync.Once
23 }
24
25
26
27
28 func NewRetryingSink(sink Sink, strategy RetryStrategy) *RetryingSink {
29 rs := &RetryingSink{
30 sink: sink,
31 strategy: strategy,
32 closed: make(chan struct{}),
33 }
34
35 return rs
36 }
37
38
39
40 func (rs *RetryingSink) Write(event Event) error {
41 logger := logrus.WithField("event", event)
42
43 retry:
44 select {
45 case <-rs.closed:
46 return ErrSinkClosed
47 default:
48 }
49
50 if backoff := rs.strategy.Proceed(event); backoff > 0 {
51 select {
52 case <-time.After(backoff):
53
54
55
56
57
58 case <-rs.closed:
59 return ErrSinkClosed
60 }
61 }
62
63 if err := rs.sink.Write(event); err != nil {
64 if err == ErrSinkClosed {
65
66 return err
67 }
68
69 logger := logger.WithError(err)
70
71 if rs.strategy.Failure(event, err) {
72 logger.Errorf("retryingsink: dropped event")
73 return nil
74 }
75
76 logger.Errorf("retryingsink: error writing event, retrying")
77 goto retry
78 }
79
80 rs.strategy.Success(event)
81 return nil
82 }
83
84
85 func (rs *RetryingSink) Close() error {
86 rs.once.Do(func() {
87 close(rs.closed)
88 })
89
90 return nil
91 }
92
93 func (rs *RetryingSink) String() string {
94
95
96 rs2 := map[string]interface{}{
97 "sink": rs.sink,
98 "strategy": rs.strategy,
99 "closed": rs.closed,
100 }
101 return fmt.Sprint(rs2)
102 }
103
104
105
106
107 type RetryStrategy interface {
108
109
110
111
112
113 Proceed(event Event) time.Duration
114
115
116
117 Failure(event Event, err error) bool
118
119
120 Success(event Event)
121 }
122
123
124
125
126 type Breaker struct {
127 threshold int
128 recent int
129 last time.Time
130 backoff time.Duration
131 mu sync.Mutex
132 }
133
134 var _ RetryStrategy = &Breaker{}
135
136
137
138 func NewBreaker(threshold int, backoff time.Duration) *Breaker {
139 return &Breaker{
140 threshold: threshold,
141 backoff: backoff,
142 }
143 }
144
145
146 func (b *Breaker) Proceed(event Event) time.Duration {
147 b.mu.Lock()
148 defer b.mu.Unlock()
149
150 if b.recent < b.threshold {
151 return 0
152 }
153
154 return b.last.Add(b.backoff).Sub(time.Now())
155 }
156
157
158 func (b *Breaker) Success(event Event) {
159 b.mu.Lock()
160 defer b.mu.Unlock()
161
162 b.recent = 0
163 b.last = time.Time{}
164 }
165
166
167 func (b *Breaker) Failure(event Event, err error) bool {
168 b.mu.Lock()
169 defer b.mu.Unlock()
170
171 b.recent++
172 b.last = time.Now().UTC()
173 return false
174 }
175
176 var (
177
178
179 DefaultExponentialBackoffConfig = ExponentialBackoffConfig{
180 Base: time.Second,
181 Factor: time.Second,
182 Max: 20 * time.Second,
183 }
184 )
185
186
187
188
189
190
191 type ExponentialBackoffConfig struct {
192
193 Base time.Duration
194
195
196
197 Factor time.Duration
198
199
200 Max time.Duration
201 }
202
203
204
205 type ExponentialBackoff struct {
206 failures uint64
207 config ExponentialBackoffConfig
208 }
209
210
211
212 func NewExponentialBackoff(config ExponentialBackoffConfig) *ExponentialBackoff {
213 return &ExponentialBackoff{
214 config: config,
215 }
216 }
217
218
219 func (b *ExponentialBackoff) Proceed(event Event) time.Duration {
220 return b.backoff(atomic.LoadUint64(&b.failures))
221 }
222
223
224 func (b *ExponentialBackoff) Success(event Event) {
225 atomic.StoreUint64(&b.failures, 0)
226 }
227
228
229 func (b *ExponentialBackoff) Failure(event Event, err error) bool {
230 atomic.AddUint64(&b.failures, 1)
231 return false
232 }
233
234
235
236 func (b *ExponentialBackoff) backoff(failures uint64) time.Duration {
237 if failures <= 0 {
238
239 return 0
240 }
241
242 factor := b.config.Factor
243 if factor <= 0 {
244 factor = DefaultExponentialBackoffConfig.Factor
245 }
246
247 backoff := b.config.Base + factor*time.Duration(1<<(failures-1))
248
249 max := b.config.Max
250 if max <= 0 {
251 max = DefaultExponentialBackoffConfig.Max
252 }
253
254 if backoff > max || backoff < 0 {
255 backoff = max
256 }
257
258
259 return time.Duration(rand.Int63n(int64(backoff)))
260 }
261
View as plain text