1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdserver
16
17 import (
18 "encoding/json"
19 "expvar"
20 "fmt"
21 "log"
22 "sort"
23 "sync"
24 "time"
25
26 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
27 "go.etcd.io/etcd/client/pkg/v3/logutil"
28 "go.etcd.io/etcd/client/pkg/v3/types"
29 "go.etcd.io/etcd/pkg/v3/contention"
30 "go.etcd.io/etcd/pkg/v3/pbutil"
31 "go.etcd.io/etcd/raft/v3"
32 "go.etcd.io/etcd/raft/v3/raftpb"
33 "go.etcd.io/etcd/server/v3/config"
34 "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
35 "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
36 "go.etcd.io/etcd/server/v3/wal"
37 "go.etcd.io/etcd/server/v3/wal/walpb"
38 "go.uber.org/zap"
39 )
40
41 const (
42
43
44 maxSizePerMsg = 1 * 1024 * 1024
45
46
47 maxInflightMsgs = 4096 / 8
48 )
49
50 var (
51
52 raftStatusMu sync.Mutex
53
54
55
56
57
58 raftStatus func() raft.Status
59 )
60
61 func init() {
62 expvar.Publish("raft.status", expvar.Func(func() interface{} {
63 raftStatusMu.Lock()
64 defer raftStatusMu.Unlock()
65 if raftStatus == nil {
66 return nil
67 }
68 return raftStatus()
69 }))
70 }
71
72
73
74
75
76 type apply struct {
77 entries []raftpb.Entry
78 snapshot raftpb.Snapshot
79
80 notifyc chan struct{}
81 }
82
83 type raftNode struct {
84 lg *zap.Logger
85
86 tickMu *sync.Mutex
87 raftNodeConfig
88
89
90 msgSnapC chan raftpb.Message
91
92
93 applyc chan apply
94
95
96 readStateC chan raft.ReadState
97
98
99 ticker *time.Ticker
100
101 td *contention.TimeoutDetector
102
103 stopped chan struct{}
104 done chan struct{}
105 }
106
107 type raftNodeConfig struct {
108 lg *zap.Logger
109
110
111 isIDRemoved func(id uint64) bool
112 raft.Node
113 raftStorage *raft.MemoryStorage
114 storage Storage
115 heartbeat time.Duration
116
117
118
119
120 transport rafthttp.Transporter
121 }
122
123 func newRaftNode(cfg raftNodeConfig) *raftNode {
124 var lg raft.Logger
125 if cfg.lg != nil {
126 lg = NewRaftLoggerZap(cfg.lg)
127 } else {
128 lcfg := logutil.DefaultZapLoggerConfig
129 var err error
130 lg, err = NewRaftLogger(&lcfg)
131 if err != nil {
132 log.Fatalf("cannot create raft logger %v", err)
133 }
134 }
135 raft.SetLogger(lg)
136 r := &raftNode{
137 lg: cfg.lg,
138 tickMu: new(sync.Mutex),
139 raftNodeConfig: cfg,
140
141
142 td: contention.NewTimeoutDetector(2 * cfg.heartbeat),
143 readStateC: make(chan raft.ReadState, 1),
144 msgSnapC: make(chan raftpb.Message, maxInFlightMsgSnap),
145 applyc: make(chan apply),
146 stopped: make(chan struct{}),
147 done: make(chan struct{}),
148 }
149 if r.heartbeat == 0 {
150 r.ticker = &time.Ticker{}
151 } else {
152 r.ticker = time.NewTicker(r.heartbeat)
153 }
154 return r
155 }
156
157
158 func (r *raftNode) tick() {
159 r.tickMu.Lock()
160 r.Tick()
161 r.tickMu.Unlock()
162 }
163
164
165
166 func (r *raftNode) start(rh *raftReadyHandler) {
167 internalTimeout := time.Second
168
169 go func() {
170 defer r.onStop()
171 islead := false
172
173 for {
174 select {
175 case <-r.ticker.C:
176 r.tick()
177 case rd := <-r.Ready():
178 if rd.SoftState != nil {
179 newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
180 if newLeader {
181 leaderChanges.Inc()
182 }
183
184 if rd.SoftState.Lead == raft.None {
185 hasLeader.Set(0)
186 } else {
187 hasLeader.Set(1)
188 }
189
190 rh.updateLead(rd.SoftState.Lead)
191 islead = rd.RaftState == raft.StateLeader
192 if islead {
193 isLeader.Set(1)
194 } else {
195 isLeader.Set(0)
196 }
197 rh.updateLeadership(newLeader)
198 r.td.Reset()
199 }
200
201 if len(rd.ReadStates) != 0 {
202 select {
203 case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
204 case <-time.After(internalTimeout):
205 r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
206 case <-r.stopped:
207 return
208 }
209 }
210
211 notifyc := make(chan struct{}, 1)
212 ap := apply{
213 entries: rd.CommittedEntries,
214 snapshot: rd.Snapshot,
215 notifyc: notifyc,
216 }
217
218 updateCommittedIndex(&ap, rh)
219
220 waitWALSync := shouldWaitWALSync(rd)
221 if waitWALSync {
222
223 if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
224 r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
225 }
226 }
227
228 select {
229 case r.applyc <- ap:
230 case <-r.stopped:
231 return
232 }
233
234
235
236
237 if islead {
238
239 r.transport.Send(r.processMessages(rd.Messages))
240 }
241
242
243
244 if !raft.IsEmptySnap(rd.Snapshot) {
245
246 if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
247 r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
248 }
249
250 }
251
252 if !waitWALSync {
253
254 if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
255 r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
256 }
257 }
258 if !raft.IsEmptyHardState(rd.HardState) {
259 proposalsCommitted.Set(float64(rd.HardState.Commit))
260 }
261
262
263 if !raft.IsEmptySnap(rd.Snapshot) {
264
265
266
267
268 if err := r.storage.Sync(); err != nil {
269 r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
270 }
271
272
273 notifyc <- struct{}{}
274
275
276 r.raftStorage.ApplySnapshot(rd.Snapshot)
277 r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
278
279
280 if err := r.storage.Release(rd.Snapshot); err != nil {
281 r.lg.Fatal("failed to release Raft wal", zap.Error(err))
282 }
283
284 }
285
286 r.raftStorage.Append(rd.Entries)
287
288 if !islead {
289
290 msgs := r.processMessages(rd.Messages)
291
292
293 notifyc <- struct{}{}
294
295
296
297
298
299
300
301
302 waitApply := false
303 for _, ent := range rd.CommittedEntries {
304 if ent.Type == raftpb.EntryConfChange {
305 waitApply = true
306 break
307 }
308 }
309 if waitApply {
310
311
312
313 select {
314 case notifyc <- struct{}{}:
315 case <-r.stopped:
316 return
317 }
318 }
319
320
321 r.transport.Send(msgs)
322 } else {
323
324 notifyc <- struct{}{}
325 }
326
327 r.Advance()
328 case <-r.stopped:
329 return
330 }
331 }
332 }()
333 }
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359 func shouldWaitWALSync(rd raft.Ready) bool {
360 if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 {
361 return false
362 }
363
364
365
366 lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1]
367 firstUnstableEntry := rd.Entries[0]
368 return lastCommittedEntry.Term > firstUnstableEntry.Term ||
369 (lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index)
370 }
371
372 func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
373 var ci uint64
374 if len(ap.entries) != 0 {
375 ci = ap.entries[len(ap.entries)-1].Index
376 }
377 if ap.snapshot.Metadata.Index > ci {
378 ci = ap.snapshot.Metadata.Index
379 }
380 if ci != 0 {
381 rh.updateCommittedIndex(ci)
382 }
383 }
384
385 func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
386 sentAppResp := false
387 for i := len(ms) - 1; i >= 0; i-- {
388 if r.isIDRemoved(ms[i].To) {
389 ms[i].To = 0
390 }
391
392 if ms[i].Type == raftpb.MsgAppResp {
393 if sentAppResp {
394 ms[i].To = 0
395 } else {
396 sentAppResp = true
397 }
398 }
399
400 if ms[i].Type == raftpb.MsgSnap {
401
402
403
404
405 select {
406 case r.msgSnapC <- ms[i]:
407 default:
408
409 }
410 ms[i].To = 0
411 }
412 if ms[i].Type == raftpb.MsgHeartbeat {
413 ok, exceed := r.td.Observe(ms[i].To)
414 if !ok {
415
416 r.lg.Warn(
417 "leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk",
418 zap.String("to", fmt.Sprintf("%x", ms[i].To)),
419 zap.Duration("heartbeat-interval", r.heartbeat),
420 zap.Duration("expected-duration", 2*r.heartbeat),
421 zap.Duration("exceeded-duration", exceed),
422 )
423 heartbeatSendFailures.Inc()
424 }
425 }
426 }
427 return ms
428 }
429
430 func (r *raftNode) apply() chan apply {
431 return r.applyc
432 }
433
434 func (r *raftNode) stop() {
435 r.stopped <- struct{}{}
436 <-r.done
437 }
438
439 func (r *raftNode) onStop() {
440 r.Stop()
441 r.ticker.Stop()
442 r.transport.Stop()
443 if err := r.storage.Close(); err != nil {
444 r.lg.Panic("failed to close Raft storage", zap.Error(err))
445 }
446 close(r.done)
447 }
448
449
450 func (r *raftNode) pauseSending() {
451 p := r.transport.(rafthttp.Pausable)
452 p.Pause()
453 }
454
455 func (r *raftNode) resumeSending() {
456 p := r.transport.(rafthttp.Pausable)
457 p.Resume()
458 }
459
460
461
462
463
464 func (r *raftNode) advanceTicks(ticks int) {
465 for i := 0; i < ticks; i++ {
466 r.tick()
467 }
468 }
469
470 func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
471 var err error
472 member := cl.MemberByName(cfg.Name)
473 metadata := pbutil.MustMarshal(
474 &pb.Metadata{
475 NodeID: uint64(member.ID),
476 ClusterID: uint64(cl.ID()),
477 },
478 )
479 if w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata); err != nil {
480 cfg.Logger.Panic("failed to create WAL", zap.Error(err))
481 }
482 if cfg.UnsafeNoFsync {
483 w.SetUnsafeNoFsync()
484 }
485 peers := make([]raft.Peer, len(ids))
486 for i, id := range ids {
487 var ctx []byte
488 ctx, err = json.Marshal((*cl).Member(id))
489 if err != nil {
490 cfg.Logger.Panic("failed to marshal member", zap.Error(err))
491 }
492 peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
493 }
494 id = member.ID
495 cfg.Logger.Info(
496 "starting local member",
497 zap.String("local-member-id", id.String()),
498 zap.String("cluster-id", cl.ID().String()),
499 )
500 s = raft.NewMemoryStorage()
501 c := &raft.Config{
502 ID: uint64(id),
503 ElectionTick: cfg.ElectionTicks,
504 HeartbeatTick: 1,
505 Storage: s,
506 MaxSizePerMsg: maxSizePerMsg,
507 MaxInflightMsgs: maxInflightMsgs,
508 CheckQuorum: true,
509 PreVote: cfg.PreVote,
510 Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")),
511 }
512 if len(peers) == 0 {
513 n = raft.RestartNode(c)
514 } else {
515 n = raft.StartNode(c, peers)
516 }
517 raftStatusMu.Lock()
518 raftStatus = n.Status
519 raftStatusMu.Unlock()
520 return id, n, s, w
521 }
522
523 func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
524 var walsnap walpb.Snapshot
525 if snapshot != nil {
526 walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
527 }
528 w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
529
530 cfg.Logger.Info(
531 "restarting local member",
532 zap.String("cluster-id", cid.String()),
533 zap.String("local-member-id", id.String()),
534 zap.Uint64("commit-index", st.Commit),
535 )
536 cl := membership.NewCluster(cfg.Logger)
537 cl.SetID(id, cid)
538 s := raft.NewMemoryStorage()
539 if snapshot != nil {
540 s.ApplySnapshot(*snapshot)
541 }
542 s.SetHardState(st)
543 s.Append(ents)
544 c := &raft.Config{
545 ID: uint64(id),
546 ElectionTick: cfg.ElectionTicks,
547 HeartbeatTick: 1,
548 Storage: s,
549 MaxSizePerMsg: maxSizePerMsg,
550 MaxInflightMsgs: maxInflightMsgs,
551 CheckQuorum: true,
552 PreVote: cfg.PreVote,
553 Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")),
554 }
555
556 n := raft.RestartNode(c)
557 raftStatusMu.Lock()
558 raftStatus = n.Status
559 raftStatusMu.Unlock()
560 return id, cl, n, s, w
561 }
562
563 func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
564 var walsnap walpb.Snapshot
565 if snapshot != nil {
566 walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
567 }
568 w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
569
570
571 for i, ent := range ents {
572 if ent.Index > st.Commit {
573 cfg.Logger.Info(
574 "discarding uncommitted WAL entries",
575 zap.Uint64("entry-index", ent.Index),
576 zap.Uint64("commit-index-from-wal", st.Commit),
577 zap.Int("number-of-discarded-entries", len(ents)-i),
578 )
579 ents = ents[:i]
580 break
581 }
582 }
583
584
585 toAppEnts := createConfigChangeEnts(
586 cfg.Logger,
587 getIDs(cfg.Logger, snapshot, ents),
588 uint64(id),
589 st.Term,
590 st.Commit,
591 )
592 ents = append(ents, toAppEnts...)
593
594
595 err := w.Save(raftpb.HardState{}, toAppEnts)
596 if err != nil {
597 cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err))
598 }
599 if len(ents) != 0 {
600 st.Commit = ents[len(ents)-1].Index
601 }
602
603 cfg.Logger.Info(
604 "forcing restart member",
605 zap.String("cluster-id", cid.String()),
606 zap.String("local-member-id", id.String()),
607 zap.Uint64("commit-index", st.Commit),
608 )
609
610 cl := membership.NewCluster(cfg.Logger)
611 cl.SetID(id, cid)
612 s := raft.NewMemoryStorage()
613 if snapshot != nil {
614 s.ApplySnapshot(*snapshot)
615 }
616 s.SetHardState(st)
617 s.Append(ents)
618 c := &raft.Config{
619 ID: uint64(id),
620 ElectionTick: cfg.ElectionTicks,
621 HeartbeatTick: 1,
622 Storage: s,
623 MaxSizePerMsg: maxSizePerMsg,
624 MaxInflightMsgs: maxInflightMsgs,
625 CheckQuorum: true,
626 PreVote: cfg.PreVote,
627 Logger: NewRaftLoggerZap(cfg.Logger.Named("raft")),
628 }
629
630 n := raft.RestartNode(c)
631 raftStatus = n.Status
632 return id, cl, n, s, w
633 }
634
635
636
637
638
639
640
641 func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
642 ids := make(map[uint64]bool)
643 if snap != nil {
644 for _, id := range snap.Metadata.ConfState.Voters {
645 ids[id] = true
646 }
647 }
648 for _, e := range ents {
649 if e.Type != raftpb.EntryConfChange {
650 continue
651 }
652 var cc raftpb.ConfChange
653 pbutil.MustUnmarshal(&cc, e.Data)
654 switch cc.Type {
655 case raftpb.ConfChangeAddLearnerNode:
656 ids[cc.NodeID] = true
657 case raftpb.ConfChangeAddNode:
658 ids[cc.NodeID] = true
659 case raftpb.ConfChangeRemoveNode:
660 delete(ids, cc.NodeID)
661 case raftpb.ConfChangeUpdateNode:
662
663 default:
664 lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String()))
665 }
666 }
667 sids := make(types.Uint64Slice, 0, len(ids))
668 for id := range ids {
669 sids = append(sids, id)
670 }
671 sort.Sort(sids)
672 return []uint64(sids)
673 }
674
675
676
677
678
679
680 func createConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
681 found := false
682 for _, id := range ids {
683 if id == self {
684 found = true
685 }
686 }
687
688 var ents []raftpb.Entry
689 next := index + 1
690
691
692
693 if !found {
694 m := membership.Member{
695 ID: types.ID(self),
696 RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
697 }
698 ctx, err := json.Marshal(m)
699 if err != nil {
700 lg.Panic("failed to marshal member", zap.Error(err))
701 }
702 cc := &raftpb.ConfChange{
703 Type: raftpb.ConfChangeAddNode,
704 NodeID: self,
705 Context: ctx,
706 }
707 e := raftpb.Entry{
708 Type: raftpb.EntryConfChange,
709 Data: pbutil.MustMarshal(cc),
710 Term: term,
711 Index: next,
712 }
713 ents = append(ents, e)
714 next++
715 }
716
717 for _, id := range ids {
718 if id == self {
719 continue
720 }
721 cc := &raftpb.ConfChange{
722 Type: raftpb.ConfChangeRemoveNode,
723 NodeID: id,
724 }
725 e := raftpb.Entry{
726 Type: raftpb.EntryConfChange,
727 Data: pbutil.MustMarshal(cc),
728 Term: term,
729 Index: next,
730 }
731 ents = append(ents, e)
732 next++
733 }
734
735 return ents
736 }
737
View as plain text