...

Source file src/github.com/prometheus/alertmanager/notify/notify.go

Documentation: github.com/prometheus/alertmanager/notify

     1  // Copyright 2015 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package notify
    15  
    16  import (
    17  	"context"
    18  	"fmt"
    19  	"sort"
    20  	"sync"
    21  	"time"
    22  
    23  	"github.com/cenkalti/backoff/v4"
    24  	"github.com/cespare/xxhash/v2"
    25  	"github.com/go-kit/log"
    26  	"github.com/go-kit/log/level"
    27  	"github.com/pkg/errors"
    28  	"github.com/prometheus/client_golang/prometheus"
    29  	"github.com/prometheus/common/model"
    30  
    31  	"github.com/prometheus/alertmanager/inhibit"
    32  	"github.com/prometheus/alertmanager/nflog"
    33  	"github.com/prometheus/alertmanager/nflog/nflogpb"
    34  	"github.com/prometheus/alertmanager/silence"
    35  	"github.com/prometheus/alertmanager/timeinterval"
    36  	"github.com/prometheus/alertmanager/types"
    37  )
    38  
    39  // ResolvedSender returns true if resolved notifications should be sent.
    40  type ResolvedSender interface {
    41  	SendResolved() bool
    42  }
    43  
    44  // Peer represents the cluster node from where we are the sending the notification.
    45  type Peer interface {
    46  	// WaitReady waits until the node silences and notifications have settled before attempting to send a notification.
    47  	WaitReady(context.Context) error
    48  }
    49  
    50  // MinTimeout is the minimum timeout that is set for the context of a call
    51  // to a notification pipeline.
    52  const MinTimeout = 10 * time.Second
    53  
    54  // Notifier notifies about alerts under constraints of the given context. It
    55  // returns an error if unsuccessful and a flag whether the error is
    56  // recoverable. This information is useful for a retry logic.
    57  type Notifier interface {
    58  	Notify(context.Context, ...*types.Alert) (bool, error)
    59  }
    60  
    61  // Integration wraps a notifier and its configuration to be uniquely identified
    62  // by name and index from its origin in the configuration.
    63  type Integration struct {
    64  	notifier Notifier
    65  	rs       ResolvedSender
    66  	name     string
    67  	idx      int
    68  }
    69  
    70  // NewIntegration returns a new integration.
    71  func NewIntegration(notifier Notifier, rs ResolvedSender, name string, idx int) Integration {
    72  	return Integration{
    73  		notifier: notifier,
    74  		rs:       rs,
    75  		name:     name,
    76  		idx:      idx,
    77  	}
    78  }
    79  
    80  // Notify implements the Notifier interface.
    81  func (i *Integration) Notify(ctx context.Context, alerts ...*types.Alert) (bool, error) {
    82  	return i.notifier.Notify(ctx, alerts...)
    83  }
    84  
    85  // SendResolved implements the ResolvedSender interface.
    86  func (i *Integration) SendResolved() bool {
    87  	return i.rs.SendResolved()
    88  }
    89  
    90  // Name returns the name of the integration.
    91  func (i *Integration) Name() string {
    92  	return i.name
    93  }
    94  
    95  // Index returns the index of the integration.
    96  func (i *Integration) Index() int {
    97  	return i.idx
    98  }
    99  
   100  // String implements the Stringer interface.
   101  func (i *Integration) String() string {
   102  	return fmt.Sprintf("%s[%d]", i.name, i.idx)
   103  }
   104  
   105  // notifyKey defines a custom type with which a context is populated to
   106  // avoid accidental collisions.
   107  type notifyKey int
   108  
   109  const (
   110  	keyReceiverName notifyKey = iota
   111  	keyRepeatInterval
   112  	keyGroupLabels
   113  	keyGroupKey
   114  	keyFiringAlerts
   115  	keyResolvedAlerts
   116  	keyNow
   117  	keyMuteTimeIntervals
   118  	keyActiveTimeIntervals
   119  )
   120  
   121  // WithReceiverName populates a context with a receiver name.
   122  func WithReceiverName(ctx context.Context, rcv string) context.Context {
   123  	return context.WithValue(ctx, keyReceiverName, rcv)
   124  }
   125  
   126  // WithGroupKey populates a context with a group key.
   127  func WithGroupKey(ctx context.Context, s string) context.Context {
   128  	return context.WithValue(ctx, keyGroupKey, s)
   129  }
   130  
   131  // WithFiringAlerts populates a context with a slice of firing alerts.
   132  func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context {
   133  	return context.WithValue(ctx, keyFiringAlerts, alerts)
   134  }
   135  
   136  // WithResolvedAlerts populates a context with a slice of resolved alerts.
   137  func WithResolvedAlerts(ctx context.Context, alerts []uint64) context.Context {
   138  	return context.WithValue(ctx, keyResolvedAlerts, alerts)
   139  }
   140  
   141  // WithGroupLabels populates a context with grouping labels.
   142  func WithGroupLabels(ctx context.Context, lset model.LabelSet) context.Context {
   143  	return context.WithValue(ctx, keyGroupLabels, lset)
   144  }
   145  
   146  // WithNow populates a context with a now timestamp.
   147  func WithNow(ctx context.Context, t time.Time) context.Context {
   148  	return context.WithValue(ctx, keyNow, t)
   149  }
   150  
   151  // WithRepeatInterval populates a context with a repeat interval.
   152  func WithRepeatInterval(ctx context.Context, t time.Duration) context.Context {
   153  	return context.WithValue(ctx, keyRepeatInterval, t)
   154  }
   155  
   156  // WithMuteTimeIntervals populates a context with a slice of mute time names.
   157  func WithMuteTimeIntervals(ctx context.Context, mt []string) context.Context {
   158  	return context.WithValue(ctx, keyMuteTimeIntervals, mt)
   159  }
   160  
   161  func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context {
   162  	return context.WithValue(ctx, keyActiveTimeIntervals, at)
   163  }
   164  
   165  // RepeatInterval extracts a repeat interval from the context. Iff none exists, the
   166  // second argument is false.
   167  func RepeatInterval(ctx context.Context) (time.Duration, bool) {
   168  	v, ok := ctx.Value(keyRepeatInterval).(time.Duration)
   169  	return v, ok
   170  }
   171  
   172  // ReceiverName extracts a receiver name from the context. Iff none exists, the
   173  // second argument is false.
   174  func ReceiverName(ctx context.Context) (string, bool) {
   175  	v, ok := ctx.Value(keyReceiverName).(string)
   176  	return v, ok
   177  }
   178  
   179  // GroupKey extracts a group key from the context. Iff none exists, the
   180  // second argument is false.
   181  func GroupKey(ctx context.Context) (string, bool) {
   182  	v, ok := ctx.Value(keyGroupKey).(string)
   183  	return v, ok
   184  }
   185  
   186  // GroupLabels extracts grouping label set from the context. Iff none exists, the
   187  // second argument is false.
   188  func GroupLabels(ctx context.Context) (model.LabelSet, bool) {
   189  	v, ok := ctx.Value(keyGroupLabels).(model.LabelSet)
   190  	return v, ok
   191  }
   192  
   193  // Now extracts a now timestamp from the context. Iff none exists, the
   194  // second argument is false.
   195  func Now(ctx context.Context) (time.Time, bool) {
   196  	v, ok := ctx.Value(keyNow).(time.Time)
   197  	return v, ok
   198  }
   199  
   200  // FiringAlerts extracts a slice of firing alerts from the context.
   201  // Iff none exists, the second argument is false.
   202  func FiringAlerts(ctx context.Context) ([]uint64, bool) {
   203  	v, ok := ctx.Value(keyFiringAlerts).([]uint64)
   204  	return v, ok
   205  }
   206  
   207  // ResolvedAlerts extracts a slice of firing alerts from the context.
   208  // Iff none exists, the second argument is false.
   209  func ResolvedAlerts(ctx context.Context) ([]uint64, bool) {
   210  	v, ok := ctx.Value(keyResolvedAlerts).([]uint64)
   211  	return v, ok
   212  }
   213  
   214  // MuteTimeIntervalNames extracts a slice of mute time names from the context. If and only if none exists, the
   215  // second argument is false.
   216  func MuteTimeIntervalNames(ctx context.Context) ([]string, bool) {
   217  	v, ok := ctx.Value(keyMuteTimeIntervals).([]string)
   218  	return v, ok
   219  }
   220  
   221  // ActiveTimeIntervalNames extracts a slice of active time names from the context. If none exists, the
   222  // second argument is false.
   223  func ActiveTimeIntervalNames(ctx context.Context) ([]string, bool) {
   224  	v, ok := ctx.Value(keyActiveTimeIntervals).([]string)
   225  	return v, ok
   226  }
   227  
   228  // A Stage processes alerts under the constraints of the given context.
   229  type Stage interface {
   230  	Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
   231  }
   232  
   233  // StageFunc wraps a function to represent a Stage.
   234  type StageFunc func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error)
   235  
   236  // Exec implements Stage interface.
   237  func (f StageFunc) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   238  	return f(ctx, l, alerts...)
   239  }
   240  
   241  type NotificationLog interface {
   242  	Log(r *nflogpb.Receiver, gkey string, firingAlerts, resolvedAlerts []uint64, expiry time.Duration) error
   243  	Query(params ...nflog.QueryParam) ([]*nflogpb.Entry, error)
   244  }
   245  
   246  type Metrics struct {
   247  	numNotifications                   *prometheus.CounterVec
   248  	numTotalFailedNotifications        *prometheus.CounterVec
   249  	numNotificationRequestsTotal       *prometheus.CounterVec
   250  	numNotificationRequestsFailedTotal *prometheus.CounterVec
   251  	notificationLatencySeconds         *prometheus.HistogramVec
   252  }
   253  
   254  func NewMetrics(r prometheus.Registerer) *Metrics {
   255  	m := &Metrics{
   256  		numNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{
   257  			Namespace: "alertmanager",
   258  			Name:      "notifications_total",
   259  			Help:      "The total number of attempted notifications.",
   260  		}, []string{"integration"}),
   261  		numTotalFailedNotifications: prometheus.NewCounterVec(prometheus.CounterOpts{
   262  			Namespace: "alertmanager",
   263  			Name:      "notifications_failed_total",
   264  			Help:      "The total number of failed notifications.",
   265  		}, []string{"integration"}),
   266  		numNotificationRequestsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
   267  			Namespace: "alertmanager",
   268  			Name:      "notification_requests_total",
   269  			Help:      "The total number of attempted notification requests.",
   270  		}, []string{"integration"}),
   271  		numNotificationRequestsFailedTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
   272  			Namespace: "alertmanager",
   273  			Name:      "notification_requests_failed_total",
   274  			Help:      "The total number of failed notification requests.",
   275  		}, []string{"integration"}),
   276  		notificationLatencySeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{
   277  			Namespace: "alertmanager",
   278  			Name:      "notification_latency_seconds",
   279  			Help:      "The latency of notifications in seconds.",
   280  			Buckets:   []float64{1, 5, 10, 15, 20},
   281  		}, []string{"integration"}),
   282  	}
   283  	for _, integration := range []string{
   284  		"email",
   285  		"pagerduty",
   286  		"wechat",
   287  		"pushover",
   288  		"slack",
   289  		"opsgenie",
   290  		"webhook",
   291  		"victorops",
   292  		"sns",
   293  		"telegram",
   294  	} {
   295  		m.numNotifications.WithLabelValues(integration)
   296  		m.numTotalFailedNotifications.WithLabelValues(integration)
   297  		m.numNotificationRequestsTotal.WithLabelValues(integration)
   298  		m.numNotificationRequestsFailedTotal.WithLabelValues(integration)
   299  		m.notificationLatencySeconds.WithLabelValues(integration)
   300  	}
   301  	r.MustRegister(
   302  		m.numNotifications, m.numTotalFailedNotifications,
   303  		m.numNotificationRequestsTotal, m.numNotificationRequestsFailedTotal,
   304  		m.notificationLatencySeconds,
   305  	)
   306  	return m
   307  }
   308  
   309  type PipelineBuilder struct {
   310  	metrics *Metrics
   311  }
   312  
   313  func NewPipelineBuilder(r prometheus.Registerer) *PipelineBuilder {
   314  	return &PipelineBuilder{
   315  		metrics: NewMetrics(r),
   316  	}
   317  }
   318  
   319  // New returns a map of receivers to Stages.
   320  func (pb *PipelineBuilder) New(
   321  	receivers map[string][]Integration,
   322  	wait func() time.Duration,
   323  	inhibitor *inhibit.Inhibitor,
   324  	silencer *silence.Silencer,
   325  	times map[string][]timeinterval.TimeInterval,
   326  	notificationLog NotificationLog,
   327  	peer Peer,
   328  ) RoutingStage {
   329  	rs := make(RoutingStage, len(receivers))
   330  
   331  	ms := NewGossipSettleStage(peer)
   332  	is := NewMuteStage(inhibitor)
   333  	tas := NewTimeActiveStage(times)
   334  	tms := NewTimeMuteStage(times)
   335  	ss := NewMuteStage(silencer)
   336  
   337  	for name := range receivers {
   338  		st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics)
   339  		rs[name] = MultiStage{ms, is, tas, tms, ss, st}
   340  	}
   341  	return rs
   342  }
   343  
   344  // createReceiverStage creates a pipeline of stages for a receiver.
   345  func createReceiverStage(
   346  	name string,
   347  	integrations []Integration,
   348  	wait func() time.Duration,
   349  	notificationLog NotificationLog,
   350  	metrics *Metrics,
   351  ) Stage {
   352  	var fs FanoutStage
   353  	for i := range integrations {
   354  		recv := &nflogpb.Receiver{
   355  			GroupName:   name,
   356  			Integration: integrations[i].Name(),
   357  			Idx:         uint32(integrations[i].Index()),
   358  		}
   359  		var s MultiStage
   360  		s = append(s, NewWaitStage(wait))
   361  		s = append(s, NewDedupStage(&integrations[i], notificationLog, recv))
   362  		s = append(s, NewRetryStage(integrations[i], name, metrics))
   363  		s = append(s, NewSetNotifiesStage(notificationLog, recv))
   364  
   365  		fs = append(fs, s)
   366  	}
   367  	return fs
   368  }
   369  
   370  // RoutingStage executes the inner stages based on the receiver specified in
   371  // the context.
   372  type RoutingStage map[string]Stage
   373  
   374  // Exec implements the Stage interface.
   375  func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   376  	receiver, ok := ReceiverName(ctx)
   377  	if !ok {
   378  		return ctx, nil, errors.New("receiver missing")
   379  	}
   380  
   381  	s, ok := rs[receiver]
   382  	if !ok {
   383  		return ctx, nil, errors.New("stage for receiver missing")
   384  	}
   385  
   386  	return s.Exec(ctx, l, alerts...)
   387  }
   388  
   389  // A MultiStage executes a series of stages sequentially.
   390  type MultiStage []Stage
   391  
   392  // Exec implements the Stage interface.
   393  func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   394  	var err error
   395  	for _, s := range ms {
   396  		if len(alerts) == 0 {
   397  			return ctx, nil, nil
   398  		}
   399  
   400  		ctx, alerts, err = s.Exec(ctx, l, alerts...)
   401  		if err != nil {
   402  			return ctx, nil, err
   403  		}
   404  	}
   405  	return ctx, alerts, nil
   406  }
   407  
   408  // FanoutStage executes its stages concurrently
   409  type FanoutStage []Stage
   410  
   411  // Exec attempts to execute all stages concurrently and discards the results.
   412  // It returns its input alerts and a types.MultiError if one or more stages fail.
   413  func (fs FanoutStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   414  	var (
   415  		wg sync.WaitGroup
   416  		me types.MultiError
   417  	)
   418  	wg.Add(len(fs))
   419  
   420  	for _, s := range fs {
   421  		go func(s Stage) {
   422  			if _, _, err := s.Exec(ctx, l, alerts...); err != nil {
   423  				me.Add(err)
   424  			}
   425  			wg.Done()
   426  		}(s)
   427  	}
   428  	wg.Wait()
   429  
   430  	if me.Len() > 0 {
   431  		return ctx, alerts, &me
   432  	}
   433  	return ctx, alerts, nil
   434  }
   435  
   436  // GossipSettleStage waits until the Gossip has settled to forward alerts.
   437  type GossipSettleStage struct {
   438  	peer Peer
   439  }
   440  
   441  // NewGossipSettleStage returns a new GossipSettleStage.
   442  func NewGossipSettleStage(p Peer) *GossipSettleStage {
   443  	return &GossipSettleStage{peer: p}
   444  }
   445  
   446  func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   447  	if n.peer != nil {
   448  		if err := n.peer.WaitReady(ctx); err != nil {
   449  			return ctx, nil, err
   450  		}
   451  	}
   452  	return ctx, alerts, nil
   453  }
   454  
   455  // MuteStage filters alerts through a Muter.
   456  type MuteStage struct {
   457  	muter types.Muter
   458  }
   459  
   460  // NewMuteStage return a new MuteStage.
   461  func NewMuteStage(m types.Muter) *MuteStage {
   462  	return &MuteStage{muter: m}
   463  }
   464  
   465  // Exec implements the Stage interface.
   466  func (n *MuteStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   467  	var filtered []*types.Alert
   468  	for _, a := range alerts {
   469  		// TODO(fabxc): increment total alerts counter.
   470  		// Do not send the alert if muted.
   471  		if !n.muter.Mutes(a.Labels) {
   472  			filtered = append(filtered, a)
   473  		}
   474  		// TODO(fabxc): increment muted alerts counter if muted.
   475  	}
   476  	return ctx, filtered, nil
   477  }
   478  
   479  // WaitStage waits for a certain amount of time before continuing or until the
   480  // context is done.
   481  type WaitStage struct {
   482  	wait func() time.Duration
   483  }
   484  
   485  // NewWaitStage returns a new WaitStage.
   486  func NewWaitStage(wait func() time.Duration) *WaitStage {
   487  	return &WaitStage{
   488  		wait: wait,
   489  	}
   490  }
   491  
   492  // Exec implements the Stage interface.
   493  func (ws *WaitStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   494  	select {
   495  	case <-time.After(ws.wait()):
   496  	case <-ctx.Done():
   497  		return ctx, nil, ctx.Err()
   498  	}
   499  	return ctx, alerts, nil
   500  }
   501  
   502  // DedupStage filters alerts.
   503  // Filtering happens based on a notification log.
   504  type DedupStage struct {
   505  	rs    ResolvedSender
   506  	nflog NotificationLog
   507  	recv  *nflogpb.Receiver
   508  
   509  	now  func() time.Time
   510  	hash func(*types.Alert) uint64
   511  }
   512  
   513  // NewDedupStage wraps a DedupStage that runs against the given notification log.
   514  func NewDedupStage(rs ResolvedSender, l NotificationLog, recv *nflogpb.Receiver) *DedupStage {
   515  	return &DedupStage{
   516  		rs:    rs,
   517  		nflog: l,
   518  		recv:  recv,
   519  		now:   utcNow,
   520  		hash:  hashAlert,
   521  	}
   522  }
   523  
   524  func utcNow() time.Time {
   525  	return time.Now().UTC()
   526  }
   527  
   528  // Wrap a slice in a struct so we can store a pointer in sync.Pool
   529  type hashBuffer struct {
   530  	buf []byte
   531  }
   532  
   533  var hashBuffers = sync.Pool{
   534  	New: func() interface{} { return &hashBuffer{buf: make([]byte, 0, 1024)} },
   535  }
   536  
   537  func hashAlert(a *types.Alert) uint64 {
   538  	const sep = '\xff'
   539  
   540  	hb := hashBuffers.Get().(*hashBuffer)
   541  	defer hashBuffers.Put(hb)
   542  	b := hb.buf[:0]
   543  
   544  	names := make(model.LabelNames, 0, len(a.Labels))
   545  
   546  	for ln := range a.Labels {
   547  		names = append(names, ln)
   548  	}
   549  	sort.Sort(names)
   550  
   551  	for _, ln := range names {
   552  		b = append(b, string(ln)...)
   553  		b = append(b, sep)
   554  		b = append(b, string(a.Labels[ln])...)
   555  		b = append(b, sep)
   556  	}
   557  
   558  	hash := xxhash.Sum64(b)
   559  
   560  	return hash
   561  }
   562  
   563  func (n *DedupStage) needsUpdate(entry *nflogpb.Entry, firing, resolved map[uint64]struct{}, repeat time.Duration) bool {
   564  	// If we haven't notified about the alert group before, notify right away
   565  	// unless we only have resolved alerts.
   566  	if entry == nil {
   567  		return len(firing) > 0
   568  	}
   569  
   570  	if !entry.IsFiringSubset(firing) {
   571  		return true
   572  	}
   573  
   574  	// Notify about all alerts being resolved.
   575  	// This is done irrespective of the send_resolved flag to make sure that
   576  	// the firing alerts are cleared from the notification log.
   577  	if len(firing) == 0 {
   578  		// If the current alert group and last notification contain no firing
   579  		// alert, it means that some alerts have been fired and resolved during the
   580  		// last interval. In this case, there is no need to notify the receiver
   581  		// since it doesn't know about them.
   582  		return len(entry.FiringAlerts) > 0
   583  	}
   584  
   585  	if n.rs.SendResolved() && !entry.IsResolvedSubset(resolved) {
   586  		return true
   587  	}
   588  
   589  	// Nothing changed, only notify if the repeat interval has passed.
   590  	return entry.Timestamp.Before(n.now().Add(-repeat))
   591  }
   592  
   593  // Exec implements the Stage interface.
   594  func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   595  	gkey, ok := GroupKey(ctx)
   596  	if !ok {
   597  		return ctx, nil, errors.New("group key missing")
   598  	}
   599  
   600  	repeatInterval, ok := RepeatInterval(ctx)
   601  	if !ok {
   602  		return ctx, nil, errors.New("repeat interval missing")
   603  	}
   604  
   605  	firingSet := map[uint64]struct{}{}
   606  	resolvedSet := map[uint64]struct{}{}
   607  	firing := []uint64{}
   608  	resolved := []uint64{}
   609  
   610  	var hash uint64
   611  	for _, a := range alerts {
   612  		hash = n.hash(a)
   613  		if a.Resolved() {
   614  			resolved = append(resolved, hash)
   615  			resolvedSet[hash] = struct{}{}
   616  		} else {
   617  			firing = append(firing, hash)
   618  			firingSet[hash] = struct{}{}
   619  		}
   620  	}
   621  
   622  	ctx = WithFiringAlerts(ctx, firing)
   623  	ctx = WithResolvedAlerts(ctx, resolved)
   624  
   625  	entries, err := n.nflog.Query(nflog.QGroupKey(gkey), nflog.QReceiver(n.recv))
   626  	if err != nil && err != nflog.ErrNotFound {
   627  		return ctx, nil, err
   628  	}
   629  
   630  	var entry *nflogpb.Entry
   631  	switch len(entries) {
   632  	case 0:
   633  	case 1:
   634  		entry = entries[0]
   635  	default:
   636  		return ctx, nil, errors.Errorf("unexpected entry result size %d", len(entries))
   637  	}
   638  
   639  	if n.needsUpdate(entry, firingSet, resolvedSet, repeatInterval) {
   640  		return ctx, alerts, nil
   641  	}
   642  	return ctx, nil, nil
   643  }
   644  
   645  // RetryStage notifies via passed integration with exponential backoff until it
   646  // succeeds. It aborts if the context is canceled or timed out.
   647  type RetryStage struct {
   648  	integration Integration
   649  	groupName   string
   650  	metrics     *Metrics
   651  }
   652  
   653  // NewRetryStage returns a new instance of a RetryStage.
   654  func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage {
   655  	return &RetryStage{
   656  		integration: i,
   657  		groupName:   groupName,
   658  		metrics:     metrics,
   659  	}
   660  }
   661  
   662  func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   663  	r.metrics.numNotifications.WithLabelValues(r.integration.Name()).Inc()
   664  	ctx, alerts, err := r.exec(ctx, l, alerts...)
   665  	if err != nil {
   666  		r.metrics.numTotalFailedNotifications.WithLabelValues(r.integration.Name()).Inc()
   667  	}
   668  	return ctx, alerts, err
   669  }
   670  
   671  func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   672  	var sent []*types.Alert
   673  
   674  	// If we shouldn't send notifications for resolved alerts, but there are only
   675  	// resolved alerts, report them all as successfully notified (we still want the
   676  	// notification log to log them for the next run of DedupStage).
   677  	if !r.integration.SendResolved() {
   678  		firing, ok := FiringAlerts(ctx)
   679  		if !ok {
   680  			return ctx, nil, errors.New("firing alerts missing")
   681  		}
   682  		if len(firing) == 0 {
   683  			return ctx, alerts, nil
   684  		}
   685  		for _, a := range alerts {
   686  			if a.Status() != model.AlertResolved {
   687  				sent = append(sent, a)
   688  			}
   689  		}
   690  	} else {
   691  		sent = alerts
   692  	}
   693  
   694  	b := backoff.NewExponentialBackOff()
   695  	b.MaxElapsedTime = 0 // Always retry.
   696  
   697  	tick := backoff.NewTicker(b)
   698  	defer tick.Stop()
   699  
   700  	var (
   701  		i    = 0
   702  		iErr error
   703  	)
   704  	l = log.With(l, "receiver", r.groupName, "integration", r.integration.String())
   705  
   706  	for {
   707  		i++
   708  		// Always check the context first to not notify again.
   709  		select {
   710  		case <-ctx.Done():
   711  			if iErr == nil {
   712  				iErr = ctx.Err()
   713  			}
   714  
   715  			return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
   716  		default:
   717  		}
   718  
   719  		select {
   720  		case <-tick.C:
   721  			now := time.Now()
   722  			retry, err := r.integration.Notify(ctx, sent...)
   723  			r.metrics.notificationLatencySeconds.WithLabelValues(r.integration.Name()).Observe(time.Since(now).Seconds())
   724  			r.metrics.numNotificationRequestsTotal.WithLabelValues(r.integration.Name()).Inc()
   725  			if err != nil {
   726  				r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.integration.Name()).Inc()
   727  				if !retry {
   728  					return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i)
   729  				}
   730  				if ctx.Err() == nil && (iErr == nil || err.Error() != iErr.Error()) {
   731  					// Log the error if the context isn't done and the error isn't the same as before.
   732  					level.Warn(l).Log("msg", "Notify attempt failed, will retry later", "attempts", i, "err", err)
   733  				}
   734  
   735  				// Save this error to be able to return the last seen error by an
   736  				// integration upon context timeout.
   737  				iErr = err
   738  			} else {
   739  				lvl := level.Debug(l)
   740  				if i > 1 {
   741  					lvl = level.Info(l)
   742  				}
   743  				lvl.Log("msg", "Notify success", "attempts", i)
   744  				return ctx, alerts, nil
   745  			}
   746  		case <-ctx.Done():
   747  			if iErr == nil {
   748  				iErr = ctx.Err()
   749  			}
   750  
   751  			return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i)
   752  		}
   753  	}
   754  }
   755  
   756  // SetNotifiesStage sets the notification information about passed alerts. The
   757  // passed alerts should have already been sent to the receivers.
   758  type SetNotifiesStage struct {
   759  	nflog NotificationLog
   760  	recv  *nflogpb.Receiver
   761  }
   762  
   763  // NewSetNotifiesStage returns a new instance of a SetNotifiesStage.
   764  func NewSetNotifiesStage(l NotificationLog, recv *nflogpb.Receiver) *SetNotifiesStage {
   765  	return &SetNotifiesStage{
   766  		nflog: l,
   767  		recv:  recv,
   768  	}
   769  }
   770  
   771  // Exec implements the Stage interface.
   772  func (n SetNotifiesStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   773  	gkey, ok := GroupKey(ctx)
   774  	if !ok {
   775  		return ctx, nil, errors.New("group key missing")
   776  	}
   777  
   778  	firing, ok := FiringAlerts(ctx)
   779  	if !ok {
   780  		return ctx, nil, errors.New("firing alerts missing")
   781  	}
   782  
   783  	resolved, ok := ResolvedAlerts(ctx)
   784  	if !ok {
   785  		return ctx, nil, errors.New("resolved alerts missing")
   786  	}
   787  
   788  	repeat, ok := RepeatInterval(ctx)
   789  	if !ok {
   790  		return ctx, nil, errors.New("repeat interval missing")
   791  	}
   792  	expiry := 2 * repeat
   793  
   794  	return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry)
   795  }
   796  
   797  type timeStage struct {
   798  	Times map[string][]timeinterval.TimeInterval
   799  }
   800  
   801  type TimeMuteStage timeStage
   802  
   803  func NewTimeMuteStage(ti map[string][]timeinterval.TimeInterval) *TimeMuteStage {
   804  	return &TimeMuteStage{ti}
   805  }
   806  
   807  // Exec implements the stage interface for TimeMuteStage.
   808  // TimeMuteStage is responsible for muting alerts whose route is not in an active time.
   809  func (tms TimeMuteStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   810  	muteTimeIntervalNames, ok := MuteTimeIntervalNames(ctx)
   811  	if !ok {
   812  		return ctx, alerts, nil
   813  	}
   814  	now, ok := Now(ctx)
   815  	if !ok {
   816  		return ctx, alerts, errors.New("missing now timestamp")
   817  	}
   818  
   819  	muted, err := inTimeIntervals(now, tms.Times, muteTimeIntervalNames)
   820  	if err != nil {
   821  		return ctx, alerts, err
   822  	}
   823  
   824  	// If the current time is inside a mute time, all alerts are removed from the pipeline.
   825  	if muted {
   826  		level.Debug(l).Log("msg", "Notifications not sent, route is within mute time")
   827  		return ctx, nil, nil
   828  	}
   829  	return ctx, alerts, nil
   830  }
   831  
   832  type TimeActiveStage timeStage
   833  
   834  func NewTimeActiveStage(ti map[string][]timeinterval.TimeInterval) *TimeActiveStage {
   835  	return &TimeActiveStage{ti}
   836  }
   837  
   838  // Exec implements the stage interface for TimeActiveStage.
   839  // TimeActiveStage is responsible for muting alerts whose route is not in an active time.
   840  func (tas TimeActiveStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) {
   841  	activeTimeIntervalNames, ok := ActiveTimeIntervalNames(ctx)
   842  	if !ok {
   843  		return ctx, alerts, nil
   844  	}
   845  
   846  	// if we don't have active time intervals at all it is always active.
   847  	if len(activeTimeIntervalNames) == 0 {
   848  		return ctx, alerts, nil
   849  	}
   850  
   851  	now, ok := Now(ctx)
   852  	if !ok {
   853  		return ctx, alerts, errors.New("missing now timestamp")
   854  	}
   855  
   856  	active, err := inTimeIntervals(now, tas.Times, activeTimeIntervalNames)
   857  	if err != nil {
   858  		return ctx, alerts, err
   859  	}
   860  
   861  	// If the current time is not inside an active time, all alerts are removed from the pipeline
   862  	if !active {
   863  		level.Debug(l).Log("msg", "Notifications not sent, route is not within active time")
   864  		return ctx, nil, nil
   865  	}
   866  
   867  	return ctx, alerts, nil
   868  }
   869  
   870  // inTimeIntervals returns true if the current time is contained in one of the given time intervals.
   871  func inTimeIntervals(now time.Time, intervals map[string][]timeinterval.TimeInterval, intervalNames []string) (bool, error) {
   872  	for _, name := range intervalNames {
   873  		interval, ok := intervals[name]
   874  		if !ok {
   875  			return false, errors.Errorf("time interval %s doesn't exist in config", name)
   876  		}
   877  		for _, ti := range interval {
   878  			if ti.ContainsTime(now.UTC()) {
   879  				return true, nil
   880  			}
   881  		}
   882  	}
   883  	return false, nil
   884  }
   885  

View as plain text