...

Source file src/github.com/prometheus/alertmanager/dispatch/dispatch_test.go

Documentation: github.com/prometheus/alertmanager/dispatch

     1  // Copyright 2018 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package dispatch
    15  
    16  import (
    17  	"context"
    18  	"fmt"
    19  	"reflect"
    20  	"sort"
    21  	"sync"
    22  	"testing"
    23  	"time"
    24  
    25  	"github.com/go-kit/log"
    26  	"github.com/prometheus/client_golang/prometheus"
    27  	"github.com/prometheus/client_golang/prometheus/testutil"
    28  	"github.com/prometheus/common/model"
    29  	"github.com/stretchr/testify/require"
    30  
    31  	"github.com/prometheus/alertmanager/config"
    32  	"github.com/prometheus/alertmanager/notify"
    33  	"github.com/prometheus/alertmanager/provider/mem"
    34  	"github.com/prometheus/alertmanager/types"
    35  )
    36  
    37  func TestAggrGroup(t *testing.T) {
    38  	lset := model.LabelSet{
    39  		"a": "v1",
    40  		"b": "v2",
    41  	}
    42  	opts := &RouteOpts{
    43  		Receiver: "n1",
    44  		GroupBy: map[model.LabelName]struct{}{
    45  			"a": {},
    46  			"b": {},
    47  		},
    48  		GroupWait:      1 * time.Second,
    49  		GroupInterval:  300 * time.Millisecond,
    50  		RepeatInterval: 1 * time.Hour,
    51  	}
    52  	route := &Route{
    53  		RouteOpts: *opts,
    54  	}
    55  
    56  	var (
    57  		a1 = &types.Alert{
    58  			Alert: model.Alert{
    59  				Labels: model.LabelSet{
    60  					"a": "v1",
    61  					"b": "v2",
    62  					"c": "v3",
    63  				},
    64  				StartsAt: time.Now().Add(time.Minute),
    65  				EndsAt:   time.Now().Add(time.Hour),
    66  			},
    67  			UpdatedAt: time.Now(),
    68  		}
    69  		a2 = &types.Alert{
    70  			Alert: model.Alert{
    71  				Labels: model.LabelSet{
    72  					"a": "v1",
    73  					"b": "v2",
    74  					"c": "v4",
    75  				},
    76  				StartsAt: time.Now().Add(-time.Hour),
    77  				EndsAt:   time.Now().Add(2 * time.Hour),
    78  			},
    79  			UpdatedAt: time.Now(),
    80  		}
    81  		a3 = &types.Alert{
    82  			Alert: model.Alert{
    83  				Labels: model.LabelSet{
    84  					"a": "v1",
    85  					"b": "v2",
    86  					"c": "v5",
    87  				},
    88  				StartsAt: time.Now().Add(time.Minute),
    89  				EndsAt:   time.Now().Add(5 * time.Minute),
    90  			},
    91  			UpdatedAt: time.Now(),
    92  		}
    93  	)
    94  
    95  	var (
    96  		last       = time.Now()
    97  		current    = time.Now()
    98  		lastCurMtx = &sync.Mutex{}
    99  		alertsCh   = make(chan types.AlertSlice)
   100  	)
   101  
   102  	ntfy := func(ctx context.Context, alerts ...*types.Alert) bool {
   103  		// Validate that the context is properly populated.
   104  		if _, ok := notify.Now(ctx); !ok {
   105  			t.Errorf("now missing")
   106  		}
   107  		if _, ok := notify.GroupKey(ctx); !ok {
   108  			t.Errorf("group key missing")
   109  		}
   110  		if lbls, ok := notify.GroupLabels(ctx); !ok || !reflect.DeepEqual(lbls, lset) {
   111  			t.Errorf("wrong group labels: %q", lbls)
   112  		}
   113  		if rcv, ok := notify.ReceiverName(ctx); !ok || rcv != opts.Receiver {
   114  			t.Errorf("wrong receiver: %q", rcv)
   115  		}
   116  		if ri, ok := notify.RepeatInterval(ctx); !ok || ri != opts.RepeatInterval {
   117  			t.Errorf("wrong repeat interval: %q", ri)
   118  		}
   119  
   120  		lastCurMtx.Lock()
   121  		last = current
   122  		// Subtract a millisecond to allow for races.
   123  		current = time.Now().Add(-time.Millisecond)
   124  		lastCurMtx.Unlock()
   125  
   126  		alertsCh <- types.AlertSlice(alerts)
   127  
   128  		return true
   129  	}
   130  
   131  	removeEndsAt := func(as types.AlertSlice) types.AlertSlice {
   132  		for i, a := range as {
   133  			ac := *a
   134  			ac.EndsAt = time.Time{}
   135  			as[i] = &ac
   136  		}
   137  		return as
   138  	}
   139  
   140  	// Test regular situation where we wait for group_wait to send out alerts.
   141  	ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
   142  	go ag.run(ntfy)
   143  
   144  	ag.insert(a1)
   145  
   146  	select {
   147  	case <-time.After(2 * opts.GroupWait):
   148  		t.Fatalf("expected initial batch after group_wait")
   149  
   150  	case batch := <-alertsCh:
   151  		lastCurMtx.Lock()
   152  		s := time.Since(last)
   153  		lastCurMtx.Unlock()
   154  		if s < opts.GroupWait {
   155  			t.Fatalf("received batch too early after %v", s)
   156  		}
   157  		exp := removeEndsAt(types.AlertSlice{a1})
   158  		sort.Sort(batch)
   159  
   160  		if !reflect.DeepEqual(batch, exp) {
   161  			t.Fatalf("expected alerts %v but got %v", exp, batch)
   162  		}
   163  	}
   164  
   165  	for i := 0; i < 3; i++ {
   166  		// New alert should come in after group interval.
   167  		ag.insert(a3)
   168  
   169  		select {
   170  		case <-time.After(2 * opts.GroupInterval):
   171  			t.Fatalf("expected new batch after group interval but received none")
   172  
   173  		case batch := <-alertsCh:
   174  			lastCurMtx.Lock()
   175  			s := time.Since(last)
   176  			lastCurMtx.Unlock()
   177  			if s < opts.GroupInterval {
   178  				t.Fatalf("received batch too early after %v", s)
   179  			}
   180  			exp := removeEndsAt(types.AlertSlice{a1, a3})
   181  			sort.Sort(batch)
   182  
   183  			if !reflect.DeepEqual(batch, exp) {
   184  				t.Fatalf("expected alerts %v but got %v", exp, batch)
   185  			}
   186  		}
   187  	}
   188  
   189  	ag.stop()
   190  
   191  	// Add an alert that started more than group_interval in the past. We expect
   192  	// immediate flushing.
   193  	// Finally, set all alerts to be resolved. After successful notify the aggregation group
   194  	// should empty itself.
   195  	ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
   196  	go ag.run(ntfy)
   197  
   198  	ag.insert(a1)
   199  	ag.insert(a2)
   200  
   201  	// a2 lies way in the past so the initial group_wait should be skipped.
   202  	select {
   203  	case <-time.After(opts.GroupWait / 2):
   204  		t.Fatalf("expected immediate alert but received none")
   205  
   206  	case batch := <-alertsCh:
   207  		exp := removeEndsAt(types.AlertSlice{a1, a2})
   208  		sort.Sort(batch)
   209  
   210  		if !reflect.DeepEqual(batch, exp) {
   211  			t.Fatalf("expected alerts %v but got %v", exp, batch)
   212  		}
   213  	}
   214  
   215  	for i := 0; i < 3; i++ {
   216  		// New alert should come in after group interval.
   217  		ag.insert(a3)
   218  
   219  		select {
   220  		case <-time.After(2 * opts.GroupInterval):
   221  			t.Fatalf("expected new batch after group interval but received none")
   222  
   223  		case batch := <-alertsCh:
   224  			lastCurMtx.Lock()
   225  			s := time.Since(last)
   226  			lastCurMtx.Unlock()
   227  			if s < opts.GroupInterval {
   228  				t.Fatalf("received batch too early after %v", s)
   229  			}
   230  			exp := removeEndsAt(types.AlertSlice{a1, a2, a3})
   231  			sort.Sort(batch)
   232  
   233  			if !reflect.DeepEqual(batch, exp) {
   234  				t.Fatalf("expected alerts %v but got %v", exp, batch)
   235  			}
   236  		}
   237  	}
   238  
   239  	// Resolve all alerts, they should be removed after the next batch was sent.
   240  	a1r, a2r, a3r := *a1, *a2, *a3
   241  	resolved := types.AlertSlice{&a1r, &a2r, &a3r}
   242  	for _, a := range resolved {
   243  		a.EndsAt = time.Now()
   244  		ag.insert(a)
   245  	}
   246  
   247  	select {
   248  	case <-time.After(2 * opts.GroupInterval):
   249  		t.Fatalf("expected new batch after group interval but received none")
   250  
   251  	case batch := <-alertsCh:
   252  		lastCurMtx.Lock()
   253  		s := time.Since(last)
   254  		lastCurMtx.Unlock()
   255  		if s < opts.GroupInterval {
   256  			t.Fatalf("received batch too early after %v", s)
   257  		}
   258  		sort.Sort(batch)
   259  
   260  		if !reflect.DeepEqual(batch, resolved) {
   261  			t.Fatalf("expected alerts %v but got %v", resolved, batch)
   262  		}
   263  
   264  		if !ag.empty() {
   265  			t.Fatalf("Expected aggregation group to be empty after resolving alerts: %v", ag)
   266  		}
   267  	}
   268  
   269  	ag.stop()
   270  }
   271  
   272  func TestGroupLabels(t *testing.T) {
   273  	a := &types.Alert{
   274  		Alert: model.Alert{
   275  			Labels: model.LabelSet{
   276  				"a": "v1",
   277  				"b": "v2",
   278  				"c": "v3",
   279  			},
   280  		},
   281  	}
   282  
   283  	route := &Route{
   284  		RouteOpts: RouteOpts{
   285  			GroupBy: map[model.LabelName]struct{}{
   286  				"a": {},
   287  				"b": {},
   288  			},
   289  			GroupByAll: false,
   290  		},
   291  	}
   292  
   293  	expLs := model.LabelSet{
   294  		"a": "v1",
   295  		"b": "v2",
   296  	}
   297  
   298  	ls := getGroupLabels(a, route)
   299  
   300  	if !reflect.DeepEqual(ls, expLs) {
   301  		t.Fatalf("expected labels are %v, but got %v", expLs, ls)
   302  	}
   303  }
   304  
   305  func TestGroupByAllLabels(t *testing.T) {
   306  	a := &types.Alert{
   307  		Alert: model.Alert{
   308  			Labels: model.LabelSet{
   309  				"a": "v1",
   310  				"b": "v2",
   311  				"c": "v3",
   312  			},
   313  		},
   314  	}
   315  
   316  	route := &Route{
   317  		RouteOpts: RouteOpts{
   318  			GroupBy:    map[model.LabelName]struct{}{},
   319  			GroupByAll: true,
   320  		},
   321  	}
   322  
   323  	expLs := model.LabelSet{
   324  		"a": "v1",
   325  		"b": "v2",
   326  		"c": "v3",
   327  	}
   328  
   329  	ls := getGroupLabels(a, route)
   330  
   331  	if !reflect.DeepEqual(ls, expLs) {
   332  		t.Fatalf("expected labels are %v, but got %v", expLs, ls)
   333  	}
   334  }
   335  
   336  func TestGroups(t *testing.T) {
   337  	confData := `receivers:
   338  - name: 'kafka'
   339  - name: 'prod'
   340  - name: 'testing'
   341  
   342  route:
   343    group_by: ['alertname']
   344    group_wait: 10ms
   345    group_interval: 10ms
   346    receiver: 'prod'
   347    routes:
   348    - match:
   349        env: 'testing'
   350      receiver: 'testing'
   351      group_by: ['alertname', 'service']
   352    - match:
   353        env: 'prod'
   354      receiver: 'prod'
   355      group_by: ['alertname', 'service', 'cluster']
   356      continue: true
   357    - match:
   358        kafka: 'yes'
   359      receiver: 'kafka'
   360      group_by: ['alertname', 'service', 'cluster']`
   361  	conf, err := config.Load(confData)
   362  	if err != nil {
   363  		t.Fatal(err)
   364  	}
   365  
   366  	logger := log.NewNopLogger()
   367  	route := NewRoute(conf.Route, nil)
   368  	marker := types.NewMarker(prometheus.NewRegistry())
   369  	alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
   370  	if err != nil {
   371  		t.Fatal(err)
   372  	}
   373  	defer alerts.Close()
   374  
   375  	timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
   376  	recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
   377  	dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
   378  	go dispatcher.Run()
   379  	defer dispatcher.Stop()
   380  
   381  	// Create alerts. the dispatcher will automatically create the groups.
   382  	inputAlerts := []*types.Alert{
   383  		// Matches the parent route.
   384  		newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}),
   385  		// Matches the first sub-route.
   386  		newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api", "instance": "inst1"}),
   387  		// Matches the second sub-route.
   388  		newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst1"}),
   389  		newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst2"}),
   390  		// Matches the second sub-route.
   391  		newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api", "instance": "inst1"}),
   392  		// Matches the second and third sub-route.
   393  		newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}),
   394  	}
   395  	alerts.Put(inputAlerts...)
   396  
   397  	// Let alerts get processed.
   398  	for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ {
   399  		time.Sleep(200 * time.Millisecond)
   400  	}
   401  	require.Equal(t, 7, len(recorder.Alerts()))
   402  
   403  	alertGroups, receivers := dispatcher.Groups(
   404  		func(*Route) bool {
   405  			return true
   406  		}, func(*types.Alert, time.Time) bool {
   407  			return true
   408  		},
   409  	)
   410  
   411  	require.Equal(t, AlertGroups{
   412  		&AlertGroup{
   413  			Alerts: []*types.Alert{inputAlerts[0]},
   414  			Labels: model.LabelSet{
   415  				"alertname": "OtherAlert",
   416  			},
   417  			Receiver: "prod",
   418  		},
   419  		&AlertGroup{
   420  			Alerts: []*types.Alert{inputAlerts[1]},
   421  			Labels: model.LabelSet{
   422  				"alertname": "TestingAlert",
   423  				"service":   "api",
   424  			},
   425  			Receiver: "testing",
   426  		},
   427  		&AlertGroup{
   428  			Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]},
   429  			Labels: model.LabelSet{
   430  				"alertname": "HighErrorRate",
   431  				"service":   "api",
   432  				"cluster":   "aa",
   433  			},
   434  			Receiver: "prod",
   435  		},
   436  		&AlertGroup{
   437  			Alerts: []*types.Alert{inputAlerts[4]},
   438  			Labels: model.LabelSet{
   439  				"alertname": "HighErrorRate",
   440  				"service":   "api",
   441  				"cluster":   "bb",
   442  			},
   443  			Receiver: "prod",
   444  		},
   445  		&AlertGroup{
   446  			Alerts: []*types.Alert{inputAlerts[5]},
   447  			Labels: model.LabelSet{
   448  				"alertname": "HighLatency",
   449  				"service":   "db",
   450  				"cluster":   "bb",
   451  			},
   452  			Receiver: "kafka",
   453  		},
   454  		&AlertGroup{
   455  			Alerts: []*types.Alert{inputAlerts[5]},
   456  			Labels: model.LabelSet{
   457  				"alertname": "HighLatency",
   458  				"service":   "db",
   459  				"cluster":   "bb",
   460  			},
   461  			Receiver: "prod",
   462  		},
   463  	}, alertGroups)
   464  	require.Equal(t, map[model.Fingerprint][]string{
   465  		inputAlerts[0].Fingerprint(): {"prod"},
   466  		inputAlerts[1].Fingerprint(): {"testing"},
   467  		inputAlerts[2].Fingerprint(): {"prod"},
   468  		inputAlerts[3].Fingerprint(): {"prod"},
   469  		inputAlerts[4].Fingerprint(): {"prod"},
   470  		inputAlerts[5].Fingerprint(): {"kafka", "prod"},
   471  	}, receivers)
   472  }
   473  
   474  func TestGroupsWithLimits(t *testing.T) {
   475  	confData := `receivers:
   476  - name: 'kafka'
   477  - name: 'prod'
   478  - name: 'testing'
   479  
   480  route:
   481    group_by: ['alertname']
   482    group_wait: 10ms
   483    group_interval: 10ms
   484    receiver: 'prod'
   485    routes:
   486    - match:
   487        env: 'testing'
   488      receiver: 'testing'
   489      group_by: ['alertname', 'service']
   490    - match:
   491        env: 'prod'
   492      receiver: 'prod'
   493      group_by: ['alertname', 'service', 'cluster']
   494      continue: true
   495    - match:
   496        kafka: 'yes'
   497      receiver: 'kafka'
   498      group_by: ['alertname', 'service', 'cluster']`
   499  	conf, err := config.Load(confData)
   500  	if err != nil {
   501  		t.Fatal(err)
   502  	}
   503  
   504  	logger := log.NewNopLogger()
   505  	route := NewRoute(conf.Route, nil)
   506  	marker := types.NewMarker(prometheus.NewRegistry())
   507  	alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
   508  	if err != nil {
   509  		t.Fatal(err)
   510  	}
   511  	defer alerts.Close()
   512  
   513  	timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
   514  	recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
   515  	lim := limits{groups: 6}
   516  	m := NewDispatcherMetrics(true, prometheus.NewRegistry())
   517  	dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m)
   518  	go dispatcher.Run()
   519  	defer dispatcher.Stop()
   520  
   521  	// Create alerts. the dispatcher will automatically create the groups.
   522  	inputAlerts := []*types.Alert{
   523  		// Matches the parent route.
   524  		newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}),
   525  		// Matches the first sub-route.
   526  		newAlert(model.LabelSet{"env": "testing", "alertname": "TestingAlert", "service": "api", "instance": "inst1"}),
   527  		// Matches the second sub-route.
   528  		newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst1"}),
   529  		newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "aa", "service": "api", "instance": "inst2"}),
   530  		// Matches the second sub-route.
   531  		newAlert(model.LabelSet{"env": "prod", "alertname": "HighErrorRate", "cluster": "bb", "service": "api", "instance": "inst1"}),
   532  		// Matches the second and third sub-route.
   533  		newAlert(model.LabelSet{"env": "prod", "alertname": "HighLatency", "cluster": "bb", "service": "db", "kafka": "yes", "instance": "inst3"}),
   534  	}
   535  	err = alerts.Put(inputAlerts...)
   536  	if err != nil {
   537  		t.Fatal(err)
   538  	}
   539  
   540  	// Let alerts get processed.
   541  	for i := 0; len(recorder.Alerts()) != 7 && i < 10; i++ {
   542  		time.Sleep(200 * time.Millisecond)
   543  	}
   544  	require.Equal(t, 7, len(recorder.Alerts()))
   545  
   546  	routeFilter := func(*Route) bool { return true }
   547  	alertFilter := func(*types.Alert, time.Time) bool { return true }
   548  
   549  	alertGroups, _ := dispatcher.Groups(routeFilter, alertFilter)
   550  	require.Len(t, alertGroups, 6)
   551  
   552  	require.Equal(t, 0.0, testutil.ToFloat64(m.aggrGroupLimitReached))
   553  
   554  	// Try to store new alert. This time, we will hit limit for number of groups.
   555  	err = alerts.Put(newAlert(model.LabelSet{"env": "prod", "alertname": "NewAlert", "cluster": "new-cluster", "service": "db"}))
   556  	if err != nil {
   557  		t.Fatal(err)
   558  	}
   559  
   560  	// Let alert get processed.
   561  	for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ {
   562  		time.Sleep(200 * time.Millisecond)
   563  	}
   564  	require.Equal(t, 1.0, testutil.ToFloat64(m.aggrGroupLimitReached))
   565  
   566  	// Verify there are still only 6 groups.
   567  	alertGroups, _ = dispatcher.Groups(routeFilter, alertFilter)
   568  	require.Len(t, alertGroups, 6)
   569  }
   570  
   571  type recordStage struct {
   572  	mtx    sync.RWMutex
   573  	alerts map[string]map[model.Fingerprint]*types.Alert
   574  }
   575  
   576  func (r *recordStage) Alerts() []*types.Alert {
   577  	r.mtx.RLock()
   578  	defer r.mtx.RUnlock()
   579  	alerts := make([]*types.Alert, 0)
   580  	for k := range r.alerts {
   581  		for _, a := range r.alerts[k] {
   582  			alerts = append(alerts, a)
   583  		}
   584  	}
   585  	return alerts
   586  }
   587  
   588  func (r *recordStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   589  	r.mtx.Lock()
   590  	defer r.mtx.Unlock()
   591  	gk, ok := notify.GroupKey(ctx)
   592  	if !ok {
   593  		panic("GroupKey not present!")
   594  	}
   595  	if _, ok := r.alerts[gk]; !ok {
   596  		r.alerts[gk] = make(map[model.Fingerprint]*types.Alert)
   597  	}
   598  	for _, a := range alerts {
   599  		r.alerts[gk][a.Fingerprint()] = a
   600  	}
   601  	return ctx, nil, nil
   602  }
   603  
   604  var (
   605  	// Set the start time in the past to trigger a flush immediately.
   606  	t0 = time.Now().Add(-time.Minute)
   607  	// Set the end time in the future to avoid deleting the alert.
   608  	t1 = t0.Add(2 * time.Minute)
   609  )
   610  
   611  func newAlert(labels model.LabelSet) *types.Alert {
   612  	return &types.Alert{
   613  		Alert: model.Alert{
   614  			Labels:       labels,
   615  			Annotations:  model.LabelSet{"foo": "bar"},
   616  			StartsAt:     t0,
   617  			EndsAt:       t1,
   618  			GeneratorURL: "http://example.com/prometheus",
   619  		},
   620  		UpdatedAt: t0,
   621  		Timeout:   false,
   622  	}
   623  }
   624  
   625  func TestDispatcherRace(t *testing.T) {
   626  	logger := log.NewNopLogger()
   627  	marker := types.NewMarker(prometheus.NewRegistry())
   628  	alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
   629  	if err != nil {
   630  		t.Fatal(err)
   631  	}
   632  	defer alerts.Close()
   633  
   634  	timeout := func(d time.Duration) time.Duration { return time.Duration(0) }
   635  	dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
   636  	go dispatcher.Run()
   637  	dispatcher.Stop()
   638  }
   639  
   640  func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) {
   641  	const numAlerts = 5000
   642  
   643  	logger := log.NewNopLogger()
   644  	marker := types.NewMarker(prometheus.NewRegistry())
   645  	alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil)
   646  	if err != nil {
   647  		t.Fatal(err)
   648  	}
   649  	defer alerts.Close()
   650  
   651  	route := &Route{
   652  		RouteOpts: RouteOpts{
   653  			Receiver:       "default",
   654  			GroupBy:        map[model.LabelName]struct{}{"alertname": {}},
   655  			GroupWait:      0,
   656  			GroupInterval:  1 * time.Hour, // Should never hit in this test.
   657  			RepeatInterval: 1 * time.Hour, // Should never hit in this test.
   658  		},
   659  	}
   660  
   661  	timeout := func(d time.Duration) time.Duration { return d }
   662  	recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)}
   663  	dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()))
   664  	go dispatcher.Run()
   665  	defer dispatcher.Stop()
   666  
   667  	// Push all alerts.
   668  	for i := 0; i < numAlerts; i++ {
   669  		alert := newAlert(model.LabelSet{"alertname": model.LabelValue(fmt.Sprintf("Alert_%d", i))})
   670  		require.NoError(t, alerts.Put(alert))
   671  	}
   672  
   673  	// Wait until the alerts have been notified or the waiting timeout expires.
   674  	for deadline := time.Now().Add(5 * time.Second); time.Now().Before(deadline); {
   675  		if len(recorder.Alerts()) >= numAlerts {
   676  			break
   677  		}
   678  
   679  		// Throttle.
   680  		time.Sleep(10 * time.Millisecond)
   681  	}
   682  
   683  	// We expect all alerts to be notified immediately, since they all belong to different groups.
   684  	require.Equal(t, numAlerts, len(recorder.Alerts()))
   685  }
   686  
   687  type limits struct {
   688  	groups int
   689  }
   690  
   691  func (l limits) MaxNumberOfAggregationGroups() int {
   692  	return l.groups
   693  }
   694  

View as plain text