...

Source file src/go.etcd.io/etcd/raft/v3/raft_paper_test.go

Documentation: go.etcd.io/etcd/raft/v3

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  /*
    16  This file contains tests which verify that the scenarios described
    17  in the raft paper (https://raft.github.io/raft.pdf) are
    18  handled by the raft implementation correctly. Each test focuses on
    19  several sentences written in the paper. This could help us to prevent
    20  most implementation bugs.
    21  
    22  Each test is composed of three parts: init, test and check.
    23  Init part uses simple and understandable way to simulate the init state.
    24  Test part uses Step function to generate the scenario. Check part checks
    25  outgoing messages and state.
    26  */
    27  package raft
    28  
    29  import (
    30  	"fmt"
    31  	"reflect"
    32  	"sort"
    33  	"testing"
    34  
    35  	pb "go.etcd.io/etcd/raft/v3/raftpb"
    36  )
    37  
    38  func TestFollowerUpdateTermFromMessage(t *testing.T) {
    39  	testUpdateTermFromMessage(t, StateFollower)
    40  }
    41  func TestCandidateUpdateTermFromMessage(t *testing.T) {
    42  	testUpdateTermFromMessage(t, StateCandidate)
    43  }
    44  func TestLeaderUpdateTermFromMessage(t *testing.T) {
    45  	testUpdateTermFromMessage(t, StateLeader)
    46  }
    47  
    48  // testUpdateTermFromMessage tests that if one server’s current term is
    49  // smaller than the other’s, then it updates its current term to the larger
    50  // value. If a candidate or leader discovers that its term is out of date,
    51  // it immediately reverts to follower state.
    52  // Reference: section 5.1
    53  func testUpdateTermFromMessage(t *testing.T, state StateType) {
    54  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
    55  	switch state {
    56  	case StateFollower:
    57  		r.becomeFollower(1, 2)
    58  	case StateCandidate:
    59  		r.becomeCandidate()
    60  	case StateLeader:
    61  		r.becomeCandidate()
    62  		r.becomeLeader()
    63  	}
    64  
    65  	r.Step(pb.Message{Type: pb.MsgApp, Term: 2})
    66  
    67  	if r.Term != 2 {
    68  		t.Errorf("term = %d, want %d", r.Term, 2)
    69  	}
    70  	if r.state != StateFollower {
    71  		t.Errorf("state = %v, want %v", r.state, StateFollower)
    72  	}
    73  }
    74  
    75  // TestRejectStaleTermMessage tests that if a server receives a request with
    76  // a stale term number, it rejects the request.
    77  // Our implementation ignores the request instead.
    78  // Reference: section 5.1
    79  func TestRejectStaleTermMessage(t *testing.T) {
    80  	called := false
    81  	fakeStep := func(r *raft, m pb.Message) error {
    82  		called = true
    83  		return nil
    84  	}
    85  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
    86  	r.step = fakeStep
    87  	r.loadState(pb.HardState{Term: 2})
    88  
    89  	r.Step(pb.Message{Type: pb.MsgApp, Term: r.Term - 1})
    90  
    91  	if called {
    92  		t.Errorf("stepFunc called = %v, want %v", called, false)
    93  	}
    94  }
    95  
    96  // TestStartAsFollower tests that when servers start up, they begin as followers.
    97  // Reference: section 5.2
    98  func TestStartAsFollower(t *testing.T) {
    99  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   100  	if r.state != StateFollower {
   101  		t.Errorf("state = %s, want %s", r.state, StateFollower)
   102  	}
   103  }
   104  
   105  // TestLeaderBcastBeat tests that if the leader receives a heartbeat tick,
   106  // it will send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries
   107  // as heartbeat to all followers.
   108  // Reference: section 5.2
   109  func TestLeaderBcastBeat(t *testing.T) {
   110  	// heartbeat interval
   111  	hi := 1
   112  	r := newTestRaft(1, 10, hi, newTestMemoryStorage(withPeers(1, 2, 3)))
   113  	r.becomeCandidate()
   114  	r.becomeLeader()
   115  	for i := 0; i < 10; i++ {
   116  		mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
   117  	}
   118  
   119  	for i := 0; i < hi; i++ {
   120  		r.tick()
   121  	}
   122  
   123  	msgs := r.readMessages()
   124  	sort.Sort(messageSlice(msgs))
   125  	wmsgs := []pb.Message{
   126  		{From: 1, To: 2, Term: 1, Type: pb.MsgHeartbeat},
   127  		{From: 1, To: 3, Term: 1, Type: pb.MsgHeartbeat},
   128  	}
   129  	if !reflect.DeepEqual(msgs, wmsgs) {
   130  		t.Errorf("msgs = %v, want %v", msgs, wmsgs)
   131  	}
   132  }
   133  
   134  func TestFollowerStartElection(t *testing.T) {
   135  	testNonleaderStartElection(t, StateFollower)
   136  }
   137  func TestCandidateStartNewElection(t *testing.T) {
   138  	testNonleaderStartElection(t, StateCandidate)
   139  }
   140  
   141  // testNonleaderStartElection tests that if a follower receives no communication
   142  // over election timeout, it begins an election to choose a new leader. It
   143  // increments its current term and transitions to candidate state. It then
   144  // votes for itself and issues RequestVote RPCs in parallel to each of the
   145  // other servers in the cluster.
   146  // Reference: section 5.2
   147  // Also if a candidate fails to obtain a majority, it will time out and
   148  // start a new election by incrementing its term and initiating another
   149  // round of RequestVote RPCs.
   150  // Reference: section 5.2
   151  func testNonleaderStartElection(t *testing.T, state StateType) {
   152  	// election timeout
   153  	et := 10
   154  	r := newTestRaft(1, et, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   155  	switch state {
   156  	case StateFollower:
   157  		r.becomeFollower(1, 2)
   158  	case StateCandidate:
   159  		r.becomeCandidate()
   160  	}
   161  
   162  	for i := 1; i < 2*et; i++ {
   163  		r.tick()
   164  	}
   165  
   166  	if r.Term != 2 {
   167  		t.Errorf("term = %d, want 2", r.Term)
   168  	}
   169  	if r.state != StateCandidate {
   170  		t.Errorf("state = %s, want %s", r.state, StateCandidate)
   171  	}
   172  	if !r.prs.Votes[r.id] {
   173  		t.Errorf("vote for self = false, want true")
   174  	}
   175  	msgs := r.readMessages()
   176  	sort.Sort(messageSlice(msgs))
   177  	wmsgs := []pb.Message{
   178  		{From: 1, To: 2, Term: 2, Type: pb.MsgVote},
   179  		{From: 1, To: 3, Term: 2, Type: pb.MsgVote},
   180  	}
   181  	if !reflect.DeepEqual(msgs, wmsgs) {
   182  		t.Errorf("msgs = %v, want %v", msgs, wmsgs)
   183  	}
   184  }
   185  
   186  // TestLeaderElectionInOneRoundRPC tests all cases that may happen in
   187  // leader election during one round of RequestVote RPC:
   188  // a) it wins the election
   189  // b) it loses the election
   190  // c) it is unclear about the result
   191  // Reference: section 5.2
   192  func TestLeaderElectionInOneRoundRPC(t *testing.T) {
   193  	tests := []struct {
   194  		size  int
   195  		votes map[uint64]bool
   196  		state StateType
   197  	}{
   198  		// win the election when receiving votes from a majority of the servers
   199  		{1, map[uint64]bool{}, StateLeader},
   200  		{3, map[uint64]bool{2: true, 3: true}, StateLeader},
   201  		{3, map[uint64]bool{2: true}, StateLeader},
   202  		{5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, StateLeader},
   203  		{5, map[uint64]bool{2: true, 3: true, 4: true}, StateLeader},
   204  		{5, map[uint64]bool{2: true, 3: true}, StateLeader},
   205  
   206  		// return to follower state if it receives vote denial from a majority
   207  		{3, map[uint64]bool{2: false, 3: false}, StateFollower},
   208  		{5, map[uint64]bool{2: false, 3: false, 4: false, 5: false}, StateFollower},
   209  		{5, map[uint64]bool{2: true, 3: false, 4: false, 5: false}, StateFollower},
   210  
   211  		// stay in candidate if it does not obtain the majority
   212  		{3, map[uint64]bool{}, StateCandidate},
   213  		{5, map[uint64]bool{2: true}, StateCandidate},
   214  		{5, map[uint64]bool{2: false, 3: false}, StateCandidate},
   215  		{5, map[uint64]bool{}, StateCandidate},
   216  	}
   217  	for i, tt := range tests {
   218  		r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(idsBySize(tt.size)...)))
   219  
   220  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   221  		for id, vote := range tt.votes {
   222  			r.Step(pb.Message{From: id, To: 1, Term: r.Term, Type: pb.MsgVoteResp, Reject: !vote})
   223  		}
   224  
   225  		if r.state != tt.state {
   226  			t.Errorf("#%d: state = %s, want %s", i, r.state, tt.state)
   227  		}
   228  		if g := r.Term; g != 1 {
   229  			t.Errorf("#%d: term = %d, want %d", i, g, 1)
   230  		}
   231  	}
   232  }
   233  
   234  // TestFollowerVote tests that each follower will vote for at most one
   235  // candidate in a given term, on a first-come-first-served basis.
   236  // Reference: section 5.2
   237  func TestFollowerVote(t *testing.T) {
   238  	tests := []struct {
   239  		vote    uint64
   240  		nvote   uint64
   241  		wreject bool
   242  	}{
   243  		{None, 1, false},
   244  		{None, 2, false},
   245  		{1, 1, false},
   246  		{2, 2, false},
   247  		{1, 2, true},
   248  		{2, 1, true},
   249  	}
   250  	for i, tt := range tests {
   251  		r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   252  		r.loadState(pb.HardState{Term: 1, Vote: tt.vote})
   253  
   254  		r.Step(pb.Message{From: tt.nvote, To: 1, Term: 1, Type: pb.MsgVote})
   255  
   256  		msgs := r.readMessages()
   257  		wmsgs := []pb.Message{
   258  			{From: 1, To: tt.nvote, Term: 1, Type: pb.MsgVoteResp, Reject: tt.wreject},
   259  		}
   260  		if !reflect.DeepEqual(msgs, wmsgs) {
   261  			t.Errorf("#%d: msgs = %v, want %v", i, msgs, wmsgs)
   262  		}
   263  	}
   264  }
   265  
   266  // TestCandidateFallback tests that while waiting for votes,
   267  // if a candidate receives an AppendEntries RPC from another server claiming
   268  // to be leader whose term is at least as large as the candidate's current term,
   269  // it recognizes the leader as legitimate and returns to follower state.
   270  // Reference: section 5.2
   271  func TestCandidateFallback(t *testing.T) {
   272  	tests := []pb.Message{
   273  		{From: 2, To: 1, Term: 1, Type: pb.MsgApp},
   274  		{From: 2, To: 1, Term: 2, Type: pb.MsgApp},
   275  	}
   276  	for i, tt := range tests {
   277  		r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   278  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   279  		if r.state != StateCandidate {
   280  			t.Fatalf("unexpected state = %s, want %s", r.state, StateCandidate)
   281  		}
   282  
   283  		r.Step(tt)
   284  
   285  		if g := r.state; g != StateFollower {
   286  			t.Errorf("#%d: state = %s, want %s", i, g, StateFollower)
   287  		}
   288  		if g := r.Term; g != tt.Term {
   289  			t.Errorf("#%d: term = %d, want %d", i, g, tt.Term)
   290  		}
   291  	}
   292  }
   293  
   294  func TestFollowerElectionTimeoutRandomized(t *testing.T) {
   295  	SetLogger(discardLogger)
   296  	defer SetLogger(defaultLogger)
   297  	testNonleaderElectionTimeoutRandomized(t, StateFollower)
   298  }
   299  func TestCandidateElectionTimeoutRandomized(t *testing.T) {
   300  	SetLogger(discardLogger)
   301  	defer SetLogger(defaultLogger)
   302  	testNonleaderElectionTimeoutRandomized(t, StateCandidate)
   303  }
   304  
   305  // testNonleaderElectionTimeoutRandomized tests that election timeout for
   306  // follower or candidate is randomized.
   307  // Reference: section 5.2
   308  func testNonleaderElectionTimeoutRandomized(t *testing.T, state StateType) {
   309  	et := 10
   310  	r := newTestRaft(1, et, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   311  	timeouts := make(map[int]bool)
   312  	for round := 0; round < 50*et; round++ {
   313  		switch state {
   314  		case StateFollower:
   315  			r.becomeFollower(r.Term+1, 2)
   316  		case StateCandidate:
   317  			r.becomeCandidate()
   318  		}
   319  
   320  		time := 0
   321  		for len(r.readMessages()) == 0 {
   322  			r.tick()
   323  			time++
   324  		}
   325  		timeouts[time] = true
   326  	}
   327  
   328  	for d := et + 1; d < 2*et; d++ {
   329  		if !timeouts[d] {
   330  			t.Errorf("timeout in %d ticks should happen", d)
   331  		}
   332  	}
   333  }
   334  
   335  func TestFollowersElectionTimeoutNonconflict(t *testing.T) {
   336  	SetLogger(discardLogger)
   337  	defer SetLogger(defaultLogger)
   338  	testNonleadersElectionTimeoutNonconflict(t, StateFollower)
   339  }
   340  func TestCandidatesElectionTimeoutNonconflict(t *testing.T) {
   341  	SetLogger(discardLogger)
   342  	defer SetLogger(defaultLogger)
   343  	testNonleadersElectionTimeoutNonconflict(t, StateCandidate)
   344  }
   345  
   346  // testNonleadersElectionTimeoutNonconflict tests that in most cases only a
   347  // single server(follower or candidate) will time out, which reduces the
   348  // likelihood of split vote in the new election.
   349  // Reference: section 5.2
   350  func testNonleadersElectionTimeoutNonconflict(t *testing.T, state StateType) {
   351  	et := 10
   352  	size := 5
   353  	rs := make([]*raft, size)
   354  	ids := idsBySize(size)
   355  	for k := range rs {
   356  		rs[k] = newTestRaft(ids[k], et, 1, newTestMemoryStorage(withPeers(ids...)))
   357  	}
   358  	conflicts := 0
   359  	for round := 0; round < 1000; round++ {
   360  		for _, r := range rs {
   361  			switch state {
   362  			case StateFollower:
   363  				r.becomeFollower(r.Term+1, None)
   364  			case StateCandidate:
   365  				r.becomeCandidate()
   366  			}
   367  		}
   368  
   369  		timeoutNum := 0
   370  		for timeoutNum == 0 {
   371  			for _, r := range rs {
   372  				r.tick()
   373  				if len(r.readMessages()) > 0 {
   374  					timeoutNum++
   375  				}
   376  			}
   377  		}
   378  		// several rafts time out at the same tick
   379  		if timeoutNum > 1 {
   380  			conflicts++
   381  		}
   382  	}
   383  
   384  	if g := float64(conflicts) / 1000; g > 0.3 {
   385  		t.Errorf("probability of conflicts = %v, want <= 0.3", g)
   386  	}
   387  }
   388  
   389  // TestLeaderStartReplication tests that when receiving client proposals,
   390  // the leader appends the proposal to its log as a new entry, then issues
   391  // AppendEntries RPCs in parallel to each of the other servers to replicate
   392  // the entry. Also, when sending an AppendEntries RPC, the leader includes
   393  // the index and term of the entry in its log that immediately precedes
   394  // the new entries.
   395  // Also, it writes the new entry into stable storage.
   396  // Reference: section 5.3
   397  func TestLeaderStartReplication(t *testing.T) {
   398  	s := newTestMemoryStorage(withPeers(1, 2, 3))
   399  	r := newTestRaft(1, 10, 1, s)
   400  	r.becomeCandidate()
   401  	r.becomeLeader()
   402  	commitNoopEntry(r, s)
   403  	li := r.raftLog.lastIndex()
   404  
   405  	ents := []pb.Entry{{Data: []byte("some data")}}
   406  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: ents})
   407  
   408  	if g := r.raftLog.lastIndex(); g != li+1 {
   409  		t.Errorf("lastIndex = %d, want %d", g, li+1)
   410  	}
   411  	if g := r.raftLog.committed; g != li {
   412  		t.Errorf("committed = %d, want %d", g, li)
   413  	}
   414  	msgs := r.readMessages()
   415  	sort.Sort(messageSlice(msgs))
   416  	wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
   417  	wmsgs := []pb.Message{
   418  		{From: 1, To: 2, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
   419  		{From: 1, To: 3, Term: 1, Type: pb.MsgApp, Index: li, LogTerm: 1, Entries: wents, Commit: li},
   420  	}
   421  	if !reflect.DeepEqual(msgs, wmsgs) {
   422  		t.Errorf("msgs = %+v, want %+v", msgs, wmsgs)
   423  	}
   424  	if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, wents) {
   425  		t.Errorf("ents = %+v, want %+v", g, wents)
   426  	}
   427  }
   428  
   429  // TestLeaderCommitEntry tests that when the entry has been safely replicated,
   430  // the leader gives out the applied entries, which can be applied to its state
   431  // machine.
   432  // Also, the leader keeps track of the highest index it knows to be committed,
   433  // and it includes that index in future AppendEntries RPCs so that the other
   434  // servers eventually find out.
   435  // Reference: section 5.3
   436  func TestLeaderCommitEntry(t *testing.T) {
   437  	s := newTestMemoryStorage(withPeers(1, 2, 3))
   438  	r := newTestRaft(1, 10, 1, s)
   439  	r.becomeCandidate()
   440  	r.becomeLeader()
   441  	commitNoopEntry(r, s)
   442  	li := r.raftLog.lastIndex()
   443  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   444  
   445  	for _, m := range r.readMessages() {
   446  		r.Step(acceptAndReply(m))
   447  	}
   448  
   449  	if g := r.raftLog.committed; g != li+1 {
   450  		t.Errorf("committed = %d, want %d", g, li+1)
   451  	}
   452  	wents := []pb.Entry{{Index: li + 1, Term: 1, Data: []byte("some data")}}
   453  	if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
   454  		t.Errorf("nextEnts = %+v, want %+v", g, wents)
   455  	}
   456  	msgs := r.readMessages()
   457  	sort.Sort(messageSlice(msgs))
   458  	for i, m := range msgs {
   459  		if w := uint64(i + 2); m.To != w {
   460  			t.Errorf("to = %x, want %x", m.To, w)
   461  		}
   462  		if m.Type != pb.MsgApp {
   463  			t.Errorf("type = %v, want %v", m.Type, pb.MsgApp)
   464  		}
   465  		if m.Commit != li+1 {
   466  			t.Errorf("commit = %d, want %d", m.Commit, li+1)
   467  		}
   468  	}
   469  }
   470  
   471  // TestLeaderAcknowledgeCommit tests that a log entry is committed once the
   472  // leader that created the entry has replicated it on a majority of the servers.
   473  // Reference: section 5.3
   474  func TestLeaderAcknowledgeCommit(t *testing.T) {
   475  	tests := []struct {
   476  		size      int
   477  		acceptors map[uint64]bool
   478  		wack      bool
   479  	}{
   480  		{1, nil, true},
   481  		{3, nil, false},
   482  		{3, map[uint64]bool{2: true}, true},
   483  		{3, map[uint64]bool{2: true, 3: true}, true},
   484  		{5, nil, false},
   485  		{5, map[uint64]bool{2: true}, false},
   486  		{5, map[uint64]bool{2: true, 3: true}, true},
   487  		{5, map[uint64]bool{2: true, 3: true, 4: true}, true},
   488  		{5, map[uint64]bool{2: true, 3: true, 4: true, 5: true}, true},
   489  	}
   490  	for i, tt := range tests {
   491  		s := newTestMemoryStorage(withPeers(idsBySize(tt.size)...))
   492  		r := newTestRaft(1, 10, 1, s)
   493  		r.becomeCandidate()
   494  		r.becomeLeader()
   495  		commitNoopEntry(r, s)
   496  		li := r.raftLog.lastIndex()
   497  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   498  
   499  		for _, m := range r.readMessages() {
   500  			if tt.acceptors[m.To] {
   501  				r.Step(acceptAndReply(m))
   502  			}
   503  		}
   504  
   505  		if g := r.raftLog.committed > li; g != tt.wack {
   506  			t.Errorf("#%d: ack commit = %v, want %v", i, g, tt.wack)
   507  		}
   508  	}
   509  }
   510  
   511  // TestLeaderCommitPrecedingEntries tests that when leader commits a log entry,
   512  // it also commits all preceding entries in the leader’s log, including
   513  // entries created by previous leaders.
   514  // Also, it applies the entry to its local state machine (in log order).
   515  // Reference: section 5.3
   516  func TestLeaderCommitPrecedingEntries(t *testing.T) {
   517  	tests := [][]pb.Entry{
   518  		{},
   519  		{{Term: 2, Index: 1}},
   520  		{{Term: 1, Index: 1}, {Term: 2, Index: 2}},
   521  		{{Term: 1, Index: 1}},
   522  	}
   523  	for i, tt := range tests {
   524  		storage := newTestMemoryStorage(withPeers(1, 2, 3))
   525  		storage.Append(tt)
   526  		r := newTestRaft(1, 10, 1, storage)
   527  		r.loadState(pb.HardState{Term: 2})
   528  		r.becomeCandidate()
   529  		r.becomeLeader()
   530  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   531  
   532  		for _, m := range r.readMessages() {
   533  			r.Step(acceptAndReply(m))
   534  		}
   535  
   536  		li := uint64(len(tt))
   537  		wents := append(tt, pb.Entry{Term: 3, Index: li + 1}, pb.Entry{Term: 3, Index: li + 2, Data: []byte("some data")})
   538  		if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
   539  			t.Errorf("#%d: ents = %+v, want %+v", i, g, wents)
   540  		}
   541  	}
   542  }
   543  
   544  // TestFollowerCommitEntry tests that once a follower learns that a log entry
   545  // is committed, it applies the entry to its local state machine (in log order).
   546  // Reference: section 5.3
   547  func TestFollowerCommitEntry(t *testing.T) {
   548  	tests := []struct {
   549  		ents   []pb.Entry
   550  		commit uint64
   551  	}{
   552  		{
   553  			[]pb.Entry{
   554  				{Term: 1, Index: 1, Data: []byte("some data")},
   555  			},
   556  			1,
   557  		},
   558  		{
   559  			[]pb.Entry{
   560  				{Term: 1, Index: 1, Data: []byte("some data")},
   561  				{Term: 1, Index: 2, Data: []byte("some data2")},
   562  			},
   563  			2,
   564  		},
   565  		{
   566  			[]pb.Entry{
   567  				{Term: 1, Index: 1, Data: []byte("some data2")},
   568  				{Term: 1, Index: 2, Data: []byte("some data")},
   569  			},
   570  			2,
   571  		},
   572  		{
   573  			[]pb.Entry{
   574  				{Term: 1, Index: 1, Data: []byte("some data")},
   575  				{Term: 1, Index: 2, Data: []byte("some data2")},
   576  			},
   577  			1,
   578  		},
   579  	}
   580  	for i, tt := range tests {
   581  		r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   582  		r.becomeFollower(1, 2)
   583  
   584  		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 1, Entries: tt.ents, Commit: tt.commit})
   585  
   586  		if g := r.raftLog.committed; g != tt.commit {
   587  			t.Errorf("#%d: committed = %d, want %d", i, g, tt.commit)
   588  		}
   589  		wents := tt.ents[:int(tt.commit)]
   590  		if g := r.raftLog.nextEnts(); !reflect.DeepEqual(g, wents) {
   591  			t.Errorf("#%d: nextEnts = %v, want %v", i, g, wents)
   592  		}
   593  	}
   594  }
   595  
   596  // TestFollowerCheckMsgApp tests that if the follower does not find an
   597  // entry in its log with the same index and term as the one in AppendEntries RPC,
   598  // then it refuses the new entries. Otherwise it replies that it accepts the
   599  // append entries.
   600  // Reference: section 5.3
   601  func TestFollowerCheckMsgApp(t *testing.T) {
   602  	ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
   603  	tests := []struct {
   604  		term        uint64
   605  		index       uint64
   606  		windex      uint64
   607  		wreject     bool
   608  		wrejectHint uint64
   609  		wlogterm    uint64
   610  	}{
   611  		// match with committed entries
   612  		{0, 0, 1, false, 0, 0},
   613  		{ents[0].Term, ents[0].Index, 1, false, 0, 0},
   614  		// match with uncommitted entries
   615  		{ents[1].Term, ents[1].Index, 2, false, 0, 0},
   616  
   617  		// unmatch with existing entry
   618  		{ents[0].Term, ents[1].Index, ents[1].Index, true, 1, 1},
   619  		// unexisting entry
   620  		{ents[1].Term + 1, ents[1].Index + 1, ents[1].Index + 1, true, 2, 2},
   621  	}
   622  	for i, tt := range tests {
   623  		storage := newTestMemoryStorage(withPeers(1, 2, 3))
   624  		storage.Append(ents)
   625  		r := newTestRaft(1, 10, 1, storage)
   626  		r.loadState(pb.HardState{Commit: 1})
   627  		r.becomeFollower(2, 2)
   628  
   629  		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index})
   630  
   631  		msgs := r.readMessages()
   632  		wmsgs := []pb.Message{
   633  			{From: 1, To: 2, Type: pb.MsgAppResp, Term: 2, Index: tt.windex, Reject: tt.wreject, RejectHint: tt.wrejectHint, LogTerm: tt.wlogterm},
   634  		}
   635  		if !reflect.DeepEqual(msgs, wmsgs) {
   636  			t.Errorf("#%d: msgs = %+v, want %+v", i, msgs, wmsgs)
   637  		}
   638  	}
   639  }
   640  
   641  // TestFollowerAppendEntries tests that when AppendEntries RPC is valid,
   642  // the follower will delete the existing conflict entry and all that follow it,
   643  // and append any new entries not already in the log.
   644  // Also, it writes the new entry into stable storage.
   645  // Reference: section 5.3
   646  func TestFollowerAppendEntries(t *testing.T) {
   647  	tests := []struct {
   648  		index, term uint64
   649  		ents        []pb.Entry
   650  		wents       []pb.Entry
   651  		wunstable   []pb.Entry
   652  	}{
   653  		{
   654  			2, 2,
   655  			[]pb.Entry{{Term: 3, Index: 3}},
   656  			[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}, {Term: 3, Index: 3}},
   657  			[]pb.Entry{{Term: 3, Index: 3}},
   658  		},
   659  		{
   660  			1, 1,
   661  			[]pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
   662  			[]pb.Entry{{Term: 1, Index: 1}, {Term: 3, Index: 2}, {Term: 4, Index: 3}},
   663  			[]pb.Entry{{Term: 3, Index: 2}, {Term: 4, Index: 3}},
   664  		},
   665  		{
   666  			0, 0,
   667  			[]pb.Entry{{Term: 1, Index: 1}},
   668  			[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}},
   669  			nil,
   670  		},
   671  		{
   672  			0, 0,
   673  			[]pb.Entry{{Term: 3, Index: 1}},
   674  			[]pb.Entry{{Term: 3, Index: 1}},
   675  			[]pb.Entry{{Term: 3, Index: 1}},
   676  		},
   677  	}
   678  	for i, tt := range tests {
   679  		storage := newTestMemoryStorage(withPeers(1, 2, 3))
   680  		storage.Append([]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}})
   681  		r := newTestRaft(1, 10, 1, storage)
   682  		r.becomeFollower(2, 2)
   683  
   684  		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, LogTerm: tt.term, Index: tt.index, Entries: tt.ents})
   685  
   686  		if g := r.raftLog.allEntries(); !reflect.DeepEqual(g, tt.wents) {
   687  			t.Errorf("#%d: ents = %+v, want %+v", i, g, tt.wents)
   688  		}
   689  		if g := r.raftLog.unstableEntries(); !reflect.DeepEqual(g, tt.wunstable) {
   690  			t.Errorf("#%d: unstableEnts = %+v, want %+v", i, g, tt.wunstable)
   691  		}
   692  	}
   693  }
   694  
   695  // TestLeaderSyncFollowerLog tests that the leader could bring a follower's log
   696  // into consistency with its own.
   697  // Reference: section 5.3, figure 7
   698  func TestLeaderSyncFollowerLog(t *testing.T) {
   699  	ents := []pb.Entry{
   700  		{},
   701  		{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
   702  		{Term: 4, Index: 4}, {Term: 4, Index: 5},
   703  		{Term: 5, Index: 6}, {Term: 5, Index: 7},
   704  		{Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
   705  	}
   706  	term := uint64(8)
   707  	tests := [][]pb.Entry{
   708  		{
   709  			{},
   710  			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
   711  			{Term: 4, Index: 4}, {Term: 4, Index: 5},
   712  			{Term: 5, Index: 6}, {Term: 5, Index: 7},
   713  			{Term: 6, Index: 8}, {Term: 6, Index: 9},
   714  		},
   715  		{
   716  			{},
   717  			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
   718  			{Term: 4, Index: 4},
   719  		},
   720  		{
   721  			{},
   722  			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
   723  			{Term: 4, Index: 4}, {Term: 4, Index: 5},
   724  			{Term: 5, Index: 6}, {Term: 5, Index: 7},
   725  			{Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10}, {Term: 6, Index: 11},
   726  		},
   727  		{
   728  			{},
   729  			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
   730  			{Term: 4, Index: 4}, {Term: 4, Index: 5},
   731  			{Term: 5, Index: 6}, {Term: 5, Index: 7},
   732  			{Term: 6, Index: 8}, {Term: 6, Index: 9}, {Term: 6, Index: 10},
   733  			{Term: 7, Index: 11}, {Term: 7, Index: 12},
   734  		},
   735  		{
   736  			{},
   737  			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
   738  			{Term: 4, Index: 4}, {Term: 4, Index: 5}, {Term: 4, Index: 6}, {Term: 4, Index: 7},
   739  		},
   740  		{
   741  			{},
   742  			{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3},
   743  			{Term: 2, Index: 4}, {Term: 2, Index: 5}, {Term: 2, Index: 6},
   744  			{Term: 3, Index: 7}, {Term: 3, Index: 8}, {Term: 3, Index: 9}, {Term: 3, Index: 10}, {Term: 3, Index: 11},
   745  		},
   746  	}
   747  	for i, tt := range tests {
   748  		leadStorage := newTestMemoryStorage(withPeers(1, 2, 3))
   749  		leadStorage.Append(ents)
   750  		lead := newTestRaft(1, 10, 1, leadStorage)
   751  		lead.loadState(pb.HardState{Commit: lead.raftLog.lastIndex(), Term: term})
   752  		followerStorage := newTestMemoryStorage(withPeers(1, 2, 3))
   753  		followerStorage.Append(tt)
   754  		follower := newTestRaft(2, 10, 1, followerStorage)
   755  		follower.loadState(pb.HardState{Term: term - 1})
   756  		// It is necessary to have a three-node cluster.
   757  		// The second may have more up-to-date log than the first one, so the
   758  		// first node needs the vote from the third node to become the leader.
   759  		n := newNetwork(lead, follower, nopStepper)
   760  		n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   761  		// The election occurs in the term after the one we loaded with
   762  		// lead.loadState above.
   763  		n.send(pb.Message{From: 3, To: 1, Type: pb.MsgVoteResp, Term: term + 1})
   764  
   765  		n.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
   766  
   767  		if g := diffu(ltoa(lead.raftLog), ltoa(follower.raftLog)); g != "" {
   768  			t.Errorf("#%d: log diff:\n%s", i, g)
   769  		}
   770  	}
   771  }
   772  
   773  // TestVoteRequest tests that the vote request includes information about the candidate’s log
   774  // and are sent to all of the other nodes.
   775  // Reference: section 5.4.1
   776  func TestVoteRequest(t *testing.T) {
   777  	tests := []struct {
   778  		ents  []pb.Entry
   779  		wterm uint64
   780  	}{
   781  		{[]pb.Entry{{Term: 1, Index: 1}}, 2},
   782  		{[]pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}, 3},
   783  	}
   784  	for j, tt := range tests {
   785  		r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   786  		r.Step(pb.Message{
   787  			From: 2, To: 1, Type: pb.MsgApp, Term: tt.wterm - 1, LogTerm: 0, Index: 0, Entries: tt.ents,
   788  		})
   789  		r.readMessages()
   790  
   791  		for i := 1; i < r.electionTimeout*2; i++ {
   792  			r.tickElection()
   793  		}
   794  
   795  		msgs := r.readMessages()
   796  		sort.Sort(messageSlice(msgs))
   797  		if len(msgs) != 2 {
   798  			t.Fatalf("#%d: len(msg) = %d, want %d", j, len(msgs), 2)
   799  		}
   800  		for i, m := range msgs {
   801  			if m.Type != pb.MsgVote {
   802  				t.Errorf("#%d: msgType = %d, want %d", i, m.Type, pb.MsgVote)
   803  			}
   804  			if m.To != uint64(i+2) {
   805  				t.Errorf("#%d: to = %d, want %d", i, m.To, i+2)
   806  			}
   807  			if m.Term != tt.wterm {
   808  				t.Errorf("#%d: term = %d, want %d", i, m.Term, tt.wterm)
   809  			}
   810  			windex, wlogterm := tt.ents[len(tt.ents)-1].Index, tt.ents[len(tt.ents)-1].Term
   811  			if m.Index != windex {
   812  				t.Errorf("#%d: index = %d, want %d", i, m.Index, windex)
   813  			}
   814  			if m.LogTerm != wlogterm {
   815  				t.Errorf("#%d: logterm = %d, want %d", i, m.LogTerm, wlogterm)
   816  			}
   817  		}
   818  	}
   819  }
   820  
   821  // TestVoter tests the voter denies its vote if its own log is more up-to-date
   822  // than that of the candidate.
   823  // Reference: section 5.4.1
   824  func TestVoter(t *testing.T) {
   825  	tests := []struct {
   826  		ents    []pb.Entry
   827  		logterm uint64
   828  		index   uint64
   829  
   830  		wreject bool
   831  	}{
   832  		// same logterm
   833  		{[]pb.Entry{{Term: 1, Index: 1}}, 1, 1, false},
   834  		{[]pb.Entry{{Term: 1, Index: 1}}, 1, 2, false},
   835  		{[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
   836  		// candidate higher logterm
   837  		{[]pb.Entry{{Term: 1, Index: 1}}, 2, 1, false},
   838  		{[]pb.Entry{{Term: 1, Index: 1}}, 2, 2, false},
   839  		{[]pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}}, 2, 1, false},
   840  		// voter higher logterm
   841  		{[]pb.Entry{{Term: 2, Index: 1}}, 1, 1, true},
   842  		{[]pb.Entry{{Term: 2, Index: 1}}, 1, 2, true},
   843  		{[]pb.Entry{{Term: 2, Index: 1}, {Term: 1, Index: 2}}, 1, 1, true},
   844  	}
   845  	for i, tt := range tests {
   846  		storage := newTestMemoryStorage(withPeers(1, 2))
   847  		storage.Append(tt.ents)
   848  		r := newTestRaft(1, 10, 1, storage)
   849  
   850  		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVote, Term: 3, LogTerm: tt.logterm, Index: tt.index})
   851  
   852  		msgs := r.readMessages()
   853  		if len(msgs) != 1 {
   854  			t.Fatalf("#%d: len(msg) = %d, want %d", i, len(msgs), 1)
   855  		}
   856  		m := msgs[0]
   857  		if m.Type != pb.MsgVoteResp {
   858  			t.Errorf("#%d: msgType = %d, want %d", i, m.Type, pb.MsgVoteResp)
   859  		}
   860  		if m.Reject != tt.wreject {
   861  			t.Errorf("#%d: reject = %t, want %t", i, m.Reject, tt.wreject)
   862  		}
   863  	}
   864  }
   865  
   866  // TestLeaderOnlyCommitsLogFromCurrentTerm tests that only log entries from the leader’s
   867  // current term are committed by counting replicas.
   868  // Reference: section 5.4.2
   869  func TestLeaderOnlyCommitsLogFromCurrentTerm(t *testing.T) {
   870  	ents := []pb.Entry{{Term: 1, Index: 1}, {Term: 2, Index: 2}}
   871  	tests := []struct {
   872  		index   uint64
   873  		wcommit uint64
   874  	}{
   875  		// do not commit log entries in previous terms
   876  		{1, 0},
   877  		{2, 0},
   878  		// commit log in current term
   879  		{3, 3},
   880  	}
   881  	for i, tt := range tests {
   882  		storage := newTestMemoryStorage(withPeers(1, 2))
   883  		storage.Append(ents)
   884  		r := newTestRaft(1, 10, 1, storage)
   885  		r.loadState(pb.HardState{Term: 2})
   886  		// become leader at term 3
   887  		r.becomeCandidate()
   888  		r.becomeLeader()
   889  		r.readMessages()
   890  		// propose a entry to current term
   891  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
   892  
   893  		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Term: r.Term, Index: tt.index})
   894  		if r.raftLog.committed != tt.wcommit {
   895  			t.Errorf("#%d: commit = %d, want %d", i, r.raftLog.committed, tt.wcommit)
   896  		}
   897  	}
   898  }
   899  
   900  type messageSlice []pb.Message
   901  
   902  func (s messageSlice) Len() int           { return len(s) }
   903  func (s messageSlice) Less(i, j int) bool { return fmt.Sprint(s[i]) < fmt.Sprint(s[j]) }
   904  func (s messageSlice) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
   905  
   906  func commitNoopEntry(r *raft, s *MemoryStorage) {
   907  	if r.state != StateLeader {
   908  		panic("it should only be used when it is the leader")
   909  	}
   910  	r.bcastAppend()
   911  	// simulate the response of MsgApp
   912  	msgs := r.readMessages()
   913  	for _, m := range msgs {
   914  		if m.Type != pb.MsgApp || len(m.Entries) != 1 || m.Entries[0].Data != nil {
   915  			panic("not a message to append noop entry")
   916  		}
   917  		r.Step(acceptAndReply(m))
   918  	}
   919  	// ignore further messages to refresh followers' commit index
   920  	r.readMessages()
   921  	s.Append(r.raftLog.unstableEntries())
   922  	r.raftLog.appliedTo(r.raftLog.committed)
   923  	r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
   924  }
   925  
   926  func acceptAndReply(m pb.Message) pb.Message {
   927  	if m.Type != pb.MsgApp {
   928  		panic("type should be MsgApp")
   929  	}
   930  	return pb.Message{
   931  		From:  m.To,
   932  		To:    m.From,
   933  		Term:  m.Term,
   934  		Type:  pb.MsgAppResp,
   935  		Index: m.Index + uint64(len(m.Entries)),
   936  	}
   937  }
   938  

View as plain text