1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package raft
16
17 import (
18 "bytes"
19 "errors"
20 "fmt"
21 "math"
22 "math/rand"
23 "sort"
24 "strings"
25 "sync"
26 "time"
27
28 "go.etcd.io/etcd/raft/v3/confchange"
29 "go.etcd.io/etcd/raft/v3/quorum"
30 pb "go.etcd.io/etcd/raft/v3/raftpb"
31 "go.etcd.io/etcd/raft/v3/tracker"
32 )
33
34
35 const None uint64 = 0
36 const noLimit = math.MaxUint64
37
38
39 const (
40 StateFollower StateType = iota
41 StateCandidate
42 StateLeader
43 StatePreCandidate
44 numStates
45 )
46
47 type ReadOnlyOption int
48
49 const (
50
51
52 ReadOnlySafe ReadOnlyOption = iota
53
54
55
56
57
58 ReadOnlyLeaseBased
59 )
60
61
62 const (
63
64
65 campaignPreElection CampaignType = "CampaignPreElection"
66
67
68 campaignElection CampaignType = "CampaignElection"
69
70 campaignTransfer CampaignType = "CampaignTransfer"
71 )
72
73
74
75 var ErrProposalDropped = errors.New("raft proposal dropped")
76
77
78
79
80 type lockedRand struct {
81 mu sync.Mutex
82 rand *rand.Rand
83 }
84
85 func (r *lockedRand) Intn(n int) int {
86 r.mu.Lock()
87 v := r.rand.Intn(n)
88 r.mu.Unlock()
89 return v
90 }
91
92 var globalRand = &lockedRand{
93 rand: rand.New(rand.NewSource(time.Now().UnixNano())),
94 }
95
96
97
98
99 type CampaignType string
100
101
102 type StateType uint64
103
104 var stmap = [...]string{
105 "StateFollower",
106 "StateCandidate",
107 "StateLeader",
108 "StatePreCandidate",
109 }
110
111 func (st StateType) String() string {
112 return stmap[uint64(st)]
113 }
114
115
116 type Config struct {
117
118 ID uint64
119
120
121
122
123
124
125
126 ElectionTick int
127
128
129
130 HeartbeatTick int
131
132
133
134
135
136 Storage Storage
137
138
139
140
141 Applied uint64
142
143
144
145
146
147
148 MaxSizePerMsg uint64
149
150
151 MaxCommittedSizePerReady uint64
152
153
154
155
156 MaxUncommittedEntriesSize uint64
157
158
159
160
161
162 MaxInflightMsgs int
163
164
165
166 CheckQuorum bool
167
168
169
170
171 PreVote bool
172
173
174
175
176
177
178
179
180
181
182
183
184 ReadOnlyOption ReadOnlyOption
185
186
187
188 Logger Logger
189
190
191
192
193
194
195
196
197
198 DisableProposalForwarding bool
199 }
200
201 func (c *Config) validate() error {
202 if c.ID == None {
203 return errors.New("cannot use none as id")
204 }
205
206 if c.HeartbeatTick <= 0 {
207 return errors.New("heartbeat tick must be greater than 0")
208 }
209
210 if c.ElectionTick <= c.HeartbeatTick {
211 return errors.New("election tick must be greater than heartbeat tick")
212 }
213
214 if c.Storage == nil {
215 return errors.New("storage cannot be nil")
216 }
217
218 if c.MaxUncommittedEntriesSize == 0 {
219 c.MaxUncommittedEntriesSize = noLimit
220 }
221
222
223
224 if c.MaxCommittedSizePerReady == 0 {
225 c.MaxCommittedSizePerReady = c.MaxSizePerMsg
226 }
227
228 if c.MaxInflightMsgs <= 0 {
229 return errors.New("max inflight messages must be greater than 0")
230 }
231
232 if c.Logger == nil {
233 c.Logger = getLogger()
234 }
235
236 if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
237 return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
238 }
239
240 return nil
241 }
242
243 type raft struct {
244 id uint64
245
246 Term uint64
247 Vote uint64
248
249 readStates []ReadState
250
251
252 raftLog *raftLog
253
254 maxMsgSize uint64
255 maxUncommittedSize uint64
256
257 prs tracker.ProgressTracker
258
259 state StateType
260
261
262 isLearner bool
263
264 msgs []pb.Message
265
266
267 lead uint64
268
269
270 leadTransferee uint64
271
272
273
274
275
276
277 pendingConfIndex uint64
278
279
280
281 uncommittedSize uint64
282
283 readOnly *readOnly
284
285
286
287
288
289 electionElapsed int
290
291
292
293 heartbeatElapsed int
294
295 checkQuorum bool
296 preVote bool
297
298 heartbeatTimeout int
299 electionTimeout int
300
301
302
303 randomizedElectionTimeout int
304 disableProposalForwarding bool
305
306 tick func()
307 step stepFunc
308
309 logger Logger
310
311
312
313
314
315 pendingReadIndexMessages []pb.Message
316 }
317
318 func newRaft(c *Config) *raft {
319 if err := c.validate(); err != nil {
320 panic(err.Error())
321 }
322 raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
323 hs, cs, err := c.Storage.InitialState()
324 if err != nil {
325 panic(err)
326 }
327
328 r := &raft{
329 id: c.ID,
330 lead: None,
331 isLearner: false,
332 raftLog: raftlog,
333 maxMsgSize: c.MaxSizePerMsg,
334 maxUncommittedSize: c.MaxUncommittedEntriesSize,
335 prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
336 electionTimeout: c.ElectionTick,
337 heartbeatTimeout: c.HeartbeatTick,
338 logger: c.Logger,
339 checkQuorum: c.CheckQuorum,
340 preVote: c.PreVote,
341 readOnly: newReadOnly(c.ReadOnlyOption),
342 disableProposalForwarding: c.DisableProposalForwarding,
343 }
344
345 cfg, prs, err := confchange.Restore(confchange.Changer{
346 Tracker: r.prs,
347 LastIndex: raftlog.lastIndex(),
348 }, cs)
349 if err != nil {
350 panic(err)
351 }
352 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
353
354 if !IsEmptyHardState(hs) {
355 r.loadState(hs)
356 }
357 if c.Applied > 0 {
358 raftlog.appliedTo(c.Applied)
359 }
360 r.becomeFollower(r.Term, None)
361
362 var nodesStrs []string
363 for _, n := range r.prs.VoterNodes() {
364 nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
365 }
366
367 r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
368 r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
369 return r
370 }
371
372 func (r *raft) hasLeader() bool { return r.lead != None }
373
374 func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
375
376 func (r *raft) hardState() pb.HardState {
377 return pb.HardState{
378 Term: r.Term,
379 Vote: r.Vote,
380 Commit: r.raftLog.committed,
381 }
382 }
383
384
385
386 func (r *raft) send(m pb.Message) {
387 if m.From == None {
388 m.From = r.id
389 }
390 if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
391 if m.Term == 0 {
392
393
394
395
396
397
398
399
400
401
402
403
404 panic(fmt.Sprintf("term should be set when sending %s", m.Type))
405 }
406 } else {
407 if m.Term != 0 {
408 panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
409 }
410
411
412
413
414 if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
415 m.Term = r.Term
416 }
417 }
418 r.msgs = append(r.msgs, m)
419 }
420
421
422
423 func (r *raft) sendAppend(to uint64) {
424 r.maybeSendAppend(to, true)
425 }
426
427
428
429
430
431
432 func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
433 pr := r.prs.Progress[to]
434 if pr.IsPaused() {
435 return false
436 }
437 m := pb.Message{}
438 m.To = to
439
440 term, errt := r.raftLog.term(pr.Next - 1)
441 ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
442 if len(ents) == 0 && !sendIfEmpty {
443 return false
444 }
445
446 if errt != nil || erre != nil {
447 if !pr.RecentActive {
448 r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
449 return false
450 }
451
452 m.Type = pb.MsgSnap
453 snapshot, err := r.raftLog.snapshot()
454 if err != nil {
455 if err == ErrSnapshotTemporarilyUnavailable {
456 r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
457 return false
458 }
459 panic(err)
460 }
461 if IsEmptySnap(snapshot) {
462 panic("need non-empty snapshot")
463 }
464 m.Snapshot = snapshot
465 sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
466 r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
467 r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
468 pr.BecomeSnapshot(sindex)
469 r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
470 } else {
471 m.Type = pb.MsgApp
472 m.Index = pr.Next - 1
473 m.LogTerm = term
474 m.Entries = ents
475 m.Commit = r.raftLog.committed
476 if n := len(m.Entries); n != 0 {
477 switch pr.State {
478
479 case tracker.StateReplicate:
480 last := m.Entries[n-1].Index
481 pr.OptimisticUpdate(last)
482 pr.Inflights.Add(last)
483 case tracker.StateProbe:
484 pr.ProbeSent = true
485 default:
486 r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
487 }
488 }
489 }
490 r.send(m)
491 return true
492 }
493
494
495 func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
496
497
498
499
500
501
502 commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
503 m := pb.Message{
504 To: to,
505 Type: pb.MsgHeartbeat,
506 Commit: commit,
507 Context: ctx,
508 }
509
510 r.send(m)
511 }
512
513
514
515 func (r *raft) bcastAppend() {
516 r.prs.Visit(func(id uint64, _ *tracker.Progress) {
517 if id == r.id {
518 return
519 }
520 r.sendAppend(id)
521 })
522 }
523
524
525 func (r *raft) bcastHeartbeat() {
526 lastCtx := r.readOnly.lastPendingRequestCtx()
527 if len(lastCtx) == 0 {
528 r.bcastHeartbeatWithCtx(nil)
529 } else {
530 r.bcastHeartbeatWithCtx([]byte(lastCtx))
531 }
532 }
533
534 func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
535 r.prs.Visit(func(id uint64, _ *tracker.Progress) {
536 if id == r.id {
537 return
538 }
539 r.sendHeartbeat(id, ctx)
540 })
541 }
542
543 func (r *raft) advance(rd Ready) {
544 r.reduceUncommittedSize(rd.CommittedEntries)
545
546
547
548
549
550 if newApplied := rd.appliedCursor(); newApplied > 0 {
551 oldApplied := r.raftLog.applied
552 r.raftLog.appliedTo(newApplied)
553
554 if r.prs.Config.AutoLeave && oldApplied <= r.pendingConfIndex && newApplied >= r.pendingConfIndex && r.state == StateLeader {
555
556
557
558
559
560 ent := pb.Entry{
561 Type: pb.EntryConfChangeV2,
562 Data: nil,
563 }
564
565 if !r.appendEntry(ent) {
566 panic("refused un-refusable auto-leaving ConfChangeV2")
567 }
568 r.pendingConfIndex = r.raftLog.lastIndex()
569 r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
570 }
571 }
572
573 if len(rd.Entries) > 0 {
574 e := rd.Entries[len(rd.Entries)-1]
575 r.raftLog.stableTo(e.Index, e.Term)
576 }
577 if !IsEmptySnap(rd.Snapshot) {
578 r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
579 }
580 }
581
582
583
584
585 func (r *raft) maybeCommit() bool {
586 mci := r.prs.Committed()
587 return r.raftLog.maybeCommit(mci, r.Term)
588 }
589
590 func (r *raft) reset(term uint64) {
591 if r.Term != term {
592 r.Term = term
593 r.Vote = None
594 }
595 r.lead = None
596
597 r.electionElapsed = 0
598 r.heartbeatElapsed = 0
599 r.resetRandomizedElectionTimeout()
600
601 r.abortLeaderTransfer()
602
603 r.prs.ResetVotes()
604 r.prs.Visit(func(id uint64, pr *tracker.Progress) {
605 *pr = tracker.Progress{
606 Match: 0,
607 Next: r.raftLog.lastIndex() + 1,
608 Inflights: tracker.NewInflights(r.prs.MaxInflight),
609 IsLearner: pr.IsLearner,
610 }
611 if id == r.id {
612 pr.Match = r.raftLog.lastIndex()
613 }
614 })
615
616 r.pendingConfIndex = 0
617 r.uncommittedSize = 0
618 r.readOnly = newReadOnly(r.readOnly.option)
619 }
620
621 func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
622 li := r.raftLog.lastIndex()
623 for i := range es {
624 es[i].Term = r.Term
625 es[i].Index = li + 1 + uint64(i)
626 }
627
628 if !r.increaseUncommittedSize(es) {
629 r.logger.Debugf(
630 "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
631 r.id,
632 )
633
634 return false
635 }
636
637 li = r.raftLog.append(es...)
638 r.prs.Progress[r.id].MaybeUpdate(li)
639
640 r.maybeCommit()
641 return true
642 }
643
644
645 func (r *raft) tickElection() {
646 r.electionElapsed++
647
648 if r.promotable() && r.pastElectionTimeout() {
649 r.electionElapsed = 0
650 r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
651 }
652 }
653
654
655 func (r *raft) tickHeartbeat() {
656 r.heartbeatElapsed++
657 r.electionElapsed++
658
659 if r.electionElapsed >= r.electionTimeout {
660 r.electionElapsed = 0
661 if r.checkQuorum {
662 r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
663 }
664
665 if r.state == StateLeader && r.leadTransferee != None {
666 r.abortLeaderTransfer()
667 }
668 }
669
670 if r.state != StateLeader {
671 return
672 }
673
674 if r.heartbeatElapsed >= r.heartbeatTimeout {
675 r.heartbeatElapsed = 0
676 r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
677 }
678 }
679
680 func (r *raft) becomeFollower(term uint64, lead uint64) {
681 r.step = stepFollower
682 r.reset(term)
683 r.tick = r.tickElection
684 r.lead = lead
685 r.state = StateFollower
686 r.logger.Infof("%x became follower at term %d", r.id, r.Term)
687 }
688
689 func (r *raft) becomeCandidate() {
690
691 if r.state == StateLeader {
692 panic("invalid transition [leader -> candidate]")
693 }
694 r.step = stepCandidate
695 r.reset(r.Term + 1)
696 r.tick = r.tickElection
697 r.Vote = r.id
698 r.state = StateCandidate
699 r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
700 }
701
702 func (r *raft) becomePreCandidate() {
703
704 if r.state == StateLeader {
705 panic("invalid transition [leader -> pre-candidate]")
706 }
707
708
709
710 r.step = stepCandidate
711 r.prs.ResetVotes()
712 r.tick = r.tickElection
713 r.lead = None
714 r.state = StatePreCandidate
715 r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
716 }
717
718 func (r *raft) becomeLeader() {
719
720 if r.state == StateFollower {
721 panic("invalid transition [follower -> leader]")
722 }
723 r.step = stepLeader
724 r.reset(r.Term)
725 r.tick = r.tickHeartbeat
726 r.lead = r.id
727 r.state = StateLeader
728
729
730
731
732 r.prs.Progress[r.id].BecomeReplicate()
733
734
735
736
737
738
739 r.pendingConfIndex = r.raftLog.lastIndex()
740
741 emptyEnt := pb.Entry{Data: nil}
742 if !r.appendEntry(emptyEnt) {
743
744 r.logger.Panic("empty entry was dropped")
745 }
746
747
748
749
750 r.reduceUncommittedSize([]pb.Entry{emptyEnt})
751 r.logger.Infof("%x became leader at term %d", r.id, r.Term)
752 }
753
754 func (r *raft) hup(t CampaignType) {
755 if r.state == StateLeader {
756 r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
757 return
758 }
759
760 if !r.promotable() {
761 r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
762 return
763 }
764 ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
765 if err != nil {
766 r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
767 }
768 if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
769 r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
770 return
771 }
772
773 r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
774 r.campaign(t)
775 }
776
777
778
779 func (r *raft) campaign(t CampaignType) {
780 if !r.promotable() {
781
782
783 r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
784 }
785 var term uint64
786 var voteMsg pb.MessageType
787 if t == campaignPreElection {
788 r.becomePreCandidate()
789 voteMsg = pb.MsgPreVote
790
791 term = r.Term + 1
792 } else {
793 r.becomeCandidate()
794 voteMsg = pb.MsgVote
795 term = r.Term
796 }
797 if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
798
799
800 if t == campaignPreElection {
801 r.campaign(campaignElection)
802 } else {
803 r.becomeLeader()
804 }
805 return
806 }
807 var ids []uint64
808 {
809 idMap := r.prs.Voters.IDs()
810 ids = make([]uint64, 0, len(idMap))
811 for id := range idMap {
812 ids = append(ids, id)
813 }
814 sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
815 }
816 for _, id := range ids {
817 if id == r.id {
818 continue
819 }
820 r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
821 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
822
823 var ctx []byte
824 if t == campaignTransfer {
825 ctx = []byte(t)
826 }
827 r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
828 }
829 }
830
831 func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
832 if v {
833 r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
834 } else {
835 r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
836 }
837 r.prs.RecordVote(id, v)
838 return r.prs.TallyVotes()
839 }
840
841 func (r *raft) Step(m pb.Message) error {
842
843 switch {
844 case m.Term == 0:
845
846 case m.Term > r.Term:
847 if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
848 force := bytes.Equal(m.Context, []byte(campaignTransfer))
849 inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
850 if !force && inLease {
851
852
853 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
854 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
855 return nil
856 }
857 }
858 switch {
859 case m.Type == pb.MsgPreVote:
860
861 case m.Type == pb.MsgPreVoteResp && !m.Reject:
862
863
864
865
866
867 default:
868 r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
869 r.id, r.Term, m.Type, m.From, m.Term)
870 if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
871 r.becomeFollower(m.Term, m.From)
872 } else {
873 r.becomeFollower(m.Term, None)
874 }
875 }
876
877 case m.Term < r.Term:
878 if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
901 } else if m.Type == pb.MsgPreVote {
902
903
904
905 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
906 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
907 r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
908 } else {
909
910 r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
911 r.id, r.Term, m.Type, m.From, m.Term)
912 }
913 return nil
914 }
915
916 switch m.Type {
917 case pb.MsgHup:
918 if r.preVote {
919 r.hup(campaignPreElection)
920 } else {
921 r.hup(campaignElection)
922 }
923
924 case pb.MsgVote, pb.MsgPreVote:
925
926 canVote := r.Vote == m.From ||
927
928 (r.Vote == None && r.lead == None) ||
929
930 (m.Type == pb.MsgPreVote && m.Term > r.Term)
931
932 if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
952 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
953
954
955
956
957
958
959
960
961
962 r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
963 if m.Type == pb.MsgVote {
964
965 r.electionElapsed = 0
966 r.Vote = m.From
967 }
968 } else {
969 r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
970 r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
971 r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
972 }
973
974 default:
975 err := r.step(r, m)
976 if err != nil {
977 return err
978 }
979 }
980 return nil
981 }
982
983 type stepFunc func(r *raft, m pb.Message) error
984
985 func stepLeader(r *raft, m pb.Message) error {
986
987 switch m.Type {
988 case pb.MsgBeat:
989 r.bcastHeartbeat()
990 return nil
991 case pb.MsgCheckQuorum:
992
993
994
995
996
997
998 if pr := r.prs.Progress[r.id]; pr != nil {
999 pr.RecentActive = true
1000 }
1001 if !r.prs.QuorumActive() {
1002 r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
1003 r.becomeFollower(r.Term, None)
1004 }
1005
1006
1007 r.prs.Visit(func(id uint64, pr *tracker.Progress) {
1008 if id != r.id {
1009 pr.RecentActive = false
1010 }
1011 })
1012 return nil
1013 case pb.MsgProp:
1014 if len(m.Entries) == 0 {
1015 r.logger.Panicf("%x stepped empty MsgProp", r.id)
1016 }
1017 if r.prs.Progress[r.id] == nil {
1018
1019
1020
1021 return ErrProposalDropped
1022 }
1023 if r.leadTransferee != None {
1024 r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
1025 return ErrProposalDropped
1026 }
1027
1028 for i := range m.Entries {
1029 e := &m.Entries[i]
1030 var cc pb.ConfChangeI
1031 if e.Type == pb.EntryConfChange {
1032 var ccc pb.ConfChange
1033 if err := ccc.Unmarshal(e.Data); err != nil {
1034 panic(err)
1035 }
1036 cc = ccc
1037 } else if e.Type == pb.EntryConfChangeV2 {
1038 var ccc pb.ConfChangeV2
1039 if err := ccc.Unmarshal(e.Data); err != nil {
1040 panic(err)
1041 }
1042 cc = ccc
1043 }
1044 if cc != nil {
1045 alreadyPending := r.pendingConfIndex > r.raftLog.applied
1046 alreadyJoint := len(r.prs.Config.Voters[1]) > 0
1047 wantsLeaveJoint := len(cc.AsV2().Changes) == 0
1048
1049 var refused string
1050 if alreadyPending {
1051 refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
1052 } else if alreadyJoint && !wantsLeaveJoint {
1053 refused = "must transition out of joint config first"
1054 } else if !alreadyJoint && wantsLeaveJoint {
1055 refused = "not in joint state; refusing empty conf change"
1056 }
1057
1058 if refused != "" {
1059 r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
1060 m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
1061 } else {
1062 r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
1063 }
1064 }
1065 }
1066
1067 if !r.appendEntry(m.Entries...) {
1068 return ErrProposalDropped
1069 }
1070 r.bcastAppend()
1071 return nil
1072 case pb.MsgReadIndex:
1073
1074 if r.prs.IsSingleton() {
1075 if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
1076 r.send(resp)
1077 }
1078 return nil
1079 }
1080
1081
1082
1083 if !r.committedEntryInCurrentTerm() {
1084 r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
1085 return nil
1086 }
1087
1088 sendMsgReadIndexResponse(r, m)
1089
1090 return nil
1091 }
1092
1093
1094 pr := r.prs.Progress[m.From]
1095 if pr == nil {
1096 r.logger.Debugf("%x no progress available for %x", r.id, m.From)
1097 return nil
1098 }
1099 switch m.Type {
1100 case pb.MsgAppResp:
1101 pr.RecentActive = true
1102
1103 if m.Reject {
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124 r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d",
1125 r.id, m.RejectHint, m.LogTerm, m.From, m.Index)
1126 nextProbeIdx := m.RejectHint
1127 if m.LogTerm > 0 {
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222 nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
1223 }
1224 if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
1225 r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
1226 if pr.State == tracker.StateReplicate {
1227 pr.BecomeProbe()
1228 }
1229 r.sendAppend(m.From)
1230 }
1231 } else {
1232 oldPaused := pr.IsPaused()
1233 if pr.MaybeUpdate(m.Index) {
1234 switch {
1235 case pr.State == tracker.StateProbe:
1236 pr.BecomeReplicate()
1237 case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
1238
1239
1240
1241 r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1242
1243
1244
1245
1246
1247 pr.BecomeProbe()
1248 pr.BecomeReplicate()
1249 case pr.State == tracker.StateReplicate:
1250 pr.Inflights.FreeLE(m.Index)
1251 }
1252
1253 if r.maybeCommit() {
1254
1255
1256 releasePendingReadIndexMessages(r)
1257 r.bcastAppend()
1258 } else if oldPaused {
1259
1260
1261 r.sendAppend(m.From)
1262 }
1263
1264
1265
1266
1267
1268
1269 for r.maybeSendAppend(m.From, false) {
1270 }
1271
1272 if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
1273 r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
1274 r.sendTimeoutNow(m.From)
1275 }
1276 }
1277 }
1278 case pb.MsgHeartbeatResp:
1279 pr.RecentActive = true
1280 pr.ProbeSent = false
1281
1282
1283 if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
1284 pr.Inflights.FreeFirstOne()
1285 }
1286 if pr.Match < r.raftLog.lastIndex() {
1287 r.sendAppend(m.From)
1288 }
1289
1290 if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
1291 return nil
1292 }
1293
1294 if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
1295 return nil
1296 }
1297
1298 rss := r.readOnly.advance(m)
1299 for _, rs := range rss {
1300 if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
1301 r.send(resp)
1302 }
1303 }
1304 case pb.MsgSnapStatus:
1305 if pr.State != tracker.StateSnapshot {
1306 return nil
1307 }
1308
1309
1310
1311
1312 if !m.Reject {
1313 pr.BecomeProbe()
1314 r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1315 } else {
1316
1317
1318 pr.PendingSnapshot = 0
1319 pr.BecomeProbe()
1320 r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
1321 }
1322
1323
1324
1325 pr.ProbeSent = true
1326 case pb.MsgUnreachable:
1327
1328
1329 if pr.State == tracker.StateReplicate {
1330 pr.BecomeProbe()
1331 }
1332 r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
1333 case pb.MsgTransferLeader:
1334 if pr.IsLearner {
1335 r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
1336 return nil
1337 }
1338 leadTransferee := m.From
1339 lastLeadTransferee := r.leadTransferee
1340 if lastLeadTransferee != None {
1341 if lastLeadTransferee == leadTransferee {
1342 r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
1343 r.id, r.Term, leadTransferee, leadTransferee)
1344 return nil
1345 }
1346 r.abortLeaderTransfer()
1347 r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
1348 }
1349 if leadTransferee == r.id {
1350 r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
1351 return nil
1352 }
1353
1354 r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
1355
1356 r.electionElapsed = 0
1357 r.leadTransferee = leadTransferee
1358 if pr.Match == r.raftLog.lastIndex() {
1359 r.sendTimeoutNow(leadTransferee)
1360 r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
1361 } else {
1362 r.sendAppend(leadTransferee)
1363 }
1364 }
1365 return nil
1366 }
1367
1368
1369
1370 func stepCandidate(r *raft, m pb.Message) error {
1371
1372
1373
1374 var myVoteRespType pb.MessageType
1375 if r.state == StatePreCandidate {
1376 myVoteRespType = pb.MsgPreVoteResp
1377 } else {
1378 myVoteRespType = pb.MsgVoteResp
1379 }
1380 switch m.Type {
1381 case pb.MsgProp:
1382 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1383 return ErrProposalDropped
1384 case pb.MsgApp:
1385 r.becomeFollower(m.Term, m.From)
1386 r.handleAppendEntries(m)
1387 case pb.MsgHeartbeat:
1388 r.becomeFollower(m.Term, m.From)
1389 r.handleHeartbeat(m)
1390 case pb.MsgSnap:
1391 r.becomeFollower(m.Term, m.From)
1392 r.handleSnapshot(m)
1393 case myVoteRespType:
1394 gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
1395 r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
1396 switch res {
1397 case quorum.VoteWon:
1398 if r.state == StatePreCandidate {
1399 r.campaign(campaignElection)
1400 } else {
1401 r.becomeLeader()
1402 r.bcastAppend()
1403 }
1404 case quorum.VoteLost:
1405
1406
1407 r.becomeFollower(r.Term, None)
1408 }
1409 case pb.MsgTimeoutNow:
1410 r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
1411 }
1412 return nil
1413 }
1414
1415 func stepFollower(r *raft, m pb.Message) error {
1416 switch m.Type {
1417 case pb.MsgProp:
1418 if r.lead == None {
1419 r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
1420 return ErrProposalDropped
1421 } else if r.disableProposalForwarding {
1422 r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
1423 return ErrProposalDropped
1424 }
1425 m.To = r.lead
1426 r.send(m)
1427 case pb.MsgApp:
1428 r.electionElapsed = 0
1429 r.lead = m.From
1430 r.handleAppendEntries(m)
1431 case pb.MsgHeartbeat:
1432 r.electionElapsed = 0
1433 r.lead = m.From
1434 r.handleHeartbeat(m)
1435 case pb.MsgSnap:
1436 r.electionElapsed = 0
1437 r.lead = m.From
1438 r.handleSnapshot(m)
1439 case pb.MsgTransferLeader:
1440 if r.lead == None {
1441 r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
1442 return nil
1443 }
1444 m.To = r.lead
1445 r.send(m)
1446 case pb.MsgTimeoutNow:
1447 r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
1448
1449
1450
1451 r.hup(campaignTransfer)
1452 case pb.MsgReadIndex:
1453 if r.lead == None {
1454 r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
1455 return nil
1456 }
1457 m.To = r.lead
1458 r.send(m)
1459 case pb.MsgReadIndexResp:
1460 if len(m.Entries) != 1 {
1461 r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
1462 return nil
1463 }
1464 r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
1465 }
1466 return nil
1467 }
1468
1469 func (r *raft) handleAppendEntries(m pb.Message) {
1470 if m.Index < r.raftLog.committed {
1471 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1472 return
1473 }
1474
1475 if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
1476 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
1477 } else {
1478 r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
1479 r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490 hintIndex := min(m.Index, r.raftLog.lastIndex())
1491 hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
1492 hintTerm, err := r.raftLog.term(hintIndex)
1493 if err != nil {
1494 panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
1495 }
1496 r.send(pb.Message{
1497 To: m.From,
1498 Type: pb.MsgAppResp,
1499 Index: m.Index,
1500 Reject: true,
1501 RejectHint: hintIndex,
1502 LogTerm: hintTerm,
1503 })
1504 }
1505 }
1506
1507 func (r *raft) handleHeartbeat(m pb.Message) {
1508 r.raftLog.commitTo(m.Commit)
1509 r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
1510 }
1511
1512 func (r *raft) handleSnapshot(m pb.Message) {
1513 sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
1514 if r.restore(m.Snapshot) {
1515 r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
1516 r.id, r.raftLog.committed, sindex, sterm)
1517 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
1518 } else {
1519 r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
1520 r.id, r.raftLog.committed, sindex, sterm)
1521 r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
1522 }
1523 }
1524
1525
1526
1527
1528 func (r *raft) restore(s pb.Snapshot) bool {
1529 if s.Metadata.Index <= r.raftLog.committed {
1530 return false
1531 }
1532 if r.state != StateFollower {
1533
1534
1535
1536
1537
1538
1539
1540 r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
1541 r.becomeFollower(r.Term+1, None)
1542 return false
1543 }
1544
1545
1546
1547
1548 found := false
1549 cs := s.Metadata.ConfState
1550
1551 for _, set := range [][]uint64{
1552 cs.Voters,
1553 cs.Learners,
1554 cs.VotersOutgoing,
1555
1556
1557 } {
1558 for _, id := range set {
1559 if id == r.id {
1560 found = true
1561 break
1562 }
1563 }
1564 if found {
1565 break
1566 }
1567 }
1568 if !found {
1569 r.logger.Warningf(
1570 "%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
1571 r.id, cs,
1572 )
1573 return false
1574 }
1575
1576
1577
1578 if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
1579 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
1580 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1581 r.raftLog.commitTo(s.Metadata.Index)
1582 return false
1583 }
1584
1585 r.raftLog.restore(s)
1586
1587
1588 r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
1589 cfg, prs, err := confchange.Restore(confchange.Changer{
1590 Tracker: r.prs,
1591 LastIndex: r.raftLog.lastIndex(),
1592 }, cs)
1593
1594 if err != nil {
1595
1596
1597 panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
1598 }
1599
1600 assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
1601
1602 pr := r.prs.Progress[r.id]
1603 pr.MaybeUpdate(pr.Next - 1)
1604
1605 r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
1606 r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
1607 return true
1608 }
1609
1610
1611
1612 func (r *raft) promotable() bool {
1613 pr := r.prs.Progress[r.id]
1614 return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
1615 }
1616
1617 func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
1618 cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
1619 changer := confchange.Changer{
1620 Tracker: r.prs,
1621 LastIndex: r.raftLog.lastIndex(),
1622 }
1623 if cc.LeaveJoint() {
1624 return changer.LeaveJoint()
1625 } else if autoLeave, ok := cc.EnterJoint(); ok {
1626 return changer.EnterJoint(autoLeave, cc.Changes...)
1627 }
1628 return changer.Simple(cc.Changes...)
1629 }()
1630
1631 if err != nil {
1632
1633 panic(err)
1634 }
1635
1636 return r.switchToConfig(cfg, prs)
1637 }
1638
1639
1640
1641
1642
1643
1644
1645 func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
1646 r.prs.Config = cfg
1647 r.prs.Progress = prs
1648
1649 r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
1650 cs := r.prs.ConfState()
1651 pr, ok := r.prs.Progress[r.id]
1652
1653
1654
1655 r.isLearner = ok && pr.IsLearner
1656
1657 if (!ok || r.isLearner) && r.state == StateLeader {
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667 return cs
1668 }
1669
1670
1671
1672 if r.state != StateLeader || len(cs.Voters) == 0 {
1673 return cs
1674 }
1675
1676 if r.maybeCommit() {
1677
1678
1679 r.bcastAppend()
1680 } else {
1681
1682
1683
1684 r.prs.Visit(func(id uint64, pr *tracker.Progress) {
1685 r.maybeSendAppend(id, false )
1686 })
1687 }
1688
1689 if _, tOK := r.prs.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
1690 r.abortLeaderTransfer()
1691 }
1692
1693 return cs
1694 }
1695
1696 func (r *raft) loadState(state pb.HardState) {
1697 if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
1698 r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
1699 }
1700 r.raftLog.committed = state.Commit
1701 r.Term = state.Term
1702 r.Vote = state.Vote
1703 }
1704
1705
1706
1707
1708 func (r *raft) pastElectionTimeout() bool {
1709 return r.electionElapsed >= r.randomizedElectionTimeout
1710 }
1711
1712 func (r *raft) resetRandomizedElectionTimeout() {
1713 r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
1714 }
1715
1716 func (r *raft) sendTimeoutNow(to uint64) {
1717 r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
1718 }
1719
1720 func (r *raft) abortLeaderTransfer() {
1721 r.leadTransferee = None
1722 }
1723
1724
1725 func (r *raft) committedEntryInCurrentTerm() bool {
1726 return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term
1727 }
1728
1729
1730
1731 func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
1732 if req.From == None || req.From == r.id {
1733 r.readStates = append(r.readStates, ReadState{
1734 Index: readIndex,
1735 RequestCtx: req.Entries[0].Data,
1736 })
1737 return pb.Message{}
1738 }
1739 return pb.Message{
1740 Type: pb.MsgReadIndexResp,
1741 To: req.From,
1742 Index: readIndex,
1743 Entries: req.Entries,
1744 }
1745 }
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755 func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
1756 var s uint64
1757 for _, e := range ents {
1758 s += uint64(PayloadSize(e))
1759 }
1760
1761 if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
1762
1763
1764
1765
1766
1767
1768
1769 return false
1770 }
1771 r.uncommittedSize += s
1772 return true
1773 }
1774
1775
1776
1777 func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
1778 if r.uncommittedSize == 0 {
1779
1780 return
1781 }
1782
1783 var s uint64
1784 for _, e := range ents {
1785 s += uint64(PayloadSize(e))
1786 }
1787 if s > r.uncommittedSize {
1788
1789
1790
1791 r.uncommittedSize = 0
1792 } else {
1793 r.uncommittedSize -= s
1794 }
1795 }
1796
1797 func numOfPendingConf(ents []pb.Entry) int {
1798 n := 0
1799 for i := range ents {
1800 if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
1801 n++
1802 }
1803 }
1804 return n
1805 }
1806
1807 func releasePendingReadIndexMessages(r *raft) {
1808 if !r.committedEntryInCurrentTerm() {
1809 r.logger.Error("pending MsgReadIndex should be released only after first commit in current term")
1810 return
1811 }
1812
1813 msgs := r.pendingReadIndexMessages
1814 r.pendingReadIndexMessages = nil
1815
1816 for _, m := range msgs {
1817 sendMsgReadIndexResponse(r, m)
1818 }
1819 }
1820
1821 func sendMsgReadIndexResponse(r *raft, m pb.Message) {
1822
1823
1824
1825 switch r.readOnly.option {
1826
1827 case ReadOnlySafe:
1828 r.readOnly.addRequest(r.raftLog.committed, m)
1829
1830 r.readOnly.recvAck(r.id, m.Entries[0].Data)
1831 r.bcastHeartbeatWithCtx(m.Entries[0].Data)
1832 case ReadOnlyLeaseBased:
1833 if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
1834 r.send(resp)
1835 }
1836 }
1837 }
1838
View as plain text