1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "context"
19 "sync"
20 "time"
21
22 "go.etcd.io/etcd/client/pkg/v3/types"
23 "go.etcd.io/etcd/raft/v3"
24 "go.etcd.io/etcd/raft/v3/raftpb"
25 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
26 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
27
28 "go.uber.org/zap"
29 "golang.org/x/time/rate"
30 )
31
32 const (
33
34
35
36
37
38
39
40 DefaultConnReadTimeout = 5 * time.Second
41 DefaultConnWriteTimeout = 5 * time.Second
42
43 recvBufSize = 4096
44
45
46
47
48
49
50 maxPendingProposals = 4096
51
52 streamAppV2 = "streamMsgAppV2"
53 streamMsg = "streamMsg"
54 pipelineMsg = "pipeline"
55 sendSnap = "sendMsgSnap"
56 )
57
58 var (
59 ConnReadTimeout = DefaultConnReadTimeout
60 ConnWriteTimeout = DefaultConnWriteTimeout
61 )
62
63 type Peer interface {
64
65
66
67
68 send(m raftpb.Message)
69
70
71
72 sendSnap(m snap.Message)
73
74
75 update(urls types.URLs)
76
77
78
79
80
81 attachOutgoingConn(conn *outgoingConn)
82
83
84 activeSince() time.Time
85
86
87 stop()
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101 type peer struct {
102 lg *zap.Logger
103
104 localID types.ID
105
106 id types.ID
107
108 r Raft
109
110 status *peerStatus
111
112 picker *urlPicker
113
114 msgAppV2Writer *streamWriter
115 writer *streamWriter
116 pipeline *pipeline
117 snapSender *snapshotSender
118 msgAppV2Reader *streamReader
119 msgAppReader *streamReader
120
121 recvc chan raftpb.Message
122 propc chan raftpb.Message
123
124 mu sync.Mutex
125 paused bool
126
127 cancel context.CancelFunc
128 stopc chan struct{}
129 }
130
131 func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
132 if t.Logger != nil {
133 t.Logger.Info("starting remote peer", zap.String("remote-peer-id", peerID.String()))
134 }
135 defer func() {
136 if t.Logger != nil {
137 t.Logger.Info("started remote peer", zap.String("remote-peer-id", peerID.String()))
138 }
139 }()
140
141 status := newPeerStatus(t.Logger, t.ID, peerID)
142 picker := newURLPicker(urls)
143 errorc := t.ErrorC
144 r := t.Raft
145 pipeline := &pipeline{
146 peerID: peerID,
147 tr: t,
148 picker: picker,
149 status: status,
150 followerStats: fs,
151 raft: r,
152 errorc: errorc,
153 }
154 pipeline.start()
155
156 p := &peer{
157 lg: t.Logger,
158 localID: t.ID,
159 id: peerID,
160 r: r,
161 status: status,
162 picker: picker,
163 msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
164 writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
165 pipeline: pipeline,
166 snapSender: newSnapshotSender(t, picker, peerID, status),
167 recvc: make(chan raftpb.Message, recvBufSize),
168 propc: make(chan raftpb.Message, maxPendingProposals),
169 stopc: make(chan struct{}),
170 }
171
172 ctx, cancel := context.WithCancel(context.Background())
173 p.cancel = cancel
174 go func() {
175 for {
176 select {
177 case mm := <-p.recvc:
178 if err := r.Process(ctx, mm); err != nil {
179 if t.Logger != nil {
180 t.Logger.Warn("failed to process Raft message", zap.Error(err))
181 }
182 }
183 case <-p.stopc:
184 return
185 }
186 }
187 }()
188
189
190
191
192 go func() {
193 for {
194 select {
195 case mm := <-p.propc:
196 if err := r.Process(ctx, mm); err != nil {
197 if t.Logger != nil {
198 t.Logger.Warn("failed to process Raft message", zap.Error(err))
199 }
200 }
201 case <-p.stopc:
202 return
203 }
204 }
205 }()
206
207 p.msgAppV2Reader = &streamReader{
208 lg: t.Logger,
209 peerID: peerID,
210 typ: streamTypeMsgAppV2,
211 tr: t,
212 picker: picker,
213 status: status,
214 recvc: p.recvc,
215 propc: p.propc,
216 rl: rate.NewLimiter(t.DialRetryFrequency, 1),
217 }
218 p.msgAppReader = &streamReader{
219 lg: t.Logger,
220 peerID: peerID,
221 typ: streamTypeMessage,
222 tr: t,
223 picker: picker,
224 status: status,
225 recvc: p.recvc,
226 propc: p.propc,
227 rl: rate.NewLimiter(t.DialRetryFrequency, 1),
228 }
229
230 p.msgAppV2Reader.start()
231 p.msgAppReader.start()
232
233 return p
234 }
235
236 func (p *peer) send(m raftpb.Message) {
237 p.mu.Lock()
238 paused := p.paused
239 p.mu.Unlock()
240
241 if paused {
242 return
243 }
244
245 writec, name := p.pick(m)
246 select {
247 case writec <- m:
248 default:
249 p.r.ReportUnreachable(m.To)
250 if isMsgSnap(m) {
251 p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
252 }
253 if p.status.isActive() {
254 if p.lg != nil {
255 p.lg.Warn(
256 "dropped internal Raft message since sending buffer is full (overloaded network)",
257 zap.String("message-type", m.Type.String()),
258 zap.String("local-member-id", p.localID.String()),
259 zap.String("from", types.ID(m.From).String()),
260 zap.String("remote-peer-id", p.id.String()),
261 zap.String("remote-peer-name", name),
262 zap.Bool("remote-peer-active", p.status.isActive()),
263 )
264 }
265 } else {
266 if p.lg != nil {
267 p.lg.Warn(
268 "dropped internal Raft message since sending buffer is full (overloaded network)",
269 zap.String("message-type", m.Type.String()),
270 zap.String("local-member-id", p.localID.String()),
271 zap.String("from", types.ID(m.From).String()),
272 zap.String("remote-peer-id", p.id.String()),
273 zap.String("remote-peer-name", name),
274 zap.Bool("remote-peer-active", p.status.isActive()),
275 )
276 }
277 }
278 sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
279 }
280 }
281
282 func (p *peer) sendSnap(m snap.Message) {
283 go p.snapSender.send(m)
284 }
285
286 func (p *peer) update(urls types.URLs) {
287 p.picker.update(urls)
288 }
289
290 func (p *peer) attachOutgoingConn(conn *outgoingConn) {
291 var ok bool
292 switch conn.t {
293 case streamTypeMsgAppV2:
294 ok = p.msgAppV2Writer.attach(conn)
295 case streamTypeMessage:
296 ok = p.writer.attach(conn)
297 default:
298 if p.lg != nil {
299 p.lg.Panic("unknown stream type", zap.String("type", conn.t.String()))
300 }
301 }
302 if !ok {
303 conn.Close()
304 }
305 }
306
307 func (p *peer) activeSince() time.Time { return p.status.activeSince() }
308
309
310
311 func (p *peer) Pause() {
312 p.mu.Lock()
313 defer p.mu.Unlock()
314 p.paused = true
315 p.msgAppReader.pause()
316 p.msgAppV2Reader.pause()
317 }
318
319
320 func (p *peer) Resume() {
321 p.mu.Lock()
322 defer p.mu.Unlock()
323 p.paused = false
324 p.msgAppReader.resume()
325 p.msgAppV2Reader.resume()
326 }
327
328 func (p *peer) stop() {
329 if p.lg != nil {
330 p.lg.Info("stopping remote peer", zap.String("remote-peer-id", p.id.String()))
331 }
332
333 defer func() {
334 if p.lg != nil {
335 p.lg.Info("stopped remote peer", zap.String("remote-peer-id", p.id.String()))
336 }
337 }()
338
339 close(p.stopc)
340 p.cancel()
341 p.msgAppV2Writer.stop()
342 p.writer.stop()
343 p.pipeline.stop()
344 p.snapSender.stop()
345 p.msgAppV2Reader.stop()
346 p.msgAppReader.stop()
347 }
348
349
350
351 func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
352 var ok bool
353
354
355 if isMsgSnap(m) {
356 return p.pipeline.msgc, pipelineMsg
357 } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
358 return writec, streamAppV2
359 } else if writec, ok = p.writer.writec(); ok {
360 return writec, streamMsg
361 }
362 return p.pipeline.msgc, pipelineMsg
363 }
364
365 func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
366
367 func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }
368
View as plain text