1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "io/ioutil"
22 "net/http"
23 "path"
24 "strings"
25 "time"
26
27 "go.etcd.io/etcd/api/v3/version"
28 "go.etcd.io/etcd/client/pkg/v3/types"
29 pioutil "go.etcd.io/etcd/pkg/v3/ioutil"
30 "go.etcd.io/etcd/raft/v3/raftpb"
31 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
32
33 humanize "github.com/dustin/go-humanize"
34 "go.uber.org/zap"
35 )
36
37 const (
38
39
40
41
42
43
44 connReadLimitByte = 64 * 1024
45
46
47 snapshotLimitByte = 1 * 1024 * 1024 * 1024 * 1024
48 )
49
50 var (
51 RaftPrefix = "/raft"
52 ProbingPrefix = path.Join(RaftPrefix, "probing")
53 RaftStreamPrefix = path.Join(RaftPrefix, "stream")
54 RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
55
56 errIncompatibleVersion = errors.New("incompatible version")
57 ErrClusterIDMismatch = errors.New("cluster ID mismatch")
58 )
59
60 type peerGetter interface {
61 Get(id types.ID) Peer
62 }
63
64 type writerToResponse interface {
65 WriteTo(w http.ResponseWriter)
66 }
67
68 type pipelineHandler struct {
69 lg *zap.Logger
70 localID types.ID
71 tr Transporter
72 r Raft
73 cid types.ID
74 }
75
76
77
78
79
80
81 func newPipelineHandler(t *Transport, r Raft, cid types.ID) http.Handler {
82 h := &pipelineHandler{
83 lg: t.Logger,
84 localID: t.ID,
85 tr: t,
86 r: r,
87 cid: cid,
88 }
89 if h.lg == nil {
90 h.lg = zap.NewNop()
91 }
92 return h
93 }
94
95 func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
96 if r.Method != "POST" {
97 w.Header().Set("Allow", "POST")
98 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
99 return
100 }
101
102 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
103
104 if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
105 http.Error(w, err.Error(), http.StatusPreconditionFailed)
106 return
107 }
108
109 addRemoteFromRequest(h.tr, r)
110
111
112
113 limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
114 b, err := ioutil.ReadAll(limitedr)
115 if err != nil {
116 h.lg.Warn(
117 "failed to read Raft message",
118 zap.String("local-member-id", h.localID.String()),
119 zap.Error(err),
120 )
121 http.Error(w, "error reading raft message", http.StatusBadRequest)
122 recvFailures.WithLabelValues(r.RemoteAddr).Inc()
123 return
124 }
125
126 var m raftpb.Message
127 if err := m.Unmarshal(b); err != nil {
128 h.lg.Warn(
129 "failed to unmarshal Raft message",
130 zap.String("local-member-id", h.localID.String()),
131 zap.Error(err),
132 )
133 http.Error(w, "error unmarshalling raft message", http.StatusBadRequest)
134 recvFailures.WithLabelValues(r.RemoteAddr).Inc()
135 return
136 }
137
138 receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
139
140 if err := h.r.Process(context.TODO(), m); err != nil {
141 switch v := err.(type) {
142 case writerToResponse:
143 v.WriteTo(w)
144 default:
145 h.lg.Warn(
146 "failed to process Raft message",
147 zap.String("local-member-id", h.localID.String()),
148 zap.Error(err),
149 )
150 http.Error(w, "error processing raft message", http.StatusInternalServerError)
151 w.(http.Flusher).Flush()
152
153 panic(err)
154 }
155 return
156 }
157
158
159
160 w.WriteHeader(http.StatusNoContent)
161 }
162
163 type snapshotHandler struct {
164 lg *zap.Logger
165 tr Transporter
166 r Raft
167 snapshotter *snap.Snapshotter
168
169 localID types.ID
170 cid types.ID
171 }
172
173 func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
174 h := &snapshotHandler{
175 lg: t.Logger,
176 tr: t,
177 r: r,
178 snapshotter: snapshotter,
179 localID: t.ID,
180 cid: cid,
181 }
182 if h.lg == nil {
183 h.lg = zap.NewNop()
184 }
185 return h
186 }
187
188 const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
189
190
191
192
193
194
195
196
197
198
199 func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
200 start := time.Now()
201
202 if r.Method != "POST" {
203 w.Header().Set("Allow", "POST")
204 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
205 snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
206 return
207 }
208
209 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
210
211 if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
212 http.Error(w, err.Error(), http.StatusPreconditionFailed)
213 snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
214 return
215 }
216
217 addRemoteFromRequest(h.tr, r)
218
219 dec := &messageDecoder{r: r.Body}
220
221 m, err := dec.decodeLimit(snapshotLimitByte)
222 from := types.ID(m.From).String()
223 if err != nil {
224 msg := fmt.Sprintf("failed to decode raft message (%v)", err)
225 h.lg.Warn(
226 "failed to decode Raft message",
227 zap.String("local-member-id", h.localID.String()),
228 zap.String("remote-snapshot-sender-id", from),
229 zap.Error(err),
230 )
231 http.Error(w, msg, http.StatusBadRequest)
232 recvFailures.WithLabelValues(r.RemoteAddr).Inc()
233 snapshotReceiveFailures.WithLabelValues(from).Inc()
234 return
235 }
236
237 msgSize := m.Size()
238 receivedBytes.WithLabelValues(from).Add(float64(msgSize))
239
240 if m.Type != raftpb.MsgSnap {
241 h.lg.Warn(
242 "unexpected Raft message type",
243 zap.String("local-member-id", h.localID.String()),
244 zap.String("remote-snapshot-sender-id", from),
245 zap.String("message-type", m.Type.String()),
246 )
247 http.Error(w, "wrong raft message type", http.StatusBadRequest)
248 snapshotReceiveFailures.WithLabelValues(from).Inc()
249 return
250 }
251
252 snapshotReceiveInflights.WithLabelValues(from).Inc()
253 defer func() {
254 snapshotReceiveInflights.WithLabelValues(from).Dec()
255 }()
256
257 h.lg.Info(
258 "receiving database snapshot",
259 zap.String("local-member-id", h.localID.String()),
260 zap.String("remote-snapshot-sender-id", from),
261 zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
262 zap.Int("incoming-snapshot-message-size-bytes", msgSize),
263 zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
264 )
265
266
267
268 n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
269 if err != nil {
270 msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
271 h.lg.Warn(
272 "failed to save incoming database snapshot",
273 zap.String("local-member-id", h.localID.String()),
274 zap.String("remote-snapshot-sender-id", from),
275 zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
276 zap.Error(err),
277 )
278 http.Error(w, msg, http.StatusInternalServerError)
279 snapshotReceiveFailures.WithLabelValues(from).Inc()
280 return
281 }
282
283 receivedBytes.WithLabelValues(from).Add(float64(n))
284
285 downloadTook := time.Since(start)
286 h.lg.Info(
287 "received and saved database snapshot",
288 zap.String("local-member-id", h.localID.String()),
289 zap.String("remote-snapshot-sender-id", from),
290 zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
291 zap.Int64("incoming-snapshot-size-bytes", n),
292 zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
293 zap.String("download-took", downloadTook.String()),
294 )
295
296 if err := h.r.Process(context.TODO(), m); err != nil {
297 switch v := err.(type) {
298
299
300 case writerToResponse:
301 v.WriteTo(w)
302 default:
303 msg := fmt.Sprintf("failed to process raft message (%v)", err)
304 h.lg.Warn(
305 "failed to process Raft message",
306 zap.String("local-member-id", h.localID.String()),
307 zap.String("remote-snapshot-sender-id", from),
308 zap.Error(err),
309 )
310 http.Error(w, msg, http.StatusInternalServerError)
311 snapshotReceiveFailures.WithLabelValues(from).Inc()
312 }
313 return
314 }
315
316
317
318 w.WriteHeader(http.StatusNoContent)
319
320 snapshotReceive.WithLabelValues(from).Inc()
321 snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
322 }
323
324 type streamHandler struct {
325 lg *zap.Logger
326 tr *Transport
327 peerGetter peerGetter
328 r Raft
329 id types.ID
330 cid types.ID
331 }
332
333 func newStreamHandler(t *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
334 h := &streamHandler{
335 lg: t.Logger,
336 tr: t,
337 peerGetter: pg,
338 r: r,
339 id: id,
340 cid: cid,
341 }
342 if h.lg == nil {
343 h.lg = zap.NewNop()
344 }
345 return h
346 }
347
348 func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
349 if r.Method != "GET" {
350 w.Header().Set("Allow", "GET")
351 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
352 return
353 }
354
355 w.Header().Set("X-Server-Version", version.Version)
356 w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
357
358 if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil {
359 http.Error(w, err.Error(), http.StatusPreconditionFailed)
360 return
361 }
362
363 var t streamType
364 switch path.Dir(r.URL.Path) {
365 case streamTypeMsgAppV2.endpoint(h.lg):
366 t = streamTypeMsgAppV2
367 case streamTypeMessage.endpoint(h.lg):
368 t = streamTypeMessage
369 default:
370 h.lg.Debug(
371 "ignored unexpected streaming request path",
372 zap.String("local-member-id", h.tr.ID.String()),
373 zap.String("remote-peer-id-stream-handler", h.id.String()),
374 zap.String("path", r.URL.Path),
375 )
376 http.Error(w, "invalid path", http.StatusNotFound)
377 return
378 }
379
380 fromStr := path.Base(r.URL.Path)
381 from, err := types.IDFromString(fromStr)
382 if err != nil {
383 h.lg.Warn(
384 "failed to parse path into ID",
385 zap.String("local-member-id", h.tr.ID.String()),
386 zap.String("remote-peer-id-stream-handler", h.id.String()),
387 zap.String("path", fromStr),
388 zap.Error(err),
389 )
390 http.Error(w, "invalid from", http.StatusNotFound)
391 return
392 }
393 if h.r.IsIDRemoved(uint64(from)) {
394 h.lg.Warn(
395 "rejected stream from remote peer because it was removed",
396 zap.String("local-member-id", h.tr.ID.String()),
397 zap.String("remote-peer-id-stream-handler", h.id.String()),
398 zap.String("remote-peer-id-from", from.String()),
399 )
400 http.Error(w, "removed member", http.StatusGone)
401 return
402 }
403 p := h.peerGetter.Get(from)
404 if p == nil {
405
406
407
408
409
410 if urls := r.Header.Get("X-PeerURLs"); urls != "" {
411 h.tr.AddRemote(from, strings.Split(urls, ","))
412 }
413 h.lg.Warn(
414 "failed to find remote peer in cluster",
415 zap.String("local-member-id", h.tr.ID.String()),
416 zap.String("remote-peer-id-stream-handler", h.id.String()),
417 zap.String("remote-peer-id-from", from.String()),
418 zap.String("cluster-id", h.cid.String()),
419 )
420 http.Error(w, "error sender not found", http.StatusNotFound)
421 return
422 }
423
424 wto := h.id.String()
425 if gto := r.Header.Get("X-Raft-To"); gto != wto {
426 h.lg.Warn(
427 "ignored streaming request; ID mismatch",
428 zap.String("local-member-id", h.tr.ID.String()),
429 zap.String("remote-peer-id-stream-handler", h.id.String()),
430 zap.String("remote-peer-id-header", gto),
431 zap.String("remote-peer-id-from", from.String()),
432 zap.String("cluster-id", h.cid.String()),
433 )
434 http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
435 return
436 }
437
438 w.WriteHeader(http.StatusOK)
439 w.(http.Flusher).Flush()
440
441 c := newCloseNotifier()
442 conn := &outgoingConn{
443 t: t,
444 Writer: w,
445 Flusher: w.(http.Flusher),
446 Closer: c,
447 localID: h.tr.ID,
448 peerID: from,
449 }
450 p.attachOutgoingConn(conn)
451 <-c.closeNotify()
452 }
453
454
455
456
457
458
459 func checkClusterCompatibilityFromHeader(lg *zap.Logger, localID types.ID, header http.Header, cid types.ID) error {
460 remoteName := header.Get("X-Server-From")
461
462 remoteServer := serverVersion(header)
463 remoteVs := ""
464 if remoteServer != nil {
465 remoteVs = remoteServer.String()
466 }
467
468 remoteMinClusterVer := minClusterVersion(header)
469 remoteMinClusterVs := ""
470 if remoteMinClusterVer != nil {
471 remoteMinClusterVs = remoteMinClusterVer.String()
472 }
473
474 localServer, localMinCluster, err := checkVersionCompatibility(remoteName, remoteServer, remoteMinClusterVer)
475
476 localVs := ""
477 if localServer != nil {
478 localVs = localServer.String()
479 }
480 localMinClusterVs := ""
481 if localMinCluster != nil {
482 localMinClusterVs = localMinCluster.String()
483 }
484
485 if err != nil {
486 lg.Warn(
487 "failed to check version compatibility",
488 zap.String("local-member-id", localID.String()),
489 zap.String("local-member-cluster-id", cid.String()),
490 zap.String("local-member-server-version", localVs),
491 zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
492 zap.String("remote-peer-server-name", remoteName),
493 zap.String("remote-peer-server-version", remoteVs),
494 zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
495 zap.Error(err),
496 )
497 return errIncompatibleVersion
498 }
499 if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
500 lg.Warn(
501 "request cluster ID mismatch",
502 zap.String("local-member-id", localID.String()),
503 zap.String("local-member-cluster-id", cid.String()),
504 zap.String("local-member-server-version", localVs),
505 zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
506 zap.String("remote-peer-server-name", remoteName),
507 zap.String("remote-peer-server-version", remoteVs),
508 zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
509 zap.String("remote-peer-cluster-id", gcid),
510 )
511 return ErrClusterIDMismatch
512 }
513 return nil
514 }
515
516 type closeNotifier struct {
517 done chan struct{}
518 }
519
520 func newCloseNotifier() *closeNotifier {
521 return &closeNotifier{
522 done: make(chan struct{}),
523 }
524 }
525
526 func (n *closeNotifier) Close() error {
527 close(n.done)
528 return nil
529 }
530
531 func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done }
532
View as plain text