1
2
3
4
5
6
7
8
9
10
11
12
13
14 package dispatch
15
16 import (
17 "context"
18 "fmt"
19 "sort"
20 "sync"
21 "time"
22
23 "github.com/go-kit/log"
24 "github.com/go-kit/log/level"
25 "github.com/prometheus/client_golang/prometheus"
26 "github.com/prometheus/common/model"
27
28 "github.com/prometheus/alertmanager/notify"
29 "github.com/prometheus/alertmanager/provider"
30 "github.com/prometheus/alertmanager/store"
31 "github.com/prometheus/alertmanager/types"
32 )
33
34
35 type DispatcherMetrics struct {
36 aggrGroups prometheus.Gauge
37 processingDuration prometheus.Summary
38 aggrGroupLimitReached prometheus.Counter
39 }
40
41
42 func NewDispatcherMetrics(registerLimitMetrics bool, r prometheus.Registerer) *DispatcherMetrics {
43 m := DispatcherMetrics{
44 aggrGroups: prometheus.NewGauge(
45 prometheus.GaugeOpts{
46 Name: "alertmanager_dispatcher_aggregation_groups",
47 Help: "Number of active aggregation groups",
48 },
49 ),
50 processingDuration: prometheus.NewSummary(
51 prometheus.SummaryOpts{
52 Name: "alertmanager_dispatcher_alert_processing_duration_seconds",
53 Help: "Summary of latencies for the processing of alerts.",
54 },
55 ),
56 aggrGroupLimitReached: prometheus.NewCounter(
57 prometheus.CounterOpts{
58 Name: "alertmanager_dispatcher_aggregation_group_limit_reached_total",
59 Help: "Number of times when dispatcher failed to create new aggregation group due to limit.",
60 },
61 ),
62 }
63
64 if r != nil {
65 r.MustRegister(m.aggrGroups, m.processingDuration)
66 if registerLimitMetrics {
67 r.MustRegister(m.aggrGroupLimitReached)
68 }
69 }
70
71 return &m
72 }
73
74
75
76 type Dispatcher struct {
77 route *Route
78 alerts provider.Alerts
79 stage notify.Stage
80 metrics *DispatcherMetrics
81 limits Limits
82
83 timeout func(time.Duration) time.Duration
84
85 mtx sync.RWMutex
86 aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
87 aggrGroupsNum int
88
89 done chan struct{}
90 ctx context.Context
91 cancel func()
92
93 logger log.Logger
94 }
95
96
97 type Limits interface {
98
99
100
101 MaxNumberOfAggregationGroups() int
102 }
103
104
105 func NewDispatcher(
106 ap provider.Alerts,
107 r *Route,
108 s notify.Stage,
109 mk types.Marker,
110 to func(time.Duration) time.Duration,
111 lim Limits,
112 l log.Logger,
113 m *DispatcherMetrics,
114 ) *Dispatcher {
115 if lim == nil {
116 lim = nilLimits{}
117 }
118
119 disp := &Dispatcher{
120 alerts: ap,
121 stage: s,
122 route: r,
123 timeout: to,
124 logger: log.With(l, "component", "dispatcher"),
125 metrics: m,
126 limits: lim,
127 }
128 return disp
129 }
130
131
132 func (d *Dispatcher) Run() {
133 d.done = make(chan struct{})
134
135 d.mtx.Lock()
136 d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
137 d.aggrGroupsNum = 0
138 d.metrics.aggrGroups.Set(0)
139 d.ctx, d.cancel = context.WithCancel(context.Background())
140 d.mtx.Unlock()
141
142 d.run(d.alerts.Subscribe())
143 close(d.done)
144 }
145
146 func (d *Dispatcher) run(it provider.AlertIterator) {
147 cleanup := time.NewTicker(30 * time.Second)
148 defer cleanup.Stop()
149
150 defer it.Close()
151
152 for {
153 select {
154 case alert, ok := <-it.Next():
155 if !ok {
156
157 if err := it.Err(); err != nil {
158 level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
159 }
160 return
161 }
162
163 level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)
164
165
166 if err := it.Err(); err != nil {
167 level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
168 continue
169 }
170
171 now := time.Now()
172 for _, r := range d.route.Match(alert.Labels) {
173 d.processAlert(alert, r)
174 }
175 d.metrics.processingDuration.Observe(time.Since(now).Seconds())
176
177 case <-cleanup.C:
178 d.mtx.Lock()
179
180 for _, groups := range d.aggrGroupsPerRoute {
181 for _, ag := range groups {
182 if ag.empty() {
183 ag.stop()
184 delete(groups, ag.fingerprint())
185 d.aggrGroupsNum--
186 d.metrics.aggrGroups.Dec()
187 }
188 }
189 }
190
191 d.mtx.Unlock()
192
193 case <-d.ctx.Done():
194 return
195 }
196 }
197 }
198
199
200 type AlertGroup struct {
201 Alerts types.AlertSlice
202 Labels model.LabelSet
203 Receiver string
204 }
205
206 type AlertGroups []*AlertGroup
207
208 func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
209 func (ag AlertGroups) Less(i, j int) bool {
210 if ag[i].Labels.Equal(ag[j].Labels) {
211 return ag[i].Receiver < ag[j].Receiver
212 }
213 return ag[i].Labels.Before(ag[j].Labels)
214 }
215 func (ag AlertGroups) Len() int { return len(ag) }
216
217
218 func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
219 groups := AlertGroups{}
220
221 d.mtx.RLock()
222 defer d.mtx.RUnlock()
223
224
225
226
227 receivers := map[model.Fingerprint][]string{}
228
229 now := time.Now()
230 for route, ags := range d.aggrGroupsPerRoute {
231 if !routeFilter(route) {
232 continue
233 }
234
235 for _, ag := range ags {
236 receiver := route.RouteOpts.Receiver
237 alertGroup := &AlertGroup{
238 Labels: ag.labels,
239 Receiver: receiver,
240 }
241
242 alerts := ag.alerts.List()
243 filteredAlerts := make([]*types.Alert, 0, len(alerts))
244 for _, a := range alerts {
245 if !alertFilter(a, now) {
246 continue
247 }
248
249 fp := a.Fingerprint()
250 if r, ok := receivers[fp]; ok {
251
252
253 receivers[fp] = append(r, receiver)
254 } else {
255
256
257 receivers[fp] = []string{receiver}
258 }
259
260 filteredAlerts = append(filteredAlerts, a)
261 }
262 if len(filteredAlerts) == 0 {
263 continue
264 }
265 alertGroup.Alerts = filteredAlerts
266
267 groups = append(groups, alertGroup)
268 }
269 }
270 sort.Sort(groups)
271 for i := range groups {
272 sort.Sort(groups[i].Alerts)
273 }
274 for i := range receivers {
275 sort.Strings(receivers[i])
276 }
277
278 return groups, receivers
279 }
280
281
282 func (d *Dispatcher) Stop() {
283 if d == nil {
284 return
285 }
286 d.mtx.Lock()
287 if d.cancel == nil {
288 d.mtx.Unlock()
289 return
290 }
291 d.cancel()
292 d.cancel = nil
293 d.mtx.Unlock()
294
295 <-d.done
296 }
297
298
299
300
301 type notifyFunc func(context.Context, ...*types.Alert) bool
302
303
304
305 func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
306 groupLabels := getGroupLabels(alert, route)
307
308 fp := groupLabels.Fingerprint()
309
310 d.mtx.Lock()
311 defer d.mtx.Unlock()
312
313 routeGroups, ok := d.aggrGroupsPerRoute[route]
314 if !ok {
315 routeGroups = map[model.Fingerprint]*aggrGroup{}
316 d.aggrGroupsPerRoute[route] = routeGroups
317 }
318
319 ag, ok := routeGroups[fp]
320 if ok {
321 ag.insert(alert)
322 return
323 }
324
325
326 if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
327 d.metrics.aggrGroupLimitReached.Inc()
328 level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
329 return
330 }
331
332 ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
333 routeGroups[fp] = ag
334 d.aggrGroupsNum++
335 d.metrics.aggrGroups.Inc()
336
337
338
339
340 ag.insert(alert)
341
342 go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
343 _, _, err := d.stage.Exec(ctx, d.logger, alerts...)
344 if err != nil {
345 lvl := level.Error(d.logger)
346 if ctx.Err() == context.Canceled {
347
348
349
350 lvl = level.Debug(d.logger)
351 }
352 lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
353 }
354 return err == nil
355 })
356 }
357
358 func getGroupLabels(alert *types.Alert, route *Route) model.LabelSet {
359 groupLabels := model.LabelSet{}
360 for ln, lv := range alert.Labels {
361 if _, ok := route.RouteOpts.GroupBy[ln]; ok || route.RouteOpts.GroupByAll {
362 groupLabels[ln] = lv
363 }
364 }
365
366 return groupLabels
367 }
368
369
370
371
372 type aggrGroup struct {
373 labels model.LabelSet
374 opts *RouteOpts
375 logger log.Logger
376 routeKey string
377
378 alerts *store.Alerts
379 ctx context.Context
380 cancel func()
381 done chan struct{}
382 next *time.Timer
383 timeout func(time.Duration) time.Duration
384
385 mtx sync.RWMutex
386 hasFlushed bool
387 }
388
389
390 func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger) *aggrGroup {
391 if to == nil {
392 to = func(d time.Duration) time.Duration { return d }
393 }
394 ag := &aggrGroup{
395 labels: labels,
396 routeKey: r.Key(),
397 opts: &r.RouteOpts,
398 timeout: to,
399 alerts: store.NewAlerts(),
400 done: make(chan struct{}),
401 }
402 ag.ctx, ag.cancel = context.WithCancel(ctx)
403
404 ag.logger = log.With(logger, "aggrGroup", ag)
405
406
407
408 ag.next = time.NewTimer(ag.opts.GroupWait)
409
410 return ag
411 }
412
413 func (ag *aggrGroup) fingerprint() model.Fingerprint {
414 return ag.labels.Fingerprint()
415 }
416
417 func (ag *aggrGroup) GroupKey() string {
418 return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels)
419 }
420
421 func (ag *aggrGroup) String() string {
422 return ag.GroupKey()
423 }
424
425 func (ag *aggrGroup) run(nf notifyFunc) {
426 defer close(ag.done)
427 defer ag.next.Stop()
428
429 for {
430 select {
431 case now := <-ag.next.C:
432
433
434 ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
435
436
437
438
439
440 ctx = notify.WithNow(ctx, now)
441
442
443 ctx = notify.WithGroupKey(ctx, ag.GroupKey())
444 ctx = notify.WithGroupLabels(ctx, ag.labels)
445 ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
446 ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
447 ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
448 ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
449
450
451 ag.mtx.Lock()
452 ag.next.Reset(ag.opts.GroupInterval)
453 ag.hasFlushed = true
454 ag.mtx.Unlock()
455
456 ag.flush(func(alerts ...*types.Alert) bool {
457 return nf(ctx, alerts...)
458 })
459
460 cancel()
461
462 case <-ag.ctx.Done():
463 return
464 }
465 }
466 }
467
468 func (ag *aggrGroup) stop() {
469
470
471 ag.cancel()
472 <-ag.done
473 }
474
475
476 func (ag *aggrGroup) insert(alert *types.Alert) {
477 if err := ag.alerts.Set(alert); err != nil {
478 level.Error(ag.logger).Log("msg", "error on set alert", "err", err)
479 }
480
481
482
483 ag.mtx.Lock()
484 defer ag.mtx.Unlock()
485 if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
486 ag.next.Reset(0)
487 }
488 }
489
490 func (ag *aggrGroup) empty() bool {
491 return ag.alerts.Empty()
492 }
493
494
495 func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
496 if ag.empty() {
497 return
498 }
499
500 var (
501 alerts = ag.alerts.List()
502 alertsSlice = make(types.AlertSlice, 0, len(alerts))
503 now = time.Now()
504 )
505 for _, alert := range alerts {
506 a := *alert
507
508 if !a.ResolvedAt(now) {
509 a.EndsAt = time.Time{}
510 }
511 alertsSlice = append(alertsSlice, &a)
512 }
513 sort.Stable(alertsSlice)
514
515 level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
516
517 if notify(alertsSlice...) {
518 for _, a := range alertsSlice {
519
520
521 fp := a.Fingerprint()
522 got, err := ag.alerts.Get(fp)
523 if err != nil {
524
525 level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String())
526 continue
527 }
528 if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
529 if err := ag.alerts.Delete(fp); err != nil {
530 level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String())
531 }
532 }
533 }
534 }
535 }
536
537 type nilLimits struct{}
538
539 func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 }
540
View as plain text