1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package githubapp
16
17 import (
18 "context"
19 "sync/atomic"
20 "time"
21
22 "github.com/pkg/errors"
23 "github.com/rcrowley/go-metrics"
24 "github.com/rs/zerolog"
25 )
26
27 const (
28 MetricsKeyQueueLength = "github.event.queued"
29 MetricsKeyActiveWorkers = "github.event.workers"
30 MetricsKeyEventAge = "github.event.age"
31 MetricsKeyDroppedEvents = "github.event.dropped"
32 )
33
34 const (
35
36 histogramReservoirSize = 1028
37 histogramAlpha = 0.015
38 )
39
40 var (
41 ErrCapacityExceeded = errors.New("scheduler: capacity exceeded")
42 )
43
44
45 type Dispatch struct {
46 Handler EventHandler
47
48 EventType string
49 DeliveryID string
50 Payload []byte
51 }
52
53
54 func (d Dispatch) Execute(ctx context.Context) error {
55 return d.Handler.Handle(ctx, d.EventType, d.DeliveryID, d.Payload)
56 }
57
58
59
60
61
62
63 type AsyncErrorCallback func(ctx context.Context, d Dispatch, err error)
64
65
66 func DefaultAsyncErrorCallback(ctx context.Context, d Dispatch, err error) {
67 defaultAsyncErrorCallback(ctx, d, err)
68 }
69
70 var defaultAsyncErrorCallback = MetricsAsyncErrorCallback(nil)
71
72
73 func MetricsAsyncErrorCallback(reg metrics.Registry) AsyncErrorCallback {
74 return func(ctx context.Context, d Dispatch, err error) {
75 zerolog.Ctx(ctx).Error().Err(err).Msg("Unexpected error handling webhook")
76 errorCounter(reg, d.EventType).Inc(1)
77 }
78 }
79
80
81
82 type ContextDeriver func(context.Context) context.Context
83
84
85
86 func DefaultContextDeriver(ctx context.Context) context.Context {
87 newCtx := context.Background()
88
89
90
91 newCtx = InitializeResponder(newCtx)
92
93 return zerolog.Ctx(ctx).WithContext(newCtx)
94 }
95
96
97
98
99
100
101
102
103
104
105
106 type Scheduler interface {
107 Schedule(ctx context.Context, d Dispatch) error
108 }
109
110
111 type SchedulerOption func(*scheduler)
112
113
114
115 func WithAsyncErrorCallback(onError AsyncErrorCallback) SchedulerOption {
116 return func(s *scheduler) {
117 if onError != nil {
118 s.onError = onError
119 }
120 }
121 }
122
123
124
125 func WithContextDeriver(deriver ContextDeriver) SchedulerOption {
126 return func(s *scheduler) {
127 if deriver != nil {
128 s.deriver = deriver
129 }
130 }
131 }
132
133
134 func WithSchedulingMetrics(r metrics.Registry) SchedulerOption {
135 return func(s *scheduler) {
136 metrics.NewRegisteredFunctionalGauge(MetricsKeyQueueLength, r, func() int64 {
137 return int64(len(s.queue))
138 })
139 metrics.NewRegisteredFunctionalGauge(MetricsKeyActiveWorkers, r, func() int64 {
140 return atomic.LoadInt64(&s.activeWorkers)
141 })
142
143 sample := metrics.NewExpDecaySample(histogramReservoirSize, histogramAlpha)
144 s.eventAge = metrics.NewRegisteredHistogram(MetricsKeyEventAge, r, sample)
145 s.dropped = metrics.NewRegisteredCounter(MetricsKeyDroppedEvents, r)
146 }
147 }
148
149 type queueDispatch struct {
150 ctx context.Context
151 t time.Time
152 d Dispatch
153 }
154
155
156 type scheduler struct {
157 onError AsyncErrorCallback
158 deriver ContextDeriver
159
160 activeWorkers int64
161 queue chan queueDispatch
162
163 eventAge metrics.Histogram
164 dropped metrics.Counter
165 }
166
167 func (s *scheduler) safeExecute(ctx context.Context, d Dispatch) {
168 var err error
169 defer func() {
170 atomic.AddInt64(&s.activeWorkers, -1)
171 if r := recover(); r != nil {
172 err = HandlerPanicError{
173 value: r,
174 stack: getStack(1),
175 }
176 }
177 if err != nil && s.onError != nil {
178 s.onError(ctx, d, err)
179 }
180 }()
181
182 atomic.AddInt64(&s.activeWorkers, 1)
183 err = d.Execute(ctx)
184 }
185
186 func (s *scheduler) derive(ctx context.Context) context.Context {
187 if s.deriver == nil {
188 return ctx
189 }
190 return s.deriver(ctx)
191 }
192
193
194
195 func DefaultScheduler() Scheduler {
196 return &defaultScheduler{}
197 }
198
199 type defaultScheduler struct{}
200
201 func (s *defaultScheduler) Schedule(ctx context.Context, d Dispatch) error {
202 return d.Execute(ctx)
203 }
204
205
206
207 func AsyncScheduler(opts ...SchedulerOption) Scheduler {
208 s := &asyncScheduler{
209 scheduler: scheduler{
210 deriver: DefaultContextDeriver,
211 onError: DefaultAsyncErrorCallback,
212 },
213 }
214 for _, opt := range opts {
215 opt(&s.scheduler)
216 }
217 return s
218 }
219
220 type asyncScheduler struct {
221 scheduler
222 }
223
224 func (s *asyncScheduler) Schedule(ctx context.Context, d Dispatch) error {
225 go s.safeExecute(s.derive(ctx), d)
226 return nil
227 }
228
229
230
231
232 func QueueAsyncScheduler(queueSize int, workers int, opts ...SchedulerOption) Scheduler {
233 if queueSize < 0 {
234 panic("QueueAsyncScheduler: queue size must be non-negative")
235 }
236 if workers < 1 {
237 panic("QueueAsyncScheduler: worker count must be positive")
238 }
239
240 s := &queueScheduler{
241 scheduler: scheduler{
242 deriver: DefaultContextDeriver,
243 onError: DefaultAsyncErrorCallback,
244 queue: make(chan queueDispatch, queueSize),
245 },
246 }
247 for _, opt := range opts {
248 opt(&s.scheduler)
249 }
250
251 for i := 0; i < workers; i++ {
252 go func() {
253 for d := range s.queue {
254 if s.eventAge != nil {
255 s.eventAge.Update(time.Since(d.t).Milliseconds())
256 }
257 s.safeExecute(d.ctx, d.d)
258 }
259 }()
260 }
261
262 return s
263 }
264
265 type queueScheduler struct {
266 scheduler
267 }
268
269 func (s *queueScheduler) Schedule(ctx context.Context, d Dispatch) error {
270 select {
271 case s.queue <- queueDispatch{ctx: s.derive(ctx), t: time.Now(), d: d}:
272 default:
273 if s.dropped != nil {
274 s.dropped.Inc(1)
275 }
276 return ErrCapacityExceeded
277 }
278 return nil
279 }
280
View as plain text