1
2
3
4
5
6
7
8
9
10
11
12
13
14 package test
15
16 import (
17 "bytes"
18 "context"
19 "fmt"
20 "net"
21 "os"
22 "os/exec"
23 "path/filepath"
24 "sync"
25 "syscall"
26 "testing"
27 "time"
28
29 apiclient "github.com/prometheus/alertmanager/api/v2/client"
30 "github.com/prometheus/alertmanager/api/v2/client/alert"
31 "github.com/prometheus/alertmanager/api/v2/client/general"
32 "github.com/prometheus/alertmanager/api/v2/client/silence"
33 "github.com/prometheus/alertmanager/api/v2/models"
34
35 httptransport "github.com/go-openapi/runtime/client"
36 "github.com/go-openapi/strfmt"
37 )
38
39
40
41 type AcceptanceTest struct {
42 *testing.T
43
44 opts *AcceptanceOpts
45
46 amc *AlertmanagerCluster
47 collectors []*Collector
48
49 actions map[float64][]func()
50 }
51
52
53 type AcceptanceOpts struct {
54 RoutePrefix string
55 Tolerance time.Duration
56 baseTime time.Time
57 }
58
59 func (opts *AcceptanceOpts) alertString(a *models.GettableAlert) string {
60 if a.EndsAt == nil || time.Time(*a.EndsAt).IsZero() {
61 return fmt.Sprintf("%v[%v:]", a, opts.relativeTime(time.Time(*a.StartsAt)))
62 }
63 return fmt.Sprintf("%v[%v:%v]", a, opts.relativeTime(time.Time(*a.StartsAt)), opts.relativeTime(time.Time(*a.EndsAt)))
64 }
65
66
67
68 func (opts *AcceptanceOpts) expandTime(rel float64) time.Time {
69 return opts.baseTime.Add(time.Duration(rel * float64(time.Second)))
70 }
71
72
73
74 func (opts *AcceptanceOpts) relativeTime(act time.Time) float64 {
75 return float64(act.Sub(opts.baseTime)) / float64(time.Second)
76 }
77
78
79
80 func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest {
81 test := &AcceptanceTest{
82 T: t,
83 opts: opts,
84 actions: map[float64][]func(){},
85 }
86
87
88
89 opts.baseTime = time.Now()
90
91 return test
92 }
93
94
95 func freeAddress() string {
96
97
98 l, err := net.Listen("tcp4", "localhost:0")
99 if err != nil {
100 panic(err)
101 }
102 defer func() {
103 if err := l.Close(); err != nil {
104 panic(err)
105 }
106 }()
107
108 return l.Addr().String()
109 }
110
111
112 func (t *AcceptanceTest) Do(at float64, f func()) {
113 t.actions[at] = append(t.actions[at], f)
114 }
115
116
117
118 func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
119 amc := AlertmanagerCluster{}
120
121 for i := 0; i < size; i++ {
122 am := &Alertmanager{
123 t: t,
124 opts: t.opts,
125 }
126
127 dir, err := os.MkdirTemp("", "am_test")
128 if err != nil {
129 t.Fatal(err)
130 }
131 am.dir = dir
132
133 cf, err := os.Create(filepath.Join(dir, "config.yml"))
134 if err != nil {
135 t.Fatal(err)
136 }
137 am.confFile = cf
138 am.UpdateConfig(conf)
139
140 am.apiAddr = freeAddress()
141 am.clusterAddr = freeAddress()
142
143 transport := httptransport.New(am.apiAddr, t.opts.RoutePrefix+"/api/v2/", nil)
144 am.clientV2 = apiclient.New(transport, strfmt.Default)
145
146 amc.ams = append(amc.ams, am)
147 }
148
149 t.amc = &amc
150
151 return &amc
152 }
153
154
155 func (t *AcceptanceTest) Collector(name string) *Collector {
156 co := &Collector{
157 t: t.T,
158 name: name,
159 opts: t.opts,
160 collected: map[float64][]models.GettableAlerts{},
161 expected: map[Interval][]models.GettableAlerts{},
162 }
163 t.collectors = append(t.collectors, co)
164
165 return co
166 }
167
168
169
170 func (t *AcceptanceTest) Run() {
171 errc := make(chan error)
172
173 for _, am := range t.amc.ams {
174 am.errc = errc
175 defer func(am *Alertmanager) {
176 am.Terminate()
177 am.cleanup()
178 t.Logf("stdout:\n%v", am.cmd.Stdout)
179 t.Logf("stderr:\n%v", am.cmd.Stderr)
180 }(am)
181 }
182
183 err := t.amc.Start()
184 if err != nil {
185 t.T.Fatal(err)
186 }
187
188 go t.runActions()
189
190 var latest float64
191 for _, coll := range t.collectors {
192 if l := coll.latest(); l > latest {
193 latest = l
194 }
195 }
196
197 deadline := t.opts.expandTime(latest)
198
199 select {
200 case <-time.After(time.Until(deadline)):
201
202 case err := <-errc:
203 t.Error(err)
204 }
205 }
206
207
208 func (t *AcceptanceTest) runActions() {
209 var wg sync.WaitGroup
210
211 for at, fs := range t.actions {
212 ts := t.opts.expandTime(at)
213 wg.Add(len(fs))
214
215 for _, f := range fs {
216 go func(f func()) {
217 time.Sleep(time.Until(ts))
218 f()
219 wg.Done()
220 }(f)
221 }
222 }
223
224 wg.Wait()
225 }
226
227 type buffer struct {
228 b bytes.Buffer
229 mtx sync.Mutex
230 }
231
232 func (b *buffer) Write(p []byte) (int, error) {
233 b.mtx.Lock()
234 defer b.mtx.Unlock()
235 return b.b.Write(p)
236 }
237
238 func (b *buffer) String() string {
239 b.mtx.Lock()
240 defer b.mtx.Unlock()
241 return b.b.String()
242 }
243
244
245
246 type Alertmanager struct {
247 t *AcceptanceTest
248 opts *AcceptanceOpts
249
250 apiAddr string
251 clusterAddr string
252 clientV2 *apiclient.AlertmanagerAPI
253 cmd *exec.Cmd
254 confFile *os.File
255 dir string
256
257 errc chan<- error
258 }
259
260
261
262 type AlertmanagerCluster struct {
263 ams []*Alertmanager
264 }
265
266
267 func (amc *AlertmanagerCluster) Start() error {
268 var peerFlags []string
269 for _, am := range amc.ams {
270 peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr)
271 }
272
273 for _, am := range amc.ams {
274 err := am.Start(peerFlags)
275 if err != nil {
276 return fmt.Errorf("failed to start alertmanager cluster: %v", err.Error())
277 }
278 }
279
280 for _, am := range amc.ams {
281 err := am.WaitForCluster(len(amc.ams))
282 if err != nil {
283 return fmt.Errorf("failed to wait for Alertmanager instance %q to join cluster: %v", am.clusterAddr, err.Error())
284 }
285 }
286
287 return nil
288 }
289
290
291 func (amc *AlertmanagerCluster) Members() []*Alertmanager {
292 return amc.ams
293 }
294
295
296 func (am *Alertmanager) Start(additionalArg []string) error {
297 am.t.Helper()
298 args := []string{
299 "--config.file", am.confFile.Name(),
300 "--log.level", "debug",
301 "--web.listen-address", am.apiAddr,
302 "--storage.path", am.dir,
303 "--cluster.listen-address", am.clusterAddr,
304 "--cluster.settle-timeout", "0s",
305 }
306 if am.opts.RoutePrefix != "" {
307 args = append(args, "--web.route-prefix", am.opts.RoutePrefix)
308 }
309 args = append(args, additionalArg...)
310
311 cmd := exec.Command("../../../alertmanager", args...)
312
313 if am.cmd == nil {
314 var outb, errb buffer
315 cmd.Stdout = &outb
316 cmd.Stderr = &errb
317 } else {
318 cmd.Stdout = am.cmd.Stdout
319 cmd.Stderr = am.cmd.Stderr
320 }
321 am.cmd = cmd
322
323 if err := am.cmd.Start(); err != nil {
324 return err
325 }
326
327 go func() {
328 if err := am.cmd.Wait(); err != nil {
329 am.errc <- err
330 }
331 }()
332
333 time.Sleep(50 * time.Millisecond)
334 var lastErr error
335 for i := 0; i < 10; i++ {
336 _, lastErr = am.clientV2.General.GetStatus(nil)
337 if lastErr == nil {
338 return nil
339 }
340 time.Sleep(500 * time.Millisecond)
341 }
342 return fmt.Errorf("unable to get a successful response from the Alertmanager: %v", lastErr)
343 }
344
345
346
347 func (am *Alertmanager) WaitForCluster(size int) error {
348 params := general.NewGetStatusParams()
349 params.WithContext(context.Background())
350 var status *general.GetStatusOK
351
352
353 for i := 0; i < 20; i++ {
354 var err error
355 status, err = am.clientV2.General.GetStatus(params)
356 if err != nil {
357 return err
358 }
359
360 if len(status.Payload.Cluster.Peers) == size {
361 return nil
362 }
363 time.Sleep(100 * time.Millisecond)
364 }
365
366 return fmt.Errorf(
367 "expected %v peers, but got %v",
368 size,
369 len(status.Payload.Cluster.Peers),
370 )
371 }
372
373
374
375 func (amc *AlertmanagerCluster) Terminate() {
376 for _, am := range amc.ams {
377 am.Terminate()
378 }
379 }
380
381
382
383 func (am *Alertmanager) Terminate() {
384 am.t.Helper()
385 if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM); err != nil {
386 am.t.Logf("Error sending SIGTERM to Alertmanager process: %v", err)
387 }
388 }
389
390
391 func (amc *AlertmanagerCluster) Reload() {
392 for _, am := range amc.ams {
393 am.Reload()
394 }
395 }
396
397
398 func (am *Alertmanager) Reload() {
399 am.t.Helper()
400 if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP); err != nil {
401 am.t.Fatalf("Error sending SIGHUP to Alertmanager process: %v", err)
402 }
403 }
404
405 func (am *Alertmanager) cleanup() {
406 am.t.Helper()
407 if err := os.RemoveAll(am.confFile.Name()); err != nil {
408 am.t.Errorf("Error removing test config file %q: %v", am.confFile.Name(), err)
409 }
410 }
411
412
413
414 func (amc *AlertmanagerCluster) Push(at float64, alerts ...*TestAlert) {
415 for _, am := range amc.ams {
416 am.Push(at, alerts...)
417 }
418 }
419
420
421
422 func (am *Alertmanager) Push(at float64, alerts ...*TestAlert) {
423 var cas models.PostableAlerts
424 for i := range alerts {
425 a := alerts[i].nativeAlert(am.opts)
426 alert := &models.PostableAlert{
427 Alert: models.Alert{
428 Labels: a.Labels,
429 GeneratorURL: a.GeneratorURL,
430 },
431 Annotations: a.Annotations,
432 }
433 if a.StartsAt != nil {
434 alert.StartsAt = *a.StartsAt
435 }
436 if a.EndsAt != nil {
437 alert.EndsAt = *a.EndsAt
438 }
439 cas = append(cas, alert)
440 }
441
442 am.t.Do(at, func() {
443 params := alert.PostAlertsParams{}
444 params.WithContext(context.Background()).WithAlerts(cas)
445
446 _, err := am.clientV2.Alert.PostAlerts(¶ms)
447 if err != nil {
448 am.t.Errorf("Error pushing %v: %v", cas, err)
449 }
450 })
451 }
452
453
454 func (amc *AlertmanagerCluster) SetSilence(at float64, sil *TestSilence) {
455 for _, am := range amc.ams {
456 am.SetSilence(at, sil)
457 }
458 }
459
460
461 func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
462 am.t.Do(at, func() {
463 resp, err := am.clientV2.Silence.PostSilences(
464 silence.NewPostSilencesParams().WithSilence(
465 &models.PostableSilence{
466 Silence: *sil.nativeSilence(am.opts),
467 },
468 ),
469 )
470 if err != nil {
471 am.t.Errorf("Error setting silence %v: %s", sil, err)
472 return
473 }
474 sil.SetID(resp.Payload.SilenceID)
475 })
476 }
477
478
479 func (amc *AlertmanagerCluster) DelSilence(at float64, sil *TestSilence) {
480 for _, am := range amc.ams {
481 am.DelSilence(at, sil)
482 }
483 }
484
485
486 func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
487 am.t.Do(at, func() {
488 _, err := am.clientV2.Silence.DeleteSilence(
489 silence.NewDeleteSilenceParams().WithSilenceID(strfmt.UUID(sil.ID())),
490 )
491 if err != nil {
492 am.t.Errorf("Error deleting silence %v: %s", sil, err)
493 }
494 })
495 }
496
497
498
499 func (amc *AlertmanagerCluster) UpdateConfig(conf string) {
500 for _, am := range amc.ams {
501 am.UpdateConfig(conf)
502 }
503 }
504
505
506
507 func (am *Alertmanager) UpdateConfig(conf string) {
508 if _, err := am.confFile.WriteString(conf); err != nil {
509 am.t.Fatal(err)
510 }
511 if err := am.confFile.Sync(); err != nil {
512 am.t.Fatal(err)
513 }
514 }
515
516
517 func (am *Alertmanager) Client() *apiclient.AlertmanagerAPI {
518 return am.clientV2
519 }
520
View as plain text