1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "context"
19 "net/http"
20 "sync"
21 "time"
22
23 "go.etcd.io/etcd/client/pkg/v3/transport"
24 "go.etcd.io/etcd/client/pkg/v3/types"
25 "go.etcd.io/etcd/raft/v3"
26 "go.etcd.io/etcd/raft/v3/raftpb"
27 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
28 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
29
30 "github.com/xiang90/probing"
31 "go.uber.org/zap"
32 "golang.org/x/time/rate"
33 )
34
35 type Raft interface {
36 Process(ctx context.Context, m raftpb.Message) error
37 IsIDRemoved(id uint64) bool
38 ReportUnreachable(id uint64)
39 ReportSnapshot(id uint64, status raft.SnapshotStatus)
40 }
41
42 type Transporter interface {
43
44
45 Start() error
46
47
48
49
50
51 Handler() http.Handler
52
53
54
55
56
57 Send(m []raftpb.Message)
58
59
60 SendSnapshot(m snap.Message)
61
62
63
64
65
66 AddRemote(id types.ID, urls []string)
67
68
69
70
71 AddPeer(id types.ID, urls []string)
72
73 RemovePeer(id types.ID)
74
75 RemoveAllPeers()
76
77
78
79 UpdatePeer(id types.ID, urls []string)
80
81
82
83
84 ActiveSince(id types.ID) time.Time
85
86 ActivePeers() int
87
88 Stop()
89 }
90
91
92
93
94
95
96
97 type Transport struct {
98 Logger *zap.Logger
99
100 DialTimeout time.Duration
101
102
103 DialRetryFrequency rate.Limit
104
105 TLSInfo transport.TLSInfo
106
107 ID types.ID
108 URLs types.URLs
109 ClusterID types.ID
110 Raft Raft
111 Snapshotter *snap.Snapshotter
112 ServerStats *stats.ServerStats
113
114
115 LeaderStats *stats.LeaderStats
116
117
118
119
120 ErrorC chan error
121
122 streamRt http.RoundTripper
123 pipelineRt http.RoundTripper
124
125 mu sync.RWMutex
126 remotes map[types.ID]*remote
127 peers map[types.ID]Peer
128
129 pipelineProber probing.Prober
130 streamProber probing.Prober
131 }
132
133 func (t *Transport) Start() error {
134 var err error
135 t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
136 if err != nil {
137 return err
138 }
139 t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
140 if err != nil {
141 return err
142 }
143 t.remotes = make(map[types.ID]*remote)
144 t.peers = make(map[types.ID]Peer)
145 t.pipelineProber = probing.NewProber(t.pipelineRt)
146 t.streamProber = probing.NewProber(t.streamRt)
147
148
149
150
151 if t.DialRetryFrequency == 0 {
152 t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
153 }
154 return nil
155 }
156
157 func (t *Transport) Handler() http.Handler {
158 pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
159 streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
160 snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
161 mux := http.NewServeMux()
162 mux.Handle(RaftPrefix, pipelineHandler)
163 mux.Handle(RaftStreamPrefix+"/", streamHandler)
164 mux.Handle(RaftSnapshotPrefix, snapHandler)
165 mux.Handle(ProbingPrefix, probing.NewHandler())
166 return mux
167 }
168
169 func (t *Transport) Get(id types.ID) Peer {
170 t.mu.RLock()
171 defer t.mu.RUnlock()
172 return t.peers[id]
173 }
174
175 func (t *Transport) Send(msgs []raftpb.Message) {
176 for _, m := range msgs {
177 if m.To == 0 {
178
179 continue
180 }
181 to := types.ID(m.To)
182
183 t.mu.RLock()
184 p, pok := t.peers[to]
185 g, rok := t.remotes[to]
186 t.mu.RUnlock()
187
188 if pok {
189 if m.Type == raftpb.MsgApp {
190 t.ServerStats.SendAppendReq(m.Size())
191 }
192 p.send(m)
193 continue
194 }
195
196 if rok {
197 g.send(m)
198 continue
199 }
200
201 if t.Logger != nil {
202 t.Logger.Debug(
203 "ignored message send request; unknown remote peer target",
204 zap.String("type", m.Type.String()),
205 zap.String("unknown-target-peer-id", to.String()),
206 )
207 }
208 }
209 }
210
211 func (t *Transport) Stop() {
212 t.mu.Lock()
213 defer t.mu.Unlock()
214 for _, r := range t.remotes {
215 r.stop()
216 }
217 for _, p := range t.peers {
218 p.stop()
219 }
220 t.pipelineProber.RemoveAll()
221 t.streamProber.RemoveAll()
222 if tr, ok := t.streamRt.(*http.Transport); ok {
223 tr.CloseIdleConnections()
224 }
225 if tr, ok := t.pipelineRt.(*http.Transport); ok {
226 tr.CloseIdleConnections()
227 }
228 t.peers = nil
229 t.remotes = nil
230 }
231
232
233 func (t *Transport) CutPeer(id types.ID) {
234 t.mu.RLock()
235 p, pok := t.peers[id]
236 g, gok := t.remotes[id]
237 t.mu.RUnlock()
238
239 if pok {
240 p.(Pausable).Pause()
241 }
242 if gok {
243 g.Pause()
244 }
245 }
246
247
248 func (t *Transport) MendPeer(id types.ID) {
249 t.mu.RLock()
250 p, pok := t.peers[id]
251 g, gok := t.remotes[id]
252 t.mu.RUnlock()
253
254 if pok {
255 p.(Pausable).Resume()
256 }
257 if gok {
258 g.Resume()
259 }
260 }
261
262 func (t *Transport) AddRemote(id types.ID, us []string) {
263 t.mu.Lock()
264 defer t.mu.Unlock()
265 if t.remotes == nil {
266
267
268
269 return
270 }
271 if _, ok := t.peers[id]; ok {
272 return
273 }
274 if _, ok := t.remotes[id]; ok {
275 return
276 }
277 urls, err := types.NewURLs(us)
278 if err != nil {
279 if t.Logger != nil {
280 t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
281 }
282 }
283 t.remotes[id] = startRemote(t, urls, id)
284
285 if t.Logger != nil {
286 t.Logger.Info(
287 "added new remote peer",
288 zap.String("local-member-id", t.ID.String()),
289 zap.String("remote-peer-id", id.String()),
290 zap.Strings("remote-peer-urls", us),
291 )
292 }
293 }
294
295 func (t *Transport) AddPeer(id types.ID, us []string) {
296 t.mu.Lock()
297 defer t.mu.Unlock()
298
299 if t.peers == nil {
300 panic("transport stopped")
301 }
302 if _, ok := t.peers[id]; ok {
303 return
304 }
305 urls, err := types.NewURLs(us)
306 if err != nil {
307 if t.Logger != nil {
308 t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
309 }
310 }
311 fs := t.LeaderStats.Follower(id.String())
312 t.peers[id] = startPeer(t, urls, id, fs)
313 addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
314 addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
315
316 if t.Logger != nil {
317 t.Logger.Info(
318 "added remote peer",
319 zap.String("local-member-id", t.ID.String()),
320 zap.String("remote-peer-id", id.String()),
321 zap.Strings("remote-peer-urls", us),
322 )
323 }
324 }
325
326 func (t *Transport) RemovePeer(id types.ID) {
327 t.mu.Lock()
328 defer t.mu.Unlock()
329 t.removePeer(id)
330 }
331
332 func (t *Transport) RemoveAllPeers() {
333 t.mu.Lock()
334 defer t.mu.Unlock()
335 for id := range t.peers {
336 t.removePeer(id)
337 }
338 }
339
340
341 func (t *Transport) removePeer(id types.ID) {
342 if peer, ok := t.peers[id]; ok {
343 peer.stop()
344 } else {
345 if t.Logger != nil {
346 t.Logger.Panic("unexpected removal of unknown remote peer", zap.String("remote-peer-id", id.String()))
347 }
348 }
349 delete(t.peers, id)
350 delete(t.LeaderStats.Followers, id.String())
351 t.pipelineProber.Remove(id.String())
352 t.streamProber.Remove(id.String())
353
354 if t.Logger != nil {
355 t.Logger.Info(
356 "removed remote peer",
357 zap.String("local-member-id", t.ID.String()),
358 zap.String("removed-remote-peer-id", id.String()),
359 )
360 }
361 }
362
363 func (t *Transport) UpdatePeer(id types.ID, us []string) {
364 t.mu.Lock()
365 defer t.mu.Unlock()
366
367 if _, ok := t.peers[id]; !ok {
368 return
369 }
370 urls, err := types.NewURLs(us)
371 if err != nil {
372 if t.Logger != nil {
373 t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
374 }
375 }
376 t.peers[id].update(urls)
377
378 t.pipelineProber.Remove(id.String())
379 addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
380 t.streamProber.Remove(id.String())
381 addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
382
383 if t.Logger != nil {
384 t.Logger.Info(
385 "updated remote peer",
386 zap.String("local-member-id", t.ID.String()),
387 zap.String("updated-remote-peer-id", id.String()),
388 zap.Strings("updated-remote-peer-urls", us),
389 )
390 }
391 }
392
393 func (t *Transport) ActiveSince(id types.ID) time.Time {
394 t.mu.RLock()
395 defer t.mu.RUnlock()
396 if p, ok := t.peers[id]; ok {
397 return p.activeSince()
398 }
399 return time.Time{}
400 }
401
402 func (t *Transport) SendSnapshot(m snap.Message) {
403 t.mu.Lock()
404 defer t.mu.Unlock()
405 p := t.peers[types.ID(m.To)]
406 if p == nil {
407 m.CloseWithError(errMemberNotFound)
408 return
409 }
410 p.sendSnap(m)
411 }
412
413
414 type Pausable interface {
415 Pause()
416 Resume()
417 }
418
419 func (t *Transport) Pause() {
420 t.mu.RLock()
421 defer t.mu.RUnlock()
422 for _, p := range t.peers {
423 p.(Pausable).Pause()
424 }
425 }
426
427 func (t *Transport) Resume() {
428 t.mu.RLock()
429 defer t.mu.RUnlock()
430 for _, p := range t.peers {
431 p.(Pausable).Resume()
432 }
433 }
434
435
436
437
438 func (t *Transport) ActivePeers() (cnt int) {
439 t.mu.RLock()
440 defer t.mu.RUnlock()
441 for _, p := range t.peers {
442 if !p.activeSince().IsZero() {
443 cnt++
444 }
445 }
446 return cnt
447 }
448
View as plain text