1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
27 package raft
28
29 import (
30 "fmt"
31 "reflect"
32 "sort"
33 "testing"
34
35 pb "go.etcd.io/etcd/raft/v3/raftpb"
36 )
37
38 func TestFollowerUpdateTermFromMessage(t *testing.T) {
39 testUpdateTermFromMessage(t, StateFollower)
40 }
41 func TestCandidateUpdateTermFromMessage(t *testing.T) {
42 testUpdateTermFromMessage(t, StateCandidate)
43 }
44 func TestLeaderUpdateTermFromMessage(t *testing.T) {
45 testUpdateTermFromMessage(t, StateLeader)
46 }
47
48
49
50
51
52
53 func testUpdateTermFromMessage(t *testing.T, state StateType) {
54 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
55 switch state {
56 case StateFollower:
57 r.becomeFollower(1, 2)
58 case StateCandidate:
59 r.becomeCandidate()
60 case StateLeader:
61 r.becomeCandidate()
62 r.becomeLeader()
63 }
64
65 r.Step(pb.Message{Type: pb.MsgApp, Term: 2})
66
67 if r.Term != 2 {
68 t.Errorf("term = %d, want %d", r.Term, 2)
69 }
70 if r.state != StateFollower {
71 t.Errorf("state = %v, want %v", r.state, StateFollower)
72 }
73 }
74
75
76
77
78
79 func TestRejectStaleTermMessage(t *testing.T) {
80 called := false
81 fakeStep := func(r *raft, m pb.Message) error {
82 called = true
83 return nil
84 }
85 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
86 r.step = fakeStep
87 r.loadState(pb.HardState{Term: 2})
88
89 r.Step(pb.Message{Type: pb.MsgApp, Term: r.Term - 1})
90
91 if called {
92 t.Errorf("stepFunc called = %v, want %v", called, false)
93 }
94 }
95
96
97
98 func TestStartAsFollower(t *testing.T) {
99 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
100 if r.state != StateFollower {
101 t.Errorf("state = %s, want %s", r.state, StateFollower)
102 }
103 }
104
105
106
107
108
109 func TestLeaderBcastBeat(t *testing.T) {
110
111 hi := 1
112 r := newTestRaft(1, 10, hi, newTestMemoryStorage(withPeers(1, 2, 3)))
113 r.becomeCandidate()
114 r.becomeLeader()
115 for i := 0; i < 10; i++ {
116 mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
117 }
118
119 for i := 0; i < hi; i++ {
120 r.tick()
121 }
122
123 msgs := r.readMessages()
124 sort.Sort(messageSlice(msgs))
125 wmsgs := []pb.Message{
126 {From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
127 {From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
128 }
129 if !reflect.DeepEqual(msgs, wmsgs) {
130 t.Errorf("msgs = %v, want %v", msgs, wmsgs)
131 }
132 }
133
134 func TestFollowerStartElection(t *testing.T) {
135 testNonleaderStartElection(t, StateFollower)
136 }
137 func TestCandidateStartNewElection(t *testing.T) {
138 testNonleaderStartElection(t, StateCandidate)
139 }
140
141
142
143
144
145
146
147
148
149
150
151 func testNonleaderStartElection(t *testing.T, state StateType) {
152
153 et := 10
154 r := newTestRaft(1, et, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
155 switch state {
156 case StateFollower:
157 r.becomeFollower(1, 2)
158 case StateCandidate:
159 r.becomeCandidate()
160 }
161
162 for i := 1; i < 2*et; i++ {
163 r.tick()
164 }
165
166 if r.Term != 2 {
167 t.Errorf("term = %d, want 2", r.Term)
168 }
169 if r.state != StateCandidate {
170 t.Errorf("state = %s, want %s", r.state, StateCandidate)
171 }
172 if !r.prs.Votes[r.id] {
173 t.Errorf("vote for self = false, want true")
174 }
175 msgs := r.readMessages()
176 sort.Sort(messageSlice(msgs))
177 wmsgs := []pb.Message{
178 {From: 1, To: 2, Term: 2, Type: pb.MsgVote},
179 {From: 1, To: 3, Term: 2, Type: pb.MsgVote},
180 }
181 if !reflect.DeepEqual(msgs, wmsgs) {
182 t.Errorf("msgs = %v, want %v", msgs, wmsgs)
183 }
184 }
185
186
187
188
189
190
191
192 func TestLeaderElectionInOneRoundRPC(t *testing.T) {
193 tests := []struct {
194 size int
195 votes map[uint64]bool
196 state StateType
197 }{
198
199 {1, map[uint64]bool{}, StateLeader},
200 {3, map[uint64]bool{2: true, 3: true}, StateLeader},
201 {3, map[uint64]bool{2: true}, StateLeader},
202 {5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, StateLeader},
203 {5, map[uint64]bool{2: true, 3: true, 4: true}, StateLeader},
204 {5, map[uint64]bool{2: true, 3: true}, StateLeader},
205
206
207 {3, map[uint64]bool{2: false, 3: false}, StateFollower},
208 {5, map[uint64]bool{2: false, 3: false, 4: false, 5: false}, StateFollower},
209 {5, map[uint64]bool{2: true, 3: false, 4: false, 5: false}, StateFollower},
210
211
212 {3, map[uint64]bool{}, StateCandidate},
213 {5, map[uint64]bool{2: true}, StateCandidate},
214 {5, map[uint64]bool{2: false, 3: false}, StateCandidate},
215 {5, map[uint64]bool{}, StateCandidate},
216 }
217 for i, tt := range tests {
218 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(idsBySize(tt.size)...)))
219
220 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
221 for id, vote := range tt.votes {
222 r.Step(pb.Message{From: id, To: 1, Term: r.Term, Type: pb.MsgVoteResp, Reject: !vote})
223 }
224
225 if r.state != tt.state {
226 t.Errorf("#%d: state = %s, want %s", i, r.state, tt.state)
227 }
228 if g := r.Term; g != 1 {
229 t.Errorf("#%d: term = %d, want %d", i, g, 1)
230 }
231 }
232 }
233
234
235
236
237 func TestFollowerVote(t *testing.T) {
238 tests := []struct {
239 vote uint64
240 nvote uint64
241 wreject bool
242 }{
243 {None, 1, false},
244 {None, 2, false},
245 {1, 1, false},
246 {2, 2, false},
247 {1, 2, true},
248 {2, 1, true},
249 }
250 for i, tt := range tests {
251 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
252 r.loadState(pb.HardState{Term: 1, Vote: tt.vote})
253
254 r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote})
255
256 msgs := r.readMessages()
257 wmsgs := []pb.Message{
258 {From: 1, To: tt.nvote, Term: 1, Type: pb.MsgVoteResp, Reject: tt.wreject},
259 }
260 if !reflect.DeepEqual(msgs, wmsgs) {
261 t.Errorf("#%d: msgs = %v, want %v", i, msgs, wmsgs)
262 }
263 }
264 }
265
266
267
268
269
270
271 func TestCandidateFallback(t *testing.T) {
272 tests := []pb.Message{
273 {From: 2, To: 1, Term: 1, Type: pb.MsgApp},
274 {From: 2, To: 1, Term: 2, Type: pb.MsgApp},
275 }
276 for i, tt := range tests {
277 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
278 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
279 if r.state != StateCandidate {
280 t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate)
281 }
282
283 r.Step(tt)
284
285 if g := r.state; g != StateFollower {
286 t.Errorf("#%d: state = %s, want %s", i, g, StateFollower)
287 }
288 if g := r.Term; g != tt.Term {
289 t.Errorf("#%d: term = %d, want %d", i, g, tt.Term)
290 }
291 }
292 }
293
294 func TestFollowerElectionTimeoutRandomized(t *testing.T) {
295 SetLogger(discardLogger)
296 defer SetLogger(defaultLogger)
297 testNonleaderElectionTimeoutRandomized(t, StateFollower)
298 }
299 func TestCandidateElectionTimeoutRandomized(t *testing.T) {
300 SetLogger(discardLogger)
301 defer SetLogger(defaultLogger)
302 testNonleaderElectionTimeoutRandomized(t, StateCandidate)
303 }
304
305
306
307
308 func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
309 et := 10
310 r := newTestRaft(1, et, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
311 timeouts := make(map[int]bool)
312 for round := 0; round < 50*et; round++ {
313 switch state {
314 case StateFollower:
315 r.becomeFollower(r.Term+1, 2)
316 case StateCandidate:
317 r.becomeCandidate()
318 }
319
320 time := 0
321 for len(r.readMessages()) == 0 {
322 r.tick()
323 time++
324 }
325 timeouts[time] = true
326 }
327
328 for d := et + 1; d < 2*et; d++ {
329 if !timeouts[d] {
330 t.Errorf("timeout in %d ticks should happen", d)
331 }
332 }
333 }
334
335 func TestFollowersElectionTimeoutNonconflict(t *testing.T) {
336 SetLogger(discardLogger)
337 defer SetLogger(defaultLogger)
338 testNonleadersElectionTimeoutNonconflict(t, StateFollower)
339 }
340 func TestCandidatesElectionTimeoutNonconflict(t *testing.T) {
341 SetLogger(discardLogger)
342 defer SetLogger(defaultLogger)
343 testNonleadersElectionTimeoutNonconflict(t, StateCandidate)
344 }
345
346
347
348
349
350 func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
351 et := 10
352 size := 5
353 rs := make([]*raft, size)
354 ids := idsBySize(size)
355 for k := range rs {
356 rs[k] = newTestRaft(ids[k], et, 1, newTestMemoryStorage(withPeers(ids...)))
357 }
358 conflicts := 0
359 for round := 0; round < 1000; round++ {
360 for _, r := range rs {
361 switch state {
362 case StateFollower:
363 r.becomeFollower(r.Term+1, None)
364 case StateCandidate:
365 r.becomeCandidate()
366 }
367 }
368
369 timeoutNum := 0
370 for timeoutNum == 0 {
371 for _, r := range rs {
372 r.tick()
373 if len(r.readMessages()) > 0 {
374 timeoutNum++
375 }
376 }
377 }
378
379 if timeoutNum > 1 {
380 conflicts++
381 }
382 }
383
384 if g := float64(conflicts) / 1000; g > 0.3 {
385 t.Errorf("probability of conflicts = %v, want <= 0.3", g)
386 }
387 }
388
389
390
391
392
393
394
395
396
397 func TestLeaderStartReplication(t *testing.T) {
398 s := newTestMemoryStorage(withPeers(1, 2, 3))
399 r := newTestRaft(1, 10, 1, s)
400 r.becomeCandidate()
401 r.becomeLeader()
402 commitNoopEntry(r, s)
403 li := r.raftLog.lastIndex()
404
405 ents := []pb.Entry{{Data: []byte("some data")}}
406 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: ents})
407
408 if g := r.raftLog.lastIndex(); g != li+1 {
409 t.Errorf("lastIndex = %d, want %d", g, li+1)
410 }
411 if g := r.raftLog.committed; g != li {
412 t.Errorf("committed = %d, want %d", g, li)
413 }
414 msgs := r.readMessages()
415 sort.Sort(messageSlice(msgs))
416 wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
417 wmsgs := []pb.Message{
418 {From: 1, To: 2, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
419 {From: 1, To: 3, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
420 }
421 if !reflect.DeepEqual(msgs, wmsgs) {
422 t.Errorf("msgs = %+v, want %+v", msgs, wmsgs)
423 }
424 if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, wents) {
425 t.Errorf("ents = %+v, want %+v", g, wents)
426 }
427 }
428
429
430
431
432
433
434
435
436 func TestLeaderCommitEntry(t *testing.T) {
437 s := newTestMemoryStorage(withPeers(1, 2, 3))
438 r := newTestRaft(1, 10, 1, s)
439 r.becomeCandidate()
440 r.becomeLeader()
441 commitNoopEntry(r, s)
442 li := r.raftLog.lastIndex()
443 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
444
445 for _, m := range r.readMessages() {
446 r.Step(acceptAndReply(m))
447 }
448
449 if g := r.raftLog.committed; g != li+1 {
450 t.Errorf("committed = %d, want %d", g, li+1)
451 }
452 wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
453 if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
454 t.Errorf("nextEnts = %+v, want %+v", g, wents)
455 }
456 msgs := r.readMessages()
457 sort.Sort(messageSlice(msgs))
458 for i, m := range msgs {
459 if w := uint64(i + 2); m.To != w {
460 t.Errorf("to = %x, want %x", m.To, w)
461 }
462 if m.Type != pb.MsgApp {
463 t.Errorf("type = %v, want %v", m.Type, pb.MsgApp)
464 }
465 if m.Commit != li+1 {
466 t.Errorf("commit = %d, want %d", m.Commit, li+1)
467 }
468 }
469 }
470
471
472
473
474 func TestLeaderAcknowledgeCommit(t *testing.T) {
475 tests := []struct {
476 size int
477 acceptors map[uint64]bool
478 wack bool
479 }{
480 {1, nil, true},
481 {3, nil, false},
482 {3, map[uint64]bool{2: true}, true},
483 {3, map[uint64]bool{2: true, 3: true}, true},
484 {5, nil, false},
485 {5, map[uint64]bool{2: true}, false},
486 {5, map[uint64]bool{2: true, 3: true}, true},
487 {5, map[uint64]bool{2: true, 3: true, 4: true}, true},
488 {5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, true},
489 }
490 for i, tt := range tests {
491 s := newTestMemoryStorage(withPeers(idsBySize(tt.size)...))
492 r := newTestRaft(1, 10, 1, s)
493 r.becomeCandidate()
494 r.becomeLeader()
495 commitNoopEntry(r, s)
496 li := r.raftLog.lastIndex()
497 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
498
499 for _, m := range r.readMessages() {
500 if tt.acceptors[m.To] {
501 r.Step(acceptAndReply(m))
502 }
503 }
504
505 if g := r.raftLog.committed > li; g != tt.wack {
506 t.Errorf("#%d: ack commit = %v, want %v", i, g, tt.wack)
507 }
508 }
509 }
510
511
512
513
514
515
516 func TestLeaderCommitPrecedingEntries(t *testing.T) {
517 tests := [][]pb.Entry{
518 {},
519 {{Term: 2, Index: 1}},
520 {{Term: 1, Index: 1}, {Term: 2, Index: 2}},
521 {{Term: 1, Index: 1}},
522 }
523 for i, tt := range tests {
524 storage := newTestMemoryStorage(withPeers(1, 2, 3))
525 storage.Append(tt)
526 r := newTestRaft(1, 10, 1, storage)
527 r.loadState(pb.HardState{Term: 2})
528 r.becomeCandidate()
529 r.becomeLeader()
530 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
531
532 for _, m := range r.readMessages() {
533 r.Step(acceptAndReply(m))
534 }
535
536 li := uint64(len(tt))
537 wents := append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")})
538 if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
539 t.Errorf("#%d: ents = %+v, want %+v", i, g, wents)
540 }
541 }
542 }
543
544
545
546
547 func TestFollowerCommitEntry(t *testing.T) {
548 tests := []struct {
549 ents []pb.Entry
550 commit uint64
551 }{
552 {
553 []pb.Entry{
554 {Term: 1, Index: 1, Data: []byte("some data")},
555 },
556 1,
557 },
558 {
559 []pb.Entry{
560 {Term: 1, Index: 1, Data: []byte("some data")},
561 {Term: 1, Index: 2, Data: []byte("some data2")},
562 },
563 2,
564 },
565 {
566 []pb.Entry{
567 {Term: 1, Index: 1, Data: []byte("some data2")},
568 {Term: 1, Index: 2, Data: []byte("some data")},
569 },
570 2,
571 },
572 {
573 []pb.Entry{
574 {Term: 1, Index: 1, Data: []byte("some data")},
575 {Term: 1, Index: 2, Data: []byte("some data2")},
576 },
577 1,
578 },
579 }
580 for i, tt := range tests {
581 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
582 r.becomeFollower(1, 2)
583
584 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
585
586 if g := r.raftLog.committed; g != tt.commit {
587 t.Errorf("#%d: committed = %d, want %d", i, g, tt.commit)
588 }
589 wents := tt.ents[:int(tt.commit)]
590 if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
591 t.Errorf("#%d: nextEnts = %v, want %v", i, g, wents)
592 }
593 }
594 }
595
596
597
598
599
600
601 func TestFollowerCheckMsgApp(t *testing.T) {
602 ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
603 tests := []struct {
604 term uint64
605 index uint64
606 windex uint64
607 wreject bool
608 wrejectHint uint64
609 wlogterm uint64
610 }{
611
612 {0, 0, 1, false, 0, 0},
613 {ents[0].Term, ents[0].Index, 1, false, 0, 0},
614
615 {ents[1].Term, ents[1].Index, 2, false, 0, 0},
616
617
618 {ents[0].Term, ents[1].Index, ents[1].Index, true, 1, 1},
619
620 {ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2, 2},
621 }
622 for i, tt := range tests {
623 storage := newTestMemoryStorage(withPeers(1, 2, 3))
624 storage.Append(ents)
625 r := newTestRaft(1, 10, 1, storage)
626 r.loadState(pb.HardState{Commit: 1})
627 r.becomeFollower(2, 2)
628
629 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index})
630
631 msgs := r.readMessages()
632 wmsgs := []pb.Message{
633 {From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint, LogTerm: tt.wlogterm},
634 }
635 if !reflect.DeepEqual(msgs, wmsgs) {
636 t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs)
637 }
638 }
639 }
640
641
642
643
644
645
646 func TestFollowerAppendEntries(t *testing.T) {
647 tests := []struct {
648 index, term uint64
649 ents []pb.Entry
650 wents []pb.Entry
651 wunstable []pb.Entry
652 }{
653 {
654 2, 2,
655 []pb.Entry{{Term: 3, Index: 3}},
656 []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}},
657 []pb.Entry{{Term: 3, Index: 3}},
658 },
659 {
660 1, 1,
661 []pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
662 []pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 2}, {Term: 4, Index: 3}},
663 []pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
664 },
665 {
666 0, 0,
667 []pb.Entry{{Term: 1, Index: 1}},
668 []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}},
669 nil,
670 },
671 {
672 0, 0,
673 []pb.Entry{{Term: 3, Index: 1}},
674 []pb.Entry{{Term: 3, Index: 1}},
675 []pb.Entry{{Term: 3, Index: 1}},
676 },
677 }
678 for i, tt := range tests {
679 storage := newTestMemoryStorage(withPeers(1, 2, 3))
680 storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}})
681 r := newTestRaft(1, 10, 1, storage)
682 r.becomeFollower(2, 2)
683
684 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
685
686 if g := r.raftLog.allEntries(); !reflect.DeepEqual(g, tt.wents) {
687 t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents)
688 }
689 if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, tt.wunstable) {
690 t.Errorf("#%d: unstableEnts = %+v, want %+v", i, g, tt.wunstable)
691 }
692 }
693 }
694
695
696
697
698 func TestLeaderSyncFollowerLog(t *testing.T) {
699 ents := []pb.Entry{
700 {},
701 {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
702 {Term: 4, Index: 4}, {Term: 4, Index: 5},
703 {Term: 5, Index: 6}, {Term: 5, Index: 7},
704 {Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
705 }
706 term := uint64(8)
707 tests := [][]pb.Entry{
708 {
709 {},
710 {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
711 {Term: 4, Index: 4}, {Term: 4, Index: 5},
712 {Term: 5, Index: 6}, {Term: 5, Index: 7},
713 {Term: 6, Index: 8}, {Term: 6, Index: 9},
714 },
715 {
716 {},
717 {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
718 {Term: 4, Index: 4},
719 },
720 {
721 {},
722 {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
723 {Term: 4, Index: 4}, {Term: 4, Index: 5},
724 {Term: 5, Index: 6}, {Term: 5, Index: 7},
725 {Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10}, {Term: 6, Index: 11},
726 },
727 {
728 {},
729 {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
730 {Term: 4, Index: 4}, {Term: 4, Index: 5},
731 {Term: 5, Index: 6}, {Term: 5, Index: 7},
732 {Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
733 {Term: 7, Index: 11}, {Term: 7, Index: 12},
734 },
735 {
736 {},
737 {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
738 {Term: 4, Index: 4}, {Term: 4, Index: 5}, {Term: 4, Index: 6}, {Term: 4, Index: 7},
739 },
740 {
741 {},
742 {Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
743 {Term: 2, Index: 4}, {Term: 2, Index: 5}, {Term: 2, Index: 6},
744 {Term: 3, Index: 7}, {Term: 3, Index: 8}, {Term: 3, Index: 9}, {Term: 3, Index: 10}, {Term: 3, Index: 11},
745 },
746 }
747 for i, tt := range tests {
748 leadStorage := newTestMemoryStorage(withPeers(1, 2, 3))
749 leadStorage.Append(ents)
750 lead := newTestRaft(1, 10, 1, leadStorage)
751 lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
752 followerStorage := newTestMemoryStorage(withPeers(1, 2, 3))
753 followerStorage.Append(tt)
754 follower := newTestRaft(2, 10, 1, followerStorage)
755 follower.loadState(pb.HardState{Term: term - 1})
756
757
758
759 n := newNetwork(lead, follower, nopStepper)
760 n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
761
762
763 n.send(pb.Message{From: 3, To: 1, Type: pb.MsgVoteResp, Term: term + 1})
764
765 n.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
766
767 if g := diffu(ltoa(lead.raftLog), ltoa(follower.raftLog)); g != "" {
768 t.Errorf("#%d: log diff:\n%s", i, g)
769 }
770 }
771 }
772
773
774
775
776 func TestVoteRequest(t *testing.T) {
777 tests := []struct {
778 ents []pb.Entry
779 wterm uint64
780 }{
781 {[]pb.Entry{{Term: 1, Index: 1}}, 2},
782 {[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
783 }
784 for j, tt := range tests {
785 r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
786 r.Step(pb.Message{
787 From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
788 })
789 r.readMessages()
790
791 for i := 1; i < r.electionTimeout*2; i++ {
792 r.tickElection()
793 }
794
795 msgs := r.readMessages()
796 sort.Sort(messageSlice(msgs))
797 if len(msgs) != 2 {
798 t.Fatalf("#%d: len(msg) = %d, want %d", j, len(msgs), 2)
799 }
800 for i, m := range msgs {
801 if m.Type != pb.MsgVote {
802 t.Errorf("#%d: msgType = %d, want %d", i, m.Type, pb.MsgVote)
803 }
804 if m.To != uint64(i+2) {
805 t.Errorf("#%d: to = %d, want %d", i, m.To, i+2)
806 }
807 if m.Term != tt.wterm {
808 t.Errorf("#%d: term = %d, want %d", i, m.Term, tt.wterm)
809 }
810 windex, wlogterm := tt.ents[len(tt.ents)-1].Index, tt.ents[len(tt.ents)-1].Term
811 if m.Index != windex {
812 t.Errorf("#%d: index = %d, want %d", i, m.Index, windex)
813 }
814 if m.LogTerm != wlogterm {
815 t.Errorf("#%d: logterm = %d, want %d", i, m.LogTerm, wlogterm)
816 }
817 }
818 }
819 }
820
821
822
823
824 func TestVoter(t *testing.T) {
825 tests := []struct {
826 ents []pb.Entry
827 logterm uint64
828 index uint64
829
830 wreject bool
831 }{
832
833 {[]pb.Entry{{Term: 1, Index: 1}}, 1, 1, false},
834 {[]pb.Entry{{Term: 1, Index: 1}}, 1, 2, false},
835 {[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
836
837 {[]pb.Entry{{Term: 1, Index: 1}}, 2, 1, false},
838 {[]pb.Entry{{Term: 1, Index: 1}}, 2, 2, false},
839 {[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 2, 1, false},
840
841 {[]pb.Entry{{Term: 2, Index: 1}}, 1, 1, true},
842 {[]pb.Entry{{Term: 2, Index: 1}}, 1, 2, true},
843 {[]pb.Entry{{Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
844 }
845 for i, tt := range tests {
846 storage := newTestMemoryStorage(withPeers(1, 2))
847 storage.Append(tt.ents)
848 r := newTestRaft(1, 10, 1, storage)
849
850 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
851
852 msgs := r.readMessages()
853 if len(msgs) != 1 {
854 t.Fatalf("#%d: len(msg) = %d, want %d", i, len(msgs), 1)
855 }
856 m := msgs[0]
857 if m.Type != pb.MsgVoteResp {
858 t.Errorf("#%d: msgType = %d, want %d", i, m.Type, pb.MsgVoteResp)
859 }
860 if m.Reject != tt.wreject {
861 t.Errorf("#%d: reject = %t, want %t", i, m.Reject, tt.wreject)
862 }
863 }
864 }
865
866
867
868
869 func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
870 ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
871 tests := []struct {
872 index uint64
873 wcommit uint64
874 }{
875
876 {1, 0},
877 {2, 0},
878
879 {3, 3},
880 }
881 for i, tt := range tests {
882 storage := newTestMemoryStorage(withPeers(1, 2))
883 storage.Append(ents)
884 r := newTestRaft(1, 10, 1, storage)
885 r.loadState(pb.HardState{Term: 2})
886
887 r.becomeCandidate()
888 r.becomeLeader()
889 r.readMessages()
890
891 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
892
893 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Term: r.Term, Index: tt.index})
894 if r.raftLog.committed != tt.wcommit {
895 t.Errorf("#%d: commit = %d, want %d", i, r.raftLog.committed, tt.wcommit)
896 }
897 }
898 }
899
900 type messageSlice []pb.Message
901
902 func (s messageSlice) Len() int { return len(s) }
903 func (s messageSlice) Less(i, j int) bool { return fmt.Sprint(s[i]) < fmt.Sprint(s[j]) }
904 func (s messageSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
905
906 func commitNoopEntry(r *raft, s *MemoryStorage) {
907 if r.state != StateLeader {
908 panic("it should only be used when it is the leader")
909 }
910 r.bcastAppend()
911
912 msgs := r.readMessages()
913 for _, m := range msgs {
914 if m.Type != pb.MsgApp || len(m.Entries) != 1 || m.Entries[0].Data != nil {
915 panic("not a message to append noop entry")
916 }
917 r.Step(acceptAndReply(m))
918 }
919
920 r.readMessages()
921 s.Append(r.raftLog.unstableEntries())
922 r.raftLog.appliedTo(r.raftLog.committed)
923 r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
924 }
925
926 func acceptAndReply(m pb.Message) pb.Message {
927 if m.Type != pb.MsgApp {
928 panic("type should be MsgApp")
929 }
930 return pb.Message{
931 From: m.To,
932 To: m.From,
933 Term: m.Term,
934 Type: pb.MsgAppResp,
935 Index: m.Index + uint64(len(m.Entries)),
936 }
937 }
938
View as plain text