1
2
3
4
5
6
7
8
9
10
11
12
13
14 package cluster
15
16 import (
17 "time"
18
19 "github.com/go-kit/log"
20 "github.com/go-kit/log/level"
21 "github.com/gogo/protobuf/proto"
22 "github.com/hashicorp/memberlist"
23 "github.com/prometheus/client_golang/prometheus"
24
25 "github.com/prometheus/alertmanager/cluster/clusterpb"
26 )
27
28 const (
29
30 maxQueueSize = 4096
31 fullState = "full_state"
32 update = "update"
33 )
34
35
36
37 type delegate struct {
38 *Peer
39
40 logger log.Logger
41 bcast *memberlist.TransmitLimitedQueue
42
43 messagesReceived *prometheus.CounterVec
44 messagesReceivedSize *prometheus.CounterVec
45 messagesSent *prometheus.CounterVec
46 messagesSentSize *prometheus.CounterVec
47 messagesPruned prometheus.Counter
48 nodeAlive *prometheus.CounterVec
49 nodePingDuration *prometheus.HistogramVec
50 }
51
52 func newDelegate(l log.Logger, reg prometheus.Registerer, p *Peer, retransmit int) *delegate {
53 bcast := &memberlist.TransmitLimitedQueue{
54 NumNodes: p.ClusterSize,
55 RetransmitMult: retransmit,
56 }
57 messagesReceived := prometheus.NewCounterVec(prometheus.CounterOpts{
58 Name: "alertmanager_cluster_messages_received_total",
59 Help: "Total number of cluster messages received.",
60 }, []string{"msg_type"})
61 messagesReceivedSize := prometheus.NewCounterVec(prometheus.CounterOpts{
62 Name: "alertmanager_cluster_messages_received_size_total",
63 Help: "Total size of cluster messages received.",
64 }, []string{"msg_type"})
65 messagesSent := prometheus.NewCounterVec(prometheus.CounterOpts{
66 Name: "alertmanager_cluster_messages_sent_total",
67 Help: "Total number of cluster messages sent.",
68 }, []string{"msg_type"})
69 messagesSentSize := prometheus.NewCounterVec(prometheus.CounterOpts{
70 Name: "alertmanager_cluster_messages_sent_size_total",
71 Help: "Total size of cluster messages sent.",
72 }, []string{"msg_type"})
73 messagesPruned := prometheus.NewCounter(prometheus.CounterOpts{
74 Name: "alertmanager_cluster_messages_pruned_total",
75 Help: "Total number of cluster messages pruned.",
76 })
77 gossipClusterMembers := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
78 Name: "alertmanager_cluster_members",
79 Help: "Number indicating current number of members in cluster.",
80 }, func() float64 {
81 return float64(p.ClusterSize())
82 })
83 peerPosition := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
84 Name: "alertmanager_peer_position",
85 Help: "Position the Alertmanager instance believes it's in. The position determines a peer's behavior in the cluster.",
86 }, func() float64 {
87 return float64(p.Position())
88 })
89 healthScore := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
90 Name: "alertmanager_cluster_health_score",
91 Help: "Health score of the cluster. Lower values are better and zero means 'totally healthy'.",
92 }, func() float64 {
93 return float64(p.mlist.GetHealthScore())
94 })
95 messagesQueued := prometheus.NewGaugeFunc(prometheus.GaugeOpts{
96 Name: "alertmanager_cluster_messages_queued",
97 Help: "Number of cluster messages which are queued.",
98 }, func() float64 {
99 return float64(bcast.NumQueued())
100 })
101 nodeAlive := prometheus.NewCounterVec(prometheus.CounterOpts{
102 Name: "alertmanager_cluster_alive_messages_total",
103 Help: "Total number of received alive messages.",
104 }, []string{"peer"},
105 )
106 nodePingDuration := prometheus.NewHistogramVec(prometheus.HistogramOpts{
107 Name: "alertmanager_cluster_pings_seconds",
108 Help: "Histogram of latencies for ping messages.",
109 Buckets: []float64{.005, .01, .025, .05, .1, .25, .5},
110 }, []string{"peer"},
111 )
112
113 messagesReceived.WithLabelValues(fullState)
114 messagesReceivedSize.WithLabelValues(fullState)
115 messagesReceived.WithLabelValues(update)
116 messagesReceivedSize.WithLabelValues(update)
117 messagesSent.WithLabelValues(fullState)
118 messagesSentSize.WithLabelValues(fullState)
119 messagesSent.WithLabelValues(update)
120 messagesSentSize.WithLabelValues(update)
121
122 reg.MustRegister(messagesReceived, messagesReceivedSize, messagesSent, messagesSentSize,
123 gossipClusterMembers, peerPosition, healthScore, messagesQueued, messagesPruned,
124 nodeAlive, nodePingDuration,
125 )
126
127 d := &delegate{
128 logger: l,
129 Peer: p,
130 bcast: bcast,
131 messagesReceived: messagesReceived,
132 messagesReceivedSize: messagesReceivedSize,
133 messagesSent: messagesSent,
134 messagesSentSize: messagesSentSize,
135 messagesPruned: messagesPruned,
136 nodeAlive: nodeAlive,
137 nodePingDuration: nodePingDuration,
138 }
139
140 go d.handleQueueDepth()
141
142 return d
143 }
144
145
146 func (d *delegate) NodeMeta(limit int) []byte {
147 return []byte{}
148 }
149
150
151 func (d *delegate) NotifyMsg(b []byte) {
152 d.messagesReceived.WithLabelValues(update).Inc()
153 d.messagesReceivedSize.WithLabelValues(update).Add(float64(len(b)))
154
155 var p clusterpb.Part
156 if err := proto.Unmarshal(b, &p); err != nil {
157 level.Warn(d.logger).Log("msg", "decode broadcast", "err", err)
158 return
159 }
160
161 d.mtx.RLock()
162 s, ok := d.states[p.Key]
163 d.mtx.RUnlock()
164
165 if !ok {
166 return
167 }
168 if err := s.Merge(p.Data); err != nil {
169 level.Warn(d.logger).Log("msg", "merge broadcast", "err", err, "key", p.Key)
170 return
171 }
172 }
173
174
175 func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
176 msgs := d.bcast.GetBroadcasts(overhead, limit)
177 d.messagesSent.WithLabelValues(update).Add(float64(len(msgs)))
178 for _, m := range msgs {
179 d.messagesSentSize.WithLabelValues(update).Add(float64(len(m)))
180 }
181 return msgs
182 }
183
184
185 func (d *delegate) LocalState(_ bool) []byte {
186 d.mtx.RLock()
187 defer d.mtx.RUnlock()
188 all := &clusterpb.FullState{
189 Parts: make([]clusterpb.Part, 0, len(d.states)),
190 }
191
192 for key, s := range d.states {
193 b, err := s.MarshalBinary()
194 if err != nil {
195 level.Warn(d.logger).Log("msg", "encode local state", "err", err, "key", key)
196 return nil
197 }
198 all.Parts = append(all.Parts, clusterpb.Part{Key: key, Data: b})
199 }
200 b, err := proto.Marshal(all)
201 if err != nil {
202 level.Warn(d.logger).Log("msg", "encode local state", "err", err)
203 return nil
204 }
205 d.messagesSent.WithLabelValues(fullState).Inc()
206 d.messagesSentSize.WithLabelValues(fullState).Add(float64(len(b)))
207 return b
208 }
209
210 func (d *delegate) MergeRemoteState(buf []byte, _ bool) {
211 d.messagesReceived.WithLabelValues(fullState).Inc()
212 d.messagesReceivedSize.WithLabelValues(fullState).Add(float64(len(buf)))
213
214 var fs clusterpb.FullState
215 if err := proto.Unmarshal(buf, &fs); err != nil {
216 level.Warn(d.logger).Log("msg", "merge remote state", "err", err)
217 return
218 }
219 d.mtx.RLock()
220 defer d.mtx.RUnlock()
221 for _, p := range fs.Parts {
222 s, ok := d.states[p.Key]
223 if !ok {
224 level.Warn(d.logger).Log("received", "unknown state key", "len", len(buf), "key", p.Key)
225 continue
226 }
227 if err := s.Merge(p.Data); err != nil {
228 level.Warn(d.logger).Log("msg", "merge remote state", "err", err, "key", p.Key)
229 return
230 }
231 }
232 }
233
234
235 func (d *delegate) NotifyJoin(n *memberlist.Node) {
236 level.Debug(d.logger).Log("received", "NotifyJoin", "node", n.Name, "addr", n.Address())
237 d.Peer.peerJoin(n)
238 }
239
240
241 func (d *delegate) NotifyLeave(n *memberlist.Node) {
242 level.Debug(d.logger).Log("received", "NotifyLeave", "node", n.Name, "addr", n.Address())
243 d.Peer.peerLeave(n)
244 }
245
246
247 func (d *delegate) NotifyUpdate(n *memberlist.Node) {
248 level.Debug(d.logger).Log("received", "NotifyUpdate", "node", n.Name, "addr", n.Address())
249 d.Peer.peerUpdate(n)
250 }
251
252
253 func (d *delegate) NotifyAlive(peer *memberlist.Node) error {
254 d.nodeAlive.WithLabelValues(peer.Name).Inc()
255 return nil
256 }
257
258
259 func (d *delegate) AckPayload() []byte {
260 return []byte{}
261 }
262
263
264 func (d *delegate) NotifyPingComplete(peer *memberlist.Node, rtt time.Duration, payload []byte) {
265 d.nodePingDuration.WithLabelValues(peer.Name).Observe(rtt.Seconds())
266 }
267
268
269
270 func (d *delegate) handleQueueDepth() {
271 for {
272 select {
273 case <-d.stopc:
274 return
275 case <-time.After(15 * time.Minute):
276 n := d.bcast.NumQueued()
277 if n > maxQueueSize {
278 level.Warn(d.logger).Log("msg", "dropping messages because too many are queued", "current", n, "limit", maxQueueSize)
279 d.bcast.Prune(maxQueueSize)
280 d.messagesPruned.Add(float64(n - maxQueueSize))
281 }
282 }
283 }
284 }
285
View as plain text