1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package raft
16
17 import (
18 "bytes"
19 "fmt"
20 "math"
21 "math/rand"
22 "reflect"
23 "strings"
24 "testing"
25
26 pb "go.etcd.io/etcd/raft/v3/raftpb"
27 "go.etcd.io/etcd/raft/v3/tracker"
28 )
29
30
31 func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
32
33 s.Append(r.raftLog.unstableEntries())
34 r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
35
36 ents = r.raftLog.nextEnts()
37 r.raftLog.appliedTo(r.raftLog.committed)
38 return ents
39 }
40
41 func mustAppendEntry(r *raft, ents ...pb.Entry) {
42 if !r.appendEntry(ents...) {
43 panic("entry unexpectedly dropped")
44 }
45 }
46
47 type stateMachine interface {
48 Step(m pb.Message) error
49 readMessages() []pb.Message
50 }
51
52 func (r *raft) readMessages() []pb.Message {
53 msgs := r.msgs
54 r.msgs = make([]pb.Message, 0)
55
56 return msgs
57 }
58
59 func TestProgressLeader(t *testing.T) {
60 r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
61 r.becomeCandidate()
62 r.becomeLeader()
63 r.prs.Progress[2].BecomeReplicate()
64
65
66 propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
67 for i := 0; i < 5; i++ {
68 if pr := r.prs.Progress[r.id]; pr.State != tracker.StateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
69 t.Errorf("unexpected progress %v", pr)
70 }
71 if err := r.Step(propMsg); err != nil {
72 t.Fatalf("proposal resulted in error: %v", err)
73 }
74 }
75 }
76
77
78 func TestProgressResumeByHeartbeatResp(t *testing.T) {
79 r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
80 r.becomeCandidate()
81 r.becomeLeader()
82
83 r.prs.Progress[2].ProbeSent = true
84
85 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
86 if !r.prs.Progress[2].ProbeSent {
87 t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
88 }
89
90 r.prs.Progress[2].BecomeReplicate()
91 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
92 if r.prs.Progress[2].ProbeSent {
93 t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
94 }
95 }
96
97 func TestProgressPaused(t *testing.T) {
98 r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
99 r.becomeCandidate()
100 r.becomeLeader()
101 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
102 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
103 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
104
105 ms := r.readMessages()
106 if len(ms) != 1 {
107 t.Errorf("len(ms) = %d, want 1", len(ms))
108 }
109 }
110
111 func TestProgressFlowControl(t *testing.T) {
112 cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
113 cfg.MaxInflightMsgs = 3
114 cfg.MaxSizePerMsg = 2048
115 r := newRaft(cfg)
116 r.becomeCandidate()
117 r.becomeLeader()
118
119
120 r.readMessages()
121
122
123 r.prs.Progress[2].BecomeProbe()
124 blob := []byte(strings.Repeat("a", 1000))
125 for i := 0; i < 10; i++ {
126 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
127 }
128
129 ms := r.readMessages()
130
131
132
133 if len(ms) != 1 || ms[0].Type != pb.MsgApp {
134 t.Fatalf("expected 1 MsgApp, got %v", ms)
135 }
136 if len(ms[0].Entries) != 2 {
137 t.Fatalf("expected 2 entries, got %d", len(ms[0].Entries))
138 }
139 if len(ms[0].Entries[0].Data) != 0 || len(ms[0].Entries[1].Data) != 1000 {
140 t.Fatalf("unexpected entry sizes: %v", ms[0].Entries)
141 }
142
143
144
145 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index})
146 ms = r.readMessages()
147 if len(ms) != 3 {
148 t.Fatalf("expected 3 messages, got %d", len(ms))
149 }
150 for i, m := range ms {
151 if m.Type != pb.MsgApp {
152 t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
153 }
154 if len(m.Entries) != 2 {
155 t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries))
156 }
157 }
158
159
160
161 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index})
162 ms = r.readMessages()
163 if len(ms) != 2 {
164 t.Fatalf("expected 2 messages, got %d", len(ms))
165 }
166 for i, m := range ms {
167 if m.Type != pb.MsgApp {
168 t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
169 }
170 }
171 if len(ms[0].Entries) != 2 {
172 t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries))
173 }
174 if len(ms[1].Entries) != 1 {
175 t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries))
176 }
177 }
178
179 func TestUncommittedEntryLimit(t *testing.T) {
180
181
182
183
184
185 const maxEntries = 1024
186 testEntry := pb.Entry{Data: []byte("testdata")}
187 maxEntrySize := maxEntries * PayloadSize(testEntry)
188
189 if n := PayloadSize(pb.Entry{Data: nil}); n != 0 {
190 t.Fatal("entry with no Data must have zero payload size")
191 }
192
193 cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
194 cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
195 cfg.MaxInflightMsgs = 2 * 1024
196 r := newRaft(cfg)
197 r.becomeCandidate()
198 r.becomeLeader()
199 if n := r.uncommittedSize; n != 0 {
200 t.Fatalf("expected zero uncommitted size, got %d bytes", n)
201 }
202
203
204 const numFollowers = 2
205 r.prs.Progress[2].BecomeReplicate()
206 r.prs.Progress[3].BecomeReplicate()
207 r.uncommittedSize = 0
208
209
210 propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{testEntry}}
211 propEnts := make([]pb.Entry, maxEntries)
212 for i := 0; i < maxEntries; i++ {
213 if err := r.Step(propMsg); err != nil {
214 t.Fatalf("proposal resulted in error: %v", err)
215 }
216 propEnts[i] = testEntry
217 }
218
219
220 if err := r.Step(propMsg); err != ErrProposalDropped {
221 t.Fatalf("proposal not dropped: %v", err)
222 }
223
224
225
226 ms := r.readMessages()
227 if e := maxEntries * numFollowers; len(ms) != e {
228 t.Fatalf("expected %d messages, got %d", e, len(ms))
229 }
230 r.reduceUncommittedSize(propEnts)
231 if r.uncommittedSize != 0 {
232 t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize)
233 }
234
235
236
237 propEnts = make([]pb.Entry, 2*maxEntries)
238 for i := range propEnts {
239 propEnts[i] = testEntry
240 }
241 propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts}
242 if err := r.Step(propMsgLarge); err != nil {
243 t.Fatalf("proposal resulted in error: %v", err)
244 }
245
246
247 if err := r.Step(propMsg); err != ErrProposalDropped {
248 t.Fatalf("proposal not dropped: %v", err)
249 }
250
251
252
253
254 if err := r.Step(
255 pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}},
256 ); err != nil {
257 t.Fatal(err)
258 }
259
260
261
262 ms = r.readMessages()
263 if e := 2 * numFollowers; len(ms) != e {
264 t.Fatalf("expected %d messages, got %d", e, len(ms))
265 }
266 r.reduceUncommittedSize(propEnts)
267 if n := r.uncommittedSize; n != 0 {
268 t.Fatalf("expected zero uncommitted size, got %d", n)
269 }
270 }
271
272 func TestLeaderElection(t *testing.T) {
273 testLeaderElection(t, false)
274 }
275
276 func TestLeaderElectionPreVote(t *testing.T) {
277 testLeaderElection(t, true)
278 }
279
280 func testLeaderElection(t *testing.T, preVote bool) {
281 var cfg func(*Config)
282 candState := StateCandidate
283 candTerm := uint64(1)
284 if preVote {
285 cfg = preVoteConfig
286
287
288
289 candState = StatePreCandidate
290 candTerm = 0
291 }
292 tests := []struct {
293 *network
294 state StateType
295 expTerm uint64
296 }{
297 {newNetworkWithConfig(cfg, nil, nil, nil), StateLeader, 1},
298 {newNetworkWithConfig(cfg, nil, nil, nopStepper), StateLeader, 1},
299 {newNetworkWithConfig(cfg, nil, nopStepper, nopStepper), candState, candTerm},
300 {newNetworkWithConfig(cfg, nil, nopStepper, nopStepper, nil), candState, candTerm},
301 {newNetworkWithConfig(cfg, nil, nopStepper, nopStepper, nil, nil), StateLeader, 1},
302
303
304
305 {newNetworkWithConfig(cfg,
306 nil, entsWithConfig(cfg, 1), entsWithConfig(cfg, 1), entsWithConfig(cfg, 1, 1), nil),
307 StateFollower, 1},
308 }
309
310 for i, tt := range tests {
311 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
312 sm := tt.network.peers[1].(*raft)
313 if sm.state != tt.state {
314 t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
315 }
316 if g := sm.Term; g != tt.expTerm {
317 t.Errorf("#%d: term = %d, want %d", i, g, tt.expTerm)
318 }
319 }
320 }
321
322
323
324 func TestLearnerElectionTimeout(t *testing.T) {
325 n1 := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
326 n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
327
328 n1.becomeFollower(1, None)
329 n2.becomeFollower(1, None)
330
331
332 setRandomizedElectionTimeout(n2, n2.electionTimeout)
333 for i := 0; i < n2.electionTimeout; i++ {
334 n2.tick()
335 }
336
337 if n2.state != StateFollower {
338 t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower)
339 }
340 }
341
342
343
344 func TestLearnerPromotion(t *testing.T) {
345 n1 := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
346 n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
347
348 n1.becomeFollower(1, None)
349 n2.becomeFollower(1, None)
350
351 nt := newNetwork(n1, n2)
352
353 if n1.state == StateLeader {
354 t.Error("peer 1 state is leader, want not", n1.state)
355 }
356
357
358 setRandomizedElectionTimeout(n1, n1.electionTimeout)
359 for i := 0; i < n1.electionTimeout; i++ {
360 n1.tick()
361 }
362
363 if n1.state != StateLeader {
364 t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader)
365 }
366 if n2.state != StateFollower {
367 t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower)
368 }
369
370 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
371
372 n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
373 n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
374 if n2.isLearner {
375 t.Error("peer 2 is learner, want not")
376 }
377
378
379 setRandomizedElectionTimeout(n2, n2.electionTimeout)
380 for i := 0; i < n2.electionTimeout; i++ {
381 n2.tick()
382 }
383
384 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
385
386 if n1.state != StateFollower {
387 t.Errorf("peer 1 state: %s, want %s", n1.state, StateFollower)
388 }
389 if n2.state != StateLeader {
390 t.Errorf("peer 2 state: %s, want %s", n2.state, StateLeader)
391 }
392 }
393
394
395
396 func TestLearnerCanVote(t *testing.T) {
397 n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
398
399 n2.becomeFollower(1, None)
400
401 n2.Step(pb.Message{From: 1, To: 2, Term: 2, Type: pb.MsgVote, LogTerm: 11, Index: 11})
402
403 if len(n2.msgs) != 1 {
404 t.Fatalf("expected exactly one message, not %+v", n2.msgs)
405 }
406 msg := n2.msgs[0]
407 if msg.Type != pb.MsgVoteResp && !msg.Reject {
408 t.Fatal("expected learner to not reject vote")
409 }
410 }
411
412 func TestLeaderCycle(t *testing.T) {
413 testLeaderCycle(t, false)
414 }
415
416 func TestLeaderCyclePreVote(t *testing.T) {
417 testLeaderCycle(t, true)
418 }
419
420
421
422
423
424 func testLeaderCycle(t *testing.T, preVote bool) {
425 var cfg func(*Config)
426 if preVote {
427 cfg = preVoteConfig
428 }
429 n := newNetworkWithConfig(cfg, nil, nil, nil)
430 for campaignerID := uint64(1); campaignerID <= 3; campaignerID++ {
431 n.send(pb.Message{From: campaignerID, To: campaignerID, Type: pb.MsgHup})
432
433 for _, peer := range n.peers {
434 sm := peer.(*raft)
435 if sm.id == campaignerID && sm.state != StateLeader {
436 t.Errorf("preVote=%v: campaigning node %d state = %v, want StateLeader",
437 preVote, sm.id, sm.state)
438 } else if sm.id != campaignerID && sm.state != StateFollower {
439 t.Errorf("preVote=%v: after campaign of node %d, "+
440 "node %d had state = %v, want StateFollower",
441 preVote, campaignerID, sm.id, sm.state)
442 }
443 }
444 }
445 }
446
447
448
449
450
451 func TestLeaderElectionOverwriteNewerLogs(t *testing.T) {
452 testLeaderElectionOverwriteNewerLogs(t, false)
453 }
454
455 func TestLeaderElectionOverwriteNewerLogsPreVote(t *testing.T) {
456 testLeaderElectionOverwriteNewerLogs(t, true)
457 }
458
459 func testLeaderElectionOverwriteNewerLogs(t *testing.T, preVote bool) {
460 var cfg func(*Config)
461 if preVote {
462 cfg = preVoteConfig
463 }
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478 n := newNetworkWithConfig(cfg,
479 entsWithConfig(cfg, 1),
480 entsWithConfig(cfg, 1),
481 entsWithConfig(cfg, 2),
482 votedWithConfig(cfg, 3, 2),
483 votedWithConfig(cfg, 3, 2))
484
485
486
487
488 n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
489 sm1 := n.peers[1].(*raft)
490 if sm1.state != StateFollower {
491 t.Errorf("state = %s, want StateFollower", sm1.state)
492 }
493 if sm1.Term != 2 {
494 t.Errorf("term = %d, want 2", sm1.Term)
495 }
496
497
498 n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
499 if sm1.state != StateLeader {
500 t.Errorf("state = %s, want StateLeader", sm1.state)
501 }
502 if sm1.Term != 3 {
503 t.Errorf("term = %d, want 3", sm1.Term)
504 }
505
506
507
508 for i := range n.peers {
509 sm := n.peers[i].(*raft)
510 entries := sm.raftLog.allEntries()
511 if len(entries) != 2 {
512 t.Fatalf("node %d: len(entries) == %d, want 2", i, len(entries))
513 }
514 if entries[0].Term != 1 {
515 t.Errorf("node %d: term at index 1 == %d, want 1", i, entries[0].Term)
516 }
517 if entries[1].Term != 3 {
518 t.Errorf("node %d: term at index 2 == %d, want 3", i, entries[1].Term)
519 }
520 }
521 }
522
523 func TestVoteFromAnyState(t *testing.T) {
524 testVoteFromAnyState(t, pb.MsgVote)
525 }
526
527 func TestPreVoteFromAnyState(t *testing.T) {
528 testVoteFromAnyState(t, pb.MsgPreVote)
529 }
530
531 func testVoteFromAnyState(t *testing.T, vt pb.MessageType) {
532 for st := StateType(0); st < numStates; st++ {
533 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
534 r.Term = 1
535
536 switch st {
537 case StateFollower:
538 r.becomeFollower(r.Term, 3)
539 case StatePreCandidate:
540 r.becomePreCandidate()
541 case StateCandidate:
542 r.becomeCandidate()
543 case StateLeader:
544 r.becomeCandidate()
545 r.becomeLeader()
546 }
547
548
549
550 origTerm := r.Term
551 newTerm := r.Term + 1
552
553 msg := pb.Message{
554 From: 2,
555 To: 1,
556 Type: vt,
557 Term: newTerm,
558 LogTerm: newTerm,
559 Index: 42,
560 }
561 if err := r.Step(msg); err != nil {
562 t.Errorf("%s,%s: Step failed: %s", vt, st, err)
563 }
564 if len(r.msgs) != 1 {
565 t.Errorf("%s,%s: %d response messages, want 1: %+v", vt, st, len(r.msgs), r.msgs)
566 } else {
567 resp := r.msgs[0]
568 if resp.Type != voteRespMsgType(vt) {
569 t.Errorf("%s,%s: response message is %s, want %s",
570 vt, st, resp.Type, voteRespMsgType(vt))
571 }
572 if resp.Reject {
573 t.Errorf("%s,%s: unexpected rejection", vt, st)
574 }
575 }
576
577
578 if vt == pb.MsgVote {
579 if r.state != StateFollower {
580 t.Errorf("%s,%s: state %s, want %s", vt, st, r.state, StateFollower)
581 }
582 if r.Term != newTerm {
583 t.Errorf("%s,%s: term %d, want %d", vt, st, r.Term, newTerm)
584 }
585 if r.Vote != 2 {
586 t.Errorf("%s,%s: vote %d, want 2", vt, st, r.Vote)
587 }
588 } else {
589
590 if r.state != st {
591 t.Errorf("%s,%s: state %s, want %s", vt, st, r.state, st)
592 }
593 if r.Term != origTerm {
594 t.Errorf("%s,%s: term %d, want %d", vt, st, r.Term, origTerm)
595 }
596
597
598 if r.Vote != None && r.Vote != 1 {
599 t.Errorf("%s,%s: vote %d, want %d or 1", vt, st, r.Vote, None)
600 }
601 }
602 }
603 }
604
605 func TestLogReplication(t *testing.T) {
606 tests := []struct {
607 *network
608 msgs []pb.Message
609 wcommitted uint64
610 }{
611 {
612 newNetwork(nil, nil, nil),
613 []pb.Message{
614 {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
615 },
616 2,
617 },
618 {
619 newNetwork(nil, nil, nil),
620 []pb.Message{
621 {From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
622 {From: 1, To: 2, Type: pb.MsgHup},
623 {From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
624 },
625 4,
626 },
627 }
628
629 for i, tt := range tests {
630 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
631
632 for _, m := range tt.msgs {
633 tt.send(m)
634 }
635
636 for j, x := range tt.network.peers {
637 sm := x.(*raft)
638
639 if sm.raftLog.committed != tt.wcommitted {
640 t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
641 }
642
643 ents := []pb.Entry{}
644 for _, e := range nextEnts(sm, tt.network.storage[j]) {
645 if e.Data != nil {
646 ents = append(ents, e)
647 }
648 }
649 props := []pb.Message{}
650 for _, m := range tt.msgs {
651 if m.Type == pb.MsgProp {
652 props = append(props, m)
653 }
654 }
655 for k, m := range props {
656 if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
657 t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
658 }
659 }
660 }
661 }
662 }
663
664
665 func TestLearnerLogReplication(t *testing.T) {
666 n1 := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
667 n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
668
669 nt := newNetwork(n1, n2)
670
671 n1.becomeFollower(1, None)
672 n2.becomeFollower(1, None)
673
674 setRandomizedElectionTimeout(n1, n1.electionTimeout)
675 for i := 0; i < n1.electionTimeout; i++ {
676 n1.tick()
677 }
678
679 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
680
681
682 if n1.state != StateLeader {
683 t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader)
684 }
685 if !n2.isLearner {
686 t.Error("peer 2 state: not learner, want yes")
687 }
688
689 nextCommitted := n1.raftLog.committed + 1
690 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
691 if n1.raftLog.committed != nextCommitted {
692 t.Errorf("peer 1 wants committed to %d, but still %d", nextCommitted, n1.raftLog.committed)
693 }
694
695 if n1.raftLog.committed != n2.raftLog.committed {
696 t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
697 }
698
699 match := n1.prs.Progress[2].Match
700 if match != n2.raftLog.committed {
701 t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
702 }
703 }
704
705 func TestSingleNodeCommit(t *testing.T) {
706 tt := newNetwork(nil)
707 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
708 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
709 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
710
711 sm := tt.peers[1].(*raft)
712 if sm.raftLog.committed != 3 {
713 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
714 }
715 }
716
717
718
719
720 func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
721 tt := newNetwork(nil, nil, nil, nil, nil)
722 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
723
724
725 tt.cut(1, 3)
726 tt.cut(1, 4)
727 tt.cut(1, 5)
728
729 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
730 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
731
732 sm := tt.peers[1].(*raft)
733 if sm.raftLog.committed != 1 {
734 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
735 }
736
737
738 tt.recover()
739
740 tt.ignore(pb.MsgApp)
741
742
743 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
744
745
746 sm = tt.peers[2].(*raft)
747 if sm.raftLog.committed != 1 {
748 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
749 }
750
751 tt.recover()
752
753 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
754
755 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
756
757 if sm.raftLog.committed != 5 {
758 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
759 }
760 }
761
762
763
764 func TestCommitWithoutNewTermEntry(t *testing.T) {
765 tt := newNetwork(nil, nil, nil, nil, nil)
766 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
767
768
769 tt.cut(1, 3)
770 tt.cut(1, 4)
771 tt.cut(1, 5)
772
773 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
774 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
775
776 sm := tt.peers[1].(*raft)
777 if sm.raftLog.committed != 1 {
778 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
779 }
780
781
782 tt.recover()
783
784
785
786
787 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
788
789 if sm.raftLog.committed != 4 {
790 t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
791 }
792 }
793
794 func TestDuelingCandidates(t *testing.T) {
795 a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
796 b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
797 c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
798
799 nt := newNetwork(a, b, c)
800 nt.cut(1, 3)
801
802 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
803 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
804
805
806 sm := nt.peers[1].(*raft)
807 if sm.state != StateLeader {
808 t.Errorf("state = %s, want %s", sm.state, StateLeader)
809 }
810
811
812 sm = nt.peers[3].(*raft)
813 if sm.state != StateCandidate {
814 t.Errorf("state = %s, want %s", sm.state, StateCandidate)
815 }
816
817 nt.recover()
818
819
820
821
822 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
823
824 wlog := &raftLog{
825 storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}},
826 committed: 1,
827 unstable: unstable{offset: 2},
828 }
829 tests := []struct {
830 sm *raft
831 state StateType
832 term uint64
833 raftLog *raftLog
834 }{
835 {a, StateFollower, 2, wlog},
836 {b, StateFollower, 2, wlog},
837 {c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)},
838 }
839
840 for i, tt := range tests {
841 if g := tt.sm.state; g != tt.state {
842 t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
843 }
844 if g := tt.sm.Term; g != tt.term {
845 t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
846 }
847 base := ltoa(tt.raftLog)
848 if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
849 l := ltoa(sm.raftLog)
850 if g := diffu(base, l); g != "" {
851 t.Errorf("#%d: diff:\n%s", i, g)
852 }
853 } else {
854 t.Logf("#%d: empty log", i)
855 }
856 }
857 }
858
859 func TestDuelingPreCandidates(t *testing.T) {
860 cfgA := newTestConfig(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
861 cfgB := newTestConfig(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
862 cfgC := newTestConfig(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
863 cfgA.PreVote = true
864 cfgB.PreVote = true
865 cfgC.PreVote = true
866 a := newRaft(cfgA)
867 b := newRaft(cfgB)
868 c := newRaft(cfgC)
869
870 nt := newNetwork(a, b, c)
871 nt.cut(1, 3)
872
873 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
874 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
875
876
877 sm := nt.peers[1].(*raft)
878 if sm.state != StateLeader {
879 t.Errorf("state = %s, want %s", sm.state, StateLeader)
880 }
881
882
883 sm = nt.peers[3].(*raft)
884 if sm.state != StateFollower {
885 t.Errorf("state = %s, want %s", sm.state, StateFollower)
886 }
887
888 nt.recover()
889
890
891
892 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
893
894 wlog := &raftLog{
895 storage: &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}},
896 committed: 1,
897 unstable: unstable{offset: 2},
898 }
899 tests := []struct {
900 sm *raft
901 state StateType
902 term uint64
903 raftLog *raftLog
904 }{
905 {a, StateLeader, 1, wlog},
906 {b, StateFollower, 1, wlog},
907 {c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger)},
908 }
909
910 for i, tt := range tests {
911 if g := tt.sm.state; g != tt.state {
912 t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
913 }
914 if g := tt.sm.Term; g != tt.term {
915 t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
916 }
917 base := ltoa(tt.raftLog)
918 if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
919 l := ltoa(sm.raftLog)
920 if g := diffu(base, l); g != "" {
921 t.Errorf("#%d: diff:\n%s", i, g)
922 }
923 } else {
924 t.Logf("#%d: empty log", i)
925 }
926 }
927 }
928
929 func TestCandidateConcede(t *testing.T) {
930 tt := newNetwork(nil, nil, nil)
931 tt.isolate(1)
932
933 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
934 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
935
936
937 tt.recover()
938
939 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat})
940
941 data := []byte("force follower")
942
943 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
944
945 tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat})
946
947 a := tt.peers[1].(*raft)
948 if g := a.state; g != StateFollower {
949 t.Errorf("state = %s, want %s", g, StateFollower)
950 }
951 if g := a.Term; g != 1 {
952 t.Errorf("term = %d, want %d", g, 1)
953 }
954 wantLog := ltoa(&raftLog{
955 storage: &MemoryStorage{
956 ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
957 },
958 unstable: unstable{offset: 3},
959 committed: 2,
960 })
961 for i, p := range tt.peers {
962 if sm, ok := p.(*raft); ok {
963 l := ltoa(sm.raftLog)
964 if g := diffu(wantLog, l); g != "" {
965 t.Errorf("#%d: diff:\n%s", i, g)
966 }
967 } else {
968 t.Logf("#%d: empty log", i)
969 }
970 }
971 }
972
973 func TestSingleNodeCandidate(t *testing.T) {
974 tt := newNetwork(nil)
975 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
976
977 sm := tt.peers[1].(*raft)
978 if sm.state != StateLeader {
979 t.Errorf("state = %d, want %d", sm.state, StateLeader)
980 }
981 }
982
983 func TestSingleNodePreCandidate(t *testing.T) {
984 tt := newNetworkWithConfig(preVoteConfig, nil)
985 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
986
987 sm := tt.peers[1].(*raft)
988 if sm.state != StateLeader {
989 t.Errorf("state = %d, want %d", sm.state, StateLeader)
990 }
991 }
992
993 func TestOldMessages(t *testing.T) {
994 tt := newNetwork(nil, nil, nil)
995
996 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
997 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
998 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
999
1000 tt.send(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Entries: []pb.Entry{{Index: 3, Term: 2}}})
1001
1002 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
1003
1004 ilog := &raftLog{
1005 storage: &MemoryStorage{
1006 ents: []pb.Entry{
1007 {}, {Data: nil, Term: 1, Index: 1},
1008 {Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
1009 {Data: []byte("somedata"), Term: 3, Index: 4},
1010 },
1011 },
1012 unstable: unstable{offset: 5},
1013 committed: 4,
1014 }
1015 base := ltoa(ilog)
1016 for i, p := range tt.peers {
1017 if sm, ok := p.(*raft); ok {
1018 l := ltoa(sm.raftLog)
1019 if g := diffu(base, l); g != "" {
1020 t.Errorf("#%d: diff:\n%s", i, g)
1021 }
1022 } else {
1023 t.Logf("#%d: empty log", i)
1024 }
1025 }
1026 }
1027
1028
1029
1030 func TestProposal(t *testing.T) {
1031 tests := []struct {
1032 *network
1033 success bool
1034 }{
1035 {newNetwork(nil, nil, nil), true},
1036 {newNetwork(nil, nil, nopStepper), true},
1037 {newNetwork(nil, nopStepper, nopStepper), false},
1038 {newNetwork(nil, nopStepper, nopStepper, nil), false},
1039 {newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
1040 }
1041
1042 for j, tt := range tests {
1043 send := func(m pb.Message) {
1044 defer func() {
1045
1046 if !tt.success {
1047 e := recover()
1048 if e != nil {
1049 t.Logf("#%d: err: %s", j, e)
1050 }
1051 }
1052 }()
1053 tt.send(m)
1054 }
1055
1056 data := []byte("somedata")
1057
1058
1059 send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
1060 send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
1061
1062 wantLog := newLog(NewMemoryStorage(), raftLogger)
1063 if tt.success {
1064 wantLog = &raftLog{
1065 storage: &MemoryStorage{
1066 ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
1067 },
1068 unstable: unstable{offset: 3},
1069 committed: 2}
1070 }
1071 base := ltoa(wantLog)
1072 for i, p := range tt.peers {
1073 if sm, ok := p.(*raft); ok {
1074 l := ltoa(sm.raftLog)
1075 if g := diffu(base, l); g != "" {
1076 t.Errorf("#%d: peer %d diff:\n%s", j, i, g)
1077 }
1078 } else {
1079 t.Logf("#%d: peer %d empty log", j, i)
1080 }
1081 }
1082 sm := tt.network.peers[1].(*raft)
1083 if g := sm.Term; g != 1 {
1084 t.Errorf("#%d: term = %d, want %d", j, g, 1)
1085 }
1086 }
1087 }
1088
1089 func TestProposalByProxy(t *testing.T) {
1090 data := []byte("somedata")
1091 tests := []*network{
1092 newNetwork(nil, nil, nil),
1093 newNetwork(nil, nil, nopStepper),
1094 }
1095
1096 for j, tt := range tests {
1097
1098 tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
1099
1100
1101 tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
1102
1103 wantLog := &raftLog{
1104 storage: &MemoryStorage{
1105 ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
1106 },
1107 unstable: unstable{offset: 3},
1108 committed: 2}
1109 base := ltoa(wantLog)
1110 for i, p := range tt.peers {
1111 if sm, ok := p.(*raft); ok {
1112 l := ltoa(sm.raftLog)
1113 if g := diffu(base, l); g != "" {
1114 t.Errorf("#%d: peer %d diff:\n%s", j, i, g)
1115 }
1116 } else {
1117 t.Logf("#%d: peer %d empty log", j, i)
1118 }
1119 }
1120 sm := tt.peers[1].(*raft)
1121 if g := sm.Term; g != 1 {
1122 t.Errorf("#%d: term = %d, want %d", j, g, 1)
1123 }
1124 }
1125 }
1126
1127 func TestCommit(t *testing.T) {
1128 tests := []struct {
1129 matches []uint64
1130 logs []pb.Entry
1131 smTerm uint64
1132 w uint64
1133 }{
1134
1135 {[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 1, 1},
1136 {[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 2, 0},
1137 {[]uint64{2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
1138 {[]uint64{1}, []pb.Entry{{Index: 1, Term: 2}}, 2, 1},
1139
1140
1141 {[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
1142 {[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
1143 {[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
1144 {[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
1145
1146
1147 {[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
1148 {[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
1149 {[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
1150 {[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
1151 {[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
1152 {[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
1153 }
1154
1155 for i, tt := range tests {
1156 storage := newTestMemoryStorage(withPeers(1))
1157 storage.Append(tt.logs)
1158 storage.hardState = pb.HardState{Term: tt.smTerm}
1159
1160 sm := newTestRaft(1, 10, 2, storage)
1161 for j := 0; j < len(tt.matches); j++ {
1162 id := uint64(j) + 1
1163 if id > 1 {
1164 sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}.AsV2())
1165 }
1166 pr := sm.prs.Progress[id]
1167 pr.Match, pr.Next = tt.matches[j], tt.matches[j]+1
1168 }
1169 sm.maybeCommit()
1170 if g := sm.raftLog.committed; g != tt.w {
1171 t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
1172 }
1173 }
1174 }
1175
1176 func TestPastElectionTimeout(t *testing.T) {
1177 tests := []struct {
1178 elapse int
1179 wprobability float64
1180 round bool
1181 }{
1182 {5, 0, false},
1183 {10, 0.1, true},
1184 {13, 0.4, true},
1185 {15, 0.6, true},
1186 {18, 0.9, true},
1187 {20, 1, false},
1188 }
1189
1190 for i, tt := range tests {
1191 sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
1192 sm.electionElapsed = tt.elapse
1193 c := 0
1194 for j := 0; j < 10000; j++ {
1195 sm.resetRandomizedElectionTimeout()
1196 if sm.pastElectionTimeout() {
1197 c++
1198 }
1199 }
1200 got := float64(c) / 10000.0
1201 if tt.round {
1202 got = math.Floor(got*10+0.5) / 10.0
1203 }
1204 if got != tt.wprobability {
1205 t.Errorf("#%d: probability = %v, want %v", i, got, tt.wprobability)
1206 }
1207 }
1208 }
1209
1210
1211
1212 func TestStepIgnoreOldTermMsg(t *testing.T) {
1213 called := false
1214 fakeStep := func(r *raft, m pb.Message) error {
1215 called = true
1216 return nil
1217 }
1218 sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
1219 sm.step = fakeStep
1220 sm.Term = 2
1221 sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
1222 if called {
1223 t.Errorf("stepFunc called = %v , want %v", called, false)
1224 }
1225 }
1226
1227
1228
1229
1230
1231
1232 func TestHandleMsgApp(t *testing.T) {
1233 tests := []struct {
1234 m pb.Message
1235 wIndex uint64
1236 wCommit uint64
1237 wReject bool
1238 }{
1239
1240 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true},
1241 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true},
1242
1243
1244 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
1245 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Index: 1, Term: 2}}}, 1, 1, false},
1246 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Index: 3, Term: 2}, {Index: 4, Term: 2}}}, 4, 3, false},
1247 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Index: 3, Term: 2}}}, 3, 3, false},
1248 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false},
1249
1250
1251 {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false},
1252 {pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false},
1253 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false},
1254 {pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false},
1255 }
1256
1257 for i, tt := range tests {
1258 storage := newTestMemoryStorage(withPeers(1))
1259 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}})
1260 sm := newTestRaft(1, 10, 1, storage)
1261 sm.becomeFollower(2, None)
1262
1263 sm.handleAppendEntries(tt.m)
1264 if sm.raftLog.lastIndex() != tt.wIndex {
1265 t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
1266 }
1267 if sm.raftLog.committed != tt.wCommit {
1268 t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
1269 }
1270 m := sm.readMessages()
1271 if len(m) != 1 {
1272 t.Fatalf("#%d: msg = nil, want 1", i)
1273 }
1274 if m[0].Reject != tt.wReject {
1275 t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
1276 }
1277 }
1278 }
1279
1280
1281 func TestHandleHeartbeat(t *testing.T) {
1282 commit := uint64(2)
1283 tests := []struct {
1284 m pb.Message
1285 wCommit uint64
1286 }{
1287 {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1},
1288 {pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit},
1289 }
1290
1291 for i, tt := range tests {
1292 storage := newTestMemoryStorage(withPeers(1, 2))
1293 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
1294 sm := newTestRaft(1, 5, 1, storage)
1295 sm.becomeFollower(2, 2)
1296 sm.raftLog.commitTo(commit)
1297 sm.handleHeartbeat(tt.m)
1298 if sm.raftLog.committed != tt.wCommit {
1299 t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
1300 }
1301 m := sm.readMessages()
1302 if len(m) != 1 {
1303 t.Fatalf("#%d: msg = nil, want 1", i)
1304 }
1305 if m[0].Type != pb.MsgHeartbeatResp {
1306 t.Errorf("#%d: type = %v, want MsgHeartbeatResp", i, m[0].Type)
1307 }
1308 }
1309 }
1310
1311
1312 func TestHandleHeartbeatResp(t *testing.T) {
1313 storage := newTestMemoryStorage(withPeers(1, 2))
1314 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
1315 sm := newTestRaft(1, 5, 1, storage)
1316 sm.becomeCandidate()
1317 sm.becomeLeader()
1318 sm.raftLog.commitTo(sm.raftLog.lastIndex())
1319
1320
1321 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
1322 msgs := sm.readMessages()
1323 if len(msgs) != 1 {
1324 t.Fatalf("len(msgs) = %d, want 1", len(msgs))
1325 }
1326 if msgs[0].Type != pb.MsgApp {
1327 t.Errorf("type = %v, want MsgApp", msgs[0].Type)
1328 }
1329
1330
1331 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
1332 msgs = sm.readMessages()
1333 if len(msgs) != 1 {
1334 t.Fatalf("len(msgs) = %d, want 1", len(msgs))
1335 }
1336 if msgs[0].Type != pb.MsgApp {
1337 t.Errorf("type = %v, want MsgApp", msgs[0].Type)
1338 }
1339
1340
1341 sm.Step(pb.Message{
1342 From: 2,
1343 Type: pb.MsgAppResp,
1344 Index: msgs[0].Index + uint64(len(msgs[0].Entries)),
1345 })
1346
1347 sm.readMessages()
1348
1349 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
1350 msgs = sm.readMessages()
1351 if len(msgs) != 0 {
1352 t.Fatalf("len(msgs) = %d, want 0: %+v", len(msgs), msgs)
1353 }
1354 }
1355
1356
1357
1358
1359 func TestRaftFreesReadOnlyMem(t *testing.T) {
1360 sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
1361 sm.becomeCandidate()
1362 sm.becomeLeader()
1363 sm.raftLog.commitTo(sm.raftLog.lastIndex())
1364
1365 ctx := []byte("ctx")
1366
1367
1368
1369 sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
1370 msgs := sm.readMessages()
1371 if len(msgs) != 1 {
1372 t.Fatalf("len(msgs) = %d, want 1", len(msgs))
1373 }
1374 if msgs[0].Type != pb.MsgHeartbeat {
1375 t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type)
1376 }
1377 if !bytes.Equal(msgs[0].Context, ctx) {
1378 t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx)
1379 }
1380 if len(sm.readOnly.readIndexQueue) != 1 {
1381 t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue))
1382 }
1383 if len(sm.readOnly.pendingReadIndex) != 1 {
1384 t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex))
1385 }
1386 if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok {
1387 t.Fatalf("can't find context %v in pendingReadIndex ", ctx)
1388 }
1389
1390
1391
1392
1393 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx})
1394 if len(sm.readOnly.readIndexQueue) != 0 {
1395 t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue))
1396 }
1397 if len(sm.readOnly.pendingReadIndex) != 0 {
1398 t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex))
1399 }
1400 if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok {
1401 t.Fatalf("found context %v in pendingReadIndex, want none", ctx)
1402 }
1403 }
1404
1405
1406
1407 func TestMsgAppRespWaitReset(t *testing.T) {
1408 sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1409 sm.becomeCandidate()
1410 sm.becomeLeader()
1411
1412
1413
1414 sm.bcastAppend()
1415 sm.readMessages()
1416
1417
1418 sm.Step(pb.Message{
1419 From: 2,
1420 Type: pb.MsgAppResp,
1421 Index: 1,
1422 })
1423 if sm.raftLog.committed != 1 {
1424 t.Fatalf("expected committed to be 1, got %d", sm.raftLog.committed)
1425 }
1426
1427 sm.readMessages()
1428
1429
1430 sm.Step(pb.Message{
1431 From: 1,
1432 Type: pb.MsgProp,
1433 Entries: []pb.Entry{{}},
1434 })
1435
1436
1437
1438 msgs := sm.readMessages()
1439 if len(msgs) != 1 {
1440 t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs)
1441 }
1442 if msgs[0].Type != pb.MsgApp || msgs[0].To != 2 {
1443 t.Errorf("expected MsgApp to node 2, got %v to %d", msgs[0].Type, msgs[0].To)
1444 }
1445 if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 {
1446 t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries)
1447 }
1448
1449
1450 sm.Step(pb.Message{
1451 From: 3,
1452 Type: pb.MsgAppResp,
1453 Index: 1,
1454 })
1455 msgs = sm.readMessages()
1456 if len(msgs) != 1 {
1457 t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs)
1458 }
1459 if msgs[0].Type != pb.MsgApp || msgs[0].To != 3 {
1460 t.Errorf("expected MsgApp to node 3, got %v to %d", msgs[0].Type, msgs[0].To)
1461 }
1462 if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 {
1463 t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries)
1464 }
1465 }
1466
1467 func TestRecvMsgVote(t *testing.T) {
1468 testRecvMsgVote(t, pb.MsgVote)
1469 }
1470
1471 func TestRecvMsgPreVote(t *testing.T) {
1472 testRecvMsgVote(t, pb.MsgPreVote)
1473 }
1474
1475 func testRecvMsgVote(t *testing.T, msgType pb.MessageType) {
1476 tests := []struct {
1477 state StateType
1478 index, logTerm uint64
1479 voteFor uint64
1480 wreject bool
1481 }{
1482 {StateFollower, 0, 0, None, true},
1483 {StateFollower, 0, 1, None, true},
1484 {StateFollower, 0, 2, None, true},
1485 {StateFollower, 0, 3, None, false},
1486
1487 {StateFollower, 1, 0, None, true},
1488 {StateFollower, 1, 1, None, true},
1489 {StateFollower, 1, 2, None, true},
1490 {StateFollower, 1, 3, None, false},
1491
1492 {StateFollower, 2, 0, None, true},
1493 {StateFollower, 2, 1, None, true},
1494 {StateFollower, 2, 2, None, false},
1495 {StateFollower, 2, 3, None, false},
1496
1497 {StateFollower, 3, 0, None, true},
1498 {StateFollower, 3, 1, None, true},
1499 {StateFollower, 3, 2, None, false},
1500 {StateFollower, 3, 3, None, false},
1501
1502 {StateFollower, 3, 2, 2, false},
1503 {StateFollower, 3, 2, 1, true},
1504
1505 {StateLeader, 3, 3, 1, true},
1506 {StatePreCandidate, 3, 3, 1, true},
1507 {StateCandidate, 3, 3, 1, true},
1508 }
1509
1510 max := func(a, b uint64) uint64 {
1511 if a > b {
1512 return a
1513 }
1514 return b
1515 }
1516
1517 for i, tt := range tests {
1518 sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
1519 sm.state = tt.state
1520 switch tt.state {
1521 case StateFollower:
1522 sm.step = stepFollower
1523 case StateCandidate, StatePreCandidate:
1524 sm.step = stepCandidate
1525 case StateLeader:
1526 sm.step = stepLeader
1527 }
1528 sm.Vote = tt.voteFor
1529 sm.raftLog = &raftLog{
1530 storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}},
1531 unstable: unstable{offset: 3},
1532 }
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542 term := max(sm.raftLog.lastTerm(), tt.logTerm)
1543 sm.Term = term
1544 sm.Step(pb.Message{Type: msgType, Term: term, From: 2, Index: tt.index, LogTerm: tt.logTerm})
1545
1546 msgs := sm.readMessages()
1547 if g := len(msgs); g != 1 {
1548 t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
1549 continue
1550 }
1551 if g := msgs[0].Type; g != voteRespMsgType(msgType) {
1552 t.Errorf("#%d, m.Type = %v, want %v", i, g, voteRespMsgType(msgType))
1553 }
1554 if g := msgs[0].Reject; g != tt.wreject {
1555 t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
1556 }
1557 }
1558 }
1559
1560 func TestStateTransition(t *testing.T) {
1561 tests := []struct {
1562 from StateType
1563 to StateType
1564 wallow bool
1565 wterm uint64
1566 wlead uint64
1567 }{
1568 {StateFollower, StateFollower, true, 1, None},
1569 {StateFollower, StatePreCandidate, true, 0, None},
1570 {StateFollower, StateCandidate, true, 1, None},
1571 {StateFollower, StateLeader, false, 0, None},
1572
1573 {StatePreCandidate, StateFollower, true, 0, None},
1574 {StatePreCandidate, StatePreCandidate, true, 0, None},
1575 {StatePreCandidate, StateCandidate, true, 1, None},
1576 {StatePreCandidate, StateLeader, true, 0, 1},
1577
1578 {StateCandidate, StateFollower, true, 0, None},
1579 {StateCandidate, StatePreCandidate, true, 0, None},
1580 {StateCandidate, StateCandidate, true, 1, None},
1581 {StateCandidate, StateLeader, true, 0, 1},
1582
1583 {StateLeader, StateFollower, true, 1, None},
1584 {StateLeader, StatePreCandidate, false, 0, None},
1585 {StateLeader, StateCandidate, false, 1, None},
1586 {StateLeader, StateLeader, true, 0, 1},
1587 }
1588
1589 for i, tt := range tests {
1590 func() {
1591 defer func() {
1592 if r := recover(); r != nil {
1593 if tt.wallow {
1594 t.Errorf("%d: allow = %v, want %v", i, false, true)
1595 }
1596 }
1597 }()
1598
1599 sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
1600 sm.state = tt.from
1601
1602 switch tt.to {
1603 case StateFollower:
1604 sm.becomeFollower(tt.wterm, tt.wlead)
1605 case StatePreCandidate:
1606 sm.becomePreCandidate()
1607 case StateCandidate:
1608 sm.becomeCandidate()
1609 case StateLeader:
1610 sm.becomeLeader()
1611 }
1612
1613 if sm.Term != tt.wterm {
1614 t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
1615 }
1616 if sm.lead != tt.wlead {
1617 t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
1618 }
1619 }()
1620 }
1621 }
1622
1623 func TestAllServerStepdown(t *testing.T) {
1624 tests := []struct {
1625 state StateType
1626
1627 wstate StateType
1628 wterm uint64
1629 windex uint64
1630 }{
1631 {StateFollower, StateFollower, 3, 0},
1632 {StatePreCandidate, StateFollower, 3, 0},
1633 {StateCandidate, StateFollower, 3, 0},
1634 {StateLeader, StateFollower, 3, 1},
1635 }
1636
1637 tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
1638 tterm := uint64(3)
1639
1640 for i, tt := range tests {
1641 sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1642 switch tt.state {
1643 case StateFollower:
1644 sm.becomeFollower(1, None)
1645 case StatePreCandidate:
1646 sm.becomePreCandidate()
1647 case StateCandidate:
1648 sm.becomeCandidate()
1649 case StateLeader:
1650 sm.becomeCandidate()
1651 sm.becomeLeader()
1652 }
1653
1654 for j, msgType := range tmsgTypes {
1655 sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
1656
1657 if sm.state != tt.wstate {
1658 t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
1659 }
1660 if sm.Term != tt.wterm {
1661 t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
1662 }
1663 if sm.raftLog.lastIndex() != tt.windex {
1664 t.Errorf("#%d.%d index = %v , want %v", i, j, sm.raftLog.lastIndex(), tt.windex)
1665 }
1666 if uint64(len(sm.raftLog.allEntries())) != tt.windex {
1667 t.Errorf("#%d.%d len(ents) = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex)
1668 }
1669 wlead := uint64(2)
1670 if msgType == pb.MsgVote {
1671 wlead = None
1672 }
1673 if sm.lead != wlead {
1674 t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
1675 }
1676 }
1677 }
1678 }
1679
1680 func TestCandidateResetTermMsgHeartbeat(t *testing.T) {
1681 testCandidateResetTerm(t, pb.MsgHeartbeat)
1682 }
1683
1684 func TestCandidateResetTermMsgApp(t *testing.T) {
1685 testCandidateResetTerm(t, pb.MsgApp)
1686 }
1687
1688
1689
1690
1691 func testCandidateResetTerm(t *testing.T, mt pb.MessageType) {
1692 a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1693 b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1694 c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1695
1696 nt := newNetwork(a, b, c)
1697
1698 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
1699 if a.state != StateLeader {
1700 t.Errorf("state = %s, want %s", a.state, StateLeader)
1701 }
1702 if b.state != StateFollower {
1703 t.Errorf("state = %s, want %s", b.state, StateFollower)
1704 }
1705 if c.state != StateFollower {
1706 t.Errorf("state = %s, want %s", c.state, StateFollower)
1707 }
1708
1709
1710 nt.isolate(3)
1711
1712 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
1713 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
1714
1715 if a.state != StateLeader {
1716 t.Errorf("state = %s, want %s", a.state, StateLeader)
1717 }
1718 if b.state != StateFollower {
1719 t.Errorf("state = %s, want %s", b.state, StateFollower)
1720 }
1721
1722
1723 c.resetRandomizedElectionTimeout()
1724 for i := 0; i < c.randomizedElectionTimeout; i++ {
1725 c.tick()
1726 }
1727
1728 if c.state != StateCandidate {
1729 t.Errorf("state = %s, want %s", c.state, StateCandidate)
1730 }
1731
1732 nt.recover()
1733
1734
1735
1736 nt.send(pb.Message{From: 1, To: 3, Term: a.Term, Type: mt})
1737
1738 if c.state != StateFollower {
1739 t.Errorf("state = %s, want %s", c.state, StateFollower)
1740 }
1741
1742
1743 if a.Term != c.Term {
1744 t.Errorf("follower term expected same term as leader's %d, got %d", a.Term, c.Term)
1745 }
1746 }
1747
1748 func TestLeaderStepdownWhenQuorumActive(t *testing.T) {
1749 sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1750
1751 sm.checkQuorum = true
1752
1753 sm.becomeCandidate()
1754 sm.becomeLeader()
1755
1756 for i := 0; i < sm.electionTimeout+1; i++ {
1757 sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Term: sm.Term})
1758 sm.tick()
1759 }
1760
1761 if sm.state != StateLeader {
1762 t.Errorf("state = %v, want %v", sm.state, StateLeader)
1763 }
1764 }
1765
1766 func TestLeaderStepdownWhenQuorumLost(t *testing.T) {
1767 sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1768
1769 sm.checkQuorum = true
1770
1771 sm.becomeCandidate()
1772 sm.becomeLeader()
1773
1774 for i := 0; i < sm.electionTimeout+1; i++ {
1775 sm.tick()
1776 }
1777
1778 if sm.state != StateFollower {
1779 t.Errorf("state = %v, want %v", sm.state, StateFollower)
1780 }
1781 }
1782
1783 func TestLeaderSupersedingWithCheckQuorum(t *testing.T) {
1784 a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1785 b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1786 c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1787
1788 a.checkQuorum = true
1789 b.checkQuorum = true
1790 c.checkQuorum = true
1791
1792 nt := newNetwork(a, b, c)
1793 setRandomizedElectionTimeout(b, b.electionTimeout+1)
1794
1795 for i := 0; i < b.electionTimeout; i++ {
1796 b.tick()
1797 }
1798 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
1799
1800 if a.state != StateLeader {
1801 t.Errorf("state = %s, want %s", a.state, StateLeader)
1802 }
1803
1804 if c.state != StateFollower {
1805 t.Errorf("state = %s, want %s", c.state, StateFollower)
1806 }
1807
1808 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
1809
1810
1811 if c.state != StateCandidate {
1812 t.Errorf("state = %s, want %s", c.state, StateCandidate)
1813 }
1814
1815
1816 for i := 0; i < b.electionTimeout; i++ {
1817 b.tick()
1818 }
1819 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
1820
1821 if c.state != StateLeader {
1822 t.Errorf("state = %s, want %s", c.state, StateLeader)
1823 }
1824 }
1825
1826 func TestLeaderElectionWithCheckQuorum(t *testing.T) {
1827 a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1828 b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1829 c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1830
1831 a.checkQuorum = true
1832 b.checkQuorum = true
1833 c.checkQuorum = true
1834
1835 nt := newNetwork(a, b, c)
1836 setRandomizedElectionTimeout(a, a.electionTimeout+1)
1837 setRandomizedElectionTimeout(b, b.electionTimeout+2)
1838
1839
1840
1841 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
1842
1843 if a.state != StateLeader {
1844 t.Errorf("state = %s, want %s", a.state, StateLeader)
1845 }
1846
1847 if c.state != StateFollower {
1848 t.Errorf("state = %s, want %s", c.state, StateFollower)
1849 }
1850
1851
1852
1853 setRandomizedElectionTimeout(a, a.electionTimeout+1)
1854 setRandomizedElectionTimeout(b, b.electionTimeout+2)
1855 for i := 0; i < a.electionTimeout; i++ {
1856 a.tick()
1857 }
1858 for i := 0; i < b.electionTimeout; i++ {
1859 b.tick()
1860 }
1861 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
1862
1863 if a.state != StateFollower {
1864 t.Errorf("state = %s, want %s", a.state, StateFollower)
1865 }
1866
1867 if c.state != StateLeader {
1868 t.Errorf("state = %s, want %s", c.state, StateLeader)
1869 }
1870 }
1871
1872
1873
1874
1875 func TestFreeStuckCandidateWithCheckQuorum(t *testing.T) {
1876 a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1877 b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1878 c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1879
1880 a.checkQuorum = true
1881 b.checkQuorum = true
1882 c.checkQuorum = true
1883
1884 nt := newNetwork(a, b, c)
1885 setRandomizedElectionTimeout(b, b.electionTimeout+1)
1886
1887 for i := 0; i < b.electionTimeout; i++ {
1888 b.tick()
1889 }
1890 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
1891
1892 nt.isolate(1)
1893 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
1894
1895 if b.state != StateFollower {
1896 t.Errorf("state = %s, want %s", b.state, StateFollower)
1897 }
1898
1899 if c.state != StateCandidate {
1900 t.Errorf("state = %s, want %s", c.state, StateCandidate)
1901 }
1902
1903 if c.Term != b.Term+1 {
1904 t.Errorf("term = %d, want %d", c.Term, b.Term+1)
1905 }
1906
1907
1908 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
1909
1910 if b.state != StateFollower {
1911 t.Errorf("state = %s, want %s", b.state, StateFollower)
1912 }
1913
1914 if c.state != StateCandidate {
1915 t.Errorf("state = %s, want %s", c.state, StateCandidate)
1916 }
1917
1918 if c.Term != b.Term+2 {
1919 t.Errorf("term = %d, want %d", c.Term, b.Term+2)
1920 }
1921
1922 nt.recover()
1923 nt.send(pb.Message{From: 1, To: 3, Type: pb.MsgHeartbeat, Term: a.Term})
1924
1925
1926 if a.state != StateFollower {
1927 t.Errorf("state = %s, want %s", a.state, StateFollower)
1928 }
1929
1930 if c.Term != a.Term {
1931 t.Errorf("term = %d, want %d", c.Term, a.Term)
1932 }
1933
1934
1935 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
1936
1937 if c.state != StateLeader {
1938 t.Errorf("peer 3 state: %s, want %s", c.state, StateLeader)
1939 }
1940 }
1941
1942 func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
1943 a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
1944 b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1)))
1945
1946 a.checkQuorum = true
1947 b.checkQuorum = true
1948
1949 nt := newNetwork(a, b)
1950 setRandomizedElectionTimeout(b, b.electionTimeout+1)
1951
1952 b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2}.AsV2())
1953
1954 if b.promotable() {
1955 t.Fatalf("promotable = %v, want false", b.promotable())
1956 }
1957
1958 for i := 0; i < b.electionTimeout; i++ {
1959 b.tick()
1960 }
1961 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
1962
1963 if a.state != StateLeader {
1964 t.Errorf("state = %s, want %s", a.state, StateLeader)
1965 }
1966
1967 if b.state != StateFollower {
1968 t.Errorf("state = %s, want %s", b.state, StateFollower)
1969 }
1970
1971 if b.lead != 1 {
1972 t.Errorf("lead = %d, want 1", b.lead)
1973 }
1974 }
1975
1976
1977
1978
1979
1980
1981 func TestDisruptiveFollower(t *testing.T) {
1982 n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1983 n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1984 n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
1985
1986 n1.checkQuorum = true
1987 n2.checkQuorum = true
1988 n3.checkQuorum = true
1989
1990 n1.becomeFollower(1, None)
1991 n2.becomeFollower(1, None)
1992 n3.becomeFollower(1, None)
1993
1994 nt := newNetwork(n1, n2, n3)
1995
1996 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
1997
1998
1999
2000
2001
2002 if n1.state != StateLeader {
2003 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
2004 }
2005 if n2.state != StateFollower {
2006 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
2007 }
2008 if n3.state != StateFollower {
2009 t.Fatalf("node 3 state: %s, want %s", n3.state, StateFollower)
2010 }
2011
2012
2013
2014
2015
2016 setRandomizedElectionTimeout(n3, n3.electionTimeout+2)
2017 for i := 0; i < n3.randomizedElectionTimeout-1; i++ {
2018 n3.tick()
2019 }
2020
2021
2022
2023
2024
2025
2026 n3.tick()
2027
2028
2029
2030
2031
2032
2033
2034
2035 if n1.state != StateLeader {
2036 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
2037 }
2038 if n2.state != StateFollower {
2039 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
2040 }
2041 if n3.state != StateCandidate {
2042 t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate)
2043 }
2044
2045
2046
2047
2048 if n1.Term != 2 {
2049 t.Fatalf("node 1 term: %d, want %d", n1.Term, 2)
2050 }
2051 if n2.Term != 2 {
2052 t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
2053 }
2054 if n3.Term != 3 {
2055 t.Fatalf("node 3 term: %d, want %d", n3.Term, 3)
2056 }
2057
2058
2059
2060
2061
2062 nt.send(pb.Message{From: 1, To: 3, Term: n1.Term, Type: pb.MsgHeartbeat})
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073 if n1.state != StateFollower {
2074 t.Fatalf("node 1 state: %s, want %s", n1.state, StateFollower)
2075 }
2076 if n2.state != StateFollower {
2077 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
2078 }
2079 if n3.state != StateCandidate {
2080 t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate)
2081 }
2082
2083
2084
2085
2086 if n1.Term != 3 {
2087 t.Fatalf("node 1 term: %d, want %d", n1.Term, 3)
2088 }
2089 if n2.Term != 2 {
2090 t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
2091 }
2092 if n3.Term != 3 {
2093 t.Fatalf("node 3 term: %d, want %d", n3.Term, 3)
2094 }
2095 }
2096
2097
2098
2099
2100
2101
2102 func TestDisruptiveFollowerPreVote(t *testing.T) {
2103 n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2104 n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2105 n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2106
2107 n1.checkQuorum = true
2108 n2.checkQuorum = true
2109 n3.checkQuorum = true
2110
2111 n1.becomeFollower(1, None)
2112 n2.becomeFollower(1, None)
2113 n3.becomeFollower(1, None)
2114
2115 nt := newNetwork(n1, n2, n3)
2116
2117 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
2118
2119
2120
2121
2122
2123 if n1.state != StateLeader {
2124 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
2125 }
2126 if n2.state != StateFollower {
2127 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
2128 }
2129 if n3.state != StateFollower {
2130 t.Fatalf("node 3 state: %s, want %s", n3.state, StateFollower)
2131 }
2132
2133 nt.isolate(3)
2134 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
2135 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
2136 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
2137 n1.preVote = true
2138 n2.preVote = true
2139 n3.preVote = true
2140 nt.recover()
2141 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
2142
2143
2144
2145
2146
2147 if n1.state != StateLeader {
2148 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
2149 }
2150 if n2.state != StateFollower {
2151 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
2152 }
2153 if n3.state != StatePreCandidate {
2154 t.Fatalf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
2155 }
2156
2157
2158
2159
2160 if n1.Term != 2 {
2161 t.Fatalf("node 1 term: %d, want %d", n1.Term, 2)
2162 }
2163 if n2.Term != 2 {
2164 t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
2165 }
2166 if n3.Term != 2 {
2167 t.Fatalf("node 2 term: %d, want %d", n3.Term, 2)
2168 }
2169
2170
2171 nt.send(pb.Message{From: 1, To: 3, Term: n1.Term, Type: pb.MsgHeartbeat})
2172 if n1.state != StateLeader {
2173 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
2174 }
2175 }
2176
2177 func TestReadOnlyOptionSafe(t *testing.T) {
2178 a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2179 b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2180 c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2181
2182 nt := newNetwork(a, b, c)
2183 setRandomizedElectionTimeout(b, b.electionTimeout+1)
2184
2185 for i := 0; i < b.electionTimeout; i++ {
2186 b.tick()
2187 }
2188 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
2189
2190 if a.state != StateLeader {
2191 t.Fatalf("state = %s, want %s", a.state, StateLeader)
2192 }
2193
2194 tests := []struct {
2195 sm *raft
2196 proposals int
2197 wri uint64
2198 wctx []byte
2199 }{
2200 {a, 10, 11, []byte("ctx1")},
2201 {b, 10, 21, []byte("ctx2")},
2202 {c, 10, 31, []byte("ctx3")},
2203 {a, 10, 41, []byte("ctx4")},
2204 {b, 10, 51, []byte("ctx5")},
2205 {c, 10, 61, []byte("ctx6")},
2206 }
2207
2208 for i, tt := range tests {
2209 for j := 0; j < tt.proposals; j++ {
2210 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
2211 }
2212
2213 nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
2214
2215 r := tt.sm
2216 if len(r.readStates) == 0 {
2217 t.Errorf("#%d: len(readStates) = 0, want non-zero", i)
2218 }
2219 rs := r.readStates[0]
2220 if rs.Index != tt.wri {
2221 t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
2222 }
2223
2224 if !bytes.Equal(rs.RequestCtx, tt.wctx) {
2225 t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
2226 }
2227 r.readStates = nil
2228 }
2229 }
2230
2231 func TestReadOnlyWithLearner(t *testing.T) {
2232 a := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
2233 b := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
2234
2235 nt := newNetwork(a, b)
2236 setRandomizedElectionTimeout(b, b.electionTimeout+1)
2237
2238 for i := 0; i < b.electionTimeout; i++ {
2239 b.tick()
2240 }
2241 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
2242
2243 if a.state != StateLeader {
2244 t.Fatalf("state = %s, want %s", a.state, StateLeader)
2245 }
2246
2247 tests := []struct {
2248 sm *raft
2249 proposals int
2250 wri uint64
2251 wctx []byte
2252 }{
2253 {a, 10, 11, []byte("ctx1")},
2254 {b, 10, 21, []byte("ctx2")},
2255 {a, 10, 31, []byte("ctx3")},
2256 {b, 10, 41, []byte("ctx4")},
2257 }
2258
2259 for i, tt := range tests {
2260 for j := 0; j < tt.proposals; j++ {
2261 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
2262 }
2263
2264 nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
2265
2266 r := tt.sm
2267 if len(r.readStates) == 0 {
2268 t.Fatalf("#%d: len(readStates) = 0, want non-zero", i)
2269 }
2270 rs := r.readStates[0]
2271 if rs.Index != tt.wri {
2272 t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
2273 }
2274
2275 if !bytes.Equal(rs.RequestCtx, tt.wctx) {
2276 t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
2277 }
2278 r.readStates = nil
2279 }
2280 }
2281
2282 func TestReadOnlyOptionLease(t *testing.T) {
2283 a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2284 b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2285 c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2286 a.readOnly.option = ReadOnlyLeaseBased
2287 b.readOnly.option = ReadOnlyLeaseBased
2288 c.readOnly.option = ReadOnlyLeaseBased
2289 a.checkQuorum = true
2290 b.checkQuorum = true
2291 c.checkQuorum = true
2292
2293 nt := newNetwork(a, b, c)
2294 setRandomizedElectionTimeout(b, b.electionTimeout+1)
2295
2296 for i := 0; i < b.electionTimeout; i++ {
2297 b.tick()
2298 }
2299 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
2300
2301 if a.state != StateLeader {
2302 t.Fatalf("state = %s, want %s", a.state, StateLeader)
2303 }
2304
2305 tests := []struct {
2306 sm *raft
2307 proposals int
2308 wri uint64
2309 wctx []byte
2310 }{
2311 {a, 10, 11, []byte("ctx1")},
2312 {b, 10, 21, []byte("ctx2")},
2313 {c, 10, 31, []byte("ctx3")},
2314 {a, 10, 41, []byte("ctx4")},
2315 {b, 10, 51, []byte("ctx5")},
2316 {c, 10, 61, []byte("ctx6")},
2317 }
2318
2319 for i, tt := range tests {
2320 for j := 0; j < tt.proposals; j++ {
2321 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
2322 }
2323
2324 nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
2325
2326 r := tt.sm
2327 rs := r.readStates[0]
2328 if rs.Index != tt.wri {
2329 t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
2330 }
2331
2332 if !bytes.Equal(rs.RequestCtx, tt.wctx) {
2333 t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
2334 }
2335 r.readStates = nil
2336 }
2337 }
2338
2339
2340
2341 func TestReadOnlyForNewLeader(t *testing.T) {
2342 nodeConfigs := []struct {
2343 id uint64
2344 committed uint64
2345 applied uint64
2346 compactIndex uint64
2347 }{
2348 {1, 1, 1, 0},
2349 {2, 2, 2, 2},
2350 {3, 2, 2, 2},
2351 }
2352 peers := make([]stateMachine, 0)
2353 for _, c := range nodeConfigs {
2354 storage := newTestMemoryStorage(withPeers(1, 2, 3))
2355 storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}})
2356 storage.SetHardState(pb.HardState{Term: 1, Commit: c.committed})
2357 if c.compactIndex != 0 {
2358 storage.Compact(c.compactIndex)
2359 }
2360 cfg := newTestConfig(c.id, 10, 1, storage)
2361 cfg.Applied = c.applied
2362 raft := newRaft(cfg)
2363 peers = append(peers, raft)
2364 }
2365 nt := newNetwork(peers...)
2366
2367
2368 nt.ignore(pb.MsgApp)
2369
2370 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
2371
2372 sm := nt.peers[1].(*raft)
2373 if sm.state != StateLeader {
2374 t.Fatalf("state = %s, want %s", sm.state, StateLeader)
2375 }
2376
2377
2378 var windex uint64 = 4
2379 wctx := []byte("ctx")
2380 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
2381 if len(sm.readStates) != 0 {
2382 t.Fatalf("len(readStates) = %d, want zero", len(sm.readStates))
2383 }
2384
2385 nt.recover()
2386
2387
2388 for i := 0; i < sm.heartbeatTimeout; i++ {
2389 sm.tick()
2390 }
2391 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
2392 if sm.raftLog.committed != 4 {
2393 t.Fatalf("committed = %d, want 4", sm.raftLog.committed)
2394 }
2395 lastLogTerm := sm.raftLog.zeroTermOnErrCompacted(sm.raftLog.term(sm.raftLog.committed))
2396 if lastLogTerm != sm.Term {
2397 t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term)
2398 }
2399
2400
2401 if len(sm.readStates) != 1 {
2402 t.Fatalf("len(readStates) = %d, want 1", len(sm.readStates))
2403 }
2404 rs := sm.readStates[0]
2405 if rs.Index != windex {
2406 t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
2407 }
2408 if !bytes.Equal(rs.RequestCtx, wctx) {
2409 t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
2410 }
2411
2412
2413 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
2414 if len(sm.readStates) != 2 {
2415 t.Fatalf("len(readStates) = %d, want 2", len(sm.readStates))
2416 }
2417 rs = sm.readStates[1]
2418 if rs.Index != windex {
2419 t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
2420 }
2421 if !bytes.Equal(rs.RequestCtx, wctx) {
2422 t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
2423 }
2424 }
2425
2426 func TestLeaderAppResp(t *testing.T) {
2427
2428 tests := []struct {
2429 index uint64
2430 reject bool
2431
2432 wmatch uint64
2433 wnext uint64
2434
2435 wmsgNum int
2436 windex uint64
2437 wcommitted uint64
2438 }{
2439 {3, true, 0, 3, 0, 0, 0},
2440 {2, true, 0, 2, 1, 1, 0},
2441 {2, false, 2, 4, 2, 2, 2},
2442 {0, false, 0, 3, 0, 0, 0},
2443 }
2444
2445 for i, tt := range tests {
2446
2447
2448 sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2449 sm.raftLog = &raftLog{
2450 storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
2451 unstable: unstable{offset: 3},
2452 }
2453 sm.becomeCandidate()
2454 sm.becomeLeader()
2455 sm.readMessages()
2456 sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
2457
2458 p := sm.prs.Progress[2]
2459 if p.Match != tt.wmatch {
2460 t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
2461 }
2462 if p.Next != tt.wnext {
2463 t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
2464 }
2465
2466 msgs := sm.readMessages()
2467
2468 if len(msgs) != tt.wmsgNum {
2469 t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
2470 }
2471 for j, msg := range msgs {
2472 if msg.Index != tt.windex {
2473 t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
2474 }
2475 if msg.Commit != tt.wcommitted {
2476 t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
2477 }
2478 }
2479 }
2480 }
2481
2482
2483
2484 func TestBcastBeat(t *testing.T) {
2485 offset := uint64(1000)
2486
2487 s := pb.Snapshot{
2488 Metadata: pb.SnapshotMetadata{
2489 Index: offset,
2490 Term: 1,
2491 ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}},
2492 },
2493 }
2494 storage := NewMemoryStorage()
2495 storage.ApplySnapshot(s)
2496 sm := newTestRaft(1, 10, 1, storage)
2497 sm.Term = 1
2498
2499 sm.becomeCandidate()
2500 sm.becomeLeader()
2501 for i := 0; i < 10; i++ {
2502 mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
2503 }
2504
2505 sm.prs.Progress[2].Match, sm.prs.Progress[2].Next = 5, 6
2506
2507 sm.prs.Progress[3].Match, sm.prs.Progress[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
2508
2509 sm.Step(pb.Message{Type: pb.MsgBeat})
2510 msgs := sm.readMessages()
2511 if len(msgs) != 2 {
2512 t.Fatalf("len(msgs) = %v, want 2", len(msgs))
2513 }
2514 wantCommitMap := map[uint64]uint64{
2515 2: min(sm.raftLog.committed, sm.prs.Progress[2].Match),
2516 3: min(sm.raftLog.committed, sm.prs.Progress[3].Match),
2517 }
2518 for i, m := range msgs {
2519 if m.Type != pb.MsgHeartbeat {
2520 t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat)
2521 }
2522 if m.Index != 0 {
2523 t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
2524 }
2525 if m.LogTerm != 0 {
2526 t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
2527 }
2528 if wantCommitMap[m.To] == 0 {
2529 t.Fatalf("#%d: unexpected to %d", i, m.To)
2530 } else {
2531 if m.Commit != wantCommitMap[m.To] {
2532 t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
2533 }
2534 delete(wantCommitMap, m.To)
2535 }
2536 if len(m.Entries) != 0 {
2537 t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
2538 }
2539 }
2540 }
2541
2542
2543 func TestRecvMsgBeat(t *testing.T) {
2544 tests := []struct {
2545 state StateType
2546 wMsg int
2547 }{
2548 {StateLeader, 2},
2549
2550 {StateCandidate, 0},
2551 {StateFollower, 0},
2552 }
2553
2554 for i, tt := range tests {
2555 sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
2556 sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
2557 sm.Term = 1
2558 sm.state = tt.state
2559 switch tt.state {
2560 case StateFollower:
2561 sm.step = stepFollower
2562 case StateCandidate:
2563 sm.step = stepCandidate
2564 case StateLeader:
2565 sm.step = stepLeader
2566 }
2567 sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
2568
2569 msgs := sm.readMessages()
2570 if len(msgs) != tt.wMsg {
2571 t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
2572 }
2573 for _, m := range msgs {
2574 if m.Type != pb.MsgHeartbeat {
2575 t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgHeartbeat)
2576 }
2577 }
2578 }
2579 }
2580
2581 func TestLeaderIncreaseNext(t *testing.T) {
2582 previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
2583 tests := []struct {
2584
2585 state tracker.StateType
2586 next uint64
2587
2588 wnext uint64
2589 }{
2590
2591
2592 {tracker.StateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
2593
2594 {tracker.StateProbe, 2, 2},
2595 }
2596
2597 for i, tt := range tests {
2598 sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
2599 sm.raftLog.append(previousEnts...)
2600 sm.becomeCandidate()
2601 sm.becomeLeader()
2602 sm.prs.Progress[2].State = tt.state
2603 sm.prs.Progress[2].Next = tt.next
2604 sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
2605
2606 p := sm.prs.Progress[2]
2607 if p.Next != tt.wnext {
2608 t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
2609 }
2610 }
2611 }
2612
2613 func TestSendAppendForProgressProbe(t *testing.T) {
2614 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
2615 r.becomeCandidate()
2616 r.becomeLeader()
2617 r.readMessages()
2618 r.prs.Progress[2].BecomeProbe()
2619
2620
2621 for i := 0; i < 3; i++ {
2622 if i == 0 {
2623
2624
2625
2626 mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2627 r.sendAppend(2)
2628 msg := r.readMessages()
2629 if len(msg) != 1 {
2630 t.Errorf("len(msg) = %d, want %d", len(msg), 1)
2631 }
2632 if msg[0].Index != 0 {
2633 t.Errorf("index = %d, want %d", msg[0].Index, 0)
2634 }
2635 }
2636
2637 if !r.prs.Progress[2].ProbeSent {
2638 t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
2639 }
2640 for j := 0; j < 10; j++ {
2641 mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2642 r.sendAppend(2)
2643 if l := len(r.readMessages()); l != 0 {
2644 t.Errorf("len(msg) = %d, want %d", l, 0)
2645 }
2646 }
2647
2648
2649 for j := 0; j < r.heartbeatTimeout; j++ {
2650 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
2651 }
2652 if !r.prs.Progress[2].ProbeSent {
2653 t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
2654 }
2655
2656
2657 msg := r.readMessages()
2658 if len(msg) != 1 {
2659 t.Errorf("len(msg) = %d, want %d", len(msg), 1)
2660 }
2661 if msg[0].Type != pb.MsgHeartbeat {
2662 t.Errorf("type = %v, want %v", msg[0].Type, pb.MsgHeartbeat)
2663 }
2664 }
2665
2666
2667 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
2668 msg := r.readMessages()
2669 if len(msg) != 1 {
2670 t.Errorf("len(msg) = %d, want %d", len(msg), 1)
2671 }
2672 if msg[0].Index != 0 {
2673 t.Errorf("index = %d, want %d", msg[0].Index, 0)
2674 }
2675 if !r.prs.Progress[2].ProbeSent {
2676 t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
2677 }
2678 }
2679
2680 func TestSendAppendForProgressReplicate(t *testing.T) {
2681 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
2682 r.becomeCandidate()
2683 r.becomeLeader()
2684 r.readMessages()
2685 r.prs.Progress[2].BecomeReplicate()
2686
2687 for i := 0; i < 10; i++ {
2688 mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2689 r.sendAppend(2)
2690 msgs := r.readMessages()
2691 if len(msgs) != 1 {
2692 t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
2693 }
2694 }
2695 }
2696
2697 func TestSendAppendForProgressSnapshot(t *testing.T) {
2698 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
2699 r.becomeCandidate()
2700 r.becomeLeader()
2701 r.readMessages()
2702 r.prs.Progress[2].BecomeSnapshot(10)
2703
2704 for i := 0; i < 10; i++ {
2705 mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
2706 r.sendAppend(2)
2707 msgs := r.readMessages()
2708 if len(msgs) != 0 {
2709 t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
2710 }
2711 }
2712 }
2713
2714 func TestRecvMsgUnreachable(t *testing.T) {
2715 previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
2716 s := newTestMemoryStorage(withPeers(1, 2))
2717 s.Append(previousEnts)
2718 r := newTestRaft(1, 10, 1, s)
2719 r.becomeCandidate()
2720 r.becomeLeader()
2721 r.readMessages()
2722
2723 r.prs.Progress[2].Match = 3
2724 r.prs.Progress[2].BecomeReplicate()
2725 r.prs.Progress[2].OptimisticUpdate(5)
2726
2727 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
2728
2729 if r.prs.Progress[2].State != tracker.StateProbe {
2730 t.Errorf("state = %s, want %s", r.prs.Progress[2].State, tracker.StateProbe)
2731 }
2732 if wnext := r.prs.Progress[2].Match + 1; r.prs.Progress[2].Next != wnext {
2733 t.Errorf("next = %d, want %d", r.prs.Progress[2].Next, wnext)
2734 }
2735 }
2736
2737 func TestRestore(t *testing.T) {
2738 s := pb.Snapshot{
2739 Metadata: pb.SnapshotMetadata{
2740 Index: 11,
2741 Term: 11,
2742 ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}},
2743 },
2744 }
2745
2746 storage := newTestMemoryStorage(withPeers(1, 2))
2747 sm := newTestRaft(1, 10, 1, storage)
2748 if ok := sm.restore(s); !ok {
2749 t.Fatal("restore fail, want succeed")
2750 }
2751
2752 if sm.raftLog.lastIndex() != s.Metadata.Index {
2753 t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
2754 }
2755 if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
2756 t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
2757 }
2758 sg := sm.prs.VoterNodes()
2759 if !reflect.DeepEqual(sg, s.Metadata.ConfState.Voters) {
2760 t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters)
2761 }
2762
2763 if ok := sm.restore(s); ok {
2764 t.Fatal("restore succeed, want fail")
2765 }
2766
2767 for i := 0; i < sm.randomizedElectionTimeout; i++ {
2768 sm.tick()
2769 }
2770 if sm.state != StateFollower {
2771 t.Errorf("state = %d, want %d", sm.state, StateFollower)
2772 }
2773 }
2774
2775
2776 func TestRestoreWithLearner(t *testing.T) {
2777 s := pb.Snapshot{
2778 Metadata: pb.SnapshotMetadata{
2779 Index: 11,
2780 Term: 11,
2781 ConfState: pb.ConfState{Voters: []uint64{1, 2}, Learners: []uint64{3}},
2782 },
2783 }
2784
2785 storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3))
2786 sm := newTestLearnerRaft(3, 8, 2, storage)
2787 if ok := sm.restore(s); !ok {
2788 t.Error("restore fail, want succeed")
2789 }
2790
2791 if sm.raftLog.lastIndex() != s.Metadata.Index {
2792 t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
2793 }
2794 if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
2795 t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
2796 }
2797 sg := sm.prs.VoterNodes()
2798 if len(sg) != len(s.Metadata.ConfState.Voters) {
2799 t.Errorf("sm.Voters = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Voters)
2800 }
2801 lns := sm.prs.LearnerNodes()
2802 if len(lns) != len(s.Metadata.ConfState.Learners) {
2803 t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
2804 }
2805 for _, n := range s.Metadata.ConfState.Voters {
2806 if sm.prs.Progress[n].IsLearner {
2807 t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], false)
2808 }
2809 }
2810 for _, n := range s.Metadata.ConfState.Learners {
2811 if !sm.prs.Progress[n].IsLearner {
2812 t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], true)
2813 }
2814 }
2815
2816 if ok := sm.restore(s); ok {
2817 t.Error("restore succeed, want fail")
2818 }
2819 }
2820
2821
2822 func TestRestoreWithVotersOutgoing(t *testing.T) {
2823 s := pb.Snapshot{
2824 Metadata: pb.SnapshotMetadata{
2825 Index: 11,
2826 Term: 11,
2827 ConfState: pb.ConfState{Voters: []uint64{2, 3, 4}, VotersOutgoing: []uint64{1, 2, 3}},
2828 },
2829 }
2830
2831 storage := newTestMemoryStorage(withPeers(1, 2))
2832 sm := newTestRaft(1, 10, 1, storage)
2833 if ok := sm.restore(s); !ok {
2834 t.Fatal("restore fail, want succeed")
2835 }
2836
2837 if sm.raftLog.lastIndex() != s.Metadata.Index {
2838 t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
2839 }
2840 if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
2841 t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
2842 }
2843 sg := sm.prs.VoterNodes()
2844 if !reflect.DeepEqual(sg, []uint64{1, 2, 3, 4}) {
2845 t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters)
2846 }
2847
2848 if ok := sm.restore(s); ok {
2849 t.Fatal("restore succeed, want fail")
2850 }
2851
2852 for i := 0; i < sm.randomizedElectionTimeout; i++ {
2853 sm.tick()
2854 }
2855 if sm.state != StateFollower {
2856 t.Errorf("state = %d, want %d", sm.state, StateFollower)
2857 }
2858 }
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868 func TestRestoreVoterToLearner(t *testing.T) {
2869 s := pb.Snapshot{
2870 Metadata: pb.SnapshotMetadata{
2871 Index: 11,
2872 Term: 11,
2873 ConfState: pb.ConfState{Voters: []uint64{1, 2}, Learners: []uint64{3}},
2874 },
2875 }
2876
2877 storage := newTestMemoryStorage(withPeers(1, 2, 3))
2878 sm := newTestRaft(3, 10, 1, storage)
2879
2880 if sm.isLearner {
2881 t.Errorf("%x is learner, want not", sm.id)
2882 }
2883 if ok := sm.restore(s); !ok {
2884 t.Error("restore failed unexpectedly")
2885 }
2886 }
2887
2888
2889
2890 func TestRestoreLearnerPromotion(t *testing.T) {
2891 s := pb.Snapshot{
2892 Metadata: pb.SnapshotMetadata{
2893 Index: 11,
2894 Term: 11,
2895 ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}},
2896 },
2897 }
2898
2899 storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3))
2900 sm := newTestLearnerRaft(3, 10, 1, storage)
2901
2902 if !sm.isLearner {
2903 t.Errorf("%x is not learner, want yes", sm.id)
2904 }
2905
2906 if ok := sm.restore(s); !ok {
2907 t.Error("restore fail, want succeed")
2908 }
2909
2910 if sm.isLearner {
2911 t.Errorf("%x is learner, want not", sm.id)
2912 }
2913 }
2914
2915
2916 func TestLearnerReceiveSnapshot(t *testing.T) {
2917
2918 s := pb.Snapshot{
2919 Metadata: pb.SnapshotMetadata{
2920 Index: 11,
2921 Term: 11,
2922 ConfState: pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
2923 },
2924 }
2925
2926 store := newTestMemoryStorage(withPeers(1), withLearners(2))
2927 n1 := newTestLearnerRaft(1, 10, 1, store)
2928 n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
2929
2930 n1.restore(s)
2931 ready := newReady(n1, &SoftState{}, pb.HardState{})
2932 store.ApplySnapshot(ready.Snapshot)
2933 n1.advance(ready)
2934
2935
2936 n1.raftLog.appliedTo(n1.raftLog.committed)
2937
2938 nt := newNetwork(n1, n2)
2939
2940 setRandomizedElectionTimeout(n1, n1.electionTimeout)
2941 for i := 0; i < n1.electionTimeout; i++ {
2942 n1.tick()
2943 }
2944
2945 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
2946
2947 if n2.raftLog.committed != n1.raftLog.committed {
2948 t.Errorf("peer 2 must commit to %d, but %d", n1.raftLog.committed, n2.raftLog.committed)
2949 }
2950 }
2951
2952 func TestRestoreIgnoreSnapshot(t *testing.T) {
2953 previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
2954 commit := uint64(1)
2955 storage := newTestMemoryStorage(withPeers(1, 2))
2956 sm := newTestRaft(1, 10, 1, storage)
2957 sm.raftLog.append(previousEnts...)
2958 sm.raftLog.commitTo(commit)
2959
2960 s := pb.Snapshot{
2961 Metadata: pb.SnapshotMetadata{
2962 Index: commit,
2963 Term: 1,
2964 ConfState: pb.ConfState{Voters: []uint64{1, 2}},
2965 },
2966 }
2967
2968
2969 if ok := sm.restore(s); ok {
2970 t.Errorf("restore = %t, want %t", ok, false)
2971 }
2972 if sm.raftLog.committed != commit {
2973 t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit)
2974 }
2975
2976
2977 s.Metadata.Index = commit + 1
2978 if ok := sm.restore(s); ok {
2979 t.Errorf("restore = %t, want %t", ok, false)
2980 }
2981 if sm.raftLog.committed != commit+1 {
2982 t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit+1)
2983 }
2984 }
2985
2986 func TestProvideSnap(t *testing.T) {
2987
2988 s := pb.Snapshot{
2989 Metadata: pb.SnapshotMetadata{
2990 Index: 11,
2991 Term: 11,
2992 ConfState: pb.ConfState{Voters: []uint64{1, 2}},
2993 },
2994 }
2995 storage := newTestMemoryStorage(withPeers(1))
2996 sm := newTestRaft(1, 10, 1, storage)
2997 sm.restore(s)
2998
2999 sm.becomeCandidate()
3000 sm.becomeLeader()
3001
3002
3003 sm.prs.Progress[2].Next = sm.raftLog.firstIndex()
3004 sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true})
3005
3006 msgs := sm.readMessages()
3007 if len(msgs) != 1 {
3008 t.Fatalf("len(msgs) = %d, want 1", len(msgs))
3009 }
3010 m := msgs[0]
3011 if m.Type != pb.MsgSnap {
3012 t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
3013 }
3014 }
3015
3016 func TestIgnoreProvidingSnap(t *testing.T) {
3017
3018 s := pb.Snapshot{
3019 Metadata: pb.SnapshotMetadata{
3020 Index: 11,
3021 Term: 11,
3022 ConfState: pb.ConfState{Voters: []uint64{1, 2}},
3023 },
3024 }
3025 storage := newTestMemoryStorage(withPeers(1))
3026 sm := newTestRaft(1, 10, 1, storage)
3027 sm.restore(s)
3028
3029 sm.becomeCandidate()
3030 sm.becomeLeader()
3031
3032
3033
3034 sm.prs.Progress[2].Next = sm.raftLog.firstIndex() - 1
3035 sm.prs.Progress[2].RecentActive = false
3036
3037 sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
3038
3039 msgs := sm.readMessages()
3040 if len(msgs) != 0 {
3041 t.Errorf("len(msgs) = %d, want 0", len(msgs))
3042 }
3043 }
3044
3045 func TestRestoreFromSnapMsg(t *testing.T) {
3046 s := pb.Snapshot{
3047 Metadata: pb.SnapshotMetadata{
3048 Index: 11,
3049 Term: 11,
3050 ConfState: pb.ConfState{Voters: []uint64{1, 2}},
3051 },
3052 }
3053 m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
3054
3055 sm := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
3056 sm.Step(m)
3057
3058 if sm.lead != uint64(1) {
3059 t.Errorf("sm.lead = %d, want 1", sm.lead)
3060 }
3061
3062
3063 }
3064
3065 func TestSlowNodeRestore(t *testing.T) {
3066 nt := newNetwork(nil, nil, nil)
3067 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3068
3069 nt.isolate(3)
3070 for j := 0; j <= 100; j++ {
3071 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3072 }
3073 lead := nt.peers[1].(*raft)
3074 nextEnts(lead, nt.storage[1])
3075 nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Voters: lead.prs.VoterNodes()}, nil)
3076 nt.storage[1].Compact(lead.raftLog.applied)
3077
3078 nt.recover()
3079
3080
3081 for {
3082 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
3083 if lead.prs.Progress[3].RecentActive {
3084 break
3085 }
3086 }
3087
3088
3089 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3090
3091 follower := nt.peers[3].(*raft)
3092
3093
3094 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3095 if follower.raftLog.committed != lead.raftLog.committed {
3096 t.Errorf("follower.committed = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
3097 }
3098 }
3099
3100
3101
3102 func TestStepConfig(t *testing.T) {
3103
3104 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
3105 r.becomeCandidate()
3106 r.becomeLeader()
3107 index := r.raftLog.lastIndex()
3108 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
3109 if g := r.raftLog.lastIndex(); g != index+1 {
3110 t.Errorf("index = %d, want %d", g, index+1)
3111 }
3112 if r.pendingConfIndex != index+1 {
3113 t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, index+1)
3114 }
3115 }
3116
3117
3118
3119
3120 func TestStepIgnoreConfig(t *testing.T) {
3121
3122 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
3123 r.becomeCandidate()
3124 r.becomeLeader()
3125 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
3126 index := r.raftLog.lastIndex()
3127 pendingConfIndex := r.pendingConfIndex
3128 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
3129 wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
3130 ents, err := r.raftLog.entries(index+1, noLimit)
3131 if err != nil {
3132 t.Fatalf("unexpected error %v", err)
3133 }
3134 if !reflect.DeepEqual(ents, wents) {
3135 t.Errorf("ents = %+v, want %+v", ents, wents)
3136 }
3137 if r.pendingConfIndex != pendingConfIndex {
3138 t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, pendingConfIndex)
3139 }
3140 }
3141
3142
3143
3144 func TestNewLeaderPendingConfig(t *testing.T) {
3145 tests := []struct {
3146 addEntry bool
3147 wpendingIndex uint64
3148 }{
3149 {false, 0},
3150 {true, 1},
3151 }
3152 for i, tt := range tests {
3153 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
3154 if tt.addEntry {
3155 mustAppendEntry(r, pb.Entry{Type: pb.EntryNormal})
3156 }
3157 r.becomeCandidate()
3158 r.becomeLeader()
3159 if r.pendingConfIndex != tt.wpendingIndex {
3160 t.Errorf("#%d: pendingConfIndex = %d, want %d",
3161 i, r.pendingConfIndex, tt.wpendingIndex)
3162 }
3163 }
3164 }
3165
3166
3167 func TestAddNode(t *testing.T) {
3168 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
3169 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
3170 nodes := r.prs.VoterNodes()
3171 wnodes := []uint64{1, 2}
3172 if !reflect.DeepEqual(nodes, wnodes) {
3173 t.Errorf("nodes = %v, want %v", nodes, wnodes)
3174 }
3175 }
3176
3177
3178 func TestAddLearner(t *testing.T) {
3179 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
3180
3181 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2())
3182 if r.isLearner {
3183 t.Fatal("expected 1 to be voter")
3184 }
3185 nodes := r.prs.LearnerNodes()
3186 wnodes := []uint64{2}
3187 if !reflect.DeepEqual(nodes, wnodes) {
3188 t.Errorf("nodes = %v, want %v", nodes, wnodes)
3189 }
3190 if !r.prs.Progress[2].IsLearner {
3191 t.Fatal("expected 2 to be learner")
3192 }
3193
3194
3195 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
3196 if r.prs.Progress[2].IsLearner {
3197 t.Fatal("expected 2 to be voter")
3198 }
3199
3200
3201 r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}.AsV2())
3202 if !r.prs.Progress[1].IsLearner {
3203 t.Fatal("expected 1 to be learner")
3204 }
3205 if !r.isLearner {
3206 t.Fatal("expected 1 to be learner")
3207 }
3208
3209
3210 r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}.AsV2())
3211 if r.prs.Progress[1].IsLearner {
3212 t.Fatal("expected 1 to be voter")
3213 }
3214 if r.isLearner {
3215 t.Fatal("expected 1 to be voter")
3216 }
3217 }
3218
3219
3220
3221 func TestAddNodeCheckQuorum(t *testing.T) {
3222 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
3223 r.checkQuorum = true
3224
3225 r.becomeCandidate()
3226 r.becomeLeader()
3227
3228 for i := 0; i < r.electionTimeout-1; i++ {
3229 r.tick()
3230 }
3231
3232 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
3233
3234
3235 r.tick()
3236
3237
3238 if r.state != StateLeader {
3239 t.Errorf("state = %v, want %v", r.state, StateLeader)
3240 }
3241
3242
3243
3244 for i := 0; i < r.electionTimeout; i++ {
3245 r.tick()
3246 }
3247
3248 if r.state != StateFollower {
3249 t.Errorf("state = %v, want %v", r.state, StateFollower)
3250 }
3251 }
3252
3253
3254
3255 func TestRemoveNode(t *testing.T) {
3256 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
3257 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2())
3258 w := []uint64{1}
3259 if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
3260 t.Errorf("nodes = %v, want %v", g, w)
3261 }
3262
3263
3264 defer func() {
3265 if r := recover(); r == nil {
3266 t.Error("did not panic")
3267 }
3268 }()
3269 r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2())
3270 }
3271
3272
3273
3274 func TestRemoveLearner(t *testing.T) {
3275 r := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
3276 r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2())
3277 w := []uint64{1}
3278 if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
3279 t.Errorf("nodes = %v, want %v", g, w)
3280 }
3281
3282 w = nil
3283 if g := r.prs.LearnerNodes(); !reflect.DeepEqual(g, w) {
3284 t.Errorf("nodes = %v, want %v", g, w)
3285 }
3286
3287
3288 defer func() {
3289 if r := recover(); r == nil {
3290 t.Error("did not panic")
3291 }
3292 }()
3293 r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2())
3294 }
3295
3296 func TestPromotable(t *testing.T) {
3297 id := uint64(1)
3298 tests := []struct {
3299 peers []uint64
3300 wp bool
3301 }{
3302 {[]uint64{1}, true},
3303 {[]uint64{1, 2, 3}, true},
3304 {[]uint64{}, false},
3305 {[]uint64{2, 3}, false},
3306 }
3307 for i, tt := range tests {
3308 r := newTestRaft(id, 5, 1, newTestMemoryStorage(withPeers(tt.peers...)))
3309 if g := r.promotable(); g != tt.wp {
3310 t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
3311 }
3312 }
3313 }
3314
3315 func TestRaftNodes(t *testing.T) {
3316 tests := []struct {
3317 ids []uint64
3318 wids []uint64
3319 }{
3320 {
3321 []uint64{1, 2, 3},
3322 []uint64{1, 2, 3},
3323 },
3324 {
3325 []uint64{3, 2, 1},
3326 []uint64{1, 2, 3},
3327 },
3328 }
3329 for i, tt := range tests {
3330 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(tt.ids...)))
3331 if !reflect.DeepEqual(r.prs.VoterNodes(), tt.wids) {
3332 t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.VoterNodes(), tt.wids)
3333 }
3334 }
3335 }
3336
3337 func TestCampaignWhileLeader(t *testing.T) {
3338 testCampaignWhileLeader(t, false)
3339 }
3340
3341 func TestPreCampaignWhileLeader(t *testing.T) {
3342 testCampaignWhileLeader(t, true)
3343 }
3344
3345 func testCampaignWhileLeader(t *testing.T, preVote bool) {
3346 cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1)))
3347 cfg.PreVote = preVote
3348 r := newRaft(cfg)
3349 if r.state != StateFollower {
3350 t.Errorf("expected new node to be follower but got %s", r.state)
3351 }
3352
3353
3354 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3355 if r.state != StateLeader {
3356 t.Errorf("expected single-node election to become leader but got %s", r.state)
3357 }
3358 term := r.Term
3359 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3360 if r.state != StateLeader {
3361 t.Errorf("expected to remain leader but got %s", r.state)
3362 }
3363 if r.Term != term {
3364 t.Errorf("expected to remain in term %v but got %v", term, r.Term)
3365 }
3366 }
3367
3368
3369
3370 func TestCommitAfterRemoveNode(t *testing.T) {
3371
3372 s := newTestMemoryStorage(withPeers(1, 2))
3373 r := newTestRaft(1, 5, 1, s)
3374 r.becomeCandidate()
3375 r.becomeLeader()
3376
3377
3378 cc := pb.ConfChange{
3379 Type: pb.ConfChangeRemoveNode,
3380 NodeID: 2,
3381 }
3382 ccData, err := cc.Marshal()
3383 if err != nil {
3384 t.Fatal(err)
3385 }
3386 r.Step(pb.Message{
3387 Type: pb.MsgProp,
3388 Entries: []pb.Entry{
3389 {Type: pb.EntryConfChange, Data: ccData},
3390 },
3391 })
3392
3393 if ents := nextEnts(r, s); len(ents) > 0 {
3394 t.Fatalf("unexpected committed entries: %v", ents)
3395 }
3396 ccIndex := r.raftLog.lastIndex()
3397
3398
3399 r.Step(pb.Message{
3400 Type: pb.MsgProp,
3401 Entries: []pb.Entry{
3402 {Type: pb.EntryNormal, Data: []byte("hello")},
3403 },
3404 })
3405
3406
3407 r.Step(pb.Message{
3408 Type: pb.MsgAppResp,
3409 From: 2,
3410 Index: ccIndex,
3411 })
3412 ents := nextEnts(r, s)
3413 if len(ents) != 2 {
3414 t.Fatalf("expected two committed entries, got %v", ents)
3415 }
3416 if ents[0].Type != pb.EntryNormal || ents[0].Data != nil {
3417 t.Fatalf("expected ents[0] to be empty, but got %v", ents[0])
3418 }
3419 if ents[1].Type != pb.EntryConfChange {
3420 t.Fatalf("expected ents[1] to be EntryConfChange, got %v", ents[1])
3421 }
3422
3423
3424
3425 r.applyConfChange(cc.AsV2())
3426 ents = nextEnts(r, s)
3427 if len(ents) != 1 || ents[0].Type != pb.EntryNormal ||
3428 string(ents[0].Data) != "hello" {
3429 t.Fatalf("expected one committed EntryNormal, got %v", ents)
3430 }
3431 }
3432
3433
3434
3435 func TestLeaderTransferToUpToDateNode(t *testing.T) {
3436 nt := newNetwork(nil, nil, nil)
3437 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3438
3439 lead := nt.peers[1].(*raft)
3440
3441 if lead.lead != 1 {
3442 t.Fatalf("after election leader is %x, want 1", lead.lead)
3443 }
3444
3445
3446 nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
3447
3448 checkLeaderTransferState(t, lead, StateFollower, 2)
3449
3450
3451 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3452
3453 nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader})
3454
3455 checkLeaderTransferState(t, lead, StateLeader, 1)
3456 }
3457
3458
3459
3460
3461
3462
3463 func TestLeaderTransferToUpToDateNodeFromFollower(t *testing.T) {
3464 nt := newNetwork(nil, nil, nil)
3465 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3466
3467 lead := nt.peers[1].(*raft)
3468
3469 if lead.lead != 1 {
3470 t.Fatalf("after election leader is %x, want 1", lead.lead)
3471 }
3472
3473
3474 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgTransferLeader})
3475
3476 checkLeaderTransferState(t, lead, StateFollower, 2)
3477
3478
3479 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3480
3481 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
3482
3483 checkLeaderTransferState(t, lead, StateLeader, 1)
3484 }
3485
3486
3487
3488 func TestLeaderTransferWithCheckQuorum(t *testing.T) {
3489 nt := newNetwork(nil, nil, nil)
3490 for i := 1; i < 4; i++ {
3491 r := nt.peers[uint64(i)].(*raft)
3492 r.checkQuorum = true
3493 setRandomizedElectionTimeout(r, r.electionTimeout+i)
3494 }
3495
3496
3497 f := nt.peers[2].(*raft)
3498 for i := 0; i < f.electionTimeout; i++ {
3499 f.tick()
3500 }
3501
3502 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3503
3504 lead := nt.peers[1].(*raft)
3505
3506 if lead.lead != 1 {
3507 t.Fatalf("after election leader is %x, want 1", lead.lead)
3508 }
3509
3510
3511 nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
3512
3513 checkLeaderTransferState(t, lead, StateFollower, 2)
3514
3515
3516 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3517
3518 nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader})
3519
3520 checkLeaderTransferState(t, lead, StateLeader, 1)
3521 }
3522
3523 func TestLeaderTransferToSlowFollower(t *testing.T) {
3524 defaultLogger.EnableDebug()
3525 nt := newNetwork(nil, nil, nil)
3526 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3527
3528 nt.isolate(3)
3529 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3530
3531 nt.recover()
3532 lead := nt.peers[1].(*raft)
3533 if lead.prs.Progress[3].Match != 1 {
3534 t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1)
3535 }
3536
3537
3538 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3539
3540 checkLeaderTransferState(t, lead, StateFollower, 3)
3541 }
3542
3543 func TestLeaderTransferAfterSnapshot(t *testing.T) {
3544 nt := newNetwork(nil, nil, nil)
3545 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3546
3547 nt.isolate(3)
3548
3549 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3550 lead := nt.peers[1].(*raft)
3551 nextEnts(lead, nt.storage[1])
3552 nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Voters: lead.prs.VoterNodes()}, nil)
3553 nt.storage[1].Compact(lead.raftLog.applied)
3554
3555 nt.recover()
3556 if lead.prs.Progress[3].Match != 1 {
3557 t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1)
3558 }
3559
3560 filtered := pb.Message{}
3561
3562 nt.msgHook = func(m pb.Message) bool {
3563 if m.Type != pb.MsgAppResp || m.From != 3 || m.Reject {
3564 return true
3565 }
3566 filtered = m
3567 return false
3568 }
3569
3570 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3571 if lead.state != StateLeader {
3572 t.Fatalf("node 1 should still be leader as snapshot is not applied, got %x", lead.state)
3573 }
3574 if reflect.DeepEqual(filtered, pb.Message{}) {
3575 t.Fatalf("Follower should report snapshot progress automatically.")
3576 }
3577
3578
3579 follower := nt.peers[3].(*raft)
3580 ready := newReady(follower, &SoftState{}, pb.HardState{})
3581 nt.storage[3].ApplySnapshot(ready.Snapshot)
3582 follower.advance(ready)
3583 nt.msgHook = nil
3584 nt.send(filtered)
3585
3586 checkLeaderTransferState(t, lead, StateFollower, 3)
3587 }
3588
3589 func TestLeaderTransferToSelf(t *testing.T) {
3590 nt := newNetwork(nil, nil, nil)
3591 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3592
3593 lead := nt.peers[1].(*raft)
3594
3595
3596 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
3597 checkLeaderTransferState(t, lead, StateLeader, 1)
3598 }
3599
3600 func TestLeaderTransferToNonExistingNode(t *testing.T) {
3601 nt := newNetwork(nil, nil, nil)
3602 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3603
3604 lead := nt.peers[1].(*raft)
3605
3606 nt.send(pb.Message{From: 4, To: 1, Type: pb.MsgTransferLeader})
3607 checkLeaderTransferState(t, lead, StateLeader, 1)
3608 }
3609
3610 func TestLeaderTransferTimeout(t *testing.T) {
3611 nt := newNetwork(nil, nil, nil)
3612 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3613
3614 nt.isolate(3)
3615
3616 lead := nt.peers[1].(*raft)
3617
3618
3619 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3620 if lead.leadTransferee != 3 {
3621 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
3622 }
3623 for i := 0; i < lead.heartbeatTimeout; i++ {
3624 lead.tick()
3625 }
3626 if lead.leadTransferee != 3 {
3627 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
3628 }
3629
3630 for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ {
3631 lead.tick()
3632 }
3633
3634 checkLeaderTransferState(t, lead, StateLeader, 1)
3635 }
3636
3637 func TestLeaderTransferIgnoreProposal(t *testing.T) {
3638 nt := newNetwork(nil, nil, nil)
3639 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3640
3641 nt.isolate(3)
3642
3643 lead := nt.peers[1].(*raft)
3644
3645
3646 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3647 if lead.leadTransferee != 3 {
3648 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
3649 }
3650
3651 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3652 err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
3653 if err != ErrProposalDropped {
3654 t.Fatalf("should return drop proposal error while transferring")
3655 }
3656
3657 if lead.prs.Progress[1].Match != 1 {
3658 t.Fatalf("node 1 has match %x, want %x", lead.prs.Progress[1].Match, 1)
3659 }
3660 }
3661
3662 func TestLeaderTransferReceiveHigherTermVote(t *testing.T) {
3663 nt := newNetwork(nil, nil, nil)
3664 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3665
3666 nt.isolate(3)
3667
3668 lead := nt.peers[1].(*raft)
3669
3670
3671 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3672 if lead.leadTransferee != 3 {
3673 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
3674 }
3675
3676 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup, Index: 1, Term: 2})
3677
3678 checkLeaderTransferState(t, lead, StateFollower, 2)
3679 }
3680
3681 func TestLeaderTransferRemoveNode(t *testing.T) {
3682 nt := newNetwork(nil, nil, nil)
3683 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3684
3685 nt.ignore(pb.MsgTimeoutNow)
3686
3687 lead := nt.peers[1].(*raft)
3688
3689
3690 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3691 if lead.leadTransferee != 3 {
3692 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
3693 }
3694
3695 lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}.AsV2())
3696
3697 checkLeaderTransferState(t, lead, StateLeader, 1)
3698 }
3699
3700 func TestLeaderTransferDemoteNode(t *testing.T) {
3701 nt := newNetwork(nil, nil, nil)
3702 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3703
3704 nt.ignore(pb.MsgTimeoutNow)
3705
3706 lead := nt.peers[1].(*raft)
3707
3708
3709 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3710 if lead.leadTransferee != 3 {
3711 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
3712 }
3713
3714 lead.applyConfChange(pb.ConfChangeV2{
3715 Changes: []pb.ConfChangeSingle{
3716 {
3717 Type: pb.ConfChangeRemoveNode,
3718 NodeID: 3,
3719 },
3720 {
3721 Type: pb.ConfChangeAddLearnerNode,
3722 NodeID: 3,
3723 },
3724 },
3725 })
3726
3727
3728 lead.applyConfChange(pb.ConfChangeV2{})
3729 checkLeaderTransferState(t, lead, StateLeader, 1)
3730 }
3731
3732
3733 func TestLeaderTransferBack(t *testing.T) {
3734 nt := newNetwork(nil, nil, nil)
3735 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3736
3737 nt.isolate(3)
3738
3739 lead := nt.peers[1].(*raft)
3740
3741 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3742 if lead.leadTransferee != 3 {
3743 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
3744 }
3745
3746
3747 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
3748
3749 checkLeaderTransferState(t, lead, StateLeader, 1)
3750 }
3751
3752
3753
3754 func TestLeaderTransferSecondTransferToAnotherNode(t *testing.T) {
3755 nt := newNetwork(nil, nil, nil)
3756 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3757
3758 nt.isolate(3)
3759
3760 lead := nt.peers[1].(*raft)
3761
3762 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3763 if lead.leadTransferee != 3 {
3764 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
3765 }
3766
3767
3768 nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
3769
3770 checkLeaderTransferState(t, lead, StateFollower, 2)
3771 }
3772
3773
3774
3775 func TestLeaderTransferSecondTransferToSameNode(t *testing.T) {
3776 nt := newNetwork(nil, nil, nil)
3777 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3778
3779 nt.isolate(3)
3780
3781 lead := nt.peers[1].(*raft)
3782
3783 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3784 if lead.leadTransferee != 3 {
3785 t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
3786 }
3787
3788 for i := 0; i < lead.heartbeatTimeout; i++ {
3789 lead.tick()
3790 }
3791
3792 nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
3793
3794 for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ {
3795 lead.tick()
3796 }
3797
3798 checkLeaderTransferState(t, lead, StateLeader, 1)
3799 }
3800
3801 func checkLeaderTransferState(t *testing.T, r *raft, state StateType, lead uint64) {
3802 if r.state != state || r.lead != lead {
3803 t.Fatalf("after transferring, node has state %v lead %v, want state %v lead %v", r.state, r.lead, state, lead)
3804 }
3805 if r.leadTransferee != None {
3806 t.Fatalf("after transferring, node has leadTransferee %v, want leadTransferee %v", r.leadTransferee, None)
3807 }
3808 }
3809
3810
3811
3812
3813
3814 func TestTransferNonMember(t *testing.T) {
3815 r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(2, 3, 4)))
3816 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgTimeoutNow})
3817
3818 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVoteResp})
3819 r.Step(pb.Message{From: 3, To: 1, Type: pb.MsgVoteResp})
3820 if r.state != StateFollower {
3821 t.Fatalf("state is %s, want StateFollower", r.state)
3822 }
3823 }
3824
3825
3826
3827
3828
3829
3830 func TestNodeWithSmallerTermCanCompleteElection(t *testing.T) {
3831 n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
3832 n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
3833 n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
3834
3835 n1.becomeFollower(1, None)
3836 n2.becomeFollower(1, None)
3837 n3.becomeFollower(1, None)
3838
3839 n1.preVote = true
3840 n2.preVote = true
3841 n3.preVote = true
3842
3843
3844 nt := newNetwork(n1, n2, n3)
3845 nt.cut(1, 3)
3846 nt.cut(2, 3)
3847
3848 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3849
3850 sm := nt.peers[1].(*raft)
3851 if sm.state != StateLeader {
3852 t.Errorf("peer 1 state: %s, want %s", sm.state, StateLeader)
3853 }
3854
3855 sm = nt.peers[2].(*raft)
3856 if sm.state != StateFollower {
3857 t.Errorf("peer 2 state: %s, want %s", sm.state, StateFollower)
3858 }
3859
3860 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
3861 sm = nt.peers[3].(*raft)
3862 if sm.state != StatePreCandidate {
3863 t.Errorf("peer 3 state: %s, want %s", sm.state, StatePreCandidate)
3864 }
3865
3866 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
3867
3868
3869
3870
3871
3872 sm = nt.peers[1].(*raft)
3873 if sm.Term != 3 {
3874 t.Errorf("peer 1 term: %d, want %d", sm.Term, 3)
3875 }
3876
3877 sm = nt.peers[2].(*raft)
3878 if sm.Term != 3 {
3879 t.Errorf("peer 2 term: %d, want %d", sm.Term, 3)
3880 }
3881
3882 sm = nt.peers[3].(*raft)
3883 if sm.Term != 1 {
3884 t.Errorf("peer 3 term: %d, want %d", sm.Term, 1)
3885 }
3886
3887
3888
3889
3890
3891 sm = nt.peers[1].(*raft)
3892 if sm.state != StateFollower {
3893 t.Errorf("peer 1 state: %s, want %s", sm.state, StateFollower)
3894 }
3895 sm = nt.peers[2].(*raft)
3896 if sm.state != StateLeader {
3897 t.Errorf("peer 2 state: %s, want %s", sm.state, StateLeader)
3898 }
3899 sm = nt.peers[3].(*raft)
3900 if sm.state != StatePreCandidate {
3901 t.Errorf("peer 3 state: %s, want %s", sm.state, StatePreCandidate)
3902 }
3903
3904 sm.logger.Infof("going to bring back peer 3 and kill peer 2")
3905
3906
3907 nt.recover()
3908 nt.cut(2, 1)
3909 nt.cut(2, 3)
3910
3911
3912 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
3913 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3914
3915
3916 sma := nt.peers[1].(*raft)
3917 smb := nt.peers[3].(*raft)
3918 if sma.state != StateLeader && smb.state != StateLeader {
3919 t.Errorf("no leader")
3920 }
3921 }
3922
3923
3924
3925 func TestPreVoteWithSplitVote(t *testing.T) {
3926 n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
3927 n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
3928 n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
3929
3930 n1.becomeFollower(1, None)
3931 n2.becomeFollower(1, None)
3932 n3.becomeFollower(1, None)
3933
3934 n1.preVote = true
3935 n2.preVote = true
3936 n3.preVote = true
3937
3938 nt := newNetwork(n1, n2, n3)
3939 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
3940
3941
3942 nt.isolate(1)
3943 nt.send([]pb.Message{
3944 {From: 2, To: 2, Type: pb.MsgHup},
3945 {From: 3, To: 3, Type: pb.MsgHup},
3946 }...)
3947
3948
3949
3950
3951 sm := nt.peers[2].(*raft)
3952 if sm.Term != 3 {
3953 t.Errorf("peer 2 term: %d, want %d", sm.Term, 3)
3954 }
3955 sm = nt.peers[3].(*raft)
3956 if sm.Term != 3 {
3957 t.Errorf("peer 3 term: %d, want %d", sm.Term, 3)
3958 }
3959
3960
3961
3962
3963 sm = nt.peers[2].(*raft)
3964 if sm.state != StateCandidate {
3965 t.Errorf("peer 2 state: %s, want %s", sm.state, StateCandidate)
3966 }
3967 sm = nt.peers[3].(*raft)
3968 if sm.state != StateCandidate {
3969 t.Errorf("peer 3 state: %s, want %s", sm.state, StateCandidate)
3970 }
3971
3972
3973 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
3974
3975
3976
3977
3978 sm = nt.peers[2].(*raft)
3979 if sm.Term != 4 {
3980 t.Errorf("peer 2 term: %d, want %d", sm.Term, 4)
3981 }
3982 sm = nt.peers[3].(*raft)
3983 if sm.Term != 4 {
3984 t.Errorf("peer 3 term: %d, want %d", sm.Term, 4)
3985 }
3986
3987
3988
3989
3990 sm = nt.peers[2].(*raft)
3991 if sm.state != StateLeader {
3992 t.Errorf("peer 2 state: %s, want %s", sm.state, StateLeader)
3993 }
3994 sm = nt.peers[3].(*raft)
3995 if sm.state != StateFollower {
3996 t.Errorf("peer 3 state: %s, want %s", sm.state, StateFollower)
3997 }
3998 }
3999
4000
4001
4002 func TestPreVoteWithCheckQuorum(t *testing.T) {
4003 n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
4004 n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
4005 n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
4006
4007 n1.becomeFollower(1, None)
4008 n2.becomeFollower(1, None)
4009 n3.becomeFollower(1, None)
4010
4011 n1.preVote = true
4012 n2.preVote = true
4013 n3.preVote = true
4014
4015 n1.checkQuorum = true
4016 n2.checkQuorum = true
4017 n3.checkQuorum = true
4018
4019 nt := newNetwork(n1, n2, n3)
4020 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
4021
4022
4023 nt.isolate(1)
4024
4025
4026 sm := nt.peers[1].(*raft)
4027 if sm.state != StateLeader {
4028 t.Fatalf("peer 1 state: %s, want %s", sm.state, StateLeader)
4029 }
4030 sm = nt.peers[2].(*raft)
4031 if sm.state != StateFollower {
4032 t.Fatalf("peer 2 state: %s, want %s", sm.state, StateFollower)
4033 }
4034 sm = nt.peers[3].(*raft)
4035 if sm.state != StateFollower {
4036 t.Fatalf("peer 3 state: %s, want %s", sm.state, StateFollower)
4037 }
4038
4039
4040 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
4041 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
4042
4043
4044 if n2.state != StateLeader && n3.state != StateFollower {
4045 t.Errorf("no leader")
4046 }
4047 }
4048
4049
4050
4051 func TestLearnerCampaign(t *testing.T) {
4052 n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
4053 n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2())
4054 n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1)))
4055 n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2())
4056 nt := newNetwork(n1, n2)
4057 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
4058
4059 if !n2.isLearner {
4060 t.Fatalf("failed to make n2 a learner")
4061 }
4062
4063 if n2.state != StateFollower {
4064 t.Fatalf("n2 campaigned despite being learner")
4065 }
4066
4067 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
4068 if n1.state != StateLeader || n1.lead != 1 {
4069 t.Fatalf("n1 did not become leader")
4070 }
4071
4072
4073
4074
4075
4076 nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTimeoutNow})
4077
4078 if n2.state != StateFollower {
4079 t.Fatalf("n2 accepted leadership transfer despite being learner")
4080 }
4081 }
4082
4083
4084
4085
4086
4087 func newPreVoteMigrationCluster(t *testing.T) *network {
4088 n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
4089 n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
4090 n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
4091
4092 n1.becomeFollower(1, None)
4093 n2.becomeFollower(1, None)
4094 n3.becomeFollower(1, None)
4095
4096 n1.preVote = true
4097 n2.preVote = true
4098
4099
4100
4101
4102 nt := newNetwork(n1, n2, n3)
4103 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
4104
4105
4106 nt.isolate(3)
4107 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
4108 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
4109 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
4110
4111
4112
4113
4114
4115 if n1.state != StateLeader {
4116 t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
4117 }
4118 if n2.state != StateFollower {
4119 t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
4120 }
4121 if n3.state != StateCandidate {
4122 t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate)
4123 }
4124
4125
4126
4127
4128
4129 if n1.Term != 2 {
4130 t.Fatalf("node 1 term: %d, want %d", n1.Term, 2)
4131 }
4132 if n2.Term != 2 {
4133 t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
4134 }
4135 if n3.Term != 4 {
4136 t.Fatalf("node 3 term: %d, want %d", n3.Term, 4)
4137 }
4138
4139
4140 n3.preVote = true
4141 nt.recover()
4142
4143 return nt
4144 }
4145
4146 func TestPreVoteMigrationCanCompleteElection(t *testing.T) {
4147 nt := newPreVoteMigrationCluster(t)
4148
4149
4150
4151
4152 n2 := nt.peers[2].(*raft)
4153 n3 := nt.peers[3].(*raft)
4154
4155
4156 nt.isolate(1)
4157
4158
4159 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
4160 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
4161
4162
4163
4164
4165 if n2.state != StateFollower {
4166 t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
4167 }
4168 if n3.state != StatePreCandidate {
4169 t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
4170 }
4171
4172 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
4173 nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
4174
4175
4176 if n2.state != StateLeader && n3.state != StateFollower {
4177 t.Errorf("no leader")
4178 }
4179 }
4180
4181 func TestPreVoteMigrationWithFreeStuckPreCandidate(t *testing.T) {
4182 nt := newPreVoteMigrationCluster(t)
4183
4184
4185
4186
4187 n1 := nt.peers[1].(*raft)
4188 n2 := nt.peers[2].(*raft)
4189 n3 := nt.peers[3].(*raft)
4190
4191 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
4192
4193 if n1.state != StateLeader {
4194 t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
4195 }
4196 if n2.state != StateFollower {
4197 t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
4198 }
4199 if n3.state != StatePreCandidate {
4200 t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
4201 }
4202
4203
4204 nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
4205
4206 if n1.state != StateLeader {
4207 t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
4208 }
4209 if n2.state != StateFollower {
4210 t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
4211 }
4212 if n3.state != StatePreCandidate {
4213 t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
4214 }
4215
4216 nt.send(pb.Message{From: 1, To: 3, Type: pb.MsgHeartbeat, Term: n1.Term})
4217
4218
4219 if n1.state != StateFollower {
4220 t.Errorf("state = %s, want %s", n1.state, StateFollower)
4221 }
4222 if n3.Term != n1.Term {
4223 t.Errorf("term = %d, want %d", n3.Term, n1.Term)
4224 }
4225 }
4226
4227 func testConfChangeCheckBeforeCampaign(t *testing.T, v2 bool) {
4228 nt := newNetwork(nil, nil, nil)
4229 n1 := nt.peers[1].(*raft)
4230 n2 := nt.peers[2].(*raft)
4231 nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
4232 if n1.state != StateLeader {
4233 t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
4234 }
4235
4236
4237 cc := pb.ConfChange{
4238 Type: pb.ConfChangeRemoveNode,
4239 NodeID: 2,
4240 }
4241 var ccData []byte
4242 var err error
4243 var ty pb.EntryType
4244 if v2 {
4245 ccv2 := cc.AsV2()
4246 ccData, err = ccv2.Marshal()
4247 ty = pb.EntryConfChangeV2
4248 } else {
4249 ccData, err = cc.Marshal()
4250 ty = pb.EntryConfChange
4251 }
4252 if err != nil {
4253 t.Fatal(err)
4254 }
4255 nt.send(pb.Message{
4256 From: 1,
4257 To: 1,
4258 Type: pb.MsgProp,
4259 Entries: []pb.Entry{
4260 {Type: ty, Data: ccData},
4261 },
4262 })
4263
4264
4265 for i := 0; i < n2.randomizedElectionTimeout; i++ {
4266 n2.tick()
4267 }
4268
4269 if n2.state != StateFollower {
4270 t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
4271 }
4272
4273
4274 nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
4275 if n1.state != StateLeader {
4276 t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
4277 }
4278
4279 if n2.state != StateFollower {
4280 t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
4281 }
4282
4283 for i := 0; i < n1.electionTimeout; i++ {
4284 n1.tick()
4285 }
4286
4287
4288 nextEnts(n2, nt.storage[2])
4289
4290
4291 nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
4292 if n1.state != StateFollower {
4293 t.Errorf("node 1 state: %s, want %s", n1.state, StateFollower)
4294 }
4295 if n2.state != StateLeader {
4296 t.Errorf("node 2 state: %s, want %s", n2.state, StateLeader)
4297 }
4298
4299 nextEnts(n1, nt.storage[1])
4300
4301 for i := 0; i < n1.randomizedElectionTimeout; i++ {
4302 n1.tick()
4303 }
4304 if n1.state != StateCandidate {
4305 t.Errorf("node 1 state: %s, want %s", n1.state, StateCandidate)
4306 }
4307 }
4308
4309
4310 func TestConfChangeCheckBeforeCampaign(t *testing.T) {
4311 testConfChangeCheckBeforeCampaign(t, false)
4312 }
4313
4314
4315 func TestConfChangeV2CheckBeforeCampaign(t *testing.T) {
4316 testConfChangeCheckBeforeCampaign(t, true)
4317 }
4318
4319 func TestFastLogRejection(t *testing.T) {
4320 tests := []struct {
4321 leaderLog []pb.Entry
4322 followerLog []pb.Entry
4323 rejectHintTerm uint64
4324 rejectHintIndex uint64
4325 nextAppendTerm uint64
4326 nextAppendIndex uint64
4327 }{
4328
4329
4330
4331 {
4332 leaderLog: []pb.Entry{
4333 {Term: 1, Index: 1},
4334 {Term: 2, Index: 2},
4335 {Term: 2, Index: 3},
4336 {Term: 4, Index: 4},
4337 {Term: 4, Index: 5},
4338 {Term: 4, Index: 6},
4339 {Term: 4, Index: 7},
4340 },
4341 followerLog: []pb.Entry{
4342 {Term: 1, Index: 1},
4343 {Term: 2, Index: 2},
4344 {Term: 2, Index: 3},
4345 {Term: 3, Index: 4},
4346 {Term: 3, Index: 5},
4347 {Term: 3, Index: 6},
4348 {Term: 3, Index: 7},
4349 {Term: 3, Index: 8},
4350 {Term: 3, Index: 9},
4351 {Term: 3, Index: 10},
4352 {Term: 3, Index: 11},
4353 },
4354 rejectHintTerm: 3,
4355 rejectHintIndex: 7,
4356 nextAppendTerm: 2,
4357 nextAppendIndex: 3,
4358 },
4359
4360
4361
4362 {
4363 leaderLog: []pb.Entry{
4364 {Term: 1, Index: 1},
4365 {Term: 2, Index: 2},
4366 {Term: 2, Index: 3},
4367 {Term: 3, Index: 4},
4368 {Term: 4, Index: 5},
4369 {Term: 4, Index: 6},
4370 {Term: 4, Index: 7},
4371 {Term: 5, Index: 8},
4372 },
4373 followerLog: []pb.Entry{
4374 {Term: 1, Index: 1},
4375 {Term: 2, Index: 2},
4376 {Term: 2, Index: 3},
4377 {Term: 3, Index: 4},
4378 {Term: 3, Index: 5},
4379 {Term: 3, Index: 6},
4380 {Term: 3, Index: 7},
4381 {Term: 3, Index: 8},
4382 {Term: 3, Index: 9},
4383 {Term: 3, Index: 10},
4384 {Term: 3, Index: 11},
4385 },
4386 rejectHintTerm: 3,
4387 rejectHintIndex: 8,
4388 nextAppendTerm: 3,
4389 nextAppendIndex: 4,
4390 },
4391
4392
4393
4394 {
4395 leaderLog: []pb.Entry{
4396 {Term: 1, Index: 1},
4397 {Term: 1, Index: 2},
4398 {Term: 1, Index: 3},
4399 {Term: 1, Index: 4},
4400 },
4401 followerLog: []pb.Entry{
4402 {Term: 1, Index: 1},
4403 {Term: 2, Index: 2},
4404 {Term: 2, Index: 3},
4405 {Term: 4, Index: 4},
4406 },
4407 rejectHintTerm: 1,
4408 rejectHintIndex: 1,
4409 nextAppendTerm: 1,
4410 nextAppendIndex: 1,
4411 },
4412
4413
4414
4415
4416 {
4417 leaderLog: []pb.Entry{
4418 {Term: 1, Index: 1},
4419 {Term: 1, Index: 2},
4420 {Term: 1, Index: 3},
4421 {Term: 1, Index: 4},
4422 {Term: 1, Index: 5},
4423 {Term: 1, Index: 6},
4424 },
4425 followerLog: []pb.Entry{
4426 {Term: 1, Index: 1},
4427 {Term: 2, Index: 2},
4428 {Term: 2, Index: 3},
4429 {Term: 4, Index: 4},
4430 },
4431 rejectHintTerm: 1,
4432 rejectHintIndex: 1,
4433 nextAppendTerm: 1,
4434 nextAppendIndex: 1,
4435 },
4436
4437
4438
4439
4440 {
4441 leaderLog: []pb.Entry{
4442 {Term: 1, Index: 1},
4443 {Term: 1, Index: 2},
4444 {Term: 1, Index: 3},
4445 {Term: 1, Index: 4},
4446 },
4447 followerLog: []pb.Entry{
4448 {Term: 1, Index: 1},
4449 {Term: 2, Index: 2},
4450 {Term: 2, Index: 3},
4451 {Term: 4, Index: 4},
4452 {Term: 4, Index: 5},
4453 {Term: 4, Index: 6},
4454 },
4455 rejectHintTerm: 1,
4456 rejectHintIndex: 1,
4457 nextAppendTerm: 1,
4458 nextAppendIndex: 1,
4459 },
4460
4461
4462
4463 {
4464 leaderLog: []pb.Entry{
4465 {Term: 1, Index: 1},
4466 {Term: 1, Index: 2},
4467 {Term: 1, Index: 3},
4468 {Term: 4, Index: 4},
4469 {Term: 5, Index: 5},
4470 },
4471 followerLog: []pb.Entry{
4472 {Term: 1, Index: 1},
4473 {Term: 1, Index: 2},
4474 {Term: 1, Index: 3},
4475 {Term: 4, Index: 4},
4476 },
4477 rejectHintTerm: 4,
4478 rejectHintIndex: 4,
4479 nextAppendTerm: 4,
4480 nextAppendIndex: 4,
4481 },
4482
4483 {
4484 leaderLog: []pb.Entry{
4485 {Term: 2, Index: 1},
4486 {Term: 5, Index: 2},
4487 {Term: 5, Index: 3},
4488 {Term: 5, Index: 4},
4489 {Term: 5, Index: 5},
4490 {Term: 5, Index: 6},
4491 {Term: 5, Index: 7},
4492 {Term: 5, Index: 8},
4493 {Term: 5, Index: 9},
4494 },
4495 followerLog: []pb.Entry{
4496 {Term: 2, Index: 1},
4497 {Term: 4, Index: 2},
4498 {Term: 4, Index: 3},
4499 {Term: 4, Index: 4},
4500 {Term: 4, Index: 5},
4501 {Term: 4, Index: 6},
4502 },
4503 rejectHintTerm: 4,
4504 rejectHintIndex: 6,
4505 nextAppendTerm: 2,
4506 nextAppendIndex: 1,
4507 },
4508
4509 {
4510 leaderLog: []pb.Entry{
4511 {Term: 2, Index: 1},
4512 {Term: 2, Index: 2},
4513 {Term: 2, Index: 3},
4514 {Term: 2, Index: 4},
4515 {Term: 2, Index: 5},
4516 },
4517 followerLog: []pb.Entry{
4518 {Term: 2, Index: 1},
4519 {Term: 4, Index: 2},
4520 {Term: 4, Index: 3},
4521 {Term: 4, Index: 4},
4522 {Term: 4, Index: 5},
4523 {Term: 4, Index: 6},
4524 {Term: 4, Index: 7},
4525 {Term: 4, Index: 8},
4526 },
4527 nextAppendTerm: 2,
4528 nextAppendIndex: 1,
4529 rejectHintTerm: 2,
4530 rejectHintIndex: 1,
4531 },
4532 }
4533
4534 for i, test := range tests {
4535 t.Run("", func(t *testing.T) {
4536 s1 := NewMemoryStorage()
4537 s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
4538 s1.Append(test.leaderLog)
4539 s2 := NewMemoryStorage()
4540 s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
4541 s2.Append(test.followerLog)
4542
4543 n1 := newTestRaft(1, 10, 1, s1)
4544 n2 := newTestRaft(2, 10, 1, s2)
4545
4546 n1.becomeCandidate()
4547 n1.becomeLeader()
4548
4549 n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat})
4550
4551 msgs := n2.readMessages()
4552 if len(msgs) != 1 {
4553 t.Errorf("can't read 1 message from peer 2")
4554 }
4555 if msgs[0].Type != pb.MsgHeartbeatResp {
4556 t.Errorf("can't read heartbeat response from peer 2")
4557 }
4558 if n1.Step(msgs[0]) != nil {
4559 t.Errorf("peer 1 step heartbeat response fail")
4560 }
4561
4562 msgs = n1.readMessages()
4563 if len(msgs) != 1 {
4564 t.Errorf("can't read 1 message from peer 1")
4565 }
4566 if msgs[0].Type != pb.MsgApp {
4567 t.Errorf("can't read append from peer 1")
4568 }
4569
4570 if n2.Step(msgs[0]) != nil {
4571 t.Errorf("peer 2 step append fail")
4572 }
4573 msgs = n2.readMessages()
4574 if len(msgs) != 1 {
4575 t.Errorf("can't read 1 message from peer 2")
4576 }
4577 if msgs[0].Type != pb.MsgAppResp {
4578 t.Errorf("can't read append response from peer 2")
4579 }
4580 if !msgs[0].Reject {
4581 t.Errorf("expected rejected append response from peer 2")
4582 }
4583 if msgs[0].LogTerm != test.rejectHintTerm {
4584 t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm)
4585 }
4586 if msgs[0].RejectHint != test.rejectHintIndex {
4587 t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint)
4588 }
4589
4590 if n1.Step(msgs[0]) != nil {
4591 t.Errorf("peer 1 step append fail")
4592 }
4593 msgs = n1.readMessages()
4594 if msgs[0].LogTerm != test.nextAppendTerm {
4595 t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm)
4596 }
4597 if msgs[0].Index != test.nextAppendIndex {
4598 t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index)
4599 }
4600 })
4601 }
4602 }
4603
4604 func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft {
4605 storage := NewMemoryStorage()
4606 for i, term := range terms {
4607 storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}})
4608 }
4609 cfg := newTestConfig(1, 5, 1, storage)
4610 if configFunc != nil {
4611 configFunc(cfg)
4612 }
4613 sm := newRaft(cfg)
4614 sm.reset(terms[len(terms)-1])
4615 return sm
4616 }
4617
4618
4619
4620
4621 func votedWithConfig(configFunc func(*Config), vote, term uint64) *raft {
4622 storage := NewMemoryStorage()
4623 storage.SetHardState(pb.HardState{Vote: vote, Term: term})
4624 cfg := newTestConfig(1, 5, 1, storage)
4625 if configFunc != nil {
4626 configFunc(cfg)
4627 }
4628 sm := newRaft(cfg)
4629 sm.reset(term)
4630 return sm
4631 }
4632
4633 type network struct {
4634 peers map[uint64]stateMachine
4635 storage map[uint64]*MemoryStorage
4636 dropm map[connem]float64
4637 ignorem map[pb.MessageType]bool
4638
4639
4640
4641 msgHook func(pb.Message) bool
4642 }
4643
4644
4645
4646
4647
4648 func newNetwork(peers ...stateMachine) *network {
4649 return newNetworkWithConfig(nil, peers...)
4650 }
4651
4652
4653
4654 func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *network {
4655 size := len(peers)
4656 peerAddrs := idsBySize(size)
4657
4658 npeers := make(map[uint64]stateMachine, size)
4659 nstorage := make(map[uint64]*MemoryStorage, size)
4660
4661 for j, p := range peers {
4662 id := peerAddrs[j]
4663 switch v := p.(type) {
4664 case nil:
4665 nstorage[id] = newTestMemoryStorage(withPeers(peerAddrs...))
4666 cfg := newTestConfig(id, 10, 1, nstorage[id])
4667 if configFunc != nil {
4668 configFunc(cfg)
4669 }
4670 sm := newRaft(cfg)
4671 npeers[id] = sm
4672 case *raft:
4673
4674 learners := make(map[uint64]bool, len(v.prs.Learners))
4675 for i := range v.prs.Learners {
4676 learners[i] = true
4677 }
4678 v.id = id
4679 v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight)
4680 if len(learners) > 0 {
4681 v.prs.Learners = map[uint64]struct{}{}
4682 }
4683 for i := 0; i < size; i++ {
4684 pr := &tracker.Progress{}
4685 if _, ok := learners[peerAddrs[i]]; ok {
4686 pr.IsLearner = true
4687 v.prs.Learners[peerAddrs[i]] = struct{}{}
4688 } else {
4689 v.prs.Voters[0][peerAddrs[i]] = struct{}{}
4690 }
4691 v.prs.Progress[peerAddrs[i]] = pr
4692 }
4693 v.reset(v.Term)
4694 npeers[id] = v
4695 case *blackHole:
4696 npeers[id] = v
4697 default:
4698 panic(fmt.Sprintf("unexpected state machine type: %T", p))
4699 }
4700 }
4701 return &network{
4702 peers: npeers,
4703 storage: nstorage,
4704 dropm: make(map[connem]float64),
4705 ignorem: make(map[pb.MessageType]bool),
4706 }
4707 }
4708
4709 func preVoteConfig(c *Config) {
4710 c.PreVote = true
4711 }
4712
4713 func (nw *network) send(msgs ...pb.Message) {
4714 for len(msgs) > 0 {
4715 m := msgs[0]
4716 p := nw.peers[m.To]
4717 p.Step(m)
4718 msgs = append(msgs[1:], nw.filter(p.readMessages())...)
4719 }
4720 }
4721
4722 func (nw *network) drop(from, to uint64, perc float64) {
4723 nw.dropm[connem{from, to}] = perc
4724 }
4725
4726 func (nw *network) cut(one, other uint64) {
4727 nw.drop(one, other, 2.0)
4728 nw.drop(other, one, 2.0)
4729 }
4730
4731 func (nw *network) isolate(id uint64) {
4732 for i := 0; i < len(nw.peers); i++ {
4733 nid := uint64(i) + 1
4734 if nid != id {
4735 nw.drop(id, nid, 1.0)
4736 nw.drop(nid, id, 1.0)
4737 }
4738 }
4739 }
4740
4741 func (nw *network) ignore(t pb.MessageType) {
4742 nw.ignorem[t] = true
4743 }
4744
4745 func (nw *network) recover() {
4746 nw.dropm = make(map[connem]float64)
4747 nw.ignorem = make(map[pb.MessageType]bool)
4748 }
4749
4750 func (nw *network) filter(msgs []pb.Message) []pb.Message {
4751 mm := []pb.Message{}
4752 for _, m := range msgs {
4753 if nw.ignorem[m.Type] {
4754 continue
4755 }
4756 switch m.Type {
4757 case pb.MsgHup:
4758
4759 panic("unexpected msgHup")
4760 default:
4761 perc := nw.dropm[connem{m.From, m.To}]
4762 if n := rand.Float64(); n < perc {
4763 continue
4764 }
4765 }
4766 if nw.msgHook != nil {
4767 if !nw.msgHook(m) {
4768 continue
4769 }
4770 }
4771 mm = append(mm, m)
4772 }
4773 return mm
4774 }
4775
4776 type connem struct {
4777 from, to uint64
4778 }
4779
4780 type blackHole struct{}
4781
4782 func (blackHole) Step(pb.Message) error { return nil }
4783 func (blackHole) readMessages() []pb.Message { return nil }
4784
4785 var nopStepper = &blackHole{}
4786
4787 func idsBySize(size int) []uint64 {
4788 ids := make([]uint64, size)
4789 for i := 0; i < size; i++ {
4790 ids[i] = 1 + uint64(i)
4791 }
4792 return ids
4793 }
4794
4795
4796
4797
4798 func setRandomizedElectionTimeout(r *raft, v int) {
4799 r.randomizedElectionTimeout = v
4800 }
4801
4802 func newTestConfig(id uint64, election, heartbeat int, storage Storage) *Config {
4803 return &Config{
4804 ID: id,
4805 ElectionTick: election,
4806 HeartbeatTick: heartbeat,
4807 Storage: storage,
4808 MaxSizePerMsg: noLimit,
4809 MaxInflightMsgs: 256,
4810 }
4811 }
4812
4813 type testMemoryStorageOptions func(*MemoryStorage)
4814
4815 func withPeers(peers ...uint64) testMemoryStorageOptions {
4816 return func(ms *MemoryStorage) {
4817 ms.snapshot.Metadata.ConfState.Voters = peers
4818 }
4819 }
4820
4821 func withLearners(learners ...uint64) testMemoryStorageOptions {
4822 return func(ms *MemoryStorage) {
4823 ms.snapshot.Metadata.ConfState.Learners = learners
4824 }
4825 }
4826
4827 func newTestMemoryStorage(opts ...testMemoryStorageOptions) *MemoryStorage {
4828 ms := NewMemoryStorage()
4829 for _, o := range opts {
4830 o(ms)
4831 }
4832 return ms
4833 }
4834
4835 func newTestRaft(id uint64, election, heartbeat int, storage Storage) *raft {
4836 return newRaft(newTestConfig(id, election, heartbeat, storage))
4837 }
4838
4839 func newTestLearnerRaft(id uint64, election, heartbeat int, storage Storage) *raft {
4840 cfg := newTestConfig(id, election, heartbeat, storage)
4841 return newRaft(cfg)
4842 }
4843
4844
4845
4846 func newTestRawNode(id uint64, election, heartbeat int, storage Storage) *RawNode {
4847 cfg := newTestConfig(id, election, heartbeat, storage)
4848 rn, err := NewRawNode(cfg)
4849 if err != nil {
4850 panic(err)
4851 }
4852 return rn
4853 }
4854
View as plain text