1
2
3
4
5
6
7
8
9
10
11
12
13
14 package cluster
15
16 import (
17 "sync"
18 "time"
19
20 "github.com/go-kit/log"
21 "github.com/go-kit/log/level"
22 "github.com/gogo/protobuf/proto"
23 "github.com/hashicorp/memberlist"
24 "github.com/prometheus/client_golang/prometheus"
25
26 "github.com/prometheus/alertmanager/cluster/clusterpb"
27 )
28
29
30
31 type Channel struct {
32 key string
33 send func([]byte)
34 peers func() []*memberlist.Node
35 sendOversize func(*memberlist.Node, []byte) error
36
37 msgc chan []byte
38 logger log.Logger
39
40 oversizeGossipMessageFailureTotal prometheus.Counter
41 oversizeGossipMessageDroppedTotal prometheus.Counter
42 oversizeGossipMessageSentTotal prometheus.Counter
43 oversizeGossipDuration prometheus.Histogram
44 }
45
46
47
48 func NewChannel(
49 key string,
50 send func([]byte),
51 peers func() []*memberlist.Node,
52 sendOversize func(*memberlist.Node, []byte) error,
53 logger log.Logger,
54 stopc chan struct{},
55 reg prometheus.Registerer,
56 ) *Channel {
57 oversizeGossipMessageFailureTotal := prometheus.NewCounter(prometheus.CounterOpts{
58 Name: "alertmanager_oversized_gossip_message_failure_total",
59 Help: "Number of oversized gossip message sends that failed.",
60 ConstLabels: prometheus.Labels{"key": key},
61 })
62 oversizeGossipMessageSentTotal := prometheus.NewCounter(prometheus.CounterOpts{
63 Name: "alertmanager_oversized_gossip_message_sent_total",
64 Help: "Number of oversized gossip message sent.",
65 ConstLabels: prometheus.Labels{"key": key},
66 })
67 oversizeGossipMessageDroppedTotal := prometheus.NewCounter(prometheus.CounterOpts{
68 Name: "alertmanager_oversized_gossip_message_dropped_total",
69 Help: "Number of oversized gossip messages that were dropped due to a full message queue.",
70 ConstLabels: prometheus.Labels{"key": key},
71 })
72 oversizeGossipDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
73 Name: "alertmanager_oversize_gossip_message_duration_seconds",
74 Help: "Duration of oversized gossip message requests.",
75 ConstLabels: prometheus.Labels{"key": key},
76 })
77
78 reg.MustRegister(oversizeGossipDuration, oversizeGossipMessageFailureTotal, oversizeGossipMessageDroppedTotal, oversizeGossipMessageSentTotal)
79
80 c := &Channel{
81 key: key,
82 send: send,
83 peers: peers,
84 logger: logger,
85 msgc: make(chan []byte, 200),
86 sendOversize: sendOversize,
87 oversizeGossipMessageFailureTotal: oversizeGossipMessageFailureTotal,
88 oversizeGossipMessageDroppedTotal: oversizeGossipMessageDroppedTotal,
89 oversizeGossipMessageSentTotal: oversizeGossipMessageSentTotal,
90 oversizeGossipDuration: oversizeGossipDuration,
91 }
92
93 go c.handleOverSizedMessages(stopc)
94
95 return c
96 }
97
98
99
100 func (c *Channel) handleOverSizedMessages(stopc chan struct{}) {
101 var wg sync.WaitGroup
102 for {
103 select {
104 case b := <-c.msgc:
105 for _, n := range c.peers() {
106 wg.Add(1)
107 go func(n *memberlist.Node) {
108 defer wg.Done()
109 c.oversizeGossipMessageSentTotal.Inc()
110 start := time.Now()
111 if err := c.sendOversize(n, b); err != nil {
112 level.Debug(c.logger).Log("msg", "failed to send reliable", "key", c.key, "node", n, "err", err)
113 c.oversizeGossipMessageFailureTotal.Inc()
114 return
115 }
116 c.oversizeGossipDuration.Observe(time.Since(start).Seconds())
117 }(n)
118 }
119
120 wg.Wait()
121 case <-stopc:
122 return
123 }
124 }
125 }
126
127
128 func (c *Channel) Broadcast(b []byte) {
129 b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b})
130 if err != nil {
131 return
132 }
133
134 if OversizedMessage(b) {
135 select {
136 case c.msgc <- b:
137 default:
138 level.Debug(c.logger).Log("msg", "oversized gossip channel full")
139 c.oversizeGossipMessageDroppedTotal.Inc()
140 }
141 } else {
142 c.send(b)
143 }
144 }
145
146
147
148 func OversizedMessage(b []byte) bool {
149 return len(b) > MaxGossipPacketSize/2
150 }
151
View as plain text