...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "go.etcd.io/etcd/client/pkg/v3/types"
19 "go.etcd.io/etcd/raft/v3/raftpb"
20
21 "go.uber.org/zap"
22 )
23
24 type remote struct {
25 lg *zap.Logger
26 localID types.ID
27 id types.ID
28 status *peerStatus
29 pipeline *pipeline
30 }
31
32 func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
33 picker := newURLPicker(urls)
34 status := newPeerStatus(tr.Logger, tr.ID, id)
35 pipeline := &pipeline{
36 peerID: id,
37 tr: tr,
38 picker: picker,
39 status: status,
40 raft: tr.Raft,
41 errorc: tr.ErrorC,
42 }
43 pipeline.start()
44
45 return &remote{
46 lg: tr.Logger,
47 localID: tr.ID,
48 id: id,
49 status: status,
50 pipeline: pipeline,
51 }
52 }
53
54 func (g *remote) send(m raftpb.Message) {
55 select {
56 case g.pipeline.msgc <- m:
57 default:
58 if g.status.isActive() {
59 if g.lg != nil {
60 g.lg.Warn(
61 "dropped internal Raft message since sending buffer is full (overloaded network)",
62 zap.String("message-type", m.Type.String()),
63 zap.String("local-member-id", g.localID.String()),
64 zap.String("from", types.ID(m.From).String()),
65 zap.String("remote-peer-id", g.id.String()),
66 zap.Bool("remote-peer-active", g.status.isActive()),
67 )
68 }
69 } else {
70 if g.lg != nil {
71 g.lg.Warn(
72 "dropped Raft message since sending buffer is full (overloaded network)",
73 zap.String("message-type", m.Type.String()),
74 zap.String("local-member-id", g.localID.String()),
75 zap.String("from", types.ID(m.From).String()),
76 zap.String("remote-peer-id", g.id.String()),
77 zap.Bool("remote-peer-active", g.status.isActive()),
78 )
79 }
80 }
81 sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
82 }
83 }
84
85 func (g *remote) stop() {
86 g.pipeline.stop()
87 }
88
89 func (g *remote) Pause() {
90 g.stop()
91 }
92
93 func (g *remote) Resume() {
94 g.pipeline.start()
95 }
96
View as plain text