1
2
3
4
5
6
7
8
9
10
11
12
13
14 package test
15
16 import (
17 "bytes"
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 "net"
23 "net/http"
24 "os"
25 "os/exec"
26 "path/filepath"
27 "regexp"
28 "strings"
29 "sync"
30 "syscall"
31 "testing"
32 "time"
33
34 httptransport "github.com/go-openapi/runtime/client"
35 "github.com/go-openapi/strfmt"
36
37 apiclient "github.com/prometheus/alertmanager/api/v2/client"
38 "github.com/prometheus/alertmanager/api/v2/client/general"
39 "github.com/prometheus/alertmanager/api/v2/models"
40 "github.com/prometheus/alertmanager/cli/format"
41 )
42
43 const (
44
45 amtool = "../../../amtool"
46 )
47
48
49
50 type AcceptanceTest struct {
51 *testing.T
52
53 opts *AcceptanceOpts
54
55 amc *AlertmanagerCluster
56 collectors []*Collector
57
58 actions map[float64][]func()
59 }
60
61
62 type AcceptanceOpts struct {
63 RoutePrefix string
64 Tolerance time.Duration
65 baseTime time.Time
66 }
67
68 func (opts *AcceptanceOpts) alertString(a *models.GettableAlert) string {
69 if a.EndsAt == nil || time.Time(*a.EndsAt).IsZero() {
70 return fmt.Sprintf("%v[%v:]", a, opts.relativeTime(time.Time(*a.StartsAt)))
71 }
72 return fmt.Sprintf("%v[%v:%v]", a, opts.relativeTime(time.Time(*a.StartsAt)), opts.relativeTime(time.Time(*a.EndsAt)))
73 }
74
75
76
77 func (opts *AcceptanceOpts) expandTime(rel float64) time.Time {
78 return opts.baseTime.Add(time.Duration(rel * float64(time.Second)))
79 }
80
81
82
83 func (opts *AcceptanceOpts) relativeTime(act time.Time) float64 {
84 return float64(act.Sub(opts.baseTime)) / float64(time.Second)
85 }
86
87
88
89 func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest {
90 test := &AcceptanceTest{
91 T: t,
92 opts: opts,
93 actions: map[float64][]func(){},
94 }
95
96
97
98 opts.baseTime = time.Now()
99
100 return test
101 }
102
103
104 func freeAddress() string {
105
106
107 l, err := net.Listen("tcp4", "localhost:0")
108 if err != nil {
109 panic(err)
110 }
111 defer func() {
112 if err := l.Close(); err != nil {
113 panic(err)
114 }
115 }()
116
117 return l.Addr().String()
118 }
119
120
121
122 func AmtoolOk() (bool, error) {
123 stat, err := os.Stat(amtool)
124 if err != nil {
125 return false, fmt.Errorf("error accessing amtool command, try 'make build' to generate the file. %w", err)
126 } else if stat.IsDir() {
127 return false, fmt.Errorf("file %s is a directory, expecting a binary executable file", amtool)
128 }
129 return true, nil
130 }
131
132
133 func (t *AcceptanceTest) Do(at float64, f func()) {
134 t.actions[at] = append(t.actions[at], f)
135 }
136
137
138
139 func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
140 amc := AlertmanagerCluster{}
141
142 for i := 0; i < size; i++ {
143 am := &Alertmanager{
144 t: t,
145 opts: t.opts,
146 }
147
148 dir, err := os.MkdirTemp("", "am_test")
149 if err != nil {
150 t.Fatal(err)
151 }
152 am.dir = dir
153
154 cf, err := os.Create(filepath.Join(dir, "config.yml"))
155 if err != nil {
156 t.Fatal(err)
157 }
158 am.confFile = cf
159 am.UpdateConfig(conf)
160
161 am.apiAddr = freeAddress()
162 am.clusterAddr = freeAddress()
163
164 transport := httptransport.New(am.apiAddr, t.opts.RoutePrefix+"/api/v2/", nil)
165 am.clientV2 = apiclient.New(transport, strfmt.Default)
166
167 amc.ams = append(amc.ams, am)
168 }
169
170 t.amc = &amc
171
172 return &amc
173 }
174
175
176 func (t *AcceptanceTest) Collector(name string) *Collector {
177 co := &Collector{
178 t: t.T,
179 name: name,
180 opts: t.opts,
181 collected: map[float64][]models.GettableAlerts{},
182 expected: map[Interval][]models.GettableAlerts{},
183 }
184 t.collectors = append(t.collectors, co)
185
186 return co
187 }
188
189
190
191 func (t *AcceptanceTest) Run() {
192 errc := make(chan error)
193
194 for _, am := range t.amc.ams {
195 am.errc = errc
196 defer func(am *Alertmanager) {
197 am.Terminate()
198 am.cleanup()
199 t.Logf("stdout:\n%v", am.cmd.Stdout)
200 t.Logf("stderr:\n%v", am.cmd.Stderr)
201 }(am)
202 }
203
204 err := t.amc.Start()
205 if err != nil {
206 t.T.Fatal(err)
207 }
208
209 go t.runActions()
210
211 var latest float64
212 for _, coll := range t.collectors {
213 if l := coll.latest(); l > latest {
214 latest = l
215 }
216 }
217
218 deadline := t.opts.expandTime(latest)
219
220 select {
221 case <-time.After(time.Until(deadline)):
222
223 case err := <-errc:
224 t.Error(err)
225 }
226 }
227
228
229 func (t *AcceptanceTest) runActions() {
230 var wg sync.WaitGroup
231
232 for at, fs := range t.actions {
233 ts := t.opts.expandTime(at)
234 wg.Add(len(fs))
235
236 for _, f := range fs {
237 go func(f func()) {
238 time.Sleep(time.Until(ts))
239 f()
240 wg.Done()
241 }(f)
242 }
243 }
244
245 wg.Wait()
246 }
247
248 type buffer struct {
249 b bytes.Buffer
250 mtx sync.Mutex
251 }
252
253 func (b *buffer) Write(p []byte) (int, error) {
254 b.mtx.Lock()
255 defer b.mtx.Unlock()
256 return b.b.Write(p)
257 }
258
259 func (b *buffer) String() string {
260 b.mtx.Lock()
261 defer b.mtx.Unlock()
262 return b.b.String()
263 }
264
265
266
267 type Alertmanager struct {
268 t *AcceptanceTest
269 opts *AcceptanceOpts
270
271 apiAddr string
272 clusterAddr string
273 clientV2 *apiclient.AlertmanagerAPI
274 cmd *exec.Cmd
275 confFile *os.File
276 dir string
277
278 errc chan<- error
279 }
280
281
282
283 type AlertmanagerCluster struct {
284 ams []*Alertmanager
285 }
286
287
288 func (amc *AlertmanagerCluster) Start() error {
289 var peerFlags []string
290 for _, am := range amc.ams {
291 peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr)
292 }
293
294 for _, am := range amc.ams {
295 err := am.Start(peerFlags)
296 if err != nil {
297 return fmt.Errorf("starting alertmanager cluster: %v", err.Error())
298 }
299 }
300
301 for _, am := range amc.ams {
302 err := am.WaitForCluster(len(amc.ams))
303 if err != nil {
304 return fmt.Errorf("waiting alertmanager cluster: %v", err.Error())
305 }
306 }
307
308 return nil
309 }
310
311
312 func (amc *AlertmanagerCluster) Members() []*Alertmanager {
313 return amc.ams
314 }
315
316
317 func (am *Alertmanager) Start(additionalArg []string) error {
318 am.t.Helper()
319 args := []string{
320 "--config.file", am.confFile.Name(),
321 "--log.level", "debug",
322 "--web.listen-address", am.apiAddr,
323 "--storage.path", am.dir,
324 "--cluster.listen-address", am.clusterAddr,
325 "--cluster.settle-timeout", "0s",
326 }
327 if am.opts.RoutePrefix != "" {
328 args = append(args, "--web.route-prefix", am.opts.RoutePrefix)
329 }
330 args = append(args, additionalArg...)
331
332 cmd := exec.Command("../../../alertmanager", args...)
333
334 if am.cmd == nil {
335 var outb, errb buffer
336 cmd.Stdout = &outb
337 cmd.Stderr = &errb
338 } else {
339 cmd.Stdout = am.cmd.Stdout
340 cmd.Stderr = am.cmd.Stderr
341 }
342 am.cmd = cmd
343
344 if err := am.cmd.Start(); err != nil {
345 return fmt.Errorf("starting alertmanager failed: %s", err)
346 }
347
348 go func() {
349 if err := am.cmd.Wait(); err != nil {
350 am.errc <- err
351 }
352 }()
353
354 time.Sleep(50 * time.Millisecond)
355 for i := 0; i < 10; i++ {
356 resp, err := http.Get(am.getURL("/"))
357 if err != nil {
358 time.Sleep(500 * time.Millisecond)
359 continue
360 }
361 defer resp.Body.Close()
362 if resp.StatusCode != http.StatusOK {
363 return fmt.Errorf("starting alertmanager failed: expected HTTP status '200', got '%d'", resp.StatusCode)
364 }
365 _, err = io.ReadAll(resp.Body)
366 if err != nil {
367 return fmt.Errorf("starting alertmanager failed: %s", err)
368 }
369 return nil
370 }
371 return fmt.Errorf("starting alertmanager failed: timeout")
372 }
373
374
375
376 func (am *Alertmanager) WaitForCluster(size int) error {
377 params := general.NewGetStatusParams()
378 params.WithContext(context.Background())
379 var status general.GetStatusOK
380
381
382 for i := 0; i < 20; i++ {
383 status, err := am.clientV2.General.GetStatus(params)
384 if err != nil {
385 return err
386 }
387
388 if len(status.Payload.Cluster.Peers) == size {
389 return nil
390 }
391 time.Sleep(100 * time.Millisecond)
392 }
393
394 return fmt.Errorf(
395 "failed to wait for Alertmanager instance %q to join cluster: expected %v peers, but got %v",
396 am.clusterAddr,
397 size,
398 len(status.Payload.Cluster.Peers),
399 )
400 }
401
402
403
404 func (amc *AlertmanagerCluster) Terminate() {
405 for _, am := range amc.ams {
406 am.Terminate()
407 }
408 }
409
410
411
412 func (am *Alertmanager) Terminate() {
413 am.t.Helper()
414 if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM); err != nil {
415 am.t.Fatalf("Error sending SIGTERM to Alertmanager process: %v", err)
416 }
417 }
418
419
420 func (amc *AlertmanagerCluster) Reload() {
421 for _, am := range amc.ams {
422 am.Reload()
423 }
424 }
425
426
427 func (am *Alertmanager) Reload() {
428 am.t.Helper()
429 if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP); err != nil {
430 am.t.Fatalf("Error sending SIGHUP to Alertmanager process: %v", err)
431 }
432 }
433
434 func (am *Alertmanager) cleanup() {
435 am.t.Helper()
436 if err := os.RemoveAll(am.confFile.Name()); err != nil {
437 am.t.Errorf("Error removing test config file %q: %v", am.confFile.Name(), err)
438 }
439 }
440
441
442
443 func Version() (string, error) {
444 cmd := exec.Command(amtool, "--version")
445 out, err := cmd.CombinedOutput()
446 if err != nil {
447 return "", err
448 }
449
450 versionRE := regexp.MustCompile(`^amtool, version (\d+\.\d+\.\d+) *`)
451 matched := versionRE.FindStringSubmatch(string(out))
452 if len(matched) != 2 {
453 return "", errors.New("Unable to match version info regex: " + string(out))
454 }
455 return matched[1], nil
456 }
457
458
459
460 func (am *Alertmanager) AddAlertsAt(at float64, alerts ...*TestAlert) {
461 am.t.Do(at, func() {
462 am.AddAlerts(alerts...)
463 })
464 }
465
466
467 func (am *Alertmanager) AddAlerts(alerts ...*TestAlert) {
468 for _, alert := range alerts {
469 out, err := am.addAlertCommand(alert)
470 if err != nil {
471 am.t.Errorf("Error adding alert: %v\nOutput: %s", err, string(out))
472 }
473 }
474 }
475
476 func (am *Alertmanager) addAlertCommand(alert *TestAlert) ([]byte, error) {
477 amURLFlag := "--alertmanager.url=" + am.getURL("/")
478 args := []string{amURLFlag, "alert", "add"}
479 for key, val := range alert.labels {
480 args = append(args, key+"="+val)
481 }
482 startsAt := strfmt.DateTime(am.opts.expandTime(alert.startsAt))
483 args = append(args, "--start="+startsAt.String())
484 if alert.endsAt > alert.startsAt {
485 endsAt := strfmt.DateTime(am.opts.expandTime(alert.endsAt))
486 args = append(args, "--end="+endsAt.String())
487 }
488 cmd := exec.Command(amtool, args...)
489 return cmd.CombinedOutput()
490 }
491
492
493 func (am *Alertmanager) QueryAlerts() ([]TestAlert, error) {
494 amURLFlag := "--alertmanager.url=" + am.getURL("/")
495 cmd := exec.Command(amtool, amURLFlag, "alert", "query")
496 output, err := cmd.CombinedOutput()
497 if err != nil {
498 return nil, err
499 }
500 return parseAlertQueryResponse(output)
501 }
502
503 func parseAlertQueryResponse(data []byte) ([]TestAlert, error) {
504 alerts := []TestAlert{}
505 lines := strings.Split(string(data), "\n")
506 header, lines := lines[0], lines[1:len(lines)-1]
507 startTimePos := strings.Index(header, "Starts At")
508 if startTimePos == -1 {
509 return alerts, errors.New("Invalid header: " + header)
510 }
511 summPos := strings.Index(header, "Summary")
512 if summPos == -1 {
513 return alerts, errors.New("Invalid header: " + header)
514 }
515 for _, line := range lines {
516 alertName := strings.TrimSpace(line[0:startTimePos])
517 startTime := strings.TrimSpace(line[startTimePos:summPos])
518 startsAt, err := time.Parse(format.DefaultDateFormat, startTime)
519 if err != nil {
520 return alerts, err
521 }
522 summary := strings.TrimSpace(line[summPos:])
523 alert := TestAlert{
524 labels: models.LabelSet{"name": alertName},
525 startsAt: float64(startsAt.Unix()),
526 summary: summary,
527 }
528 alerts = append(alerts, alert)
529 }
530 return alerts, nil
531 }
532
533
534 func (amc *AlertmanagerCluster) SetSilence(at float64, sil *TestSilence) {
535 for _, am := range amc.ams {
536 am.SetSilence(at, sil)
537 }
538 }
539
540
541 func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
542 out, err := am.addSilenceCommand(sil)
543 if err != nil {
544 am.t.T.Errorf("Unable to set silence %v %v", err, string(out))
545 }
546 }
547
548
549 func (am *Alertmanager) addSilenceCommand(sil *TestSilence) ([]byte, error) {
550 amURLFlag := "--alertmanager.url=" + am.getURL("/")
551 args := []string{amURLFlag, "silence", "add"}
552 if sil.comment != "" {
553 args = append(args, "--comment="+sil.comment)
554 }
555 args = append(args, sil.match...)
556 cmd := exec.Command(amtool, args...)
557 return cmd.CombinedOutput()
558 }
559
560
561 func (am *Alertmanager) QuerySilence() ([]TestSilence, error) {
562 amURLFlag := "--alertmanager.url=" + am.getURL("/")
563 args := []string{amURLFlag, "silence", "query"}
564 cmd := exec.Command(amtool, args...)
565 out, err := cmd.CombinedOutput()
566 if err != nil {
567 am.t.T.Error("Silence query command failed: ", err)
568 }
569 return parseSilenceQueryResponse(out)
570 }
571
572 var silenceHeaderFields = []string{"ID", "Matchers", "Ends At", "Created By", "Comment"}
573
574 func parseSilenceQueryResponse(data []byte) ([]TestSilence, error) {
575 sils := []TestSilence{}
576 lines := strings.Split(string(data), "\n")
577 header, lines := lines[0], lines[1:len(lines)-1]
578 matchersPos := strings.Index(header, silenceHeaderFields[1])
579 if matchersPos == -1 {
580 return sils, errors.New("Invalid header: " + header)
581 }
582 endsAtPos := strings.Index(header, silenceHeaderFields[2])
583 if endsAtPos == -1 {
584 return sils, errors.New("Invalid header: " + header)
585 }
586 createdByPos := strings.Index(header, silenceHeaderFields[3])
587 if createdByPos == -1 {
588 return sils, errors.New("Invalid header: " + header)
589 }
590 commentPos := strings.Index(header, silenceHeaderFields[4])
591 if commentPos == -1 {
592 return sils, errors.New("Invalid header: " + header)
593 }
594 for _, line := range lines {
595 id := strings.TrimSpace(line[0:matchersPos])
596 matchers := strings.TrimSpace(line[matchersPos:endsAtPos])
597 endsAtString := strings.TrimSpace(line[endsAtPos:createdByPos])
598 endsAt, err := time.Parse(format.DefaultDateFormat, endsAtString)
599 if err != nil {
600 return sils, err
601 }
602 createdBy := strings.TrimSpace(line[createdByPos:commentPos])
603 comment := strings.TrimSpace(line[commentPos:])
604 silence := TestSilence{
605 id: id,
606 endsAt: float64(endsAt.Unix()),
607 match: strings.Split(matchers, " "),
608 createdBy: createdBy,
609 comment: comment,
610 }
611 sils = append(sils, silence)
612 }
613 return sils, nil
614 }
615
616
617 func (amc *AlertmanagerCluster) DelSilence(at float64, sil *TestSilence) {
618 for _, am := range amc.ams {
619 am.DelSilence(at, sil)
620 }
621 }
622
623
624 func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
625 output, err := am.expireSilenceCommand(sil)
626 if err != nil {
627 am.t.Errorf("Error expiring silence %v: %s", string(output), err)
628 return
629 }
630 }
631
632
633 func (am *Alertmanager) expireSilenceCommand(sil *TestSilence) ([]byte, error) {
634 amURLFlag := "--alertmanager.url=" + am.getURL("/")
635 args := []string{amURLFlag, "silence", "expire", sil.ID()}
636 cmd := exec.Command(amtool, args...)
637 return cmd.CombinedOutput()
638 }
639
640
641
642 func (amc *AlertmanagerCluster) UpdateConfig(conf string) {
643 for _, am := range amc.ams {
644 am.UpdateConfig(conf)
645 }
646 }
647
648
649
650 func (am *Alertmanager) UpdateConfig(conf string) {
651 if _, err := am.confFile.WriteString(conf); err != nil {
652 am.t.Fatal(err)
653 return
654 }
655 if err := am.confFile.Sync(); err != nil {
656 am.t.Fatal(err)
657 return
658 }
659 }
660
661 func (am *Alertmanager) getURL(path string) string {
662 return fmt.Sprintf("http://%s%s%s", am.apiAddr, am.opts.RoutePrefix, path)
663 }
664
View as plain text