1
2
3
4
5
6
7
8
9
10
11
12
13
14 package dispatch
15
16 import (
17 "context"
18 "fmt"
19 "reflect"
20 "sort"
21 "sync"
22 "testing"
23 "time"
24
25 "github.com/go-kit/log"
26 "github.com/prometheus/client_golang/prometheus"
27 "github.com/prometheus/client_golang/prometheus/testutil"
28 "github.com/prometheus/common/model"
29 "github.com/stretchr/testify/require"
30
31 "github.com/prometheus/alertmanager/config"
32 "github.com/prometheus/alertmanager/notify"
33 "github.com/prometheus/alertmanager/provider/mem"
34 "github.com/prometheus/alertmanager/types"
35 )
36
37 func TestAggrGroup(t *testing.T) {
38 lset := model.LabelSet{
39 "a": "v1",
40 "b": "v2",
41 }
42 opts := &RouteOpts{
43 Receiver: "n1",
44 GroupBy: map[model.LabelName]struct{}{
45 "a": {},
46 "b": {},
47 },
48 GroupWait: 1 * time.Second,
49 GroupInterval: 300 * time.Millisecond,
50 RepeatInterval: 1 * time.Hour,
51 }
52 route := &Route{
53 RouteOpts: *opts,
54 }
55
56 var (
57 a1 = &types.Alert{
58 Alert: model.Alert{
59 Labels: model.LabelSet{
60 "a": "v1",
61 "b": "v2",
62 "c": "v3",
63 },
64 StartsAt: time.Now().Add(time.Minute),
65 EndsAt: time.Now().Add(time.Hour),
66 },
67 UpdatedAt: time.Now(),
68 }
69 a2 = &types.Alert{
70 Alert: model.Alert{
71 Labels: model.LabelSet{
72 "a": "v1",
73 "b": "v2",
74 "c": "v4",
75 },
76 StartsAt: time.Now().Add(-time.Hour),
77 EndsAt: time.Now().Add(2 * time.Hour),
78 },
79 UpdatedAt: time.Now(),
80 }
81 a3 = &types.Alert{
82 Alert: model.Alert{
83 Labels: model.LabelSet{
84 "a": "v1",
85 "b": "v2",
86 "c": "v5",
87 },
88 StartsAt: time.Now().Add(time.Minute),
89 EndsAt: time.Now().Add(5 * time.Minute),
90 },
91 UpdatedAt: time.Now(),
92 }
93 )
94
95 var (
96 last = time.Now()
97 current = time.Now()
98 lastCurMtx = &sync.Mutex{}
99 alertsCh = make(chan types.AlertSlice)
100 )
101
102 ntfy := func(ctx context.Context, alerts ...*types.Alert) bool {
103
104 if _, ok := notify.Now(ctx); !ok {
105 t.Errorf("now missing")
106 }
107 if _, ok := notify.GroupKey(ctx); !ok {
108 t.Errorf("group key missing")
109 }
110 if lbls, ok := notify.GroupLabels(ctx); !ok || !reflect.DeepEqual(lbls, lset) {
111 t.Errorf("wrong group labels: %q", lbls)
112 }
113 if rcv, ok := notify.ReceiverName(ctx); !ok || rcv != opts.Receiver {
114 t.Errorf("wrong receiver: %q", rcv)
115 }
116 if ri, ok := notify.RepeatInterval(ctx); !ok || ri != opts.RepeatInterval {
117 t.Errorf("wrong repeat interval: %q", ri)
118 }
119
120 lastCurMtx.Lock()
121 last = current
122
123 current = time.Now().Add(-time.Millisecond)
124 lastCurMtx.Unlock()
125
126 alertsCh <- types.AlertSlice(alerts)
127
128 return true
129 }
130
131 removeEndsAt := func(as types.AlertSlice) types.AlertSlice {
132 for i, a := range as {
133 ac := *a
134 ac.EndsAt = time.Time{}
135 as[i] = &ac
136 }
137 return as
138 }
139
140
141 ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
142 go ag.run(ntfy)
143
144 ag.insert(a1)
145
146 select {
147 case <-time.After(2 * opts.GroupWait):
148 t.Fatalf("expected initial batch after group_wait")
149
150 case batch := <-alertsCh:
151 lastCurMtx.Lock()
152 s := time.Since(last)
153 lastCurMtx.Unlock()
154 if s < opts.GroupWait {
155 t.Fatalf("received batch too early after %v", s)
156 }
157 exp := removeEndsAt(types.AlertSlice{a1})
158 sort.Sort(batch)
159
160 if !reflect.DeepEqual(batch, exp) {
161 t.Fatalf("expected alerts %v but got %v", exp, batch)
162 }
163 }
164
165 for i := 0; i < 3; i++ {
166
167 ag.insert(a3)
168
169 select {
170 case <-time.After(2 * opts.GroupInterval):
171 t.Fatalf("expected new batch after group interval but received none")
172
173 case batch := <-alertsCh:
174 lastCurMtx.Lock()
175 s := time.Since(last)
176 lastCurMtx.Unlock()
177 if s < opts.GroupInterval {
178 t.Fatalf("received batch too early after %v", s)
179 }
180 exp := removeEndsAt(types.AlertSlice{a1, a3})
181 sort.Sort(batch)
182
183 if !reflect.DeepEqual(batch, exp) {
184 t.Fatalf("expected alerts %v but got %v", exp, batch)
185 }
186 }
187 }
188
189 ag.stop()
190
191
192
193
194
195 ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
196 go ag.run(ntfy)
197
198 ag.insert(a1)
199 ag.insert(a2)
200
201
202 select {
203 case <-time.After(opts.GroupWait / 2):
204 t.Fatalf("expected immediate alert but received none")
205
206 case batch := <-alertsCh:
207 exp := removeEndsAt(types.AlertSlice{a1, a2})
208 sort.Sort(batch)
209
210 if !reflect.DeepEqual(batch, exp) {
211 t.Fatalf("expected alerts %v but got %v", exp, batch)
212 }
213 }
214
215 for i := 0; i < 3; i++ {
216
217 ag.insert(a3)
218
219 select {
220 case <-time.After(2 * opts.GroupInterval):
221 t.Fatalf("expected new batch after group interval but received none")
222
223 case batch := <-alertsCh:
224 lastCurMtx.Lock()
225 s := time.Since(last)
226 lastCurMtx.Unlock()
227 if s < opts.GroupInterval {
228 t.Fatalf("received batch too early after %v", s)
229 }
230 exp := removeEndsAt(types.AlertSlice{a1, a2, a3})
231 sort.Sort(batch)
232
233 if !reflect.DeepEqual(batch, exp) {
234 t.Fatalf("expected alerts %v but got %v", exp, batch)
235 }
236 }
237 }
238
239
240 a1r, a2r, a3r := *a1, *a2, *a3
241 resolved := types.AlertSlice{&a1r, &a2r, &a3r}
242 for _, a := range resolved {
243 a.EndsAt = time.Now()
244 ag.insert(a)
245 }
246
247 select {
248 case <-time.After(2 * opts.GroupInterval):
249 t.Fatalf("expected new batch after group interval but received none")
250
251 case batch := <-alertsCh:
252 lastCurMtx.Lock()
253 s := time.Since(last)
254 lastCurMtx.Unlock()
255 if s < opts.GroupInterval {
256 t.Fatalf("received batch too early after %v", s)
257 }
258 sort.Sort(batch)
259
260 if !reflect.DeepEqual(batch, resolved) {
261 t.Fatalf("expected alerts %v but got %v", resolved, batch)
262 }
263
264 if !ag.empty() {
265 t.Fatalf("Expected aggregation group to be empty after resolving alerts: %v", ag)
266 }
267 }
268
269 ag.stop()
270 }
271
272 func TestGroupLabels(t *testing.T) {
273 a := &types.Alert{
274 Alert: model.Alert{
275 Labels: model.LabelSet{
276 "a": "v1",
277 "b": "v2",
278 "c": "v3",
279 },
280 },
281 }
282
283 route := &Route{
284 RouteOpts: RouteOpts{
285 GroupBy: map[model.LabelName]struct{}{
286 "a": {},
287 "b": {},
288 },
289 GroupByAll: false,
290 },
291 }
292
293 expLs := model.LabelSet{
294 "a": "v1",
295 "b": "v2",
296 }
297
298 ls := getGroupLabels(a, route)
299
300 if !reflect.DeepEqual(ls, expLs) {
301 t.Fatalf("expected labels are %v, but got %v", expLs, ls)
302 }
303 }
304
305 func TestGroupByAllLabels(t *testing.T) {
306 a := &types.Alert{
307 Alert: model.Alert{
308 Labels: model.LabelSet{
309 "a": "v1",
310 "b": "v2",
311 "c": "v3",
312 },
313 },
314 }
315
316 route := &Route{
317 RouteOpts: RouteOpts{
318 GroupBy: map[model.LabelName]struct{}{},
319 GroupByAll: true,
320 },
321 }
322
323 expLs := model.LabelSet{
324 "a": "v1",
325 "b": "v2",
326 "c": "v3",
327 }
328
329 ls := getGroupLabels(a, route)
330
331 if !reflect.DeepEqual(ls, expLs) {
332 t.Fatalf("expected labels are %v, but got %v", expLs, ls)
333 }
334 }
335
336 func TestGroups(t *testing.T) {
337 confData := `receivers:
338 - name: 'kafka'
339 - name: 'prod'
340 - name: 'testing'
341
342 route:
343 group_by: ['alertname']
344 group_wait: 10ms
345 group_interval: 10ms
346 receiver: 'prod'
347 routes:
348 - match:
349 env: 'testing'
350 receiver: 'testing'
351 group_by: ['alertname', 'service']
352 - match:
353 env: 'prod'
354 receiver: 'prod'
355 group_by: ['alertname', 'service', 'cluster']
356 continue: true
357 - match:
358 kafka: 'yes'
359 receiver: 'kafka'
360 group_by: ['alertname', 'service', 'cluster']`
361 conf, err := config.Load(confData)
362 if err != nil {
363 t.Fatal(err)
364 }
365
366 logger := log.NewNopLogger()
367 route := NewRoute(conf.Route, nil)
368 marker := types.NewMarker(prometheus.NewRegistry())
369 alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
370 if err != nil {
371 t.Fatal(err)
372 }
373 defer alerts.Close()
374
375 timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
376 recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
377 dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
378 go dispatcher.Run()
379 defer dispatcher.Stop()
380
381
382 inputAlerts := []*types.Alert{
383
384 newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}),
385
386 newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api", "instance": "inst1"}),
387
388 newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst1"}),
389 newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst2"}),
390
391 newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api", "instance": "inst1"}),
392
393 newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}),
394 }
395 alerts.Put(inputAlerts...)
396
397
398 for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ {
399 time.Sleep(200 * time.Millisecond)
400 }
401 require.Equal(t, 7, len(recorder.Alerts()))
402
403 alertGroups, receivers := dispatcher.Groups(
404 func(*Route) bool {
405 return true
406 }, func(*types.Alert, time.Time) bool {
407 return true
408 },
409 )
410
411 require.Equal(t, AlertGroups{
412 &AlertGroup{
413 Alerts: []*types.Alert{inputAlerts[0]},
414 Labels: model.LabelSet{
415 "alertname": "OtherAlert",
416 },
417 Receiver: "prod",
418 },
419 &AlertGroup{
420 Alerts: []*types.Alert{inputAlerts[1]},
421 Labels: model.LabelSet{
422 "alertname": "TestingAlert",
423 "service": "api",
424 },
425 Receiver: "testing",
426 },
427 &AlertGroup{
428 Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]},
429 Labels: model.LabelSet{
430 "alertname": "HighErrorRate",
431 "service": "api",
432 "cluster": "aa",
433 },
434 Receiver: "prod",
435 },
436 &AlertGroup{
437 Alerts: []*types.Alert{inputAlerts[4]},
438 Labels: model.LabelSet{
439 "alertname": "HighErrorRate",
440 "service": "api",
441 "cluster": "bb",
442 },
443 Receiver: "prod",
444 },
445 &AlertGroup{
446 Alerts: []*types.Alert{inputAlerts[5]},
447 Labels: model.LabelSet{
448 "alertname": "HighLatency",
449 "service": "db",
450 "cluster": "bb",
451 },
452 Receiver: "kafka",
453 },
454 &AlertGroup{
455 Alerts: []*types.Alert{inputAlerts[5]},
456 Labels: model.LabelSet{
457 "alertname": "HighLatency",
458 "service": "db",
459 "cluster": "bb",
460 },
461 Receiver: "prod",
462 },
463 }, alertGroups)
464 require.Equal(t, map[model.Fingerprint][]string{
465 inputAlerts[0].Fingerprint(): {"prod"},
466 inputAlerts[1].Fingerprint(): {"testing"},
467 inputAlerts[2].Fingerprint(): {"prod"},
468 inputAlerts[3].Fingerprint(): {"prod"},
469 inputAlerts[4].Fingerprint(): {"prod"},
470 inputAlerts[5].Fingerprint(): {"kafka", "prod"},
471 }, receivers)
472 }
473
474 func TestGroupsWithLimits(t *testing.T) {
475 confData := `receivers:
476 - name: 'kafka'
477 - name: 'prod'
478 - name: 'testing'
479
480 route:
481 group_by: ['alertname']
482 group_wait: 10ms
483 group_interval: 10ms
484 receiver: 'prod'
485 routes:
486 - match:
487 env: 'testing'
488 receiver: 'testing'
489 group_by: ['alertname', 'service']
490 - match:
491 env: 'prod'
492 receiver: 'prod'
493 group_by: ['alertname', 'service', 'cluster']
494 continue: true
495 - match:
496 kafka: 'yes'
497 receiver: 'kafka'
498 group_by: ['alertname', 'service', 'cluster']`
499 conf, err := config.Load(confData)
500 if err != nil {
501 t.Fatal(err)
502 }
503
504 logger := log.NewNopLogger()
505 route := NewRoute(conf.Route, nil)
506 marker := types.NewMarker(prometheus.NewRegistry())
507 alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
508 if err != nil {
509 t.Fatal(err)
510 }
511 defer alerts.Close()
512
513 timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
514 recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
515 lim := limits{groups: 6}
516 m := NewDispatcherMetrics(true, prometheus.NewRegistry())
517 dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m)
518 go dispatcher.Run()
519 defer dispatcher.Stop()
520
521
522 inputAlerts := []*types.Alert{
523
524 newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}),
525
526 newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api", "instance": "inst1"}),
527
528 newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst1"}),
529 newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst2"}),
530
531 newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api", "instance": "inst1"}),
532
533 newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}),
534 }
535 err = alerts.Put(inputAlerts...)
536 if err != nil {
537 t.Fatal(err)
538 }
539
540
541 for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ {
542 time.Sleep(200 * time.Millisecond)
543 }
544 require.Equal(t, 7, len(recorder.Alerts()))
545
546 routeFilter := func(*Route) bool { return true }
547 alertFilter := func(*types.Alert, time.Time) bool { return true }
548
549 alertGroups, _ := dispatcher.Groups(routeFilter, alertFilter)
550 require.Len(t, alertGroups, 6)
551
552 require.Equal(t, 0.0, testutil.ToFloat64(m.aggrGroupLimitReached))
553
554
555 err = alerts.Put(newAlert(model.LabelSet{"env": "prod", "alertname": "NewAlert", "cluster": "new-cluster", "service": "db"}))
556 if err != nil {
557 t.Fatal(err)
558 }
559
560
561 for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ {
562 time.Sleep(200 * time.Millisecond)
563 }
564 require.Equal(t, 1.0, testutil.ToFloat64(m.aggrGroupLimitReached))
565
566
567 alertGroups, _ = dispatcher.Groups(routeFilter, alertFilter)
568 require.Len(t, alertGroups, 6)
569 }
570
571 type recordStage struct {
572 mtx sync.RWMutex
573 alerts map[string]map[model.Fingerprint]*types.Alert
574 }
575
576 func (r *recordStage) Alerts() []*types.Alert {
577 r.mtx.RLock()
578 defer r.mtx.RUnlock()
579 alerts := make([]*types.Alert, 0)
580 for k := range r.alerts {
581 for _, a := range r.alerts[k] {
582 alerts = append(alerts, a)
583 }
584 }
585 return alerts
586 }
587
588 func (r *recordStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
589 r.mtx.Lock()
590 defer r.mtx.Unlock()
591 gk, ok := notify.GroupKey(ctx)
592 if !ok {
593 panic("GroupKey not present!")
594 }
595 if _, ok := r.alerts[gk]; !ok {
596 r.alerts[gk] = make(map[model.Fingerprint]*types.Alert)
597 }
598 for _, a := range alerts {
599 r.alerts[gk][a.Fingerprint()] = a
600 }
601 return ctx, nil, nil
602 }
603
604 var (
605
606 t0 = time.Now().Add(-time.Minute)
607
608 t1 = t0.Add(2 * time.Minute)
609 )
610
611 func newAlert(labels model.LabelSet) *types.Alert {
612 return &types.Alert{
613 Alert: model.Alert{
614 Labels: labels,
615 Annotations: model.LabelSet{"foo": "bar"},
616 StartsAt: t0,
617 EndsAt: t1,
618 GeneratorURL: "http://example.com/prometheus",
619 },
620 UpdatedAt: t0,
621 Timeout: false,
622 }
623 }
624
625 func TestDispatcherRace(t *testing.T) {
626 logger := log.NewNopLogger()
627 marker := types.NewMarker(prometheus.NewRegistry())
628 alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
629 if err != nil {
630 t.Fatal(err)
631 }
632 defer alerts.Close()
633
634 timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
635 dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
636 go dispatcher.Run()
637 dispatcher.Stop()
638 }
639
640 func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) {
641 const numAlerts = 5000
642
643 logger := log.NewNopLogger()
644 marker := types.NewMarker(prometheus.NewRegistry())
645 alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
646 if err != nil {
647 t.Fatal(err)
648 }
649 defer alerts.Close()
650
651 route := &Route{
652 RouteOpts: RouteOpts{
653 Receiver: "default",
654 GroupBy: map[model.LabelName]struct{}{"alertname": {}},
655 GroupWait: 0,
656 GroupInterval: 1 * time.Hour,
657 RepeatInterval: 1 * time.Hour,
658 },
659 }
660
661 timeout := func(d time.Duration) time.Duration { return d }
662 recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
663 dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
664 go dispatcher.Run()
665 defer dispatcher.Stop()
666
667
668 for i := 0; i < numAlerts; i++ {
669 alert := newAlert(model.LabelSet{"alertname": model.LabelValue(fmt.Sprintf("Alert_%d", i))})
670 require.NoError(t, alerts.Put(alert))
671 }
672
673
674 for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); {
675 if len(recorder.Alerts()) >= numAlerts {
676 break
677 }
678
679
680 time.Sleep(10 * time.Millisecond)
681 }
682
683
684 require.Equal(t, numAlerts, len(recorder.Alerts()))
685 }
686
687 type limits struct {
688 groups int
689 }
690
691 func (l limits) MaxNumberOfAggregationGroups() int {
692 return l.groups
693 }
694
View as plain text