...

Source file src/github.com/prometheus/alertmanager/dispatch/dispatch.go

Documentation: github.com/prometheus/alertmanager/dispatch

     1  // Copyright 2018 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package dispatch
    15  
    16  import (
    17  	"context"
    18  	"fmt"
    19  	"sort"
    20  	"sync"
    21  	"time"
    22  
    23  	"github.com/go-kit/log"
    24  	"github.com/go-kit/log/level"
    25  	"github.com/prometheus/client_golang/prometheus"
    26  	"github.com/prometheus/common/model"
    27  
    28  	"github.com/prometheus/alertmanager/notify"
    29  	"github.com/prometheus/alertmanager/provider"
    30  	"github.com/prometheus/alertmanager/store"
    31  	"github.com/prometheus/alertmanager/types"
    32  )
    33  
    34  // DispatcherMetrics represents metrics associated to a dispatcher.
    35  type DispatcherMetrics struct {
    36  	aggrGroups            prometheus.Gauge
    37  	processingDuration    prometheus.Summary
    38  	aggrGroupLimitReached prometheus.Counter
    39  }
    40  
    41  // NewDispatcherMetrics returns a new registered DispatchMetrics.
    42  func NewDispatcherMetrics(registerLimitMetrics bool, r prometheus.Registerer) *DispatcherMetrics {
    43  	m := DispatcherMetrics{
    44  		aggrGroups: prometheus.NewGauge(
    45  			prometheus.GaugeOpts{
    46  				Name: "alertmanager_dispatcher_aggregation_groups",
    47  				Help: "Number of active aggregation groups",
    48  			},
    49  		),
    50  		processingDuration: prometheus.NewSummary(
    51  			prometheus.SummaryOpts{
    52  				Name: "alertmanager_dispatcher_alert_processing_duration_seconds",
    53  				Help: "Summary of latencies for the processing of alerts.",
    54  			},
    55  		),
    56  		aggrGroupLimitReached: prometheus.NewCounter(
    57  			prometheus.CounterOpts{
    58  				Name: "alertmanager_dispatcher_aggregation_group_limit_reached_total",
    59  				Help: "Number of times when dispatcher failed to create new aggregation group due to limit.",
    60  			},
    61  		),
    62  	}
    63  
    64  	if r != nil {
    65  		r.MustRegister(m.aggrGroups, m.processingDuration)
    66  		if registerLimitMetrics {
    67  			r.MustRegister(m.aggrGroupLimitReached)
    68  		}
    69  	}
    70  
    71  	return &m
    72  }
    73  
    74  // Dispatcher sorts incoming alerts into aggregation groups and
    75  // assigns the correct notifiers to each.
    76  type Dispatcher struct {
    77  	route   *Route
    78  	alerts  provider.Alerts
    79  	stage   notify.Stage
    80  	metrics *DispatcherMetrics
    81  	limits  Limits
    82  
    83  	timeout func(time.Duration) time.Duration
    84  
    85  	mtx                sync.RWMutex
    86  	aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
    87  	aggrGroupsNum      int
    88  
    89  	done   chan struct{}
    90  	ctx    context.Context
    91  	cancel func()
    92  
    93  	logger log.Logger
    94  }
    95  
    96  // Limits describes limits used by Dispatcher.
    97  type Limits interface {
    98  	// MaxNumberOfAggregationGroups returns max number of aggregation groups that dispatcher can have.
    99  	// 0 or negative value = unlimited.
   100  	// If dispatcher hits this limit, it will not create additional groups, but will log an error instead.
   101  	MaxNumberOfAggregationGroups() int
   102  }
   103  
   104  // NewDispatcher returns a new Dispatcher.
   105  func NewDispatcher(
   106  	ap provider.Alerts,
   107  	r *Route,
   108  	s notify.Stage,
   109  	mk types.Marker,
   110  	to func(time.Duration) time.Duration,
   111  	lim Limits,
   112  	l log.Logger,
   113  	m *DispatcherMetrics,
   114  ) *Dispatcher {
   115  	if lim == nil {
   116  		lim = nilLimits{}
   117  	}
   118  
   119  	disp := &Dispatcher{
   120  		alerts:  ap,
   121  		stage:   s,
   122  		route:   r,
   123  		timeout: to,
   124  		logger:  log.With(l, "component", "dispatcher"),
   125  		metrics: m,
   126  		limits:  lim,
   127  	}
   128  	return disp
   129  }
   130  
   131  // Run starts dispatching alerts incoming via the updates channel.
   132  func (d *Dispatcher) Run() {
   133  	d.done = make(chan struct{})
   134  
   135  	d.mtx.Lock()
   136  	d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
   137  	d.aggrGroupsNum = 0
   138  	d.metrics.aggrGroups.Set(0)
   139  	d.ctx, d.cancel = context.WithCancel(context.Background())
   140  	d.mtx.Unlock()
   141  
   142  	d.run(d.alerts.Subscribe())
   143  	close(d.done)
   144  }
   145  
   146  func (d *Dispatcher) run(it provider.AlertIterator) {
   147  	cleanup := time.NewTicker(30 * time.Second)
   148  	defer cleanup.Stop()
   149  
   150  	defer it.Close()
   151  
   152  	for {
   153  		select {
   154  		case alert, ok := <-it.Next():
   155  			if !ok {
   156  				// Iterator exhausted for some reason.
   157  				if err := it.Err(); err != nil {
   158  					level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
   159  				}
   160  				return
   161  			}
   162  
   163  			level.Debug(d.logger).Log("msg", "Received alert", "alert", alert)
   164  
   165  			// Log errors but keep trying.
   166  			if err := it.Err(); err != nil {
   167  				level.Error(d.logger).Log("msg", "Error on alert update", "err", err)
   168  				continue
   169  			}
   170  
   171  			now := time.Now()
   172  			for _, r := range d.route.Match(alert.Labels) {
   173  				d.processAlert(alert, r)
   174  			}
   175  			d.metrics.processingDuration.Observe(time.Since(now).Seconds())
   176  
   177  		case <-cleanup.C:
   178  			d.mtx.Lock()
   179  
   180  			for _, groups := range d.aggrGroupsPerRoute {
   181  				for _, ag := range groups {
   182  					if ag.empty() {
   183  						ag.stop()
   184  						delete(groups, ag.fingerprint())
   185  						d.aggrGroupsNum--
   186  						d.metrics.aggrGroups.Dec()
   187  					}
   188  				}
   189  			}
   190  
   191  			d.mtx.Unlock()
   192  
   193  		case <-d.ctx.Done():
   194  			return
   195  		}
   196  	}
   197  }
   198  
   199  // AlertGroup represents how alerts exist within an aggrGroup.
   200  type AlertGroup struct {
   201  	Alerts   types.AlertSlice
   202  	Labels   model.LabelSet
   203  	Receiver string
   204  }
   205  
   206  type AlertGroups []*AlertGroup
   207  
   208  func (ag AlertGroups) Swap(i, j int) { ag[i], ag[j] = ag[j], ag[i] }
   209  func (ag AlertGroups) Less(i, j int) bool {
   210  	if ag[i].Labels.Equal(ag[j].Labels) {
   211  		return ag[i].Receiver < ag[j].Receiver
   212  	}
   213  	return ag[i].Labels.Before(ag[j].Labels)
   214  }
   215  func (ag AlertGroups) Len() int { return len(ag) }
   216  
   217  // Groups returns a slice of AlertGroups from the dispatcher's internal state.
   218  func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
   219  	groups := AlertGroups{}
   220  
   221  	d.mtx.RLock()
   222  	defer d.mtx.RUnlock()
   223  
   224  	// Keep a list of receivers for an alert to prevent checking each alert
   225  	// again against all routes. The alert has already matched against this
   226  	// route on ingestion.
   227  	receivers := map[model.Fingerprint][]string{}
   228  
   229  	now := time.Now()
   230  	for route, ags := range d.aggrGroupsPerRoute {
   231  		if !routeFilter(route) {
   232  			continue
   233  		}
   234  
   235  		for _, ag := range ags {
   236  			receiver := route.RouteOpts.Receiver
   237  			alertGroup := &AlertGroup{
   238  				Labels:   ag.labels,
   239  				Receiver: receiver,
   240  			}
   241  
   242  			alerts := ag.alerts.List()
   243  			filteredAlerts := make([]*types.Alert, 0, len(alerts))
   244  			for _, a := range alerts {
   245  				if !alertFilter(a, now) {
   246  					continue
   247  				}
   248  
   249  				fp := a.Fingerprint()
   250  				if r, ok := receivers[fp]; ok {
   251  					// Receivers slice already exists. Add
   252  					// the current receiver to the slice.
   253  					receivers[fp] = append(r, receiver)
   254  				} else {
   255  					// First time we've seen this alert fingerprint.
   256  					// Initialize a new receivers slice.
   257  					receivers[fp] = []string{receiver}
   258  				}
   259  
   260  				filteredAlerts = append(filteredAlerts, a)
   261  			}
   262  			if len(filteredAlerts) == 0 {
   263  				continue
   264  			}
   265  			alertGroup.Alerts = filteredAlerts
   266  
   267  			groups = append(groups, alertGroup)
   268  		}
   269  	}
   270  	sort.Sort(groups)
   271  	for i := range groups {
   272  		sort.Sort(groups[i].Alerts)
   273  	}
   274  	for i := range receivers {
   275  		sort.Strings(receivers[i])
   276  	}
   277  
   278  	return groups, receivers
   279  }
   280  
   281  // Stop the dispatcher.
   282  func (d *Dispatcher) Stop() {
   283  	if d == nil {
   284  		return
   285  	}
   286  	d.mtx.Lock()
   287  	if d.cancel == nil {
   288  		d.mtx.Unlock()
   289  		return
   290  	}
   291  	d.cancel()
   292  	d.cancel = nil
   293  	d.mtx.Unlock()
   294  
   295  	<-d.done
   296  }
   297  
   298  // notifyFunc is a function that performs notification for the alert
   299  // with the given fingerprint. It aborts on context cancelation.
   300  // Returns false iff notifying failed.
   301  type notifyFunc func(context.Context, ...*types.Alert) bool
   302  
   303  // processAlert determines in which aggregation group the alert falls
   304  // and inserts it.
   305  func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
   306  	groupLabels := getGroupLabels(alert, route)
   307  
   308  	fp := groupLabels.Fingerprint()
   309  
   310  	d.mtx.Lock()
   311  	defer d.mtx.Unlock()
   312  
   313  	routeGroups, ok := d.aggrGroupsPerRoute[route]
   314  	if !ok {
   315  		routeGroups = map[model.Fingerprint]*aggrGroup{}
   316  		d.aggrGroupsPerRoute[route] = routeGroups
   317  	}
   318  
   319  	ag, ok := routeGroups[fp]
   320  	if ok {
   321  		ag.insert(alert)
   322  		return
   323  	}
   324  
   325  	// If the group does not exist, create it. But check the limit first.
   326  	if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
   327  		d.metrics.aggrGroupLimitReached.Inc()
   328  		level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
   329  		return
   330  	}
   331  
   332  	ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
   333  	routeGroups[fp] = ag
   334  	d.aggrGroupsNum++
   335  	d.metrics.aggrGroups.Inc()
   336  
   337  	// Insert the 1st alert in the group before starting the group's run()
   338  	// function, to make sure that when the run() will be executed the 1st
   339  	// alert is already there.
   340  	ag.insert(alert)
   341  
   342  	go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
   343  		_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
   344  		if err != nil {
   345  			lvl := level.Error(d.logger)
   346  			if ctx.Err() == context.Canceled {
   347  				// It is expected for the context to be canceled on
   348  				// configuration reload or shutdown. In this case, the
   349  				// message should only be logged at the debug level.
   350  				lvl = level.Debug(d.logger)
   351  			}
   352  			lvl.Log("msg", "Notify for alerts failed", "num_alerts", len(alerts), "err", err)
   353  		}
   354  		return err == nil
   355  	})
   356  }
   357  
   358  func getGroupLabels(alert *types.Alert, route *Route) model.LabelSet {
   359  	groupLabels := model.LabelSet{}
   360  	for ln, lv := range alert.Labels {
   361  		if _, ok := route.RouteOpts.GroupBy[ln]; ok || route.RouteOpts.GroupByAll {
   362  			groupLabels[ln] = lv
   363  		}
   364  	}
   365  
   366  	return groupLabels
   367  }
   368  
   369  // aggrGroup aggregates alert fingerprints into groups to which a
   370  // common set of routing options applies.
   371  // It emits notifications in the specified intervals.
   372  type aggrGroup struct {
   373  	labels   model.LabelSet
   374  	opts     *RouteOpts
   375  	logger   log.Logger
   376  	routeKey string
   377  
   378  	alerts  *store.Alerts
   379  	ctx     context.Context
   380  	cancel  func()
   381  	done    chan struct{}
   382  	next    *time.Timer
   383  	timeout func(time.Duration) time.Duration
   384  
   385  	mtx        sync.RWMutex
   386  	hasFlushed bool
   387  }
   388  
   389  // newAggrGroup returns a new aggregation group.
   390  func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger) *aggrGroup {
   391  	if to == nil {
   392  		to = func(d time.Duration) time.Duration { return d }
   393  	}
   394  	ag := &aggrGroup{
   395  		labels:   labels,
   396  		routeKey: r.Key(),
   397  		opts:     &r.RouteOpts,
   398  		timeout:  to,
   399  		alerts:   store.NewAlerts(),
   400  		done:     make(chan struct{}),
   401  	}
   402  	ag.ctx, ag.cancel = context.WithCancel(ctx)
   403  
   404  	ag.logger = log.With(logger, "aggrGroup", ag)
   405  
   406  	// Set an initial one-time wait before flushing
   407  	// the first batch of notifications.
   408  	ag.next = time.NewTimer(ag.opts.GroupWait)
   409  
   410  	return ag
   411  }
   412  
   413  func (ag *aggrGroup) fingerprint() model.Fingerprint {
   414  	return ag.labels.Fingerprint()
   415  }
   416  
   417  func (ag *aggrGroup) GroupKey() string {
   418  	return fmt.Sprintf("%s:%s", ag.routeKey, ag.labels)
   419  }
   420  
   421  func (ag *aggrGroup) String() string {
   422  	return ag.GroupKey()
   423  }
   424  
   425  func (ag *aggrGroup) run(nf notifyFunc) {
   426  	defer close(ag.done)
   427  	defer ag.next.Stop()
   428  
   429  	for {
   430  		select {
   431  		case now := <-ag.next.C:
   432  			// Give the notifications time until the next flush to
   433  			// finish before terminating them.
   434  			ctx, cancel := context.WithTimeout(ag.ctx, ag.timeout(ag.opts.GroupInterval))
   435  
   436  			// The now time we retrieve from the ticker is the only reliable
   437  			// point of time reference for the subsequent notification pipeline.
   438  			// Calculating the current time directly is prone to flaky behavior,
   439  			// which usually only becomes apparent in tests.
   440  			ctx = notify.WithNow(ctx, now)
   441  
   442  			// Populate context with information needed along the pipeline.
   443  			ctx = notify.WithGroupKey(ctx, ag.GroupKey())
   444  			ctx = notify.WithGroupLabels(ctx, ag.labels)
   445  			ctx = notify.WithReceiverName(ctx, ag.opts.Receiver)
   446  			ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval)
   447  			ctx = notify.WithMuteTimeIntervals(ctx, ag.opts.MuteTimeIntervals)
   448  			ctx = notify.WithActiveTimeIntervals(ctx, ag.opts.ActiveTimeIntervals)
   449  
   450  			// Wait the configured interval before calling flush again.
   451  			ag.mtx.Lock()
   452  			ag.next.Reset(ag.opts.GroupInterval)
   453  			ag.hasFlushed = true
   454  			ag.mtx.Unlock()
   455  
   456  			ag.flush(func(alerts ...*types.Alert) bool {
   457  				return nf(ctx, alerts...)
   458  			})
   459  
   460  			cancel()
   461  
   462  		case <-ag.ctx.Done():
   463  			return
   464  		}
   465  	}
   466  }
   467  
   468  func (ag *aggrGroup) stop() {
   469  	// Calling cancel will terminate all in-process notifications
   470  	// and the run() loop.
   471  	ag.cancel()
   472  	<-ag.done
   473  }
   474  
   475  // insert inserts the alert into the aggregation group.
   476  func (ag *aggrGroup) insert(alert *types.Alert) {
   477  	if err := ag.alerts.Set(alert); err != nil {
   478  		level.Error(ag.logger).Log("msg", "error on set alert", "err", err)
   479  	}
   480  
   481  	// Immediately trigger a flush if the wait duration for this
   482  	// alert is already over.
   483  	ag.mtx.Lock()
   484  	defer ag.mtx.Unlock()
   485  	if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
   486  		ag.next.Reset(0)
   487  	}
   488  }
   489  
   490  func (ag *aggrGroup) empty() bool {
   491  	return ag.alerts.Empty()
   492  }
   493  
   494  // flush sends notifications for all new alerts.
   495  func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
   496  	if ag.empty() {
   497  		return
   498  	}
   499  
   500  	var (
   501  		alerts      = ag.alerts.List()
   502  		alertsSlice = make(types.AlertSlice, 0, len(alerts))
   503  		now         = time.Now()
   504  	)
   505  	for _, alert := range alerts {
   506  		a := *alert
   507  		// Ensure that alerts don't resolve as time move forwards.
   508  		if !a.ResolvedAt(now) {
   509  			a.EndsAt = time.Time{}
   510  		}
   511  		alertsSlice = append(alertsSlice, &a)
   512  	}
   513  	sort.Stable(alertsSlice)
   514  
   515  	level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice))
   516  
   517  	if notify(alertsSlice...) {
   518  		for _, a := range alertsSlice {
   519  			// Only delete if the fingerprint has not been inserted
   520  			// again since we notified about it.
   521  			fp := a.Fingerprint()
   522  			got, err := ag.alerts.Get(fp)
   523  			if err != nil {
   524  				// This should never happen.
   525  				level.Error(ag.logger).Log("msg", "failed to get alert", "err", err, "alert", a.String())
   526  				continue
   527  			}
   528  			if a.Resolved() && got.UpdatedAt == a.UpdatedAt {
   529  				if err := ag.alerts.Delete(fp); err != nil {
   530  					level.Error(ag.logger).Log("msg", "error on delete alert", "err", err, "alert", a.String())
   531  				}
   532  			}
   533  		}
   534  	}
   535  }
   536  
   537  type nilLimits struct{}
   538  
   539  func (n nilLimits) MaxNumberOfAggregationGroups() int { return 0 }
   540  

View as plain text