1
2
3
4
5
6
7
8
9
10
11
12
13
14 package inhibit
15
16 import (
17 "context"
18 "sync"
19 "time"
20
21 "github.com/go-kit/log"
22 "github.com/go-kit/log/level"
23 "github.com/oklog/run"
24 "github.com/prometheus/common/model"
25
26 "github.com/prometheus/alertmanager/config"
27 "github.com/prometheus/alertmanager/pkg/labels"
28 "github.com/prometheus/alertmanager/provider"
29 "github.com/prometheus/alertmanager/store"
30 "github.com/prometheus/alertmanager/types"
31 )
32
33
34
35
36 type Inhibitor struct {
37 alerts provider.Alerts
38 rules []*InhibitRule
39 marker types.Marker
40 logger log.Logger
41
42 mtx sync.RWMutex
43 cancel func()
44 }
45
46
47 func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker, logger log.Logger) *Inhibitor {
48 ih := &Inhibitor{
49 alerts: ap,
50 marker: mk,
51 logger: logger,
52 }
53 for _, cr := range rs {
54 r := NewInhibitRule(cr)
55 ih.rules = append(ih.rules, r)
56 }
57 return ih
58 }
59
60 func (ih *Inhibitor) run(ctx context.Context) {
61 it := ih.alerts.Subscribe()
62 defer it.Close()
63
64 for {
65 select {
66 case <-ctx.Done():
67 return
68 case a := <-it.Next():
69 if err := it.Err(); err != nil {
70 level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err)
71 continue
72 }
73
74 for _, r := range ih.rules {
75 if r.SourceMatchers.Matches(a.Labels) {
76 if err := r.scache.Set(a); err != nil {
77 level.Error(ih.logger).Log("msg", "error on set alert", "err", err)
78 }
79 }
80 }
81 }
82 }
83 }
84
85
86 func (ih *Inhibitor) Run() {
87 var (
88 g run.Group
89 ctx context.Context
90 )
91
92 ih.mtx.Lock()
93 ctx, ih.cancel = context.WithCancel(context.Background())
94 ih.mtx.Unlock()
95 runCtx, runCancel := context.WithCancel(ctx)
96
97 for _, rule := range ih.rules {
98 go rule.scache.Run(runCtx, 15*time.Minute)
99 }
100
101 g.Add(func() error {
102 ih.run(runCtx)
103 return nil
104 }, func(err error) {
105 runCancel()
106 })
107
108 if err := g.Run(); err != nil {
109 level.Warn(ih.logger).Log("msg", "error running inhibitor", "err", err)
110 }
111 }
112
113
114 func (ih *Inhibitor) Stop() {
115 if ih == nil {
116 return
117 }
118
119 ih.mtx.RLock()
120 defer ih.mtx.RUnlock()
121 if ih.cancel != nil {
122 ih.cancel()
123 }
124 }
125
126
127
128 func (ih *Inhibitor) Mutes(lset model.LabelSet) bool {
129 fp := lset.Fingerprint()
130
131 for _, r := range ih.rules {
132 if !r.TargetMatchers.Matches(lset) {
133
134 continue
135 }
136
137
138 if inhibitedByFP, eq := r.hasEqual(lset, r.SourceMatchers.Matches(lset)); eq {
139 ih.marker.SetInhibited(fp, inhibitedByFP.String())
140 return true
141 }
142 }
143 ih.marker.SetInhibited(fp)
144
145 return false
146 }
147
148
149
150
151
152
153 type InhibitRule struct {
154
155
156 SourceMatchers labels.Matchers
157
158
159 TargetMatchers labels.Matchers
160
161
162 Equal map[model.LabelName]struct{}
163
164
165 scache *store.Alerts
166 }
167
168
169 func NewInhibitRule(cr *config.InhibitRule) *InhibitRule {
170 var (
171 sourcem labels.Matchers
172 targetm labels.Matchers
173 )
174
175 for ln, lv := range cr.SourceMatch {
176 matcher, err := labels.NewMatcher(labels.MatchEqual, ln, lv)
177 if err != nil {
178
179 panic(err)
180 }
181 sourcem = append(sourcem, matcher)
182 }
183
184 for ln, lv := range cr.SourceMatchRE {
185 matcher, err := labels.NewMatcher(labels.MatchRegexp, ln, lv.String())
186 if err != nil {
187
188 panic(err)
189 }
190 sourcem = append(sourcem, matcher)
191 }
192
193 sourcem = append(sourcem, cr.SourceMatchers...)
194
195
196 for ln, lv := range cr.TargetMatch {
197 matcher, err := labels.NewMatcher(labels.MatchEqual, ln, lv)
198 if err != nil {
199
200 panic(err)
201 }
202 targetm = append(targetm, matcher)
203 }
204
205 for ln, lv := range cr.TargetMatchRE {
206 matcher, err := labels.NewMatcher(labels.MatchRegexp, ln, lv.String())
207 if err != nil {
208
209 panic(err)
210 }
211 targetm = append(targetm, matcher)
212 }
213
214 targetm = append(targetm, cr.TargetMatchers...)
215
216 equal := map[model.LabelName]struct{}{}
217 for _, ln := range cr.Equal {
218 equal[ln] = struct{}{}
219 }
220
221 return &InhibitRule{
222 SourceMatchers: sourcem,
223 TargetMatchers: targetm,
224 Equal: equal,
225 scache: store.NewAlerts(),
226 }
227 }
228
229
230
231
232
233 func (r *InhibitRule) hasEqual(lset model.LabelSet, excludeTwoSidedMatch bool) (model.Fingerprint, bool) {
234 Outer:
235 for _, a := range r.scache.List() {
236
237 if a.Resolved() {
238 continue
239 }
240 for n := range r.Equal {
241 if a.Labels[n] != lset[n] {
242 continue Outer
243 }
244 }
245 if excludeTwoSidedMatch && r.TargetMatchers.Matches(a.Labels) {
246 continue Outer
247 }
248 return a.Fingerprint(), true
249 }
250 return model.Fingerprint(0), false
251 }
252
View as plain text