...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "bytes"
19 "context"
20 "errors"
21 "io/ioutil"
22 "runtime"
23 "sync"
24 "time"
25
26 "go.etcd.io/etcd/client/pkg/v3/types"
27 "go.etcd.io/etcd/pkg/v3/pbutil"
28 "go.etcd.io/etcd/raft/v3"
29 "go.etcd.io/etcd/raft/v3/raftpb"
30 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
31
32 "go.uber.org/zap"
33 )
34
35 const (
36 connPerPipeline = 4
37
38
39
40
41 pipelineBufSize = 64
42 )
43
44 var errStopped = errors.New("stopped")
45
46 type pipeline struct {
47 peerID types.ID
48
49 tr *Transport
50 picker *urlPicker
51 status *peerStatus
52 raft Raft
53 errorc chan error
54
55 followerStats *stats.FollowerStats
56
57 msgc chan raftpb.Message
58
59 wg sync.WaitGroup
60 stopc chan struct{}
61 }
62
63 func (p *pipeline) start() {
64 p.stopc = make(chan struct{})
65 p.msgc = make(chan raftpb.Message, pipelineBufSize)
66 p.wg.Add(connPerPipeline)
67 for i := 0; i < connPerPipeline; i++ {
68 go p.handle()
69 }
70
71 if p.tr != nil && p.tr.Logger != nil {
72 p.tr.Logger.Info(
73 "started HTTP pipelining with remote peer",
74 zap.String("local-member-id", p.tr.ID.String()),
75 zap.String("remote-peer-id", p.peerID.String()),
76 )
77 }
78 }
79
80 func (p *pipeline) stop() {
81 close(p.stopc)
82 p.wg.Wait()
83
84 if p.tr != nil && p.tr.Logger != nil {
85 p.tr.Logger.Info(
86 "stopped HTTP pipelining with remote peer",
87 zap.String("local-member-id", p.tr.ID.String()),
88 zap.String("remote-peer-id", p.peerID.String()),
89 )
90 }
91 }
92
93 func (p *pipeline) handle() {
94 defer p.wg.Done()
95
96 for {
97 select {
98 case m := <-p.msgc:
99 start := time.Now()
100 err := p.post(pbutil.MustMarshal(&m))
101 end := time.Now()
102
103 if err != nil {
104 p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
105
106 if m.Type == raftpb.MsgApp && p.followerStats != nil {
107 p.followerStats.Fail()
108 }
109 p.raft.ReportUnreachable(m.To)
110 if isMsgSnap(m) {
111 p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
112 }
113 sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
114 continue
115 }
116
117 p.status.activate()
118 if m.Type == raftpb.MsgApp && p.followerStats != nil {
119 p.followerStats.Succ(end.Sub(start))
120 }
121 if isMsgSnap(m) {
122 p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
123 }
124 sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
125 case <-p.stopc:
126 return
127 }
128 }
129 }
130
131
132
133 func (p *pipeline) post(data []byte) (err error) {
134 u := p.picker.pick()
135 req := createPostRequest(p.tr.Logger, u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
136
137 done := make(chan struct{}, 1)
138 ctx, cancel := context.WithCancel(context.Background())
139 req = req.WithContext(ctx)
140 go func() {
141 select {
142 case <-done:
143 cancel()
144 case <-p.stopc:
145 waitSchedule()
146 cancel()
147 }
148 }()
149
150 resp, err := p.tr.pipelineRt.RoundTrip(req)
151 done <- struct{}{}
152 if err != nil {
153 p.picker.unreachable(u)
154 return err
155 }
156 defer resp.Body.Close()
157 b, err := ioutil.ReadAll(resp.Body)
158 if err != nil {
159 p.picker.unreachable(u)
160 return err
161 }
162
163 err = checkPostResponse(p.tr.Logger, resp, b, req, p.peerID)
164 if err != nil {
165 p.picker.unreachable(u)
166
167
168 if err == errMemberRemoved {
169 reportCriticalError(err, p.errorc)
170 }
171 return err
172 }
173
174 return nil
175 }
176
177
178 func waitSchedule() { runtime.Gosched() }
179
View as plain text