1
2
3
4
5
6
7
8
9
10
11
12
13
14 package mem
15
16 import (
17 "context"
18 "sync"
19 "time"
20
21 "github.com/go-kit/log"
22 "github.com/go-kit/log/level"
23 "github.com/prometheus/client_golang/prometheus"
24 "github.com/prometheus/common/model"
25
26 "github.com/prometheus/alertmanager/provider"
27 "github.com/prometheus/alertmanager/store"
28 "github.com/prometheus/alertmanager/types"
29 )
30
31 const alertChannelLength = 200
32
33
34 type Alerts struct {
35 cancel context.CancelFunc
36
37 alerts *store.Alerts
38 marker types.Marker
39
40 mtx sync.Mutex
41 listeners map[int]listeningAlerts
42 next int
43
44 callback AlertStoreCallback
45
46 logger log.Logger
47 }
48
49 type AlertStoreCallback interface {
50
51
52
53
54 PreStore(alert *types.Alert, existing bool) error
55
56
57 PostStore(alert *types.Alert, existing bool)
58
59
60 PostDelete(alert *types.Alert)
61 }
62
63 type listeningAlerts struct {
64 alerts chan *types.Alert
65 done chan struct{}
66 }
67
68 func (a *Alerts) registerMetrics(r prometheus.Registerer) {
69 newMemAlertByStatus := func(s types.AlertState) prometheus.GaugeFunc {
70 return prometheus.NewGaugeFunc(
71 prometheus.GaugeOpts{
72 Name: "alertmanager_alerts",
73 Help: "How many alerts by state.",
74 ConstLabels: prometheus.Labels{"state": string(s)},
75 },
76 func() float64 {
77 return float64(a.count(s))
78 },
79 )
80 }
81
82 r.MustRegister(newMemAlertByStatus(types.AlertStateActive))
83 r.MustRegister(newMemAlertByStatus(types.AlertStateSuppressed))
84 r.MustRegister(newMemAlertByStatus(types.AlertStateUnprocessed))
85 }
86
87
88 func NewAlerts(ctx context.Context, m types.Marker, intervalGC time.Duration, alertCallback AlertStoreCallback, l log.Logger, r prometheus.Registerer) (*Alerts, error) {
89 if alertCallback == nil {
90 alertCallback = noopCallback{}
91 }
92
93 ctx, cancel := context.WithCancel(ctx)
94 a := &Alerts{
95 marker: m,
96 alerts: store.NewAlerts(),
97 cancel: cancel,
98 listeners: map[int]listeningAlerts{},
99 next: 0,
100 logger: log.With(l, "component", "provider"),
101 callback: alertCallback,
102 }
103 a.alerts.SetGCCallback(func(alerts []*types.Alert) {
104 for _, alert := range alerts {
105
106
107
108 m.Delete(alert.Fingerprint())
109 a.callback.PostDelete(alert)
110 }
111
112 a.mtx.Lock()
113 for i, l := range a.listeners {
114 select {
115 case <-l.done:
116 delete(a.listeners, i)
117 close(l.alerts)
118 default:
119
120 }
121 }
122 a.mtx.Unlock()
123 })
124
125 if r != nil {
126 a.registerMetrics(r)
127 }
128
129 go a.alerts.Run(ctx, intervalGC)
130
131 return a, nil
132 }
133
134
135 func (a *Alerts) Close() {
136 if a.cancel != nil {
137 a.cancel()
138 }
139 }
140
141 func max(a, b int) int {
142 if a > b {
143 return a
144 }
145 return b
146 }
147
148
149
150
151 func (a *Alerts) Subscribe() provider.AlertIterator {
152 a.mtx.Lock()
153 defer a.mtx.Unlock()
154
155 var (
156 done = make(chan struct{})
157 alerts = a.alerts.List()
158 ch = make(chan *types.Alert, max(len(alerts), alertChannelLength))
159 )
160
161 for _, a := range alerts {
162 ch <- a
163 }
164
165 a.listeners[a.next] = listeningAlerts{alerts: ch, done: done}
166 a.next++
167
168 return provider.NewAlertIterator(ch, done, nil)
169 }
170
171
172
173 func (a *Alerts) GetPending() provider.AlertIterator {
174 var (
175 ch = make(chan *types.Alert, alertChannelLength)
176 done = make(chan struct{})
177 )
178
179 go func() {
180 defer close(ch)
181
182 for _, a := range a.alerts.List() {
183 select {
184 case ch <- a:
185 case <-done:
186 return
187 }
188 }
189 }()
190
191 return provider.NewAlertIterator(ch, done, nil)
192 }
193
194
195 func (a *Alerts) Get(fp model.Fingerprint) (*types.Alert, error) {
196 return a.alerts.Get(fp)
197 }
198
199
200 func (a *Alerts) Put(alerts ...*types.Alert) error {
201 for _, alert := range alerts {
202 fp := alert.Fingerprint()
203
204 existing := false
205
206
207
208 if old, err := a.alerts.Get(fp); err == nil {
209 existing = true
210
211
212 if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) ||
213 (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) {
214 alert = old.Merge(alert)
215 }
216 }
217
218 if err := a.callback.PreStore(alert, existing); err != nil {
219 level.Error(a.logger).Log("msg", "pre-store callback returned error on set alert", "err", err)
220 continue
221 }
222
223 if err := a.alerts.Set(alert); err != nil {
224 level.Error(a.logger).Log("msg", "error on set alert", "err", err)
225 continue
226 }
227
228 a.callback.PostStore(alert, existing)
229
230 a.mtx.Lock()
231 for _, l := range a.listeners {
232 select {
233 case l.alerts <- alert:
234 case <-l.done:
235 }
236 }
237 a.mtx.Unlock()
238 }
239
240 return nil
241 }
242
243
244 func (a *Alerts) count(state types.AlertState) int {
245 var count int
246 for _, alert := range a.alerts.List() {
247 if alert.Resolved() {
248 continue
249 }
250
251 status := a.marker.Status(alert.Fingerprint())
252 if status.State != state {
253 continue
254 }
255
256 count++
257 }
258
259 return count
260 }
261
262 type noopCallback struct{}
263
264 func (n noopCallback) PreStore(_ *types.Alert, _ bool) error { return nil }
265 func (n noopCallback) PostStore(_ *types.Alert, _ bool) {}
266 func (n noopCallback) PostDelete(_ *types.Alert) {}
267
View as plain text