...

Source file src/github.com/prometheus/alertmanager/test/with_api_v2/acceptance.go

Documentation: github.com/prometheus/alertmanager/test/with_api_v2

     1  // Copyright 2018 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package test
    15  
    16  import (
    17  	"bytes"
    18  	"context"
    19  	"fmt"
    20  	"net"
    21  	"os"
    22  	"os/exec"
    23  	"path/filepath"
    24  	"sync"
    25  	"syscall"
    26  	"testing"
    27  	"time"
    28  
    29  	apiclient "github.com/prometheus/alertmanager/api/v2/client"
    30  	"github.com/prometheus/alertmanager/api/v2/client/alert"
    31  	"github.com/prometheus/alertmanager/api/v2/client/general"
    32  	"github.com/prometheus/alertmanager/api/v2/client/silence"
    33  	"github.com/prometheus/alertmanager/api/v2/models"
    34  
    35  	httptransport "github.com/go-openapi/runtime/client"
    36  	"github.com/go-openapi/strfmt"
    37  )
    38  
    39  // AcceptanceTest provides declarative definition of given inputs and expected
    40  // output of an Alertmanager setup.
    41  type AcceptanceTest struct {
    42  	*testing.T
    43  
    44  	opts *AcceptanceOpts
    45  
    46  	amc        *AlertmanagerCluster
    47  	collectors []*Collector
    48  
    49  	actions map[float64][]func()
    50  }
    51  
    52  // AcceptanceOpts defines configuration parameters for an acceptance test.
    53  type AcceptanceOpts struct {
    54  	RoutePrefix string
    55  	Tolerance   time.Duration
    56  	baseTime    time.Time
    57  }
    58  
    59  func (opts *AcceptanceOpts) alertString(a *models.GettableAlert) string {
    60  	if a.EndsAt == nil || time.Time(*a.EndsAt).IsZero() {
    61  		return fmt.Sprintf("%v[%v:]", a, opts.relativeTime(time.Time(*a.StartsAt)))
    62  	}
    63  	return fmt.Sprintf("%v[%v:%v]", a, opts.relativeTime(time.Time(*a.StartsAt)), opts.relativeTime(time.Time(*a.EndsAt)))
    64  }
    65  
    66  // expandTime returns the absolute time for the relative time
    67  // calculated from the test's base time.
    68  func (opts *AcceptanceOpts) expandTime(rel float64) time.Time {
    69  	return opts.baseTime.Add(time.Duration(rel * float64(time.Second)))
    70  }
    71  
    72  // expandTime returns the relative time for the given time
    73  // calculated from the test's base time.
    74  func (opts *AcceptanceOpts) relativeTime(act time.Time) float64 {
    75  	return float64(act.Sub(opts.baseTime)) / float64(time.Second)
    76  }
    77  
    78  // NewAcceptanceTest returns a new acceptance test with the base time
    79  // set to the current time.
    80  func NewAcceptanceTest(t *testing.T, opts *AcceptanceOpts) *AcceptanceTest {
    81  	test := &AcceptanceTest{
    82  		T:       t,
    83  		opts:    opts,
    84  		actions: map[float64][]func(){},
    85  	}
    86  	// TODO: Should this really be set during creation time? Why not do this
    87  	// during Run() time, maybe there is something else long happening between
    88  	// creation and running.
    89  	opts.baseTime = time.Now()
    90  
    91  	return test
    92  }
    93  
    94  // freeAddress returns a new listen address not currently in use.
    95  func freeAddress() string {
    96  	// Let the OS allocate a free address, close it and hope
    97  	// it is still free when starting Alertmanager.
    98  	l, err := net.Listen("tcp4", "localhost:0")
    99  	if err != nil {
   100  		panic(err)
   101  	}
   102  	defer func() {
   103  		if err := l.Close(); err != nil {
   104  			panic(err)
   105  		}
   106  	}()
   107  
   108  	return l.Addr().String()
   109  }
   110  
   111  // Do sets the given function to be executed at the given time.
   112  func (t *AcceptanceTest) Do(at float64, f func()) {
   113  	t.actions[at] = append(t.actions[at], f)
   114  }
   115  
   116  // AlertmanagerCluster returns a new AlertmanagerCluster that allows starting a
   117  // cluster of Alertmanager instances on random ports.
   118  func (t *AcceptanceTest) AlertmanagerCluster(conf string, size int) *AlertmanagerCluster {
   119  	amc := AlertmanagerCluster{}
   120  
   121  	for i := 0; i < size; i++ {
   122  		am := &Alertmanager{
   123  			t:    t,
   124  			opts: t.opts,
   125  		}
   126  
   127  		dir, err := os.MkdirTemp("", "am_test")
   128  		if err != nil {
   129  			t.Fatal(err)
   130  		}
   131  		am.dir = dir
   132  
   133  		cf, err := os.Create(filepath.Join(dir, "config.yml"))
   134  		if err != nil {
   135  			t.Fatal(err)
   136  		}
   137  		am.confFile = cf
   138  		am.UpdateConfig(conf)
   139  
   140  		am.apiAddr = freeAddress()
   141  		am.clusterAddr = freeAddress()
   142  
   143  		transport := httptransport.New(am.apiAddr, t.opts.RoutePrefix+"/api/v2/", nil)
   144  		am.clientV2 = apiclient.New(transport, strfmt.Default)
   145  
   146  		amc.ams = append(amc.ams, am)
   147  	}
   148  
   149  	t.amc = &amc
   150  
   151  	return &amc
   152  }
   153  
   154  // Collector returns a new collector bound to the test instance.
   155  func (t *AcceptanceTest) Collector(name string) *Collector {
   156  	co := &Collector{
   157  		t:         t.T,
   158  		name:      name,
   159  		opts:      t.opts,
   160  		collected: map[float64][]models.GettableAlerts{},
   161  		expected:  map[Interval][]models.GettableAlerts{},
   162  	}
   163  	t.collectors = append(t.collectors, co)
   164  
   165  	return co
   166  }
   167  
   168  // Run starts all Alertmanagers and runs queries against them. It then checks
   169  // whether all expected notifications have arrived at the expected receiver.
   170  func (t *AcceptanceTest) Run() {
   171  	errc := make(chan error)
   172  
   173  	for _, am := range t.amc.ams {
   174  		am.errc = errc
   175  		defer func(am *Alertmanager) {
   176  			am.Terminate()
   177  			am.cleanup()
   178  			t.Logf("stdout:\n%v", am.cmd.Stdout)
   179  			t.Logf("stderr:\n%v", am.cmd.Stderr)
   180  		}(am)
   181  	}
   182  
   183  	err := t.amc.Start()
   184  	if err != nil {
   185  		t.T.Fatal(err)
   186  	}
   187  
   188  	go t.runActions()
   189  
   190  	var latest float64
   191  	for _, coll := range t.collectors {
   192  		if l := coll.latest(); l > latest {
   193  			latest = l
   194  		}
   195  	}
   196  
   197  	deadline := t.opts.expandTime(latest)
   198  
   199  	select {
   200  	case <-time.After(time.Until(deadline)):
   201  		// continue
   202  	case err := <-errc:
   203  		t.Error(err)
   204  	}
   205  }
   206  
   207  // runActions performs the stored actions at the defined times.
   208  func (t *AcceptanceTest) runActions() {
   209  	var wg sync.WaitGroup
   210  
   211  	for at, fs := range t.actions {
   212  		ts := t.opts.expandTime(at)
   213  		wg.Add(len(fs))
   214  
   215  		for _, f := range fs {
   216  			go func(f func()) {
   217  				time.Sleep(time.Until(ts))
   218  				f()
   219  				wg.Done()
   220  			}(f)
   221  		}
   222  	}
   223  
   224  	wg.Wait()
   225  }
   226  
   227  type buffer struct {
   228  	b   bytes.Buffer
   229  	mtx sync.Mutex
   230  }
   231  
   232  func (b *buffer) Write(p []byte) (int, error) {
   233  	b.mtx.Lock()
   234  	defer b.mtx.Unlock()
   235  	return b.b.Write(p)
   236  }
   237  
   238  func (b *buffer) String() string {
   239  	b.mtx.Lock()
   240  	defer b.mtx.Unlock()
   241  	return b.b.String()
   242  }
   243  
   244  // Alertmanager encapsulates an Alertmanager process and allows
   245  // declaring alerts being pushed to it at fixed points in time.
   246  type Alertmanager struct {
   247  	t    *AcceptanceTest
   248  	opts *AcceptanceOpts
   249  
   250  	apiAddr     string
   251  	clusterAddr string
   252  	clientV2    *apiclient.AlertmanagerAPI
   253  	cmd         *exec.Cmd
   254  	confFile    *os.File
   255  	dir         string
   256  
   257  	errc chan<- error
   258  }
   259  
   260  // AlertmanagerCluster represents a group of Alertmanager instances
   261  // acting as a cluster.
   262  type AlertmanagerCluster struct {
   263  	ams []*Alertmanager
   264  }
   265  
   266  // Start the Alertmanager cluster and wait until it is ready to receive.
   267  func (amc *AlertmanagerCluster) Start() error {
   268  	var peerFlags []string
   269  	for _, am := range amc.ams {
   270  		peerFlags = append(peerFlags, "--cluster.peer="+am.clusterAddr)
   271  	}
   272  
   273  	for _, am := range amc.ams {
   274  		err := am.Start(peerFlags)
   275  		if err != nil {
   276  			return fmt.Errorf("failed to start alertmanager cluster: %v", err.Error())
   277  		}
   278  	}
   279  
   280  	for _, am := range amc.ams {
   281  		err := am.WaitForCluster(len(amc.ams))
   282  		if err != nil {
   283  			return fmt.Errorf("failed to wait for Alertmanager instance %q to join cluster: %v", am.clusterAddr, err.Error())
   284  		}
   285  	}
   286  
   287  	return nil
   288  }
   289  
   290  // Members returns the underlying slice of cluster members.
   291  func (amc *AlertmanagerCluster) Members() []*Alertmanager {
   292  	return amc.ams
   293  }
   294  
   295  // Start the alertmanager and wait until it is ready to receive.
   296  func (am *Alertmanager) Start(additionalArg []string) error {
   297  	am.t.Helper()
   298  	args := []string{
   299  		"--config.file", am.confFile.Name(),
   300  		"--log.level", "debug",
   301  		"--web.listen-address", am.apiAddr,
   302  		"--storage.path", am.dir,
   303  		"--cluster.listen-address", am.clusterAddr,
   304  		"--cluster.settle-timeout", "0s",
   305  	}
   306  	if am.opts.RoutePrefix != "" {
   307  		args = append(args, "--web.route-prefix", am.opts.RoutePrefix)
   308  	}
   309  	args = append(args, additionalArg...)
   310  
   311  	cmd := exec.Command("../../../alertmanager", args...)
   312  
   313  	if am.cmd == nil {
   314  		var outb, errb buffer
   315  		cmd.Stdout = &outb
   316  		cmd.Stderr = &errb
   317  	} else {
   318  		cmd.Stdout = am.cmd.Stdout
   319  		cmd.Stderr = am.cmd.Stderr
   320  	}
   321  	am.cmd = cmd
   322  
   323  	if err := am.cmd.Start(); err != nil {
   324  		return err
   325  	}
   326  
   327  	go func() {
   328  		if err := am.cmd.Wait(); err != nil {
   329  			am.errc <- err
   330  		}
   331  	}()
   332  
   333  	time.Sleep(50 * time.Millisecond)
   334  	var lastErr error
   335  	for i := 0; i < 10; i++ {
   336  		_, lastErr = am.clientV2.General.GetStatus(nil)
   337  		if lastErr == nil {
   338  			return nil
   339  		}
   340  		time.Sleep(500 * time.Millisecond)
   341  	}
   342  	return fmt.Errorf("unable to get a successful response from the Alertmanager: %v", lastErr)
   343  }
   344  
   345  // WaitForCluster waits for the Alertmanager instance to join a cluster with the
   346  // given size.
   347  func (am *Alertmanager) WaitForCluster(size int) error {
   348  	params := general.NewGetStatusParams()
   349  	params.WithContext(context.Background())
   350  	var status *general.GetStatusOK
   351  
   352  	// Poll for 2s
   353  	for i := 0; i < 20; i++ {
   354  		var err error
   355  		status, err = am.clientV2.General.GetStatus(params)
   356  		if err != nil {
   357  			return err
   358  		}
   359  
   360  		if len(status.Payload.Cluster.Peers) == size {
   361  			return nil
   362  		}
   363  		time.Sleep(100 * time.Millisecond)
   364  	}
   365  
   366  	return fmt.Errorf(
   367  		"expected %v peers, but got %v",
   368  		size,
   369  		len(status.Payload.Cluster.Peers),
   370  	)
   371  }
   372  
   373  // Terminate kills the underlying Alertmanager cluster processes and removes intermediate
   374  // data.
   375  func (amc *AlertmanagerCluster) Terminate() {
   376  	for _, am := range amc.ams {
   377  		am.Terminate()
   378  	}
   379  }
   380  
   381  // Terminate kills the underlying Alertmanager process and remove intermediate
   382  // data.
   383  func (am *Alertmanager) Terminate() {
   384  	am.t.Helper()
   385  	if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGTERM); err != nil {
   386  		am.t.Logf("Error sending SIGTERM to Alertmanager process: %v", err)
   387  	}
   388  }
   389  
   390  // Reload sends the reloading signal to the Alertmanager instances.
   391  func (amc *AlertmanagerCluster) Reload() {
   392  	for _, am := range amc.ams {
   393  		am.Reload()
   394  	}
   395  }
   396  
   397  // Reload sends the reloading signal to the Alertmanager process.
   398  func (am *Alertmanager) Reload() {
   399  	am.t.Helper()
   400  	if err := syscall.Kill(am.cmd.Process.Pid, syscall.SIGHUP); err != nil {
   401  		am.t.Fatalf("Error sending SIGHUP to Alertmanager process: %v", err)
   402  	}
   403  }
   404  
   405  func (am *Alertmanager) cleanup() {
   406  	am.t.Helper()
   407  	if err := os.RemoveAll(am.confFile.Name()); err != nil {
   408  		am.t.Errorf("Error removing test config file %q: %v", am.confFile.Name(), err)
   409  	}
   410  }
   411  
   412  // Push declares alerts that are to be pushed to the Alertmanager
   413  // servers at a relative point in time.
   414  func (amc *AlertmanagerCluster) Push(at float64, alerts ...*TestAlert) {
   415  	for _, am := range amc.ams {
   416  		am.Push(at, alerts...)
   417  	}
   418  }
   419  
   420  // Push declares alerts that are to be pushed to the Alertmanager
   421  // server at a relative point in time.
   422  func (am *Alertmanager) Push(at float64, alerts ...*TestAlert) {
   423  	var cas models.PostableAlerts
   424  	for i := range alerts {
   425  		a := alerts[i].nativeAlert(am.opts)
   426  		alert := &models.PostableAlert{
   427  			Alert: models.Alert{
   428  				Labels:       a.Labels,
   429  				GeneratorURL: a.GeneratorURL,
   430  			},
   431  			Annotations: a.Annotations,
   432  		}
   433  		if a.StartsAt != nil {
   434  			alert.StartsAt = *a.StartsAt
   435  		}
   436  		if a.EndsAt != nil {
   437  			alert.EndsAt = *a.EndsAt
   438  		}
   439  		cas = append(cas, alert)
   440  	}
   441  
   442  	am.t.Do(at, func() {
   443  		params := alert.PostAlertsParams{}
   444  		params.WithContext(context.Background()).WithAlerts(cas)
   445  
   446  		_, err := am.clientV2.Alert.PostAlerts(&params)
   447  		if err != nil {
   448  			am.t.Errorf("Error pushing %v: %v", cas, err)
   449  		}
   450  	})
   451  }
   452  
   453  // SetSilence updates or creates the given Silence.
   454  func (amc *AlertmanagerCluster) SetSilence(at float64, sil *TestSilence) {
   455  	for _, am := range amc.ams {
   456  		am.SetSilence(at, sil)
   457  	}
   458  }
   459  
   460  // SetSilence updates or creates the given Silence.
   461  func (am *Alertmanager) SetSilence(at float64, sil *TestSilence) {
   462  	am.t.Do(at, func() {
   463  		resp, err := am.clientV2.Silence.PostSilences(
   464  			silence.NewPostSilencesParams().WithSilence(
   465  				&models.PostableSilence{
   466  					Silence: *sil.nativeSilence(am.opts),
   467  				},
   468  			),
   469  		)
   470  		if err != nil {
   471  			am.t.Errorf("Error setting silence %v: %s", sil, err)
   472  			return
   473  		}
   474  		sil.SetID(resp.Payload.SilenceID)
   475  	})
   476  }
   477  
   478  // DelSilence deletes the silence with the sid at the given time.
   479  func (amc *AlertmanagerCluster) DelSilence(at float64, sil *TestSilence) {
   480  	for _, am := range amc.ams {
   481  		am.DelSilence(at, sil)
   482  	}
   483  }
   484  
   485  // DelSilence deletes the silence with the sid at the given time.
   486  func (am *Alertmanager) DelSilence(at float64, sil *TestSilence) {
   487  	am.t.Do(at, func() {
   488  		_, err := am.clientV2.Silence.DeleteSilence(
   489  			silence.NewDeleteSilenceParams().WithSilenceID(strfmt.UUID(sil.ID())),
   490  		)
   491  		if err != nil {
   492  			am.t.Errorf("Error deleting silence %v: %s", sil, err)
   493  		}
   494  	})
   495  }
   496  
   497  // UpdateConfig rewrites the configuration file for the Alertmanager cluster. It
   498  // does not initiate config reloading.
   499  func (amc *AlertmanagerCluster) UpdateConfig(conf string) {
   500  	for _, am := range amc.ams {
   501  		am.UpdateConfig(conf)
   502  	}
   503  }
   504  
   505  // UpdateConfig rewrites the configuration file for the Alertmanager. It does not
   506  // initiate config reloading.
   507  func (am *Alertmanager) UpdateConfig(conf string) {
   508  	if _, err := am.confFile.WriteString(conf); err != nil {
   509  		am.t.Fatal(err)
   510  	}
   511  	if err := am.confFile.Sync(); err != nil {
   512  		am.t.Fatal(err)
   513  	}
   514  }
   515  
   516  // Client returns a client to interact with the API v2 endpoint.
   517  func (am *Alertmanager) Client() *apiclient.AlertmanagerAPI {
   518  	return am.clientV2
   519  }
   520  

View as plain text