1
2
3
4
5
6
7
8
9
10
11
12
13
14 package mem
15
16 import (
17 "context"
18 "fmt"
19 "reflect"
20 "strconv"
21 "sync"
22 "testing"
23 "time"
24
25 "github.com/go-kit/log"
26 "github.com/kylelemons/godebug/pretty"
27 "github.com/prometheus/client_golang/prometheus"
28 "github.com/prometheus/common/model"
29 "github.com/stretchr/testify/require"
30 "go.uber.org/atomic"
31
32 "github.com/prometheus/alertmanager/store"
33 "github.com/prometheus/alertmanager/types"
34 )
35
36 var (
37 t0 = time.Now()
38 t1 = t0.Add(100 * time.Millisecond)
39
40 alert1 = &types.Alert{
41 Alert: model.Alert{
42 Labels: model.LabelSet{"bar": "foo"},
43 Annotations: model.LabelSet{"foo": "bar"},
44 StartsAt: t0,
45 EndsAt: t1,
46 GeneratorURL: "http://example.com/prometheus",
47 },
48 UpdatedAt: t0,
49 Timeout: false,
50 }
51
52 alert2 = &types.Alert{
53 Alert: model.Alert{
54 Labels: model.LabelSet{"bar": "foo2"},
55 Annotations: model.LabelSet{"foo": "bar2"},
56 StartsAt: t0,
57 EndsAt: t1,
58 GeneratorURL: "http://example.com/prometheus",
59 },
60 UpdatedAt: t0,
61 Timeout: false,
62 }
63
64 alert3 = &types.Alert{
65 Alert: model.Alert{
66 Labels: model.LabelSet{"bar": "foo3"},
67 Annotations: model.LabelSet{"foo": "bar3"},
68 StartsAt: t0,
69 EndsAt: t1,
70 GeneratorURL: "http://example.com/prometheus",
71 },
72 UpdatedAt: t0,
73 Timeout: false,
74 }
75 )
76
77 func init() {
78 pretty.CompareConfig.IncludeUnexported = true
79 }
80
81
82
83
84
85
86
87 func TestAlertsSubscribePutStarvation(t *testing.T) {
88 marker := types.NewMarker(prometheus.NewRegistry())
89 alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger(), nil)
90 if err != nil {
91 t.Fatal(err)
92 }
93
94 iterator := alerts.Subscribe()
95
96 alertsToInsert := []*types.Alert{}
97
98 for i := 0; i < alertChannelLength+1; i++ {
99 alertsToInsert = append(alertsToInsert, &types.Alert{
100 Alert: model.Alert{
101
102 Labels: model.LabelSet{"iteration": model.LabelValue(strconv.Itoa(i))},
103 Annotations: model.LabelSet{"foo": "bar"},
104 StartsAt: t0,
105 EndsAt: t1,
106 GeneratorURL: "http://example.com/prometheus",
107 },
108 UpdatedAt: t0,
109 Timeout: false,
110 })
111 }
112
113 putIsDone := make(chan struct{})
114 putsErr := make(chan error, 1)
115 go func() {
116 if err := alerts.Put(alertsToInsert...); err != nil {
117 putsErr <- err
118 return
119 }
120
121 putIsDone <- struct{}{}
122 }()
123
124
125 time.Sleep(100 * time.Millisecond)
126 iterator.Close()
127
128 select {
129 case <-putsErr:
130 t.Fatal(err)
131 case <-putIsDone:
132
133 case <-time.After(100 * time.Millisecond):
134 t.Fatal("expected `alerts.Put` and `iterator.Close` not to starve each other")
135 }
136 }
137
138 func TestAlertsPut(t *testing.T) {
139 marker := types.NewMarker(prometheus.NewRegistry())
140 alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger(), nil)
141 if err != nil {
142 t.Fatal(err)
143 }
144
145 insert := []*types.Alert{alert1, alert2, alert3}
146
147 if err := alerts.Put(insert...); err != nil {
148 t.Fatalf("Insert failed: %s", err)
149 }
150
151 for i, a := range insert {
152 res, err := alerts.Get(a.Fingerprint())
153 if err != nil {
154 t.Fatalf("retrieval error: %s", err)
155 }
156 if !alertsEqual(res, a) {
157 t.Errorf("Unexpected alert: %d", i)
158 t.Fatalf(pretty.Compare(res, a))
159 }
160 }
161 }
162
163 func TestAlertsSubscribe(t *testing.T) {
164 marker := types.NewMarker(prometheus.NewRegistry())
165
166 ctx, cancel := context.WithCancel(context.Background())
167 defer cancel()
168 alerts, err := NewAlerts(ctx, marker, 30*time.Minute, noopCallback{}, log.NewNopLogger(), nil)
169 if err != nil {
170 t.Fatal(err)
171 }
172
173
174 if err := alerts.Put(alert1); err != nil {
175 t.Fatalf("Insert failed: %s", err)
176 }
177
178 expectedAlerts := map[model.Fingerprint]*types.Alert{
179 alert1.Fingerprint(): alert1,
180 alert2.Fingerprint(): alert2,
181 alert3.Fingerprint(): alert3,
182 }
183
184
185 var (
186 nb = 100
187 fatalc = make(chan string, nb)
188 wg sync.WaitGroup
189 )
190 wg.Add(nb)
191 for i := 0; i < nb; i++ {
192 go func(i int) {
193 defer wg.Done()
194
195 it := alerts.Subscribe()
196 defer it.Close()
197
198 received := make(map[model.Fingerprint]struct{})
199 for {
200 select {
201 case got, ok := <-it.Next():
202 if !ok {
203 fatalc <- fmt.Sprintf("Iterator %d closed", i)
204 return
205 }
206 if it.Err() != nil {
207 fatalc <- fmt.Sprintf("Iterator %d: %v", i, it.Err())
208 return
209 }
210 expected := expectedAlerts[got.Fingerprint()]
211 if !alertsEqual(got, expected) {
212 fatalc <- fmt.Sprintf("Unexpected alert (iterator %d)\n%s", i, pretty.Compare(got, expected))
213 return
214 }
215 received[got.Fingerprint()] = struct{}{}
216 if len(received) == len(expectedAlerts) {
217 return
218 }
219 case <-time.After(5 * time.Second):
220 fatalc <- fmt.Sprintf("Unexpected number of alerts for iterator %d, got: %d, expected: %d", i, len(received), len(expectedAlerts))
221 return
222 }
223 }
224 }(i)
225 }
226
227
228 if err := alerts.Put(alert2); err != nil {
229 t.Fatalf("Insert failed: %s", err)
230 }
231 if err := alerts.Put(alert3); err != nil {
232 t.Fatalf("Insert failed: %s", err)
233 }
234
235 wg.Wait()
236 close(fatalc)
237 fatal, ok := <-fatalc
238 if ok {
239 t.Fatalf(fatal)
240 }
241 }
242
243 func TestAlertsGetPending(t *testing.T) {
244 marker := types.NewMarker(prometheus.NewRegistry())
245 alerts, err := NewAlerts(context.Background(), marker, 30*time.Minute, noopCallback{}, log.NewNopLogger(), nil)
246 if err != nil {
247 t.Fatal(err)
248 }
249
250 if err := alerts.Put(alert1, alert2); err != nil {
251 t.Fatalf("Insert failed: %s", err)
252 }
253
254 expectedAlerts := map[model.Fingerprint]*types.Alert{
255 alert1.Fingerprint(): alert1,
256 alert2.Fingerprint(): alert2,
257 }
258 iterator := alerts.GetPending()
259 for actual := range iterator.Next() {
260 expected := expectedAlerts[actual.Fingerprint()]
261 if !alertsEqual(actual, expected) {
262 t.Errorf("Unexpected alert")
263 t.Fatalf(pretty.Compare(actual, expected))
264 }
265 }
266
267 if err := alerts.Put(alert3); err != nil {
268 t.Fatalf("Insert failed: %s", err)
269 }
270
271 expectedAlerts = map[model.Fingerprint]*types.Alert{
272 alert1.Fingerprint(): alert1,
273 alert2.Fingerprint(): alert2,
274 alert3.Fingerprint(): alert3,
275 }
276 iterator = alerts.GetPending()
277 for actual := range iterator.Next() {
278 expected := expectedAlerts[actual.Fingerprint()]
279 if !alertsEqual(actual, expected) {
280 t.Errorf("Unexpected alert")
281 t.Fatalf(pretty.Compare(actual, expected))
282 }
283 }
284 }
285
286 func TestAlertsGC(t *testing.T) {
287 marker := types.NewMarker(prometheus.NewRegistry())
288 alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, noopCallback{}, log.NewNopLogger(), nil)
289 if err != nil {
290 t.Fatal(err)
291 }
292
293 insert := []*types.Alert{alert1, alert2, alert3}
294
295 if err := alerts.Put(insert...); err != nil {
296 t.Fatalf("Insert failed: %s", err)
297 }
298
299 for _, a := range insert {
300 marker.SetActiveOrSilenced(a.Fingerprint(), 0, nil, nil)
301 marker.SetInhibited(a.Fingerprint())
302 if !marker.Active(a.Fingerprint()) {
303 t.Errorf("error setting status: %v", a)
304 }
305 }
306
307 time.Sleep(300 * time.Millisecond)
308
309 for i, a := range insert {
310 _, err := alerts.Get(a.Fingerprint())
311 require.Error(t, err)
312 require.Equal(t, store.ErrNotFound, err, fmt.Sprintf("alert %d didn't get GC'd: %v", i, err))
313
314 s := marker.Status(a.Fingerprint())
315 if s.State != types.AlertStateUnprocessed {
316 t.Errorf("marker %d didn't get GC'd: %v", i, s)
317 }
318 }
319 }
320
321 func TestAlertsStoreCallback(t *testing.T) {
322 cb := &limitCountCallback{limit: 3}
323
324 marker := types.NewMarker(prometheus.NewRegistry())
325 alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, cb, log.NewNopLogger(), nil)
326 if err != nil {
327 t.Fatal(err)
328 }
329
330 err = alerts.Put(alert1, alert2, alert3)
331 if err != nil {
332 t.Fatal(err)
333 }
334 if num := cb.alerts.Load(); num != 3 {
335 t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 3, num)
336 }
337
338 alert1Mod := *alert1
339 alert1Mod.Annotations = model.LabelSet{"foo": "bar", "new": "test"}
340
341 alert4 := &types.Alert{
342 Alert: model.Alert{
343 Labels: model.LabelSet{"bar4": "foo4"},
344 Annotations: model.LabelSet{"foo4": "bar4"},
345 StartsAt: t0,
346 EndsAt: t1,
347 GeneratorURL: "http://example.com/prometheus",
348 },
349 UpdatedAt: t0,
350 Timeout: false,
351 }
352
353 err = alerts.Put(&alert1Mod, alert4)
354
355 if err != nil {
356 t.Fatalf("unexpected error %v", err)
357 }
358
359 if num := cb.alerts.Load(); num != 3 {
360 t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 3, num)
361 }
362
363
364 a, err := alerts.Get(alert1.Fingerprint())
365 if err != nil {
366 t.Fatal(err)
367 }
368 if !alertsEqual(a, &alert1Mod) {
369 t.Errorf("Unexpected alert")
370 t.Fatalf(pretty.Compare(a, &alert1Mod))
371 }
372
373
374 time.Sleep(300 * time.Millisecond)
375
376 if num := cb.alerts.Load(); num != 0 {
377 t.Fatalf("unexpected number of alerts in the store, expected %v, got %v", 0, num)
378 }
379
380 err = alerts.Put(alert4)
381 if err != nil {
382 t.Fatal(err)
383 }
384 }
385
386 func TestAlerts_Count(t *testing.T) {
387 marker := types.NewMarker(prometheus.NewRegistry())
388 alerts, err := NewAlerts(context.Background(), marker, 200*time.Millisecond, nil, log.NewNopLogger(), nil)
389 require.NoError(t, err)
390
391 states := []types.AlertState{types.AlertStateActive, types.AlertStateSuppressed, types.AlertStateUnprocessed}
392
393 countByState := func(st types.AlertState) int {
394 return alerts.count(st)
395 }
396 countTotal := func() int {
397 var count int
398 for _, st := range states {
399 count += countByState(st)
400 }
401 return count
402 }
403
404
405 require.Equal(t, 0, countTotal())
406
407
408 now := time.Now()
409 a1 := &types.Alert{
410 Alert: model.Alert{
411 Labels: model.LabelSet{"bar": "foo"},
412 Annotations: model.LabelSet{"foo": "bar"},
413 StartsAt: now,
414 EndsAt: now.Add(400 * time.Millisecond),
415 GeneratorURL: "http://example.com/prometheus",
416 },
417 UpdatedAt: now,
418 Timeout: false,
419 }
420
421 alerts.Put(a1)
422 require.Equal(t, 1, countByState(types.AlertStateUnprocessed))
423 require.Equal(t, 1, countTotal())
424 require.Eventually(t, func() bool {
425
426 return countTotal() == 0
427 }, 600*time.Millisecond, 100*time.Millisecond)
428
429 now = time.Now()
430 a2 := &types.Alert{
431 Alert: model.Alert{
432 Labels: model.LabelSet{"bar": "foo"},
433 Annotations: model.LabelSet{"foo": "bar"},
434 StartsAt: now,
435 EndsAt: now.Add(400 * time.Millisecond),
436 GeneratorURL: "http://example.com/prometheus",
437 },
438 UpdatedAt: now,
439 Timeout: false,
440 }
441
442
443 alerts.Put(a2)
444 marker.SetActiveOrSilenced(a2.Fingerprint(), 1, []string{"1"}, nil)
445 require.Equal(t, 1, countByState(types.AlertStateSuppressed))
446 require.Equal(t, 1, countTotal())
447
448 require.Eventually(t, func() bool {
449
450 return countTotal() == 0
451 }, 600*time.Millisecond, 100*time.Millisecond)
452 }
453
454 func alertsEqual(a1, a2 *types.Alert) bool {
455 if a1 == nil || a2 == nil {
456 return false
457 }
458 if !reflect.DeepEqual(a1.Labels, a2.Labels) {
459 return false
460 }
461 if !reflect.DeepEqual(a1.Annotations, a2.Annotations) {
462 return false
463 }
464 if a1.GeneratorURL != a2.GeneratorURL {
465 return false
466 }
467 if !a1.StartsAt.Equal(a2.StartsAt) {
468 return false
469 }
470 if !a1.EndsAt.Equal(a2.EndsAt) {
471 return false
472 }
473 if !a1.UpdatedAt.Equal(a2.UpdatedAt) {
474 return false
475 }
476 return a1.Timeout == a2.Timeout
477 }
478
479 type limitCountCallback struct {
480 alerts atomic.Int32
481 limit int
482 }
483
484 var errTooManyAlerts = fmt.Errorf("too many alerts")
485
486 func (l *limitCountCallback) PreStore(_ *types.Alert, existing bool) error {
487 if existing {
488 return nil
489 }
490
491 if int(l.alerts.Load())+1 > l.limit {
492 return errTooManyAlerts
493 }
494
495 return nil
496 }
497
498 func (l *limitCountCallback) PostStore(_ *types.Alert, existing bool) {
499 if !existing {
500 l.alerts.Inc()
501 }
502 }
503
504 func (l *limitCountCallback) PostDelete(_ *types.Alert) {
505 l.alerts.Dec()
506 }
507
View as plain text