...

Source file src/github.com/prometheus/alertmanager/inhibit/inhibit.go

Documentation: github.com/prometheus/alertmanager/inhibit

     1  // Copyright 2015 Prometheus Team
     2  // Licensed under the Apache License, Version 2.0 (the "License");
     3  // you may not use this file except in compliance with the License.
     4  // You may obtain a copy of the License at
     5  //
     6  // http://www.apache.org/licenses/LICENSE-2.0
     7  //
     8  // Unless required by applicable law or agreed to in writing, software
     9  // distributed under the License is distributed on an "AS IS" BASIS,
    10  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    11  // See the License for the specific language governing permissions and
    12  // limitations under the License.
    13  
    14  package inhibit
    15  
    16  import (
    17  	"context"
    18  	"sync"
    19  	"time"
    20  
    21  	"github.com/go-kit/log"
    22  	"github.com/go-kit/log/level"
    23  	"github.com/oklog/run"
    24  	"github.com/prometheus/common/model"
    25  
    26  	"github.com/prometheus/alertmanager/config"
    27  	"github.com/prometheus/alertmanager/pkg/labels"
    28  	"github.com/prometheus/alertmanager/provider"
    29  	"github.com/prometheus/alertmanager/store"
    30  	"github.com/prometheus/alertmanager/types"
    31  )
    32  
    33  // An Inhibitor determines whether a given label set is muted based on the
    34  // currently active alerts and a set of inhibition rules. It implements the
    35  // Muter interface.
    36  type Inhibitor struct {
    37  	alerts provider.Alerts
    38  	rules  []*InhibitRule
    39  	marker types.Marker
    40  	logger log.Logger
    41  
    42  	mtx    sync.RWMutex
    43  	cancel func()
    44  }
    45  
    46  // NewInhibitor returns a new Inhibitor.
    47  func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker, logger log.Logger) *Inhibitor {
    48  	ih := &Inhibitor{
    49  		alerts: ap,
    50  		marker: mk,
    51  		logger: logger,
    52  	}
    53  	for _, cr := range rs {
    54  		r := NewInhibitRule(cr)
    55  		ih.rules = append(ih.rules, r)
    56  	}
    57  	return ih
    58  }
    59  
    60  func (ih *Inhibitor) run(ctx context.Context) {
    61  	it := ih.alerts.Subscribe()
    62  	defer it.Close()
    63  
    64  	for {
    65  		select {
    66  		case <-ctx.Done():
    67  			return
    68  		case a := <-it.Next():
    69  			if err := it.Err(); err != nil {
    70  				level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err)
    71  				continue
    72  			}
    73  			// Update the inhibition rules' cache.
    74  			for _, r := range ih.rules {
    75  				if r.SourceMatchers.Matches(a.Labels) {
    76  					if err := r.scache.Set(a); err != nil {
    77  						level.Error(ih.logger).Log("msg", "error on set alert", "err", err)
    78  					}
    79  				}
    80  			}
    81  		}
    82  	}
    83  }
    84  
    85  // Run the Inhibitor's background processing.
    86  func (ih *Inhibitor) Run() {
    87  	var (
    88  		g   run.Group
    89  		ctx context.Context
    90  	)
    91  
    92  	ih.mtx.Lock()
    93  	ctx, ih.cancel = context.WithCancel(context.Background())
    94  	ih.mtx.Unlock()
    95  	runCtx, runCancel := context.WithCancel(ctx)
    96  
    97  	for _, rule := range ih.rules {
    98  		go rule.scache.Run(runCtx, 15*time.Minute)
    99  	}
   100  
   101  	g.Add(func() error {
   102  		ih.run(runCtx)
   103  		return nil
   104  	}, func(err error) {
   105  		runCancel()
   106  	})
   107  
   108  	if err := g.Run(); err != nil {
   109  		level.Warn(ih.logger).Log("msg", "error running inhibitor", "err", err)
   110  	}
   111  }
   112  
   113  // Stop the Inhibitor's background processing.
   114  func (ih *Inhibitor) Stop() {
   115  	if ih == nil {
   116  		return
   117  	}
   118  
   119  	ih.mtx.RLock()
   120  	defer ih.mtx.RUnlock()
   121  	if ih.cancel != nil {
   122  		ih.cancel()
   123  	}
   124  }
   125  
   126  // Mutes returns true iff the given label set is muted. It implements the Muter
   127  // interface.
   128  func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
   129  	fp := lset.Fingerprint()
   130  
   131  	for _, r := range ih.rules {
   132  		if !r.TargetMatchers.Matches(lset) {
   133  			// If target side of rule doesn't match, we don't need to look any further.
   134  			continue
   135  		}
   136  		// If we are here, the target side matches. If the source side matches, too, we
   137  		// need to exclude inhibiting alerts for which the same is true.
   138  		if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {
   139  			ih.marker.SetInhibited(fp, inhibitedByFP.String())
   140  			return true
   141  		}
   142  	}
   143  	ih.marker.SetInhibited(fp)
   144  
   145  	return false
   146  }
   147  
   148  // An InhibitRule specifies that a class of (source) alerts should inhibit
   149  // notifications for another class of (target) alerts if all specified matching
   150  // labels are equal between the two alerts. This may be used to inhibit alerts
   151  // from sending notifications if their meaning is logically a subset of a
   152  // higher-level alert.
   153  type InhibitRule struct {
   154  	// The set of Filters which define the group of source alerts (which inhibit
   155  	// the target alerts).
   156  	SourceMatchers labels.Matchers
   157  	// The set of Filters which define the group of target alerts (which are
   158  	// inhibited by the source alerts).
   159  	TargetMatchers labels.Matchers
   160  	// A set of label names whose label values need to be identical in source and
   161  	// target alerts in order for the inhibition to take effect.
   162  	Equal map[model.LabelName]struct{}
   163  
   164  	// Cache of alerts matching source labels.
   165  	scache *store.Alerts
   166  }
   167  
   168  // NewInhibitRule returns a new InhibitRule based on a configuration definition.
   169  func NewInhibitRule(cr *config.InhibitRule) *InhibitRule {
   170  	var (
   171  		sourcem labels.Matchers
   172  		targetm labels.Matchers
   173  	)
   174  	// cr.SourceMatch will be deprecated. This for loop appends regex matchers.
   175  	for ln, lv := range cr.SourceMatch {
   176  		matcher, err := labels.NewMatcher(labels.MatchEqual, ln, lv)
   177  		if err != nil {
   178  			// This error must not happen because the config already validates the yaml.
   179  			panic(err)
   180  		}
   181  		sourcem = append(sourcem, matcher)
   182  	}
   183  	// cr.SourceMatchRE will be deprecated. This for loop appends regex matchers.
   184  	for ln, lv := range cr.SourceMatchRE {
   185  		matcher, err := labels.NewMatcher(labels.MatchRegexp, ln, lv.String())
   186  		if err != nil {
   187  			// This error must not happen because the config already validates the yaml.
   188  			panic(err)
   189  		}
   190  		sourcem = append(sourcem, matcher)
   191  	}
   192  	// We append the new-style matchers. This can be simplified once the deprecated matcher syntax is removed.
   193  	sourcem = append(sourcem, cr.SourceMatchers...)
   194  
   195  	// cr.TargetMatch will be deprecated. This for loop appends regex matchers.
   196  	for ln, lv := range cr.TargetMatch {
   197  		matcher, err := labels.NewMatcher(labels.MatchEqual, ln, lv)
   198  		if err != nil {
   199  			// This error must not happen because the config already validates the yaml.
   200  			panic(err)
   201  		}
   202  		targetm = append(targetm, matcher)
   203  	}
   204  	// cr.TargetMatchRE will be deprecated. This for loop appends regex matchers.
   205  	for ln, lv := range cr.TargetMatchRE {
   206  		matcher, err := labels.NewMatcher(labels.MatchRegexp, ln, lv.String())
   207  		if err != nil {
   208  			// This error must not happen because the config already validates the yaml.
   209  			panic(err)
   210  		}
   211  		targetm = append(targetm, matcher)
   212  	}
   213  	// We append the new-style matchers. This can be simplified once the deprecated matcher syntax is removed.
   214  	targetm = append(targetm, cr.TargetMatchers...)
   215  
   216  	equal := map[model.LabelName]struct{}{}
   217  	for _, ln := range cr.Equal {
   218  		equal[ln] = struct{}{}
   219  	}
   220  
   221  	return &InhibitRule{
   222  		SourceMatchers: sourcem,
   223  		TargetMatchers: targetm,
   224  		Equal:          equal,
   225  		scache:         store.NewAlerts(),
   226  	}
   227  }
   228  
   229  // hasEqual checks whether the source cache contains alerts matching the equal
   230  // labels for the given label set. If so, the fingerprint of one of those alerts
   231  // is returned. If excludeTwoSidedMatch is true, alerts that match both the
   232  // source and the target side of the rule are disregarded.
   233  func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) {
   234  Outer:
   235  	for _, a := range r.scache.List() {
   236  		// The cache might be stale and contain resolved alerts.
   237  		if a.Resolved() {
   238  			continue
   239  		}
   240  		for n := range r.Equal {
   241  			if a.Labels[n] != lset[n] {
   242  				continue Outer
   243  			}
   244  		}
   245  		if excludeTwoSidedMatch && r.TargetMatchers.Matches(a.Labels) {
   246  			continue Outer
   247  		}
   248  		return a.Fingerprint(), true
   249  	}
   250  	return model.Fingerprint(0), false
   251  }
   252  

View as plain text