1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "context"
19 "fmt"
20 "io"
21 "io/ioutil"
22 "net/http"
23 "path"
24 "strings"
25 "sync"
26 "time"
27
28 "go.etcd.io/etcd/api/v3/version"
29 "go.etcd.io/etcd/client/pkg/v3/transport"
30 "go.etcd.io/etcd/client/pkg/v3/types"
31 "go.etcd.io/etcd/pkg/v3/httputil"
32 "go.etcd.io/etcd/raft/v3/raftpb"
33 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
34
35 "github.com/coreos/go-semver/semver"
36 "go.uber.org/zap"
37 "golang.org/x/time/rate"
38 )
39
40 const (
41 streamTypeMessage streamType = "message"
42 streamTypeMsgAppV2 streamType = "msgappv2"
43
44 streamBufSize = 4096
45 )
46
47 var (
48 errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
49
50
51 supportedStream = map[string][]streamType{
52 "2.0.0": {},
53 "2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
54 "2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
55 "2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
56 "3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
57 "3.1.0": {streamTypeMsgAppV2, streamTypeMessage},
58 "3.2.0": {streamTypeMsgAppV2, streamTypeMessage},
59 "3.3.0": {streamTypeMsgAppV2, streamTypeMessage},
60 "3.4.0": {streamTypeMsgAppV2, streamTypeMessage},
61 "3.5.0": {streamTypeMsgAppV2, streamTypeMessage},
62 }
63 )
64
65 type streamType string
66
67 func (t streamType) endpoint(lg *zap.Logger) string {
68 switch t {
69 case streamTypeMsgAppV2:
70 return path.Join(RaftStreamPrefix, "msgapp")
71 case streamTypeMessage:
72 return path.Join(RaftStreamPrefix, "message")
73 default:
74 if lg != nil {
75 lg.Panic("unhandled stream type", zap.String("stream-type", t.String()))
76 }
77 return ""
78 }
79 }
80
81 func (t streamType) String() string {
82 switch t {
83 case streamTypeMsgAppV2:
84 return "stream MsgApp v2"
85 case streamTypeMessage:
86 return "stream Message"
87 default:
88 return "unknown stream"
89 }
90 }
91
92 var (
93
94
95
96 linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
97 )
98
99 func isLinkHeartbeatMessage(m *raftpb.Message) bool {
100 return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
101 }
102
103 type outgoingConn struct {
104 t streamType
105 io.Writer
106 http.Flusher
107 io.Closer
108
109 localID types.ID
110 peerID types.ID
111 }
112
113
114 type streamWriter struct {
115 lg *zap.Logger
116
117 localID types.ID
118 peerID types.ID
119
120 status *peerStatus
121 fs *stats.FollowerStats
122 r Raft
123
124 mu sync.Mutex
125 closer io.Closer
126 working bool
127
128 msgc chan raftpb.Message
129 connc chan *outgoingConn
130 stopc chan struct{}
131 done chan struct{}
132 }
133
134
135
136 func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
137 w := &streamWriter{
138 lg: lg,
139
140 localID: local,
141 peerID: id,
142
143 status: status,
144 fs: fs,
145 r: r,
146 msgc: make(chan raftpb.Message, streamBufSize),
147 connc: make(chan *outgoingConn),
148 stopc: make(chan struct{}),
149 done: make(chan struct{}),
150 }
151 go w.run()
152 return w
153 }
154
155 func (cw *streamWriter) run() {
156 var (
157 msgc chan raftpb.Message
158 heartbeatc <-chan time.Time
159 t streamType
160 enc encoder
161 flusher http.Flusher
162 batched int
163 )
164 tickc := time.NewTicker(ConnReadTimeout / 3)
165 defer tickc.Stop()
166 unflushed := 0
167
168 if cw.lg != nil {
169 cw.lg.Info(
170 "started stream writer with remote peer",
171 zap.String("local-member-id", cw.localID.String()),
172 zap.String("remote-peer-id", cw.peerID.String()),
173 )
174 }
175
176 for {
177 select {
178 case <-heartbeatc:
179 err := enc.encode(&linkHeartbeatMessage)
180 unflushed += linkHeartbeatMessage.Size()
181 if err == nil {
182 flusher.Flush()
183 batched = 0
184 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
185 unflushed = 0
186 continue
187 }
188
189 cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
190
191 sentFailures.WithLabelValues(cw.peerID.String()).Inc()
192 cw.close()
193 if cw.lg != nil {
194 cw.lg.Warn(
195 "lost TCP streaming connection with remote peer",
196 zap.String("stream-writer-type", t.String()),
197 zap.String("local-member-id", cw.localID.String()),
198 zap.String("remote-peer-id", cw.peerID.String()),
199 )
200 }
201 heartbeatc, msgc = nil, nil
202
203 case m := <-msgc:
204 err := enc.encode(&m)
205 if err == nil {
206 unflushed += m.Size()
207
208 if len(msgc) == 0 || batched > streamBufSize/2 {
209 flusher.Flush()
210 sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
211 unflushed = 0
212 batched = 0
213 } else {
214 batched++
215 }
216
217 continue
218 }
219
220 cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
221 cw.close()
222 if cw.lg != nil {
223 cw.lg.Warn(
224 "lost TCP streaming connection with remote peer",
225 zap.String("stream-writer-type", t.String()),
226 zap.String("local-member-id", cw.localID.String()),
227 zap.String("remote-peer-id", cw.peerID.String()),
228 )
229 }
230 heartbeatc, msgc = nil, nil
231 cw.r.ReportUnreachable(m.To)
232 sentFailures.WithLabelValues(cw.peerID.String()).Inc()
233
234 case conn := <-cw.connc:
235 cw.mu.Lock()
236 closed := cw.closeUnlocked()
237 t = conn.t
238 switch conn.t {
239 case streamTypeMsgAppV2:
240 enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
241 case streamTypeMessage:
242 enc = &messageEncoder{w: conn.Writer}
243 default:
244 if cw.lg != nil {
245 cw.lg.Panic("unhandled stream type", zap.String("stream-type", t.String()))
246 }
247 }
248 if cw.lg != nil {
249 cw.lg.Info(
250 "set message encoder",
251 zap.String("from", conn.localID.String()),
252 zap.String("to", conn.peerID.String()),
253 zap.String("stream-type", t.String()),
254 )
255 }
256 flusher = conn.Flusher
257 unflushed = 0
258 cw.status.activate()
259 cw.closer = conn.Closer
260 cw.working = true
261 cw.mu.Unlock()
262
263 if closed {
264 if cw.lg != nil {
265 cw.lg.Warn(
266 "closed TCP streaming connection with remote peer",
267 zap.String("stream-writer-type", t.String()),
268 zap.String("local-member-id", cw.localID.String()),
269 zap.String("remote-peer-id", cw.peerID.String()),
270 )
271 }
272 }
273 if cw.lg != nil {
274 cw.lg.Info(
275 "established TCP streaming connection with remote peer",
276 zap.String("stream-writer-type", t.String()),
277 zap.String("local-member-id", cw.localID.String()),
278 zap.String("remote-peer-id", cw.peerID.String()),
279 )
280 }
281 heartbeatc, msgc = tickc.C, cw.msgc
282
283 case <-cw.stopc:
284 if cw.close() {
285 if cw.lg != nil {
286 cw.lg.Warn(
287 "closed TCP streaming connection with remote peer",
288 zap.String("stream-writer-type", t.String()),
289 zap.String("remote-peer-id", cw.peerID.String()),
290 )
291 }
292 }
293 if cw.lg != nil {
294 cw.lg.Info(
295 "stopped TCP streaming connection with remote peer",
296 zap.String("stream-writer-type", t.String()),
297 zap.String("remote-peer-id", cw.peerID.String()),
298 )
299 }
300 close(cw.done)
301 return
302 }
303 }
304 }
305
306 func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
307 cw.mu.Lock()
308 defer cw.mu.Unlock()
309 return cw.msgc, cw.working
310 }
311
312 func (cw *streamWriter) close() bool {
313 cw.mu.Lock()
314 defer cw.mu.Unlock()
315 return cw.closeUnlocked()
316 }
317
318 func (cw *streamWriter) closeUnlocked() bool {
319 if !cw.working {
320 return false
321 }
322 if err := cw.closer.Close(); err != nil {
323 if cw.lg != nil {
324 cw.lg.Warn(
325 "failed to close connection with remote peer",
326 zap.String("remote-peer-id", cw.peerID.String()),
327 zap.Error(err),
328 )
329 }
330 }
331 if len(cw.msgc) > 0 {
332 cw.r.ReportUnreachable(uint64(cw.peerID))
333 }
334 cw.msgc = make(chan raftpb.Message, streamBufSize)
335 cw.working = false
336 return true
337 }
338
339 func (cw *streamWriter) attach(conn *outgoingConn) bool {
340 select {
341 case cw.connc <- conn:
342 return true
343 case <-cw.done:
344 return false
345 }
346 }
347
348 func (cw *streamWriter) stop() {
349 close(cw.stopc)
350 <-cw.done
351 }
352
353
354
355 type streamReader struct {
356 lg *zap.Logger
357
358 peerID types.ID
359 typ streamType
360
361 tr *Transport
362 picker *urlPicker
363 status *peerStatus
364 recvc chan<- raftpb.Message
365 propc chan<- raftpb.Message
366
367 rl *rate.Limiter
368
369 errorc chan<- error
370
371 mu sync.Mutex
372 paused bool
373 closer io.Closer
374
375 ctx context.Context
376 cancel context.CancelFunc
377 done chan struct{}
378 }
379
380 func (cr *streamReader) start() {
381 cr.done = make(chan struct{})
382 if cr.errorc == nil {
383 cr.errorc = cr.tr.ErrorC
384 }
385 if cr.ctx == nil {
386 cr.ctx, cr.cancel = context.WithCancel(context.Background())
387 }
388 go cr.run()
389 }
390
391 func (cr *streamReader) run() {
392 t := cr.typ
393
394 if cr.lg != nil {
395 cr.lg.Info(
396 "started stream reader with remote peer",
397 zap.String("stream-reader-type", t.String()),
398 zap.String("local-member-id", cr.tr.ID.String()),
399 zap.String("remote-peer-id", cr.peerID.String()),
400 )
401 }
402
403 for {
404 rc, err := cr.dial(t)
405 if err != nil {
406 if err != errUnsupportedStreamType {
407 cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
408 }
409 } else {
410 cr.status.activate()
411 if cr.lg != nil {
412 cr.lg.Info(
413 "established TCP streaming connection with remote peer",
414 zap.String("stream-reader-type", cr.typ.String()),
415 zap.String("local-member-id", cr.tr.ID.String()),
416 zap.String("remote-peer-id", cr.peerID.String()),
417 )
418 }
419 err = cr.decodeLoop(rc, t)
420 if cr.lg != nil {
421 cr.lg.Warn(
422 "lost TCP streaming connection with remote peer",
423 zap.String("stream-reader-type", cr.typ.String()),
424 zap.String("local-member-id", cr.tr.ID.String()),
425 zap.String("remote-peer-id", cr.peerID.String()),
426 zap.Error(err),
427 )
428 }
429 switch {
430
431 case err == io.EOF:
432
433 case transport.IsClosedConnError(err):
434 default:
435 cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
436 }
437 }
438
439 err = cr.rl.Wait(cr.ctx)
440 if cr.ctx.Err() != nil {
441 if cr.lg != nil {
442 cr.lg.Info(
443 "stopped stream reader with remote peer",
444 zap.String("stream-reader-type", t.String()),
445 zap.String("local-member-id", cr.tr.ID.String()),
446 zap.String("remote-peer-id", cr.peerID.String()),
447 )
448 }
449 close(cr.done)
450 return
451 }
452 if err != nil {
453 if cr.lg != nil {
454 cr.lg.Warn(
455 "rate limit on stream reader with remote peer",
456 zap.String("stream-reader-type", t.String()),
457 zap.String("local-member-id", cr.tr.ID.String()),
458 zap.String("remote-peer-id", cr.peerID.String()),
459 zap.Error(err),
460 )
461 }
462 }
463 }
464 }
465
466 func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
467 var dec decoder
468 cr.mu.Lock()
469 switch t {
470 case streamTypeMsgAppV2:
471 dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
472 case streamTypeMessage:
473 dec = &messageDecoder{r: rc}
474 default:
475 if cr.lg != nil {
476 cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
477 }
478 }
479 select {
480 case <-cr.ctx.Done():
481 cr.mu.Unlock()
482 if err := rc.Close(); err != nil {
483 return err
484 }
485 return io.EOF
486 default:
487 cr.closer = rc
488 }
489 cr.mu.Unlock()
490
491
492 for {
493 m, err := dec.decode()
494 if err != nil {
495 cr.mu.Lock()
496 cr.close()
497 cr.mu.Unlock()
498 return err
499 }
500
501
502
503 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
504
505 cr.mu.Lock()
506 paused := cr.paused
507 cr.mu.Unlock()
508
509 if paused {
510 continue
511 }
512
513 if isLinkHeartbeatMessage(&m) {
514
515
516
517 continue
518 }
519
520 recvc := cr.recvc
521 if m.Type == raftpb.MsgProp {
522 recvc = cr.propc
523 }
524
525 select {
526 case recvc <- m:
527 default:
528 if cr.status.isActive() {
529 if cr.lg != nil {
530 cr.lg.Warn(
531 "dropped internal Raft message since receiving buffer is full (overloaded network)",
532 zap.String("message-type", m.Type.String()),
533 zap.String("local-member-id", cr.tr.ID.String()),
534 zap.String("from", types.ID(m.From).String()),
535 zap.String("remote-peer-id", types.ID(m.To).String()),
536 zap.Bool("remote-peer-active", cr.status.isActive()),
537 )
538 }
539 } else {
540 if cr.lg != nil {
541 cr.lg.Warn(
542 "dropped Raft message since receiving buffer is full (overloaded network)",
543 zap.String("message-type", m.Type.String()),
544 zap.String("local-member-id", cr.tr.ID.String()),
545 zap.String("from", types.ID(m.From).String()),
546 zap.String("remote-peer-id", types.ID(m.To).String()),
547 zap.Bool("remote-peer-active", cr.status.isActive()),
548 )
549 }
550 }
551 recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
552 }
553 }
554 }
555
556 func (cr *streamReader) stop() {
557 cr.mu.Lock()
558 cr.cancel()
559 cr.close()
560 cr.mu.Unlock()
561 <-cr.done
562 }
563
564 func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
565 u := cr.picker.pick()
566 uu := u
567 uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())
568
569 if cr.lg != nil {
570 cr.lg.Debug(
571 "dial stream reader",
572 zap.String("from", cr.tr.ID.String()),
573 zap.String("to", cr.peerID.String()),
574 zap.String("address", uu.String()),
575 )
576 }
577 req, err := http.NewRequest("GET", uu.String(), nil)
578 if err != nil {
579 cr.picker.unreachable(u)
580 return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
581 }
582 req.Header.Set("X-Server-From", cr.tr.ID.String())
583 req.Header.Set("X-Server-Version", version.Version)
584 req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
585 req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
586 req.Header.Set("X-Raft-To", cr.peerID.String())
587
588 setPeerURLsHeader(req, cr.tr.URLs)
589
590 req = req.WithContext(cr.ctx)
591
592 cr.mu.Lock()
593 select {
594 case <-cr.ctx.Done():
595 cr.mu.Unlock()
596 return nil, fmt.Errorf("stream reader is stopped")
597 default:
598 }
599 cr.mu.Unlock()
600
601 resp, err := cr.tr.streamRt.RoundTrip(req)
602 if err != nil {
603 cr.picker.unreachable(u)
604 return nil, err
605 }
606
607 rv := serverVersion(resp.Header)
608 lv := semver.Must(semver.NewVersion(version.Version))
609 if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
610 httputil.GracefulClose(resp)
611 cr.picker.unreachable(u)
612 return nil, errUnsupportedStreamType
613 }
614
615 switch resp.StatusCode {
616 case http.StatusGone:
617 httputil.GracefulClose(resp)
618 cr.picker.unreachable(u)
619 reportCriticalError(errMemberRemoved, cr.errorc)
620 return nil, errMemberRemoved
621
622 case http.StatusOK:
623 return resp.Body, nil
624
625 case http.StatusNotFound:
626 httputil.GracefulClose(resp)
627 cr.picker.unreachable(u)
628 return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
629
630 case http.StatusPreconditionFailed:
631 b, err := ioutil.ReadAll(resp.Body)
632 if err != nil {
633 cr.picker.unreachable(u)
634 return nil, err
635 }
636 httputil.GracefulClose(resp)
637 cr.picker.unreachable(u)
638
639 switch strings.TrimSuffix(string(b), "\n") {
640 case errIncompatibleVersion.Error():
641 if cr.lg != nil {
642 cr.lg.Warn(
643 "request sent was ignored by remote peer due to server version incompatibility",
644 zap.String("local-member-id", cr.tr.ID.String()),
645 zap.String("remote-peer-id", cr.peerID.String()),
646 zap.Error(errIncompatibleVersion),
647 )
648 }
649 return nil, errIncompatibleVersion
650
651 case ErrClusterIDMismatch.Error():
652 if cr.lg != nil {
653 cr.lg.Warn(
654 "request sent was ignored by remote peer due to cluster ID mismatch",
655 zap.String("remote-peer-id", cr.peerID.String()),
656 zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
657 zap.String("local-member-id", cr.tr.ID.String()),
658 zap.String("local-member-cluster-id", cr.tr.ClusterID.String()),
659 zap.Error(ErrClusterIDMismatch),
660 )
661 }
662 return nil, ErrClusterIDMismatch
663
664 default:
665 return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
666 }
667
668 default:
669 httputil.GracefulClose(resp)
670 cr.picker.unreachable(u)
671 return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
672 }
673 }
674
675 func (cr *streamReader) close() {
676 if cr.closer != nil {
677 if err := cr.closer.Close(); err != nil {
678 if cr.lg != nil {
679 cr.lg.Warn(
680 "failed to close remote peer connection",
681 zap.String("local-member-id", cr.tr.ID.String()),
682 zap.String("remote-peer-id", cr.peerID.String()),
683 zap.Error(err),
684 )
685 }
686 }
687 }
688 cr.closer = nil
689 }
690
691 func (cr *streamReader) pause() {
692 cr.mu.Lock()
693 defer cr.mu.Unlock()
694 cr.paused = true
695 }
696
697 func (cr *streamReader) resume() {
698 cr.mu.Lock()
699 defer cr.mu.Unlock()
700 cr.paused = false
701 }
702
703
704
705 func checkStreamSupport(v *semver.Version, t streamType) bool {
706 nv := &semver.Version{Major: v.Major, Minor: v.Minor}
707 for _, s := range supportedStream[nv.String()] {
708 if s == t {
709 return true
710 }
711 }
712 return false
713 }
714
View as plain text