...

Source file src/go.etcd.io/etcd/raft/v3/raft_test.go

Documentation: go.etcd.io/etcd/raft/v3

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package raft
    16  
    17  import (
    18  	"bytes"
    19  	"fmt"
    20  	"math"
    21  	"math/rand"
    22  	"reflect"
    23  	"strings"
    24  	"testing"
    25  
    26  	pb "go.etcd.io/etcd/raft/v3/raftpb"
    27  	"go.etcd.io/etcd/raft/v3/tracker"
    28  )
    29  
    30  // nextEnts returns the appliable entries and updates the applied index
    31  func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
    32  	// Transfer all unstable entries to "stable" storage.
    33  	s.Append(r.raftLog.unstableEntries())
    34  	r.raftLog.stableTo(r.raftLog.lastIndex(), r.raftLog.lastTerm())
    35  
    36  	ents = r.raftLog.nextEnts()
    37  	r.raftLog.appliedTo(r.raftLog.committed)
    38  	return ents
    39  }
    40  
    41  func mustAppendEntry(r *raft, ents ...pb.Entry) {
    42  	if !r.appendEntry(ents...) {
    43  		panic("entry unexpectedly dropped")
    44  	}
    45  }
    46  
    47  type stateMachine interface {
    48  	Step(m pb.Message) error
    49  	readMessages() []pb.Message
    50  }
    51  
    52  func (r *raft) readMessages() []pb.Message {
    53  	msgs := r.msgs
    54  	r.msgs = make([]pb.Message, 0)
    55  
    56  	return msgs
    57  }
    58  
    59  func TestProgressLeader(t *testing.T) {
    60  	r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
    61  	r.becomeCandidate()
    62  	r.becomeLeader()
    63  	r.prs.Progress[2].BecomeReplicate()
    64  
    65  	// Send proposals to r1. The first 5 entries should be appended to the log.
    66  	propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
    67  	for i := 0; i < 5; i++ {
    68  		if pr := r.prs.Progress[r.id]; pr.State != tracker.StateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
    69  			t.Errorf("unexpected progress %v", pr)
    70  		}
    71  		if err := r.Step(propMsg); err != nil {
    72  			t.Fatalf("proposal resulted in error: %v", err)
    73  		}
    74  	}
    75  }
    76  
    77  // TestProgressResumeByHeartbeatResp ensures raft.heartbeat reset progress.paused by heartbeat response.
    78  func TestProgressResumeByHeartbeatResp(t *testing.T) {
    79  	r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
    80  	r.becomeCandidate()
    81  	r.becomeLeader()
    82  
    83  	r.prs.Progress[2].ProbeSent = true
    84  
    85  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
    86  	if !r.prs.Progress[2].ProbeSent {
    87  		t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
    88  	}
    89  
    90  	r.prs.Progress[2].BecomeReplicate()
    91  	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
    92  	if r.prs.Progress[2].ProbeSent {
    93  		t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
    94  	}
    95  }
    96  
    97  func TestProgressPaused(t *testing.T) {
    98  	r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
    99  	r.becomeCandidate()
   100  	r.becomeLeader()
   101  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
   102  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
   103  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
   104  
   105  	ms := r.readMessages()
   106  	if len(ms) != 1 {
   107  		t.Errorf("len(ms) = %d, want 1", len(ms))
   108  	}
   109  }
   110  
   111  func TestProgressFlowControl(t *testing.T) {
   112  	cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
   113  	cfg.MaxInflightMsgs = 3
   114  	cfg.MaxSizePerMsg = 2048
   115  	r := newRaft(cfg)
   116  	r.becomeCandidate()
   117  	r.becomeLeader()
   118  
   119  	// Throw away all the messages relating to the initial election.
   120  	r.readMessages()
   121  
   122  	// While node 2 is in probe state, propose a bunch of entries.
   123  	r.prs.Progress[2].BecomeProbe()
   124  	blob := []byte(strings.Repeat("a", 1000))
   125  	for i := 0; i < 10; i++ {
   126  		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
   127  	}
   128  
   129  	ms := r.readMessages()
   130  	// First append has two entries: the empty entry to confirm the
   131  	// election, and the first proposal (only one proposal gets sent
   132  	// because we're in probe state).
   133  	if len(ms) != 1 || ms[0].Type != pb.MsgApp {
   134  		t.Fatalf("expected 1 MsgApp, got %v", ms)
   135  	}
   136  	if len(ms[0].Entries) != 2 {
   137  		t.Fatalf("expected 2 entries, got %d", len(ms[0].Entries))
   138  	}
   139  	if len(ms[0].Entries[0].Data) != 0 || len(ms[0].Entries[1].Data) != 1000 {
   140  		t.Fatalf("unexpected entry sizes: %v", ms[0].Entries)
   141  	}
   142  
   143  	// When this append is acked, we change to replicate state and can
   144  	// send multiple messages at once.
   145  	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[0].Entries[1].Index})
   146  	ms = r.readMessages()
   147  	if len(ms) != 3 {
   148  		t.Fatalf("expected 3 messages, got %d", len(ms))
   149  	}
   150  	for i, m := range ms {
   151  		if m.Type != pb.MsgApp {
   152  			t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
   153  		}
   154  		if len(m.Entries) != 2 {
   155  			t.Errorf("%d: expected 2 entries, got %d", i, len(m.Entries))
   156  		}
   157  	}
   158  
   159  	// Ack all three of those messages together and get the last two
   160  	// messages (containing three entries).
   161  	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: ms[2].Entries[1].Index})
   162  	ms = r.readMessages()
   163  	if len(ms) != 2 {
   164  		t.Fatalf("expected 2 messages, got %d", len(ms))
   165  	}
   166  	for i, m := range ms {
   167  		if m.Type != pb.MsgApp {
   168  			t.Errorf("%d: expected MsgApp, got %s", i, m.Type)
   169  		}
   170  	}
   171  	if len(ms[0].Entries) != 2 {
   172  		t.Errorf("%d: expected 2 entries, got %d", 0, len(ms[0].Entries))
   173  	}
   174  	if len(ms[1].Entries) != 1 {
   175  		t.Errorf("%d: expected 1 entry, got %d", 1, len(ms[1].Entries))
   176  	}
   177  }
   178  
   179  func TestUncommittedEntryLimit(t *testing.T) {
   180  	// Use a relatively large number of entries here to prevent regression of a
   181  	// bug which computed the size before it was fixed. This test would fail
   182  	// with the bug, either because we'd get dropped proposals earlier than we
   183  	// expect them, or because the final tally ends up nonzero. (At the time of
   184  	// writing, the former).
   185  	const maxEntries = 1024
   186  	testEntry := pb.Entry{Data: []byte("testdata")}
   187  	maxEntrySize := maxEntries * PayloadSize(testEntry)
   188  
   189  	if n := PayloadSize(pb.Entry{Data: nil}); n != 0 {
   190  		t.Fatal("entry with no Data must have zero payload size")
   191  	}
   192  
   193  	cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   194  	cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
   195  	cfg.MaxInflightMsgs = 2 * 1024 // avoid interference
   196  	r := newRaft(cfg)
   197  	r.becomeCandidate()
   198  	r.becomeLeader()
   199  	if n := r.uncommittedSize; n != 0 {
   200  		t.Fatalf("expected zero uncommitted size, got %d bytes", n)
   201  	}
   202  
   203  	// Set the two followers to the replicate state. Commit to tail of log.
   204  	const numFollowers = 2
   205  	r.prs.Progress[2].BecomeReplicate()
   206  	r.prs.Progress[3].BecomeReplicate()
   207  	r.uncommittedSize = 0
   208  
   209  	// Send proposals to r1. The first 5 entries should be appended to the log.
   210  	propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{testEntry}}
   211  	propEnts := make([]pb.Entry, maxEntries)
   212  	for i := 0; i < maxEntries; i++ {
   213  		if err := r.Step(propMsg); err != nil {
   214  			t.Fatalf("proposal resulted in error: %v", err)
   215  		}
   216  		propEnts[i] = testEntry
   217  	}
   218  
   219  	// Send one more proposal to r1. It should be rejected.
   220  	if err := r.Step(propMsg); err != ErrProposalDropped {
   221  		t.Fatalf("proposal not dropped: %v", err)
   222  	}
   223  
   224  	// Read messages and reduce the uncommitted size as if we had committed
   225  	// these entries.
   226  	ms := r.readMessages()
   227  	if e := maxEntries * numFollowers; len(ms) != e {
   228  		t.Fatalf("expected %d messages, got %d", e, len(ms))
   229  	}
   230  	r.reduceUncommittedSize(propEnts)
   231  	if r.uncommittedSize != 0 {
   232  		t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize)
   233  	}
   234  
   235  	// Send a single large proposal to r1. Should be accepted even though it
   236  	// pushes us above the limit because we were beneath it before the proposal.
   237  	propEnts = make([]pb.Entry, 2*maxEntries)
   238  	for i := range propEnts {
   239  		propEnts[i] = testEntry
   240  	}
   241  	propMsgLarge := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: propEnts}
   242  	if err := r.Step(propMsgLarge); err != nil {
   243  		t.Fatalf("proposal resulted in error: %v", err)
   244  	}
   245  
   246  	// Send one more proposal to r1. It should be rejected, again.
   247  	if err := r.Step(propMsg); err != ErrProposalDropped {
   248  		t.Fatalf("proposal not dropped: %v", err)
   249  	}
   250  
   251  	// But we can always append an entry with no Data. This is used both for the
   252  	// leader's first empty entry and for auto-transitioning out of joint config
   253  	// states.
   254  	if err := r.Step(
   255  		pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}},
   256  	); err != nil {
   257  		t.Fatal(err)
   258  	}
   259  
   260  	// Read messages and reduce the uncommitted size as if we had committed
   261  	// these entries.
   262  	ms = r.readMessages()
   263  	if e := 2 * numFollowers; len(ms) != e {
   264  		t.Fatalf("expected %d messages, got %d", e, len(ms))
   265  	}
   266  	r.reduceUncommittedSize(propEnts)
   267  	if n := r.uncommittedSize; n != 0 {
   268  		t.Fatalf("expected zero uncommitted size, got %d", n)
   269  	}
   270  }
   271  
   272  func TestLeaderElection(t *testing.T) {
   273  	testLeaderElection(t, false)
   274  }
   275  
   276  func TestLeaderElectionPreVote(t *testing.T) {
   277  	testLeaderElection(t, true)
   278  }
   279  
   280  func testLeaderElection(t *testing.T, preVote bool) {
   281  	var cfg func(*Config)
   282  	candState := StateCandidate
   283  	candTerm := uint64(1)
   284  	if preVote {
   285  		cfg = preVoteConfig
   286  		// In pre-vote mode, an election that fails to complete
   287  		// leaves the node in pre-candidate state without advancing
   288  		// the term.
   289  		candState = StatePreCandidate
   290  		candTerm = 0
   291  	}
   292  	tests := []struct {
   293  		*network
   294  		state   StateType
   295  		expTerm uint64
   296  	}{
   297  		{newNetworkWithConfig(cfg, nil, nil, nil), StateLeader, 1},
   298  		{newNetworkWithConfig(cfg, nil, nil, nopStepper), StateLeader, 1},
   299  		{newNetworkWithConfig(cfg, nil, nopStepper, nopStepper), candState, candTerm},
   300  		{newNetworkWithConfig(cfg, nil, nopStepper, nopStepper, nil), candState, candTerm},
   301  		{newNetworkWithConfig(cfg, nil, nopStepper, nopStepper, nil, nil), StateLeader, 1},
   302  
   303  		// three logs further along than 0, but in the same term so rejections
   304  		// are returned instead of the votes being ignored.
   305  		{newNetworkWithConfig(cfg,
   306  			nil, entsWithConfig(cfg, 1), entsWithConfig(cfg, 1), entsWithConfig(cfg, 1, 1), nil),
   307  			StateFollower, 1},
   308  	}
   309  
   310  	for i, tt := range tests {
   311  		tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   312  		sm := tt.network.peers[1].(*raft)
   313  		if sm.state != tt.state {
   314  			t.Errorf("#%d: state = %s, want %s", i, sm.state, tt.state)
   315  		}
   316  		if g := sm.Term; g != tt.expTerm {
   317  			t.Errorf("#%d: term = %d, want %d", i, g, tt.expTerm)
   318  		}
   319  	}
   320  }
   321  
   322  // TestLearnerElectionTimeout verfies that the leader should not start election even
   323  // when times out.
   324  func TestLearnerElectionTimeout(t *testing.T) {
   325  	n1 := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
   326  	n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
   327  
   328  	n1.becomeFollower(1, None)
   329  	n2.becomeFollower(1, None)
   330  
   331  	// n2 is learner. Learner should not start election even when times out.
   332  	setRandomizedElectionTimeout(n2, n2.electionTimeout)
   333  	for i := 0; i < n2.electionTimeout; i++ {
   334  		n2.tick()
   335  	}
   336  
   337  	if n2.state != StateFollower {
   338  		t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower)
   339  	}
   340  }
   341  
   342  // TestLearnerPromotion verifies that the learner should not election until
   343  // it is promoted to a normal peer.
   344  func TestLearnerPromotion(t *testing.T) {
   345  	n1 := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
   346  	n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
   347  
   348  	n1.becomeFollower(1, None)
   349  	n2.becomeFollower(1, None)
   350  
   351  	nt := newNetwork(n1, n2)
   352  
   353  	if n1.state == StateLeader {
   354  		t.Error("peer 1 state is leader, want not", n1.state)
   355  	}
   356  
   357  	// n1 should become leader
   358  	setRandomizedElectionTimeout(n1, n1.electionTimeout)
   359  	for i := 0; i < n1.electionTimeout; i++ {
   360  		n1.tick()
   361  	}
   362  
   363  	if n1.state != StateLeader {
   364  		t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader)
   365  	}
   366  	if n2.state != StateFollower {
   367  		t.Errorf("peer 2 state: %s, want %s", n2.state, StateFollower)
   368  	}
   369  
   370  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
   371  
   372  	n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
   373  	n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
   374  	if n2.isLearner {
   375  		t.Error("peer 2 is learner, want not")
   376  	}
   377  
   378  	// n2 start election, should become leader
   379  	setRandomizedElectionTimeout(n2, n2.electionTimeout)
   380  	for i := 0; i < n2.electionTimeout; i++ {
   381  		n2.tick()
   382  	}
   383  
   384  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
   385  
   386  	if n1.state != StateFollower {
   387  		t.Errorf("peer 1 state: %s, want %s", n1.state, StateFollower)
   388  	}
   389  	if n2.state != StateLeader {
   390  		t.Errorf("peer 2 state: %s, want %s", n2.state, StateLeader)
   391  	}
   392  }
   393  
   394  // TestLearnerCanVote checks that a learner can vote when it receives a valid Vote request.
   395  // See (*raft).Step for why this is necessary and correct behavior.
   396  func TestLearnerCanVote(t *testing.T) {
   397  	n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
   398  
   399  	n2.becomeFollower(1, None)
   400  
   401  	n2.Step(pb.Message{From: 1, To: 2, Term: 2, Type: pb.MsgVote, LogTerm: 11, Index: 11})
   402  
   403  	if len(n2.msgs) != 1 {
   404  		t.Fatalf("expected exactly one message, not %+v", n2.msgs)
   405  	}
   406  	msg := n2.msgs[0]
   407  	if msg.Type != pb.MsgVoteResp && !msg.Reject {
   408  		t.Fatal("expected learner to not reject vote")
   409  	}
   410  }
   411  
   412  func TestLeaderCycle(t *testing.T) {
   413  	testLeaderCycle(t, false)
   414  }
   415  
   416  func TestLeaderCyclePreVote(t *testing.T) {
   417  	testLeaderCycle(t, true)
   418  }
   419  
   420  // testLeaderCycle verifies that each node in a cluster can campaign
   421  // and be elected in turn. This ensures that elections (including
   422  // pre-vote) work when not starting from a clean slate (as they do in
   423  // TestLeaderElection)
   424  func testLeaderCycle(t *testing.T, preVote bool) {
   425  	var cfg func(*Config)
   426  	if preVote {
   427  		cfg = preVoteConfig
   428  	}
   429  	n := newNetworkWithConfig(cfg, nil, nil, nil)
   430  	for campaignerID := uint64(1); campaignerID <= 3; campaignerID++ {
   431  		n.send(pb.Message{From: campaignerID, To: campaignerID, Type: pb.MsgHup})
   432  
   433  		for _, peer := range n.peers {
   434  			sm := peer.(*raft)
   435  			if sm.id == campaignerID && sm.state != StateLeader {
   436  				t.Errorf("preVote=%v: campaigning node %d state = %v, want StateLeader",
   437  					preVote, sm.id, sm.state)
   438  			} else if sm.id != campaignerID && sm.state != StateFollower {
   439  				t.Errorf("preVote=%v: after campaign of node %d, "+
   440  					"node %d had state = %v, want StateFollower",
   441  					preVote, campaignerID, sm.id, sm.state)
   442  			}
   443  		}
   444  	}
   445  }
   446  
   447  // TestLeaderElectionOverwriteNewerLogs tests a scenario in which a
   448  // newly-elected leader does *not* have the newest (i.e. highest term)
   449  // log entries, and must overwrite higher-term log entries with
   450  // lower-term ones.
   451  func TestLeaderElectionOverwriteNewerLogs(t *testing.T) {
   452  	testLeaderElectionOverwriteNewerLogs(t, false)
   453  }
   454  
   455  func TestLeaderElectionOverwriteNewerLogsPreVote(t *testing.T) {
   456  	testLeaderElectionOverwriteNewerLogs(t, true)
   457  }
   458  
   459  func testLeaderElectionOverwriteNewerLogs(t *testing.T, preVote bool) {
   460  	var cfg func(*Config)
   461  	if preVote {
   462  		cfg = preVoteConfig
   463  	}
   464  	// This network represents the results of the following sequence of
   465  	// events:
   466  	// - Node 1 won the election in term 1.
   467  	// - Node 1 replicated a log entry to node 2 but died before sending
   468  	//   it to other nodes.
   469  	// - Node 3 won the second election in term 2.
   470  	// - Node 3 wrote an entry to its logs but died without sending it
   471  	//   to any other nodes.
   472  	//
   473  	// At this point, nodes 1, 2, and 3 all have uncommitted entries in
   474  	// their logs and could win an election at term 3. The winner's log
   475  	// entry overwrites the losers'. (TestLeaderSyncFollowerLog tests
   476  	// the case where older log entries are overwritten, so this test
   477  	// focuses on the case where the newer entries are lost).
   478  	n := newNetworkWithConfig(cfg,
   479  		entsWithConfig(cfg, 1),     // Node 1: Won first election
   480  		entsWithConfig(cfg, 1),     // Node 2: Got logs from node 1
   481  		entsWithConfig(cfg, 2),     // Node 3: Won second election
   482  		votedWithConfig(cfg, 3, 2), // Node 4: Voted but didn't get logs
   483  		votedWithConfig(cfg, 3, 2)) // Node 5: Voted but didn't get logs
   484  
   485  	// Node 1 campaigns. The election fails because a quorum of nodes
   486  	// know about the election that already happened at term 2. Node 1's
   487  	// term is pushed ahead to 2.
   488  	n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   489  	sm1 := n.peers[1].(*raft)
   490  	if sm1.state != StateFollower {
   491  		t.Errorf("state = %s, want StateFollower", sm1.state)
   492  	}
   493  	if sm1.Term != 2 {
   494  		t.Errorf("term = %d, want 2", sm1.Term)
   495  	}
   496  
   497  	// Node 1 campaigns again with a higher term. This time it succeeds.
   498  	n.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   499  	if sm1.state != StateLeader {
   500  		t.Errorf("state = %s, want StateLeader", sm1.state)
   501  	}
   502  	if sm1.Term != 3 {
   503  		t.Errorf("term = %d, want 3", sm1.Term)
   504  	}
   505  
   506  	// Now all nodes agree on a log entry with term 1 at index 1 (and
   507  	// term 3 at index 2).
   508  	for i := range n.peers {
   509  		sm := n.peers[i].(*raft)
   510  		entries := sm.raftLog.allEntries()
   511  		if len(entries) != 2 {
   512  			t.Fatalf("node %d: len(entries) == %d, want 2", i, len(entries))
   513  		}
   514  		if entries[0].Term != 1 {
   515  			t.Errorf("node %d: term at index 1 == %d, want 1", i, entries[0].Term)
   516  		}
   517  		if entries[1].Term != 3 {
   518  			t.Errorf("node %d: term at index 2 == %d, want 3", i, entries[1].Term)
   519  		}
   520  	}
   521  }
   522  
   523  func TestVoteFromAnyState(t *testing.T) {
   524  	testVoteFromAnyState(t, pb.MsgVote)
   525  }
   526  
   527  func TestPreVoteFromAnyState(t *testing.T) {
   528  	testVoteFromAnyState(t, pb.MsgPreVote)
   529  }
   530  
   531  func testVoteFromAnyState(t *testing.T, vt pb.MessageType) {
   532  	for st := StateType(0); st < numStates; st++ {
   533  		r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   534  		r.Term = 1
   535  
   536  		switch st {
   537  		case StateFollower:
   538  			r.becomeFollower(r.Term, 3)
   539  		case StatePreCandidate:
   540  			r.becomePreCandidate()
   541  		case StateCandidate:
   542  			r.becomeCandidate()
   543  		case StateLeader:
   544  			r.becomeCandidate()
   545  			r.becomeLeader()
   546  		}
   547  
   548  		// Note that setting our state above may have advanced r.Term
   549  		// past its initial value.
   550  		origTerm := r.Term
   551  		newTerm := r.Term + 1
   552  
   553  		msg := pb.Message{
   554  			From:    2,
   555  			To:      1,
   556  			Type:    vt,
   557  			Term:    newTerm,
   558  			LogTerm: newTerm,
   559  			Index:   42,
   560  		}
   561  		if err := r.Step(msg); err != nil {
   562  			t.Errorf("%s,%s: Step failed: %s", vt, st, err)
   563  		}
   564  		if len(r.msgs) != 1 {
   565  			t.Errorf("%s,%s: %d response messages, want 1: %+v", vt, st, len(r.msgs), r.msgs)
   566  		} else {
   567  			resp := r.msgs[0]
   568  			if resp.Type != voteRespMsgType(vt) {
   569  				t.Errorf("%s,%s: response message is %s, want %s",
   570  					vt, st, resp.Type, voteRespMsgType(vt))
   571  			}
   572  			if resp.Reject {
   573  				t.Errorf("%s,%s: unexpected rejection", vt, st)
   574  			}
   575  		}
   576  
   577  		// If this was a real vote, we reset our state and term.
   578  		if vt == pb.MsgVote {
   579  			if r.state != StateFollower {
   580  				t.Errorf("%s,%s: state %s, want %s", vt, st, r.state, StateFollower)
   581  			}
   582  			if r.Term != newTerm {
   583  				t.Errorf("%s,%s: term %d, want %d", vt, st, r.Term, newTerm)
   584  			}
   585  			if r.Vote != 2 {
   586  				t.Errorf("%s,%s: vote %d, want 2", vt, st, r.Vote)
   587  			}
   588  		} else {
   589  			// In a prevote, nothing changes.
   590  			if r.state != st {
   591  				t.Errorf("%s,%s: state %s, want %s", vt, st, r.state, st)
   592  			}
   593  			if r.Term != origTerm {
   594  				t.Errorf("%s,%s: term %d, want %d", vt, st, r.Term, origTerm)
   595  			}
   596  			// if st == StateFollower or StatePreCandidate, r hasn't voted yet.
   597  			// In StateCandidate or StateLeader, it's voted for itself.
   598  			if r.Vote != None && r.Vote != 1 {
   599  				t.Errorf("%s,%s: vote %d, want %d or 1", vt, st, r.Vote, None)
   600  			}
   601  		}
   602  	}
   603  }
   604  
   605  func TestLogReplication(t *testing.T) {
   606  	tests := []struct {
   607  		*network
   608  		msgs       []pb.Message
   609  		wcommitted uint64
   610  	}{
   611  		{
   612  			newNetwork(nil, nil, nil),
   613  			[]pb.Message{
   614  				{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
   615  			},
   616  			2,
   617  		},
   618  		{
   619  			newNetwork(nil, nil, nil),
   620  			[]pb.Message{
   621  				{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
   622  				{From: 1, To: 2, Type: pb.MsgHup},
   623  				{From: 1, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}},
   624  			},
   625  			4,
   626  		},
   627  	}
   628  
   629  	for i, tt := range tests {
   630  		tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   631  
   632  		for _, m := range tt.msgs {
   633  			tt.send(m)
   634  		}
   635  
   636  		for j, x := range tt.network.peers {
   637  			sm := x.(*raft)
   638  
   639  			if sm.raftLog.committed != tt.wcommitted {
   640  				t.Errorf("#%d.%d: committed = %d, want %d", i, j, sm.raftLog.committed, tt.wcommitted)
   641  			}
   642  
   643  			ents := []pb.Entry{}
   644  			for _, e := range nextEnts(sm, tt.network.storage[j]) {
   645  				if e.Data != nil {
   646  					ents = append(ents, e)
   647  				}
   648  			}
   649  			props := []pb.Message{}
   650  			for _, m := range tt.msgs {
   651  				if m.Type == pb.MsgProp {
   652  					props = append(props, m)
   653  				}
   654  			}
   655  			for k, m := range props {
   656  				if !bytes.Equal(ents[k].Data, m.Entries[0].Data) {
   657  					t.Errorf("#%d.%d: data = %d, want %d", i, j, ents[k].Data, m.Entries[0].Data)
   658  				}
   659  			}
   660  		}
   661  	}
   662  }
   663  
   664  // TestLearnerLogReplication tests that a learner can receive entries from the leader.
   665  func TestLearnerLogReplication(t *testing.T) {
   666  	n1 := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
   667  	n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
   668  
   669  	nt := newNetwork(n1, n2)
   670  
   671  	n1.becomeFollower(1, None)
   672  	n2.becomeFollower(1, None)
   673  
   674  	setRandomizedElectionTimeout(n1, n1.electionTimeout)
   675  	for i := 0; i < n1.electionTimeout; i++ {
   676  		n1.tick()
   677  	}
   678  
   679  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
   680  
   681  	// n1 is leader and n2 is learner
   682  	if n1.state != StateLeader {
   683  		t.Errorf("peer 1 state: %s, want %s", n1.state, StateLeader)
   684  	}
   685  	if !n2.isLearner {
   686  		t.Error("peer 2 state: not learner, want yes")
   687  	}
   688  
   689  	nextCommitted := n1.raftLog.committed + 1
   690  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
   691  	if n1.raftLog.committed != nextCommitted {
   692  		t.Errorf("peer 1 wants committed to %d, but still %d", nextCommitted, n1.raftLog.committed)
   693  	}
   694  
   695  	if n1.raftLog.committed != n2.raftLog.committed {
   696  		t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
   697  	}
   698  
   699  	match := n1.prs.Progress[2].Match
   700  	if match != n2.raftLog.committed {
   701  		t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
   702  	}
   703  }
   704  
   705  func TestSingleNodeCommit(t *testing.T) {
   706  	tt := newNetwork(nil)
   707  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   708  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   709  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   710  
   711  	sm := tt.peers[1].(*raft)
   712  	if sm.raftLog.committed != 3 {
   713  		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 3)
   714  	}
   715  }
   716  
   717  // TestCannotCommitWithoutNewTermEntry tests the entries cannot be committed
   718  // when leader changes, no new proposal comes in and ChangeTerm proposal is
   719  // filtered.
   720  func TestCannotCommitWithoutNewTermEntry(t *testing.T) {
   721  	tt := newNetwork(nil, nil, nil, nil, nil)
   722  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   723  
   724  	// 0 cannot reach 2,3,4
   725  	tt.cut(1, 3)
   726  	tt.cut(1, 4)
   727  	tt.cut(1, 5)
   728  
   729  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   730  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   731  
   732  	sm := tt.peers[1].(*raft)
   733  	if sm.raftLog.committed != 1 {
   734  		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
   735  	}
   736  
   737  	// network recovery
   738  	tt.recover()
   739  	// avoid committing ChangeTerm proposal
   740  	tt.ignore(pb.MsgApp)
   741  
   742  	// elect 2 as the new leader with term 2
   743  	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
   744  
   745  	// no log entries from previous term should be committed
   746  	sm = tt.peers[2].(*raft)
   747  	if sm.raftLog.committed != 1 {
   748  		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
   749  	}
   750  
   751  	tt.recover()
   752  	// send heartbeat; reset wait
   753  	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgBeat})
   754  	// append an entry at current term
   755  	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   756  	// expect the committed to be advanced
   757  	if sm.raftLog.committed != 5 {
   758  		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 5)
   759  	}
   760  }
   761  
   762  // TestCommitWithoutNewTermEntry tests the entries could be committed
   763  // when leader changes, no new proposal comes in.
   764  func TestCommitWithoutNewTermEntry(t *testing.T) {
   765  	tt := newNetwork(nil, nil, nil, nil, nil)
   766  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   767  
   768  	// 0 cannot reach 2,3,4
   769  	tt.cut(1, 3)
   770  	tt.cut(1, 4)
   771  	tt.cut(1, 5)
   772  
   773  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   774  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
   775  
   776  	sm := tt.peers[1].(*raft)
   777  	if sm.raftLog.committed != 1 {
   778  		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 1)
   779  	}
   780  
   781  	// network recovery
   782  	tt.recover()
   783  
   784  	// elect 2 as the new leader with term 2
   785  	// after append a ChangeTerm entry from the current term, all entries
   786  	// should be committed
   787  	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
   788  
   789  	if sm.raftLog.committed != 4 {
   790  		t.Errorf("committed = %d, want %d", sm.raftLog.committed, 4)
   791  	}
   792  }
   793  
   794  func TestDuelingCandidates(t *testing.T) {
   795  	a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   796  	b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   797  	c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   798  
   799  	nt := newNetwork(a, b, c)
   800  	nt.cut(1, 3)
   801  
   802  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   803  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
   804  
   805  	// 1 becomes leader since it receives votes from 1 and 2
   806  	sm := nt.peers[1].(*raft)
   807  	if sm.state != StateLeader {
   808  		t.Errorf("state = %s, want %s", sm.state, StateLeader)
   809  	}
   810  
   811  	// 3 stays as candidate since it receives a vote from 3 and a rejection from 2
   812  	sm = nt.peers[3].(*raft)
   813  	if sm.state != StateCandidate {
   814  		t.Errorf("state = %s, want %s", sm.state, StateCandidate)
   815  	}
   816  
   817  	nt.recover()
   818  
   819  	// candidate 3 now increases its term and tries to vote again
   820  	// we expect it to disrupt the leader 1 since it has a higher term
   821  	// 3 will be follower again since both 1 and 2 rejects its vote request since 3 does not have a long enough log
   822  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
   823  
   824  	wlog := &raftLog{
   825  		storage:   &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}},
   826  		committed: 1,
   827  		unstable:  unstable{offset: 2},
   828  	}
   829  	tests := []struct {
   830  		sm      *raft
   831  		state   StateType
   832  		term    uint64
   833  		raftLog *raftLog
   834  	}{
   835  		{a, StateFollower, 2, wlog},
   836  		{b, StateFollower, 2, wlog},
   837  		{c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)},
   838  	}
   839  
   840  	for i, tt := range tests {
   841  		if g := tt.sm.state; g != tt.state {
   842  			t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
   843  		}
   844  		if g := tt.sm.Term; g != tt.term {
   845  			t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
   846  		}
   847  		base := ltoa(tt.raftLog)
   848  		if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
   849  			l := ltoa(sm.raftLog)
   850  			if g := diffu(base, l); g != "" {
   851  				t.Errorf("#%d: diff:\n%s", i, g)
   852  			}
   853  		} else {
   854  			t.Logf("#%d: empty log", i)
   855  		}
   856  	}
   857  }
   858  
   859  func TestDuelingPreCandidates(t *testing.T) {
   860  	cfgA := newTestConfig(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   861  	cfgB := newTestConfig(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   862  	cfgC := newTestConfig(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
   863  	cfgA.PreVote = true
   864  	cfgB.PreVote = true
   865  	cfgC.PreVote = true
   866  	a := newRaft(cfgA)
   867  	b := newRaft(cfgB)
   868  	c := newRaft(cfgC)
   869  
   870  	nt := newNetwork(a, b, c)
   871  	nt.cut(1, 3)
   872  
   873  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   874  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
   875  
   876  	// 1 becomes leader since it receives votes from 1 and 2
   877  	sm := nt.peers[1].(*raft)
   878  	if sm.state != StateLeader {
   879  		t.Errorf("state = %s, want %s", sm.state, StateLeader)
   880  	}
   881  
   882  	// 3 campaigns then reverts to follower when its PreVote is rejected
   883  	sm = nt.peers[3].(*raft)
   884  	if sm.state != StateFollower {
   885  		t.Errorf("state = %s, want %s", sm.state, StateFollower)
   886  	}
   887  
   888  	nt.recover()
   889  
   890  	// Candidate 3 now increases its term and tries to vote again.
   891  	// With PreVote, it does not disrupt the leader.
   892  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
   893  
   894  	wlog := &raftLog{
   895  		storage:   &MemoryStorage{ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}}},
   896  		committed: 1,
   897  		unstable:  unstable{offset: 2},
   898  	}
   899  	tests := []struct {
   900  		sm      *raft
   901  		state   StateType
   902  		term    uint64
   903  		raftLog *raftLog
   904  	}{
   905  		{a, StateLeader, 1, wlog},
   906  		{b, StateFollower, 1, wlog},
   907  		{c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger)},
   908  	}
   909  
   910  	for i, tt := range tests {
   911  		if g := tt.sm.state; g != tt.state {
   912  			t.Errorf("#%d: state = %s, want %s", i, g, tt.state)
   913  		}
   914  		if g := tt.sm.Term; g != tt.term {
   915  			t.Errorf("#%d: term = %d, want %d", i, g, tt.term)
   916  		}
   917  		base := ltoa(tt.raftLog)
   918  		if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
   919  			l := ltoa(sm.raftLog)
   920  			if g := diffu(base, l); g != "" {
   921  				t.Errorf("#%d: diff:\n%s", i, g)
   922  			}
   923  		} else {
   924  			t.Logf("#%d: empty log", i)
   925  		}
   926  	}
   927  }
   928  
   929  func TestCandidateConcede(t *testing.T) {
   930  	tt := newNetwork(nil, nil, nil)
   931  	tt.isolate(1)
   932  
   933  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   934  	tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
   935  
   936  	// heal the partition
   937  	tt.recover()
   938  	// send heartbeat; reset wait
   939  	tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat})
   940  
   941  	data := []byte("force follower")
   942  	// send a proposal to 3 to flush out a MsgApp to 1
   943  	tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
   944  	// send heartbeat; flush out commit
   945  	tt.send(pb.Message{From: 3, To: 3, Type: pb.MsgBeat})
   946  
   947  	a := tt.peers[1].(*raft)
   948  	if g := a.state; g != StateFollower {
   949  		t.Errorf("state = %s, want %s", g, StateFollower)
   950  	}
   951  	if g := a.Term; g != 1 {
   952  		t.Errorf("term = %d, want %d", g, 1)
   953  	}
   954  	wantLog := ltoa(&raftLog{
   955  		storage: &MemoryStorage{
   956  			ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
   957  		},
   958  		unstable:  unstable{offset: 3},
   959  		committed: 2,
   960  	})
   961  	for i, p := range tt.peers {
   962  		if sm, ok := p.(*raft); ok {
   963  			l := ltoa(sm.raftLog)
   964  			if g := diffu(wantLog, l); g != "" {
   965  				t.Errorf("#%d: diff:\n%s", i, g)
   966  			}
   967  		} else {
   968  			t.Logf("#%d: empty log", i)
   969  		}
   970  	}
   971  }
   972  
   973  func TestSingleNodeCandidate(t *testing.T) {
   974  	tt := newNetwork(nil)
   975  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   976  
   977  	sm := tt.peers[1].(*raft)
   978  	if sm.state != StateLeader {
   979  		t.Errorf("state = %d, want %d", sm.state, StateLeader)
   980  	}
   981  }
   982  
   983  func TestSingleNodePreCandidate(t *testing.T) {
   984  	tt := newNetworkWithConfig(preVoteConfig, nil)
   985  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   986  
   987  	sm := tt.peers[1].(*raft)
   988  	if sm.state != StateLeader {
   989  		t.Errorf("state = %d, want %d", sm.state, StateLeader)
   990  	}
   991  }
   992  
   993  func TestOldMessages(t *testing.T) {
   994  	tt := newNetwork(nil, nil, nil)
   995  	// make 0 leader @ term 3
   996  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   997  	tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
   998  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
   999  	// pretend we're an old leader trying to make progress; this entry is expected to be ignored.
  1000  	tt.send(pb.Message{From: 2, To: 1, Type: pb.MsgApp, Term: 2, Entries: []pb.Entry{{Index: 3, Term: 2}}})
  1001  	// commit a new entry
  1002  	tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  1003  
  1004  	ilog := &raftLog{
  1005  		storage: &MemoryStorage{
  1006  			ents: []pb.Entry{
  1007  				{}, {Data: nil, Term: 1, Index: 1},
  1008  				{Data: nil, Term: 2, Index: 2}, {Data: nil, Term: 3, Index: 3},
  1009  				{Data: []byte("somedata"), Term: 3, Index: 4},
  1010  			},
  1011  		},
  1012  		unstable:  unstable{offset: 5},
  1013  		committed: 4,
  1014  	}
  1015  	base := ltoa(ilog)
  1016  	for i, p := range tt.peers {
  1017  		if sm, ok := p.(*raft); ok {
  1018  			l := ltoa(sm.raftLog)
  1019  			if g := diffu(base, l); g != "" {
  1020  				t.Errorf("#%d: diff:\n%s", i, g)
  1021  			}
  1022  		} else {
  1023  			t.Logf("#%d: empty log", i)
  1024  		}
  1025  	}
  1026  }
  1027  
  1028  // TestOldMessagesReply - optimization - reply with new term.
  1029  
  1030  func TestProposal(t *testing.T) {
  1031  	tests := []struct {
  1032  		*network
  1033  		success bool
  1034  	}{
  1035  		{newNetwork(nil, nil, nil), true},
  1036  		{newNetwork(nil, nil, nopStepper), true},
  1037  		{newNetwork(nil, nopStepper, nopStepper), false},
  1038  		{newNetwork(nil, nopStepper, nopStepper, nil), false},
  1039  		{newNetwork(nil, nopStepper, nopStepper, nil, nil), true},
  1040  	}
  1041  
  1042  	for j, tt := range tests {
  1043  		send := func(m pb.Message) {
  1044  			defer func() {
  1045  				// only recover if we expect it to panic (success==false)
  1046  				if !tt.success {
  1047  					e := recover()
  1048  					if e != nil {
  1049  						t.Logf("#%d: err: %s", j, e)
  1050  					}
  1051  				}
  1052  			}()
  1053  			tt.send(m)
  1054  		}
  1055  
  1056  		data := []byte("somedata")
  1057  
  1058  		// promote 1 to become leader
  1059  		send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1060  		send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
  1061  
  1062  		wantLog := newLog(NewMemoryStorage(), raftLogger)
  1063  		if tt.success {
  1064  			wantLog = &raftLog{
  1065  				storage: &MemoryStorage{
  1066  					ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Index: 2, Data: data}},
  1067  				},
  1068  				unstable:  unstable{offset: 3},
  1069  				committed: 2}
  1070  		}
  1071  		base := ltoa(wantLog)
  1072  		for i, p := range tt.peers {
  1073  			if sm, ok := p.(*raft); ok {
  1074  				l := ltoa(sm.raftLog)
  1075  				if g := diffu(base, l); g != "" {
  1076  					t.Errorf("#%d: peer %d diff:\n%s", j, i, g)
  1077  				}
  1078  			} else {
  1079  				t.Logf("#%d: peer %d empty log", j, i)
  1080  			}
  1081  		}
  1082  		sm := tt.network.peers[1].(*raft)
  1083  		if g := sm.Term; g != 1 {
  1084  			t.Errorf("#%d: term = %d, want %d", j, g, 1)
  1085  		}
  1086  	}
  1087  }
  1088  
  1089  func TestProposalByProxy(t *testing.T) {
  1090  	data := []byte("somedata")
  1091  	tests := []*network{
  1092  		newNetwork(nil, nil, nil),
  1093  		newNetwork(nil, nil, nopStepper),
  1094  	}
  1095  
  1096  	for j, tt := range tests {
  1097  		// promote 0 the leader
  1098  		tt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1099  
  1100  		// propose via follower
  1101  		tt.send(pb.Message{From: 2, To: 2, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  1102  
  1103  		wantLog := &raftLog{
  1104  			storage: &MemoryStorage{
  1105  				ents: []pb.Entry{{}, {Data: nil, Term: 1, Index: 1}, {Term: 1, Data: data, Index: 2}},
  1106  			},
  1107  			unstable:  unstable{offset: 3},
  1108  			committed: 2}
  1109  		base := ltoa(wantLog)
  1110  		for i, p := range tt.peers {
  1111  			if sm, ok := p.(*raft); ok {
  1112  				l := ltoa(sm.raftLog)
  1113  				if g := diffu(base, l); g != "" {
  1114  					t.Errorf("#%d: peer %d diff:\n%s", j, i, g)
  1115  				}
  1116  			} else {
  1117  				t.Logf("#%d: peer %d empty log", j, i)
  1118  			}
  1119  		}
  1120  		sm := tt.peers[1].(*raft)
  1121  		if g := sm.Term; g != 1 {
  1122  			t.Errorf("#%d: term = %d, want %d", j, g, 1)
  1123  		}
  1124  	}
  1125  }
  1126  
  1127  func TestCommit(t *testing.T) {
  1128  	tests := []struct {
  1129  		matches []uint64
  1130  		logs    []pb.Entry
  1131  		smTerm  uint64
  1132  		w       uint64
  1133  	}{
  1134  		// single
  1135  		{[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 1, 1},
  1136  		{[]uint64{1}, []pb.Entry{{Index: 1, Term: 1}}, 2, 0},
  1137  		{[]uint64{2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
  1138  		{[]uint64{1}, []pb.Entry{{Index: 1, Term: 2}}, 2, 1},
  1139  
  1140  		// odd
  1141  		{[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
  1142  		{[]uint64{2, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  1143  		{[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
  1144  		{[]uint64{2, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  1145  
  1146  		// even
  1147  		{[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
  1148  		{[]uint64{2, 1, 1, 1}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  1149  		{[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 1, 1},
  1150  		{[]uint64{2, 1, 1, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  1151  		{[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}, 2, 2},
  1152  		{[]uint64{2, 1, 2, 2}, []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}}, 2, 0},
  1153  	}
  1154  
  1155  	for i, tt := range tests {
  1156  		storage := newTestMemoryStorage(withPeers(1))
  1157  		storage.Append(tt.logs)
  1158  		storage.hardState = pb.HardState{Term: tt.smTerm}
  1159  
  1160  		sm := newTestRaft(1, 10, 2, storage)
  1161  		for j := 0; j < len(tt.matches); j++ {
  1162  			id := uint64(j) + 1
  1163  			if id > 1 {
  1164  				sm.applyConfChange(pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: id}.AsV2())
  1165  			}
  1166  			pr := sm.prs.Progress[id]
  1167  			pr.Match, pr.Next = tt.matches[j], tt.matches[j]+1
  1168  		}
  1169  		sm.maybeCommit()
  1170  		if g := sm.raftLog.committed; g != tt.w {
  1171  			t.Errorf("#%d: committed = %d, want %d", i, g, tt.w)
  1172  		}
  1173  	}
  1174  }
  1175  
  1176  func TestPastElectionTimeout(t *testing.T) {
  1177  	tests := []struct {
  1178  		elapse       int
  1179  		wprobability float64
  1180  		round        bool
  1181  	}{
  1182  		{5, 0, false},
  1183  		{10, 0.1, true},
  1184  		{13, 0.4, true},
  1185  		{15, 0.6, true},
  1186  		{18, 0.9, true},
  1187  		{20, 1, false},
  1188  	}
  1189  
  1190  	for i, tt := range tests {
  1191  		sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
  1192  		sm.electionElapsed = tt.elapse
  1193  		c := 0
  1194  		for j := 0; j < 10000; j++ {
  1195  			sm.resetRandomizedElectionTimeout()
  1196  			if sm.pastElectionTimeout() {
  1197  				c++
  1198  			}
  1199  		}
  1200  		got := float64(c) / 10000.0
  1201  		if tt.round {
  1202  			got = math.Floor(got*10+0.5) / 10.0
  1203  		}
  1204  		if got != tt.wprobability {
  1205  			t.Errorf("#%d: probability = %v, want %v", i, got, tt.wprobability)
  1206  		}
  1207  	}
  1208  }
  1209  
  1210  // TestStepIgnoreOldTermMsg to ensure that the Step function ignores the message
  1211  // from old term and does not pass it to the actual stepX function.
  1212  func TestStepIgnoreOldTermMsg(t *testing.T) {
  1213  	called := false
  1214  	fakeStep := func(r *raft, m pb.Message) error {
  1215  		called = true
  1216  		return nil
  1217  	}
  1218  	sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
  1219  	sm.step = fakeStep
  1220  	sm.Term = 2
  1221  	sm.Step(pb.Message{Type: pb.MsgApp, Term: sm.Term - 1})
  1222  	if called {
  1223  		t.Errorf("stepFunc called = %v , want %v", called, false)
  1224  	}
  1225  }
  1226  
  1227  // TestHandleMsgApp ensures:
  1228  //  1. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm.
  1229  //  2. If an existing entry conflicts with a new one (same index but different terms),
  1230  //     delete the existing entry and all that follow it; append any new entries not already in the log.
  1231  //  3. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry).
  1232  func TestHandleMsgApp(t *testing.T) {
  1233  	tests := []struct {
  1234  		m       pb.Message
  1235  		wIndex  uint64
  1236  		wCommit uint64
  1237  		wReject bool
  1238  	}{
  1239  		// Ensure 1
  1240  		{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 2, Commit: 3}, 2, 0, true}, // previous log mismatch
  1241  		{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 3, Index: 3, Commit: 3}, 2, 0, true}, // previous log non-exist
  1242  
  1243  		// Ensure 2
  1244  		{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 1}, 2, 1, false},
  1245  		{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 0, Index: 0, Commit: 1, Entries: []pb.Entry{{Index: 1, Term: 2}}}, 1, 1, false},
  1246  		{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3, Entries: []pb.Entry{{Index: 3, Term: 2}, {Index: 4, Term: 2}}}, 4, 3, false},
  1247  		{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4, Entries: []pb.Entry{{Index: 3, Term: 2}}}, 3, 3, false},
  1248  		{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 1, Index: 1, Commit: 4, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false},
  1249  
  1250  		// Ensure 3
  1251  		{pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3}, 2, 1, false},                                           // match entry 1, commit up to last new entry 1
  1252  		{pb.Message{Type: pb.MsgApp, Term: 1, LogTerm: 1, Index: 1, Commit: 3, Entries: []pb.Entry{{Index: 2, Term: 2}}}, 2, 2, false}, // match entry 1, commit up to last new entry 2
  1253  		{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 3}, 2, 2, false},                                           // match entry 2, commit up to last new entry 2
  1254  		{pb.Message{Type: pb.MsgApp, Term: 2, LogTerm: 2, Index: 2, Commit: 4}, 2, 2, false},                                           // commit up to log.last()
  1255  	}
  1256  
  1257  	for i, tt := range tests {
  1258  		storage := newTestMemoryStorage(withPeers(1))
  1259  		storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}})
  1260  		sm := newTestRaft(1, 10, 1, storage)
  1261  		sm.becomeFollower(2, None)
  1262  
  1263  		sm.handleAppendEntries(tt.m)
  1264  		if sm.raftLog.lastIndex() != tt.wIndex {
  1265  			t.Errorf("#%d: lastIndex = %d, want %d", i, sm.raftLog.lastIndex(), tt.wIndex)
  1266  		}
  1267  		if sm.raftLog.committed != tt.wCommit {
  1268  			t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  1269  		}
  1270  		m := sm.readMessages()
  1271  		if len(m) != 1 {
  1272  			t.Fatalf("#%d: msg = nil, want 1", i)
  1273  		}
  1274  		if m[0].Reject != tt.wReject {
  1275  			t.Errorf("#%d: reject = %v, want %v", i, m[0].Reject, tt.wReject)
  1276  		}
  1277  	}
  1278  }
  1279  
  1280  // TestHandleHeartbeat ensures that the follower commits to the commit in the message.
  1281  func TestHandleHeartbeat(t *testing.T) {
  1282  	commit := uint64(2)
  1283  	tests := []struct {
  1284  		m       pb.Message
  1285  		wCommit uint64
  1286  	}{
  1287  		{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit + 1}, commit + 1},
  1288  		{pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeat, Term: 2, Commit: commit - 1}, commit}, // do not decrease commit
  1289  	}
  1290  
  1291  	for i, tt := range tests {
  1292  		storage := newTestMemoryStorage(withPeers(1, 2))
  1293  		storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
  1294  		sm := newTestRaft(1, 5, 1, storage)
  1295  		sm.becomeFollower(2, 2)
  1296  		sm.raftLog.commitTo(commit)
  1297  		sm.handleHeartbeat(tt.m)
  1298  		if sm.raftLog.committed != tt.wCommit {
  1299  			t.Errorf("#%d: committed = %d, want %d", i, sm.raftLog.committed, tt.wCommit)
  1300  		}
  1301  		m := sm.readMessages()
  1302  		if len(m) != 1 {
  1303  			t.Fatalf("#%d: msg = nil, want 1", i)
  1304  		}
  1305  		if m[0].Type != pb.MsgHeartbeatResp {
  1306  			t.Errorf("#%d: type = %v, want MsgHeartbeatResp", i, m[0].Type)
  1307  		}
  1308  	}
  1309  }
  1310  
  1311  // TestHandleHeartbeatResp ensures that we re-send log entries when we get a heartbeat response.
  1312  func TestHandleHeartbeatResp(t *testing.T) {
  1313  	storage := newTestMemoryStorage(withPeers(1, 2))
  1314  	storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}})
  1315  	sm := newTestRaft(1, 5, 1, storage)
  1316  	sm.becomeCandidate()
  1317  	sm.becomeLeader()
  1318  	sm.raftLog.commitTo(sm.raftLog.lastIndex())
  1319  
  1320  	// A heartbeat response from a node that is behind; re-send MsgApp
  1321  	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
  1322  	msgs := sm.readMessages()
  1323  	if len(msgs) != 1 {
  1324  		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  1325  	}
  1326  	if msgs[0].Type != pb.MsgApp {
  1327  		t.Errorf("type = %v, want MsgApp", msgs[0].Type)
  1328  	}
  1329  
  1330  	// A second heartbeat response generates another MsgApp re-send
  1331  	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
  1332  	msgs = sm.readMessages()
  1333  	if len(msgs) != 1 {
  1334  		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  1335  	}
  1336  	if msgs[0].Type != pb.MsgApp {
  1337  		t.Errorf("type = %v, want MsgApp", msgs[0].Type)
  1338  	}
  1339  
  1340  	// Once we have an MsgAppResp, heartbeats no longer send MsgApp.
  1341  	sm.Step(pb.Message{
  1342  		From:  2,
  1343  		Type:  pb.MsgAppResp,
  1344  		Index: msgs[0].Index + uint64(len(msgs[0].Entries)),
  1345  	})
  1346  	// Consume the message sent in response to MsgAppResp
  1347  	sm.readMessages()
  1348  
  1349  	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp})
  1350  	msgs = sm.readMessages()
  1351  	if len(msgs) != 0 {
  1352  		t.Fatalf("len(msgs) = %d, want 0: %+v", len(msgs), msgs)
  1353  	}
  1354  }
  1355  
  1356  // TestRaftFreesReadOnlyMem ensures raft will free read request from
  1357  // readOnly readIndexQueue and pendingReadIndex map.
  1358  // related issue: https://github.com/etcd-io/etcd/issues/7571
  1359  func TestRaftFreesReadOnlyMem(t *testing.T) {
  1360  	sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
  1361  	sm.becomeCandidate()
  1362  	sm.becomeLeader()
  1363  	sm.raftLog.commitTo(sm.raftLog.lastIndex())
  1364  
  1365  	ctx := []byte("ctx")
  1366  
  1367  	// leader starts linearizable read request.
  1368  	// more info: raft dissertation 6.4, step 2.
  1369  	sm.Step(pb.Message{From: 2, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: ctx}}})
  1370  	msgs := sm.readMessages()
  1371  	if len(msgs) != 1 {
  1372  		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  1373  	}
  1374  	if msgs[0].Type != pb.MsgHeartbeat {
  1375  		t.Fatalf("type = %v, want MsgHeartbeat", msgs[0].Type)
  1376  	}
  1377  	if !bytes.Equal(msgs[0].Context, ctx) {
  1378  		t.Fatalf("Context = %v, want %v", msgs[0].Context, ctx)
  1379  	}
  1380  	if len(sm.readOnly.readIndexQueue) != 1 {
  1381  		t.Fatalf("len(readIndexQueue) = %v, want 1", len(sm.readOnly.readIndexQueue))
  1382  	}
  1383  	if len(sm.readOnly.pendingReadIndex) != 1 {
  1384  		t.Fatalf("len(pendingReadIndex) = %v, want 1", len(sm.readOnly.pendingReadIndex))
  1385  	}
  1386  	if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; !ok {
  1387  		t.Fatalf("can't find context %v in pendingReadIndex ", ctx)
  1388  	}
  1389  
  1390  	// heartbeat responses from majority of followers (1 in this case)
  1391  	// acknowledge the authority of the leader.
  1392  	// more info: raft dissertation 6.4, step 3.
  1393  	sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Context: ctx})
  1394  	if len(sm.readOnly.readIndexQueue) != 0 {
  1395  		t.Fatalf("len(readIndexQueue) = %v, want 0", len(sm.readOnly.readIndexQueue))
  1396  	}
  1397  	if len(sm.readOnly.pendingReadIndex) != 0 {
  1398  		t.Fatalf("len(pendingReadIndex) = %v, want 0", len(sm.readOnly.pendingReadIndex))
  1399  	}
  1400  	if _, ok := sm.readOnly.pendingReadIndex[string(ctx)]; ok {
  1401  		t.Fatalf("found context %v in pendingReadIndex, want none", ctx)
  1402  	}
  1403  }
  1404  
  1405  // TestMsgAppRespWaitReset verifies the resume behavior of a leader
  1406  // MsgAppResp.
  1407  func TestMsgAppRespWaitReset(t *testing.T) {
  1408  	sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1409  	sm.becomeCandidate()
  1410  	sm.becomeLeader()
  1411  
  1412  	// The new leader has just emitted a new Term 4 entry; consume those messages
  1413  	// from the outgoing queue.
  1414  	sm.bcastAppend()
  1415  	sm.readMessages()
  1416  
  1417  	// Node 2 acks the first entry, making it committed.
  1418  	sm.Step(pb.Message{
  1419  		From:  2,
  1420  		Type:  pb.MsgAppResp,
  1421  		Index: 1,
  1422  	})
  1423  	if sm.raftLog.committed != 1 {
  1424  		t.Fatalf("expected committed to be 1, got %d", sm.raftLog.committed)
  1425  	}
  1426  	// Also consume the MsgApp messages that update Commit on the followers.
  1427  	sm.readMessages()
  1428  
  1429  	// A new command is now proposed on node 1.
  1430  	sm.Step(pb.Message{
  1431  		From:    1,
  1432  		Type:    pb.MsgProp,
  1433  		Entries: []pb.Entry{{}},
  1434  	})
  1435  
  1436  	// The command is broadcast to all nodes not in the wait state.
  1437  	// Node 2 left the wait state due to its MsgAppResp, but node 3 is still waiting.
  1438  	msgs := sm.readMessages()
  1439  	if len(msgs) != 1 {
  1440  		t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs)
  1441  	}
  1442  	if msgs[0].Type != pb.MsgApp || msgs[0].To != 2 {
  1443  		t.Errorf("expected MsgApp to node 2, got %v to %d", msgs[0].Type, msgs[0].To)
  1444  	}
  1445  	if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 {
  1446  		t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries)
  1447  	}
  1448  
  1449  	// Now Node 3 acks the first entry. This releases the wait and entry 2 is sent.
  1450  	sm.Step(pb.Message{
  1451  		From:  3,
  1452  		Type:  pb.MsgAppResp,
  1453  		Index: 1,
  1454  	})
  1455  	msgs = sm.readMessages()
  1456  	if len(msgs) != 1 {
  1457  		t.Fatalf("expected 1 message, got %d: %+v", len(msgs), msgs)
  1458  	}
  1459  	if msgs[0].Type != pb.MsgApp || msgs[0].To != 3 {
  1460  		t.Errorf("expected MsgApp to node 3, got %v to %d", msgs[0].Type, msgs[0].To)
  1461  	}
  1462  	if len(msgs[0].Entries) != 1 || msgs[0].Entries[0].Index != 2 {
  1463  		t.Errorf("expected to send entry 2, but got %v", msgs[0].Entries)
  1464  	}
  1465  }
  1466  
  1467  func TestRecvMsgVote(t *testing.T) {
  1468  	testRecvMsgVote(t, pb.MsgVote)
  1469  }
  1470  
  1471  func TestRecvMsgPreVote(t *testing.T) {
  1472  	testRecvMsgVote(t, pb.MsgPreVote)
  1473  }
  1474  
  1475  func testRecvMsgVote(t *testing.T, msgType pb.MessageType) {
  1476  	tests := []struct {
  1477  		state          StateType
  1478  		index, logTerm uint64
  1479  		voteFor        uint64
  1480  		wreject        bool
  1481  	}{
  1482  		{StateFollower, 0, 0, None, true},
  1483  		{StateFollower, 0, 1, None, true},
  1484  		{StateFollower, 0, 2, None, true},
  1485  		{StateFollower, 0, 3, None, false},
  1486  
  1487  		{StateFollower, 1, 0, None, true},
  1488  		{StateFollower, 1, 1, None, true},
  1489  		{StateFollower, 1, 2, None, true},
  1490  		{StateFollower, 1, 3, None, false},
  1491  
  1492  		{StateFollower, 2, 0, None, true},
  1493  		{StateFollower, 2, 1, None, true},
  1494  		{StateFollower, 2, 2, None, false},
  1495  		{StateFollower, 2, 3, None, false},
  1496  
  1497  		{StateFollower, 3, 0, None, true},
  1498  		{StateFollower, 3, 1, None, true},
  1499  		{StateFollower, 3, 2, None, false},
  1500  		{StateFollower, 3, 3, None, false},
  1501  
  1502  		{StateFollower, 3, 2, 2, false},
  1503  		{StateFollower, 3, 2, 1, true},
  1504  
  1505  		{StateLeader, 3, 3, 1, true},
  1506  		{StatePreCandidate, 3, 3, 1, true},
  1507  		{StateCandidate, 3, 3, 1, true},
  1508  	}
  1509  
  1510  	max := func(a, b uint64) uint64 {
  1511  		if a > b {
  1512  			return a
  1513  		}
  1514  		return b
  1515  	}
  1516  
  1517  	for i, tt := range tests {
  1518  		sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
  1519  		sm.state = tt.state
  1520  		switch tt.state {
  1521  		case StateFollower:
  1522  			sm.step = stepFollower
  1523  		case StateCandidate, StatePreCandidate:
  1524  			sm.step = stepCandidate
  1525  		case StateLeader:
  1526  			sm.step = stepLeader
  1527  		}
  1528  		sm.Vote = tt.voteFor
  1529  		sm.raftLog = &raftLog{
  1530  			storage:  &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 2}, {Index: 2, Term: 2}}},
  1531  			unstable: unstable{offset: 3},
  1532  		}
  1533  
  1534  		// raft.Term is greater than or equal to raft.raftLog.lastTerm. In this
  1535  		// test we're only testing MsgVote responses when the campaigning node
  1536  		// has a different raft log compared to the recipient node.
  1537  		// Additionally we're verifying behaviour when the recipient node has
  1538  		// already given out its vote for its current term. We're not testing
  1539  		// what the recipient node does when receiving a message with a
  1540  		// different term number, so we simply initialize both term numbers to
  1541  		// be the same.
  1542  		term := max(sm.raftLog.lastTerm(), tt.logTerm)
  1543  		sm.Term = term
  1544  		sm.Step(pb.Message{Type: msgType, Term: term, From: 2, Index: tt.index, LogTerm: tt.logTerm})
  1545  
  1546  		msgs := sm.readMessages()
  1547  		if g := len(msgs); g != 1 {
  1548  			t.Fatalf("#%d: len(msgs) = %d, want 1", i, g)
  1549  			continue
  1550  		}
  1551  		if g := msgs[0].Type; g != voteRespMsgType(msgType) {
  1552  			t.Errorf("#%d, m.Type = %v, want %v", i, g, voteRespMsgType(msgType))
  1553  		}
  1554  		if g := msgs[0].Reject; g != tt.wreject {
  1555  			t.Errorf("#%d, m.Reject = %v, want %v", i, g, tt.wreject)
  1556  		}
  1557  	}
  1558  }
  1559  
  1560  func TestStateTransition(t *testing.T) {
  1561  	tests := []struct {
  1562  		from   StateType
  1563  		to     StateType
  1564  		wallow bool
  1565  		wterm  uint64
  1566  		wlead  uint64
  1567  	}{
  1568  		{StateFollower, StateFollower, true, 1, None},
  1569  		{StateFollower, StatePreCandidate, true, 0, None},
  1570  		{StateFollower, StateCandidate, true, 1, None},
  1571  		{StateFollower, StateLeader, false, 0, None},
  1572  
  1573  		{StatePreCandidate, StateFollower, true, 0, None},
  1574  		{StatePreCandidate, StatePreCandidate, true, 0, None},
  1575  		{StatePreCandidate, StateCandidate, true, 1, None},
  1576  		{StatePreCandidate, StateLeader, true, 0, 1},
  1577  
  1578  		{StateCandidate, StateFollower, true, 0, None},
  1579  		{StateCandidate, StatePreCandidate, true, 0, None},
  1580  		{StateCandidate, StateCandidate, true, 1, None},
  1581  		{StateCandidate, StateLeader, true, 0, 1},
  1582  
  1583  		{StateLeader, StateFollower, true, 1, None},
  1584  		{StateLeader, StatePreCandidate, false, 0, None},
  1585  		{StateLeader, StateCandidate, false, 1, None},
  1586  		{StateLeader, StateLeader, true, 0, 1},
  1587  	}
  1588  
  1589  	for i, tt := range tests {
  1590  		func() {
  1591  			defer func() {
  1592  				if r := recover(); r != nil {
  1593  					if tt.wallow {
  1594  						t.Errorf("%d: allow = %v, want %v", i, false, true)
  1595  					}
  1596  				}
  1597  			}()
  1598  
  1599  			sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
  1600  			sm.state = tt.from
  1601  
  1602  			switch tt.to {
  1603  			case StateFollower:
  1604  				sm.becomeFollower(tt.wterm, tt.wlead)
  1605  			case StatePreCandidate:
  1606  				sm.becomePreCandidate()
  1607  			case StateCandidate:
  1608  				sm.becomeCandidate()
  1609  			case StateLeader:
  1610  				sm.becomeLeader()
  1611  			}
  1612  
  1613  			if sm.Term != tt.wterm {
  1614  				t.Errorf("%d: term = %d, want %d", i, sm.Term, tt.wterm)
  1615  			}
  1616  			if sm.lead != tt.wlead {
  1617  				t.Errorf("%d: lead = %d, want %d", i, sm.lead, tt.wlead)
  1618  			}
  1619  		}()
  1620  	}
  1621  }
  1622  
  1623  func TestAllServerStepdown(t *testing.T) {
  1624  	tests := []struct {
  1625  		state StateType
  1626  
  1627  		wstate StateType
  1628  		wterm  uint64
  1629  		windex uint64
  1630  	}{
  1631  		{StateFollower, StateFollower, 3, 0},
  1632  		{StatePreCandidate, StateFollower, 3, 0},
  1633  		{StateCandidate, StateFollower, 3, 0},
  1634  		{StateLeader, StateFollower, 3, 1},
  1635  	}
  1636  
  1637  	tmsgTypes := [...]pb.MessageType{pb.MsgVote, pb.MsgApp}
  1638  	tterm := uint64(3)
  1639  
  1640  	for i, tt := range tests {
  1641  		sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1642  		switch tt.state {
  1643  		case StateFollower:
  1644  			sm.becomeFollower(1, None)
  1645  		case StatePreCandidate:
  1646  			sm.becomePreCandidate()
  1647  		case StateCandidate:
  1648  			sm.becomeCandidate()
  1649  		case StateLeader:
  1650  			sm.becomeCandidate()
  1651  			sm.becomeLeader()
  1652  		}
  1653  
  1654  		for j, msgType := range tmsgTypes {
  1655  			sm.Step(pb.Message{From: 2, Type: msgType, Term: tterm, LogTerm: tterm})
  1656  
  1657  			if sm.state != tt.wstate {
  1658  				t.Errorf("#%d.%d state = %v , want %v", i, j, sm.state, tt.wstate)
  1659  			}
  1660  			if sm.Term != tt.wterm {
  1661  				t.Errorf("#%d.%d term = %v , want %v", i, j, sm.Term, tt.wterm)
  1662  			}
  1663  			if sm.raftLog.lastIndex() != tt.windex {
  1664  				t.Errorf("#%d.%d index = %v , want %v", i, j, sm.raftLog.lastIndex(), tt.windex)
  1665  			}
  1666  			if uint64(len(sm.raftLog.allEntries())) != tt.windex {
  1667  				t.Errorf("#%d.%d len(ents) = %v , want %v", i, j, len(sm.raftLog.allEntries()), tt.windex)
  1668  			}
  1669  			wlead := uint64(2)
  1670  			if msgType == pb.MsgVote {
  1671  				wlead = None
  1672  			}
  1673  			if sm.lead != wlead {
  1674  				t.Errorf("#%d, sm.lead = %d, want %d", i, sm.lead, None)
  1675  			}
  1676  		}
  1677  	}
  1678  }
  1679  
  1680  func TestCandidateResetTermMsgHeartbeat(t *testing.T) {
  1681  	testCandidateResetTerm(t, pb.MsgHeartbeat)
  1682  }
  1683  
  1684  func TestCandidateResetTermMsgApp(t *testing.T) {
  1685  	testCandidateResetTerm(t, pb.MsgApp)
  1686  }
  1687  
  1688  // testCandidateResetTerm tests when a candidate receives a
  1689  // MsgHeartbeat or MsgApp from leader, "Step" resets the term
  1690  // with leader's and reverts back to follower.
  1691  func testCandidateResetTerm(t *testing.T, mt pb.MessageType) {
  1692  	a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1693  	b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1694  	c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1695  
  1696  	nt := newNetwork(a, b, c)
  1697  
  1698  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1699  	if a.state != StateLeader {
  1700  		t.Errorf("state = %s, want %s", a.state, StateLeader)
  1701  	}
  1702  	if b.state != StateFollower {
  1703  		t.Errorf("state = %s, want %s", b.state, StateFollower)
  1704  	}
  1705  	if c.state != StateFollower {
  1706  		t.Errorf("state = %s, want %s", c.state, StateFollower)
  1707  	}
  1708  
  1709  	// isolate 3 and increase term in rest
  1710  	nt.isolate(3)
  1711  
  1712  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  1713  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1714  
  1715  	if a.state != StateLeader {
  1716  		t.Errorf("state = %s, want %s", a.state, StateLeader)
  1717  	}
  1718  	if b.state != StateFollower {
  1719  		t.Errorf("state = %s, want %s", b.state, StateFollower)
  1720  	}
  1721  
  1722  	// trigger campaign in isolated c
  1723  	c.resetRandomizedElectionTimeout()
  1724  	for i := 0; i < c.randomizedElectionTimeout; i++ {
  1725  		c.tick()
  1726  	}
  1727  
  1728  	if c.state != StateCandidate {
  1729  		t.Errorf("state = %s, want %s", c.state, StateCandidate)
  1730  	}
  1731  
  1732  	nt.recover()
  1733  
  1734  	// leader sends to isolated candidate
  1735  	// and expects candidate to revert to follower
  1736  	nt.send(pb.Message{From: 1, To: 3, Term: a.Term, Type: mt})
  1737  
  1738  	if c.state != StateFollower {
  1739  		t.Errorf("state = %s, want %s", c.state, StateFollower)
  1740  	}
  1741  
  1742  	// follower c term is reset with leader's
  1743  	if a.Term != c.Term {
  1744  		t.Errorf("follower term expected same term as leader's %d, got %d", a.Term, c.Term)
  1745  	}
  1746  }
  1747  
  1748  func TestLeaderStepdownWhenQuorumActive(t *testing.T) {
  1749  	sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1750  
  1751  	sm.checkQuorum = true
  1752  
  1753  	sm.becomeCandidate()
  1754  	sm.becomeLeader()
  1755  
  1756  	for i := 0; i < sm.electionTimeout+1; i++ {
  1757  		sm.Step(pb.Message{From: 2, Type: pb.MsgHeartbeatResp, Term: sm.Term})
  1758  		sm.tick()
  1759  	}
  1760  
  1761  	if sm.state != StateLeader {
  1762  		t.Errorf("state = %v, want %v", sm.state, StateLeader)
  1763  	}
  1764  }
  1765  
  1766  func TestLeaderStepdownWhenQuorumLost(t *testing.T) {
  1767  	sm := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1768  
  1769  	sm.checkQuorum = true
  1770  
  1771  	sm.becomeCandidate()
  1772  	sm.becomeLeader()
  1773  
  1774  	for i := 0; i < sm.electionTimeout+1; i++ {
  1775  		sm.tick()
  1776  	}
  1777  
  1778  	if sm.state != StateFollower {
  1779  		t.Errorf("state = %v, want %v", sm.state, StateFollower)
  1780  	}
  1781  }
  1782  
  1783  func TestLeaderSupersedingWithCheckQuorum(t *testing.T) {
  1784  	a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1785  	b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1786  	c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1787  
  1788  	a.checkQuorum = true
  1789  	b.checkQuorum = true
  1790  	c.checkQuorum = true
  1791  
  1792  	nt := newNetwork(a, b, c)
  1793  	setRandomizedElectionTimeout(b, b.electionTimeout+1)
  1794  
  1795  	for i := 0; i < b.electionTimeout; i++ {
  1796  		b.tick()
  1797  	}
  1798  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1799  
  1800  	if a.state != StateLeader {
  1801  		t.Errorf("state = %s, want %s", a.state, StateLeader)
  1802  	}
  1803  
  1804  	if c.state != StateFollower {
  1805  		t.Errorf("state = %s, want %s", c.state, StateFollower)
  1806  	}
  1807  
  1808  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  1809  
  1810  	// Peer b rejected c's vote since its electionElapsed had not reached to electionTimeout
  1811  	if c.state != StateCandidate {
  1812  		t.Errorf("state = %s, want %s", c.state, StateCandidate)
  1813  	}
  1814  
  1815  	// Letting b's electionElapsed reach to electionTimeout
  1816  	for i := 0; i < b.electionTimeout; i++ {
  1817  		b.tick()
  1818  	}
  1819  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  1820  
  1821  	if c.state != StateLeader {
  1822  		t.Errorf("state = %s, want %s", c.state, StateLeader)
  1823  	}
  1824  }
  1825  
  1826  func TestLeaderElectionWithCheckQuorum(t *testing.T) {
  1827  	a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1828  	b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1829  	c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1830  
  1831  	a.checkQuorum = true
  1832  	b.checkQuorum = true
  1833  	c.checkQuorum = true
  1834  
  1835  	nt := newNetwork(a, b, c)
  1836  	setRandomizedElectionTimeout(a, a.electionTimeout+1)
  1837  	setRandomizedElectionTimeout(b, b.electionTimeout+2)
  1838  
  1839  	// Immediately after creation, votes are cast regardless of the
  1840  	// election timeout.
  1841  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1842  
  1843  	if a.state != StateLeader {
  1844  		t.Errorf("state = %s, want %s", a.state, StateLeader)
  1845  	}
  1846  
  1847  	if c.state != StateFollower {
  1848  		t.Errorf("state = %s, want %s", c.state, StateFollower)
  1849  	}
  1850  
  1851  	// need to reset randomizedElectionTimeout larger than electionTimeout again,
  1852  	// because the value might be reset to electionTimeout since the last state changes
  1853  	setRandomizedElectionTimeout(a, a.electionTimeout+1)
  1854  	setRandomizedElectionTimeout(b, b.electionTimeout+2)
  1855  	for i := 0; i < a.electionTimeout; i++ {
  1856  		a.tick()
  1857  	}
  1858  	for i := 0; i < b.electionTimeout; i++ {
  1859  		b.tick()
  1860  	}
  1861  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  1862  
  1863  	if a.state != StateFollower {
  1864  		t.Errorf("state = %s, want %s", a.state, StateFollower)
  1865  	}
  1866  
  1867  	if c.state != StateLeader {
  1868  		t.Errorf("state = %s, want %s", c.state, StateLeader)
  1869  	}
  1870  }
  1871  
  1872  // TestFreeStuckCandidateWithCheckQuorum ensures that a candidate with a higher term
  1873  // can disrupt the leader even if the leader still "officially" holds the lease, The
  1874  // leader is expected to step down and adopt the candidate's term
  1875  func TestFreeStuckCandidateWithCheckQuorum(t *testing.T) {
  1876  	a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1877  	b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1878  	c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1879  
  1880  	a.checkQuorum = true
  1881  	b.checkQuorum = true
  1882  	c.checkQuorum = true
  1883  
  1884  	nt := newNetwork(a, b, c)
  1885  	setRandomizedElectionTimeout(b, b.electionTimeout+1)
  1886  
  1887  	for i := 0; i < b.electionTimeout; i++ {
  1888  		b.tick()
  1889  	}
  1890  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1891  
  1892  	nt.isolate(1)
  1893  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  1894  
  1895  	if b.state != StateFollower {
  1896  		t.Errorf("state = %s, want %s", b.state, StateFollower)
  1897  	}
  1898  
  1899  	if c.state != StateCandidate {
  1900  		t.Errorf("state = %s, want %s", c.state, StateCandidate)
  1901  	}
  1902  
  1903  	if c.Term != b.Term+1 {
  1904  		t.Errorf("term = %d, want %d", c.Term, b.Term+1)
  1905  	}
  1906  
  1907  	// Vote again for safety
  1908  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  1909  
  1910  	if b.state != StateFollower {
  1911  		t.Errorf("state = %s, want %s", b.state, StateFollower)
  1912  	}
  1913  
  1914  	if c.state != StateCandidate {
  1915  		t.Errorf("state = %s, want %s", c.state, StateCandidate)
  1916  	}
  1917  
  1918  	if c.Term != b.Term+2 {
  1919  		t.Errorf("term = %d, want %d", c.Term, b.Term+2)
  1920  	}
  1921  
  1922  	nt.recover()
  1923  	nt.send(pb.Message{From: 1, To: 3, Type: pb.MsgHeartbeat, Term: a.Term})
  1924  
  1925  	// Disrupt the leader so that the stuck peer is freed
  1926  	if a.state != StateFollower {
  1927  		t.Errorf("state = %s, want %s", a.state, StateFollower)
  1928  	}
  1929  
  1930  	if c.Term != a.Term {
  1931  		t.Errorf("term = %d, want %d", c.Term, a.Term)
  1932  	}
  1933  
  1934  	// Vote again, should become leader this time
  1935  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  1936  
  1937  	if c.state != StateLeader {
  1938  		t.Errorf("peer 3 state: %s, want %s", c.state, StateLeader)
  1939  	}
  1940  }
  1941  
  1942  func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
  1943  	a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  1944  	b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1)))
  1945  
  1946  	a.checkQuorum = true
  1947  	b.checkQuorum = true
  1948  
  1949  	nt := newNetwork(a, b)
  1950  	setRandomizedElectionTimeout(b, b.electionTimeout+1)
  1951  	// Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states
  1952  	b.applyConfChange(pb.ConfChange{Type: pb.ConfChangeRemoveNode, NodeID: 2}.AsV2())
  1953  
  1954  	if b.promotable() {
  1955  		t.Fatalf("promotable = %v, want false", b.promotable())
  1956  	}
  1957  
  1958  	for i := 0; i < b.electionTimeout; i++ {
  1959  		b.tick()
  1960  	}
  1961  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1962  
  1963  	if a.state != StateLeader {
  1964  		t.Errorf("state = %s, want %s", a.state, StateLeader)
  1965  	}
  1966  
  1967  	if b.state != StateFollower {
  1968  		t.Errorf("state = %s, want %s", b.state, StateFollower)
  1969  	}
  1970  
  1971  	if b.lead != 1 {
  1972  		t.Errorf("lead = %d, want 1", b.lead)
  1973  	}
  1974  }
  1975  
  1976  // TestDisruptiveFollower tests isolated follower,
  1977  // with slow network incoming from leader, election times out
  1978  // to become a candidate with an increased term. Then, the
  1979  // candiate's response to late leader heartbeat forces the leader
  1980  // to step down.
  1981  func TestDisruptiveFollower(t *testing.T) {
  1982  	n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1983  	n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1984  	n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  1985  
  1986  	n1.checkQuorum = true
  1987  	n2.checkQuorum = true
  1988  	n3.checkQuorum = true
  1989  
  1990  	n1.becomeFollower(1, None)
  1991  	n2.becomeFollower(1, None)
  1992  	n3.becomeFollower(1, None)
  1993  
  1994  	nt := newNetwork(n1, n2, n3)
  1995  
  1996  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  1997  
  1998  	// check state
  1999  	// n1.state == StateLeader
  2000  	// n2.state == StateFollower
  2001  	// n3.state == StateFollower
  2002  	if n1.state != StateLeader {
  2003  		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
  2004  	}
  2005  	if n2.state != StateFollower {
  2006  		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
  2007  	}
  2008  	if n3.state != StateFollower {
  2009  		t.Fatalf("node 3 state: %s, want %s", n3.state, StateFollower)
  2010  	}
  2011  
  2012  	// etcd server "advanceTicksForElection" on restart;
  2013  	// this is to expedite campaign trigger when given larger
  2014  	// election timeouts (e.g. multi-datacenter deploy)
  2015  	// Or leader messages are being delayed while ticks elapse
  2016  	setRandomizedElectionTimeout(n3, n3.electionTimeout+2)
  2017  	for i := 0; i < n3.randomizedElectionTimeout-1; i++ {
  2018  		n3.tick()
  2019  	}
  2020  
  2021  	// ideally, before last election tick elapses,
  2022  	// the follower n3 receives "pb.MsgApp" or "pb.MsgHeartbeat"
  2023  	// from leader n1, and then resets its "electionElapsed"
  2024  	// however, last tick may elapse before receiving any
  2025  	// messages from leader, thus triggering campaign
  2026  	n3.tick()
  2027  
  2028  	// n1 is still leader yet
  2029  	// while its heartbeat to candidate n3 is being delayed
  2030  
  2031  	// check state
  2032  	// n1.state == StateLeader
  2033  	// n2.state == StateFollower
  2034  	// n3.state == StateCandidate
  2035  	if n1.state != StateLeader {
  2036  		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
  2037  	}
  2038  	if n2.state != StateFollower {
  2039  		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
  2040  	}
  2041  	if n3.state != StateCandidate {
  2042  		t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate)
  2043  	}
  2044  	// check term
  2045  	// n1.Term == 2
  2046  	// n2.Term == 2
  2047  	// n3.Term == 3
  2048  	if n1.Term != 2 {
  2049  		t.Fatalf("node 1 term: %d, want %d", n1.Term, 2)
  2050  	}
  2051  	if n2.Term != 2 {
  2052  		t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
  2053  	}
  2054  	if n3.Term != 3 {
  2055  		t.Fatalf("node 3 term: %d, want %d", n3.Term, 3)
  2056  	}
  2057  
  2058  	// while outgoing vote requests are still queued in n3,
  2059  	// leader heartbeat finally arrives at candidate n3
  2060  	// however, due to delayed network from leader, leader
  2061  	// heartbeat was sent with lower term than candidate's
  2062  	nt.send(pb.Message{From: 1, To: 3, Term: n1.Term, Type: pb.MsgHeartbeat})
  2063  
  2064  	// then candidate n3 responds with "pb.MsgAppResp" of higher term
  2065  	// and leader steps down from a message with higher term
  2066  	// this is to disrupt the current leader, so that candidate
  2067  	// with higher term can be freed with following election
  2068  
  2069  	// check state
  2070  	// n1.state == StateFollower
  2071  	// n2.state == StateFollower
  2072  	// n3.state == StateCandidate
  2073  	if n1.state != StateFollower {
  2074  		t.Fatalf("node 1 state: %s, want %s", n1.state, StateFollower)
  2075  	}
  2076  	if n2.state != StateFollower {
  2077  		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
  2078  	}
  2079  	if n3.state != StateCandidate {
  2080  		t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate)
  2081  	}
  2082  	// check term
  2083  	// n1.Term == 3
  2084  	// n2.Term == 2
  2085  	// n3.Term == 3
  2086  	if n1.Term != 3 {
  2087  		t.Fatalf("node 1 term: %d, want %d", n1.Term, 3)
  2088  	}
  2089  	if n2.Term != 2 {
  2090  		t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
  2091  	}
  2092  	if n3.Term != 3 {
  2093  		t.Fatalf("node 3 term: %d, want %d", n3.Term, 3)
  2094  	}
  2095  }
  2096  
  2097  // TestDisruptiveFollowerPreVote tests isolated follower,
  2098  // with slow network incoming from leader, election times out
  2099  // to become a pre-candidate with less log than current leader.
  2100  // Then pre-vote phase prevents this isolated node from forcing
  2101  // current leader to step down, thus less disruptions.
  2102  func TestDisruptiveFollowerPreVote(t *testing.T) {
  2103  	n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2104  	n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2105  	n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2106  
  2107  	n1.checkQuorum = true
  2108  	n2.checkQuorum = true
  2109  	n3.checkQuorum = true
  2110  
  2111  	n1.becomeFollower(1, None)
  2112  	n2.becomeFollower(1, None)
  2113  	n3.becomeFollower(1, None)
  2114  
  2115  	nt := newNetwork(n1, n2, n3)
  2116  
  2117  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  2118  
  2119  	// check state
  2120  	// n1.state == StateLeader
  2121  	// n2.state == StateFollower
  2122  	// n3.state == StateFollower
  2123  	if n1.state != StateLeader {
  2124  		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
  2125  	}
  2126  	if n2.state != StateFollower {
  2127  		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
  2128  	}
  2129  	if n3.state != StateFollower {
  2130  		t.Fatalf("node 3 state: %s, want %s", n3.state, StateFollower)
  2131  	}
  2132  
  2133  	nt.isolate(3)
  2134  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  2135  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  2136  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  2137  	n1.preVote = true
  2138  	n2.preVote = true
  2139  	n3.preVote = true
  2140  	nt.recover()
  2141  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  2142  
  2143  	// check state
  2144  	// n1.state == StateLeader
  2145  	// n2.state == StateFollower
  2146  	// n3.state == StatePreCandidate
  2147  	if n1.state != StateLeader {
  2148  		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
  2149  	}
  2150  	if n2.state != StateFollower {
  2151  		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
  2152  	}
  2153  	if n3.state != StatePreCandidate {
  2154  		t.Fatalf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
  2155  	}
  2156  	// check term
  2157  	// n1.Term == 2
  2158  	// n2.Term == 2
  2159  	// n3.Term == 2
  2160  	if n1.Term != 2 {
  2161  		t.Fatalf("node 1 term: %d, want %d", n1.Term, 2)
  2162  	}
  2163  	if n2.Term != 2 {
  2164  		t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
  2165  	}
  2166  	if n3.Term != 2 {
  2167  		t.Fatalf("node 2 term: %d, want %d", n3.Term, 2)
  2168  	}
  2169  
  2170  	// delayed leader heartbeat does not force current leader to step down
  2171  	nt.send(pb.Message{From: 1, To: 3, Term: n1.Term, Type: pb.MsgHeartbeat})
  2172  	if n1.state != StateLeader {
  2173  		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
  2174  	}
  2175  }
  2176  
  2177  func TestReadOnlyOptionSafe(t *testing.T) {
  2178  	a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2179  	b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2180  	c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2181  
  2182  	nt := newNetwork(a, b, c)
  2183  	setRandomizedElectionTimeout(b, b.electionTimeout+1)
  2184  
  2185  	for i := 0; i < b.electionTimeout; i++ {
  2186  		b.tick()
  2187  	}
  2188  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  2189  
  2190  	if a.state != StateLeader {
  2191  		t.Fatalf("state = %s, want %s", a.state, StateLeader)
  2192  	}
  2193  
  2194  	tests := []struct {
  2195  		sm        *raft
  2196  		proposals int
  2197  		wri       uint64
  2198  		wctx      []byte
  2199  	}{
  2200  		{a, 10, 11, []byte("ctx1")},
  2201  		{b, 10, 21, []byte("ctx2")},
  2202  		{c, 10, 31, []byte("ctx3")},
  2203  		{a, 10, 41, []byte("ctx4")},
  2204  		{b, 10, 51, []byte("ctx5")},
  2205  		{c, 10, 61, []byte("ctx6")},
  2206  	}
  2207  
  2208  	for i, tt := range tests {
  2209  		for j := 0; j < tt.proposals; j++ {
  2210  			nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  2211  		}
  2212  
  2213  		nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
  2214  
  2215  		r := tt.sm
  2216  		if len(r.readStates) == 0 {
  2217  			t.Errorf("#%d: len(readStates) = 0, want non-zero", i)
  2218  		}
  2219  		rs := r.readStates[0]
  2220  		if rs.Index != tt.wri {
  2221  			t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
  2222  		}
  2223  
  2224  		if !bytes.Equal(rs.RequestCtx, tt.wctx) {
  2225  			t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
  2226  		}
  2227  		r.readStates = nil
  2228  	}
  2229  }
  2230  
  2231  func TestReadOnlyWithLearner(t *testing.T) {
  2232  	a := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
  2233  	b := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
  2234  
  2235  	nt := newNetwork(a, b)
  2236  	setRandomizedElectionTimeout(b, b.electionTimeout+1)
  2237  
  2238  	for i := 0; i < b.electionTimeout; i++ {
  2239  		b.tick()
  2240  	}
  2241  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  2242  
  2243  	if a.state != StateLeader {
  2244  		t.Fatalf("state = %s, want %s", a.state, StateLeader)
  2245  	}
  2246  
  2247  	tests := []struct {
  2248  		sm        *raft
  2249  		proposals int
  2250  		wri       uint64
  2251  		wctx      []byte
  2252  	}{
  2253  		{a, 10, 11, []byte("ctx1")},
  2254  		{b, 10, 21, []byte("ctx2")},
  2255  		{a, 10, 31, []byte("ctx3")},
  2256  		{b, 10, 41, []byte("ctx4")},
  2257  	}
  2258  
  2259  	for i, tt := range tests {
  2260  		for j := 0; j < tt.proposals; j++ {
  2261  			nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  2262  		}
  2263  
  2264  		nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
  2265  
  2266  		r := tt.sm
  2267  		if len(r.readStates) == 0 {
  2268  			t.Fatalf("#%d: len(readStates) = 0, want non-zero", i)
  2269  		}
  2270  		rs := r.readStates[0]
  2271  		if rs.Index != tt.wri {
  2272  			t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
  2273  		}
  2274  
  2275  		if !bytes.Equal(rs.RequestCtx, tt.wctx) {
  2276  			t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
  2277  		}
  2278  		r.readStates = nil
  2279  	}
  2280  }
  2281  
  2282  func TestReadOnlyOptionLease(t *testing.T) {
  2283  	a := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2284  	b := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2285  	c := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2286  	a.readOnly.option = ReadOnlyLeaseBased
  2287  	b.readOnly.option = ReadOnlyLeaseBased
  2288  	c.readOnly.option = ReadOnlyLeaseBased
  2289  	a.checkQuorum = true
  2290  	b.checkQuorum = true
  2291  	c.checkQuorum = true
  2292  
  2293  	nt := newNetwork(a, b, c)
  2294  	setRandomizedElectionTimeout(b, b.electionTimeout+1)
  2295  
  2296  	for i := 0; i < b.electionTimeout; i++ {
  2297  		b.tick()
  2298  	}
  2299  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  2300  
  2301  	if a.state != StateLeader {
  2302  		t.Fatalf("state = %s, want %s", a.state, StateLeader)
  2303  	}
  2304  
  2305  	tests := []struct {
  2306  		sm        *raft
  2307  		proposals int
  2308  		wri       uint64
  2309  		wctx      []byte
  2310  	}{
  2311  		{a, 10, 11, []byte("ctx1")},
  2312  		{b, 10, 21, []byte("ctx2")},
  2313  		{c, 10, 31, []byte("ctx3")},
  2314  		{a, 10, 41, []byte("ctx4")},
  2315  		{b, 10, 51, []byte("ctx5")},
  2316  		{c, 10, 61, []byte("ctx6")},
  2317  	}
  2318  
  2319  	for i, tt := range tests {
  2320  		for j := 0; j < tt.proposals; j++ {
  2321  			nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  2322  		}
  2323  
  2324  		nt.send(pb.Message{From: tt.sm.id, To: tt.sm.id, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: tt.wctx}}})
  2325  
  2326  		r := tt.sm
  2327  		rs := r.readStates[0]
  2328  		if rs.Index != tt.wri {
  2329  			t.Errorf("#%d: readIndex = %d, want %d", i, rs.Index, tt.wri)
  2330  		}
  2331  
  2332  		if !bytes.Equal(rs.RequestCtx, tt.wctx) {
  2333  			t.Errorf("#%d: requestCtx = %v, want %v", i, rs.RequestCtx, tt.wctx)
  2334  		}
  2335  		r.readStates = nil
  2336  	}
  2337  }
  2338  
  2339  // TestReadOnlyForNewLeader ensures that a leader only accepts MsgReadIndex message
  2340  // when it commits at least one log entry at it term.
  2341  func TestReadOnlyForNewLeader(t *testing.T) {
  2342  	nodeConfigs := []struct {
  2343  		id           uint64
  2344  		committed    uint64
  2345  		applied      uint64
  2346  		compactIndex uint64
  2347  	}{
  2348  		{1, 1, 1, 0},
  2349  		{2, 2, 2, 2},
  2350  		{3, 2, 2, 2},
  2351  	}
  2352  	peers := make([]stateMachine, 0)
  2353  	for _, c := range nodeConfigs {
  2354  		storage := newTestMemoryStorage(withPeers(1, 2, 3))
  2355  		storage.Append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 1}})
  2356  		storage.SetHardState(pb.HardState{Term: 1, Commit: c.committed})
  2357  		if c.compactIndex != 0 {
  2358  			storage.Compact(c.compactIndex)
  2359  		}
  2360  		cfg := newTestConfig(c.id, 10, 1, storage)
  2361  		cfg.Applied = c.applied
  2362  		raft := newRaft(cfg)
  2363  		peers = append(peers, raft)
  2364  	}
  2365  	nt := newNetwork(peers...)
  2366  
  2367  	// Drop MsgApp to forbid peer a to commit any log entry at its term after it becomes leader.
  2368  	nt.ignore(pb.MsgApp)
  2369  	// Force peer a to become leader.
  2370  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  2371  
  2372  	sm := nt.peers[1].(*raft)
  2373  	if sm.state != StateLeader {
  2374  		t.Fatalf("state = %s, want %s", sm.state, StateLeader)
  2375  	}
  2376  
  2377  	// Ensure peer a drops read only request.
  2378  	var windex uint64 = 4
  2379  	wctx := []byte("ctx")
  2380  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
  2381  	if len(sm.readStates) != 0 {
  2382  		t.Fatalf("len(readStates) = %d, want zero", len(sm.readStates))
  2383  	}
  2384  
  2385  	nt.recover()
  2386  
  2387  	// Force peer a to commit a log entry at its term
  2388  	for i := 0; i < sm.heartbeatTimeout; i++ {
  2389  		sm.tick()
  2390  	}
  2391  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  2392  	if sm.raftLog.committed != 4 {
  2393  		t.Fatalf("committed = %d, want 4", sm.raftLog.committed)
  2394  	}
  2395  	lastLogTerm := sm.raftLog.zeroTermOnErrCompacted(sm.raftLog.term(sm.raftLog.committed))
  2396  	if lastLogTerm != sm.Term {
  2397  		t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term)
  2398  	}
  2399  
  2400  	// Ensure peer a processed postponed read only request after it committed an entry at its term.
  2401  	if len(sm.readStates) != 1 {
  2402  		t.Fatalf("len(readStates) = %d, want 1", len(sm.readStates))
  2403  	}
  2404  	rs := sm.readStates[0]
  2405  	if rs.Index != windex {
  2406  		t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
  2407  	}
  2408  	if !bytes.Equal(rs.RequestCtx, wctx) {
  2409  		t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
  2410  	}
  2411  
  2412  	// Ensure peer a accepts read only request after it committed an entry at its term.
  2413  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: wctx}}})
  2414  	if len(sm.readStates) != 2 {
  2415  		t.Fatalf("len(readStates) = %d, want 2", len(sm.readStates))
  2416  	}
  2417  	rs = sm.readStates[1]
  2418  	if rs.Index != windex {
  2419  		t.Fatalf("readIndex = %d, want %d", rs.Index, windex)
  2420  	}
  2421  	if !bytes.Equal(rs.RequestCtx, wctx) {
  2422  		t.Fatalf("requestCtx = %v, want %v", rs.RequestCtx, wctx)
  2423  	}
  2424  }
  2425  
  2426  func TestLeaderAppResp(t *testing.T) {
  2427  	// initial progress: match = 0; next = 3
  2428  	tests := []struct {
  2429  		index  uint64
  2430  		reject bool
  2431  		// progress
  2432  		wmatch uint64
  2433  		wnext  uint64
  2434  		// message
  2435  		wmsgNum    int
  2436  		windex     uint64
  2437  		wcommitted uint64
  2438  	}{
  2439  		{3, true, 0, 3, 0, 0, 0},  // stale resp; no replies
  2440  		{2, true, 0, 2, 1, 1, 0},  // denied resp; leader does not commit; decrease next and send probing msg
  2441  		{2, false, 2, 4, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
  2442  		{0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
  2443  	}
  2444  
  2445  	for i, tt := range tests {
  2446  		// sm term is 1 after it becomes the leader.
  2447  		// thus the last log term must be 1 to be committed.
  2448  		sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2449  		sm.raftLog = &raftLog{
  2450  			storage:  &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
  2451  			unstable: unstable{offset: 3},
  2452  		}
  2453  		sm.becomeCandidate()
  2454  		sm.becomeLeader()
  2455  		sm.readMessages()
  2456  		sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
  2457  
  2458  		p := sm.prs.Progress[2]
  2459  		if p.Match != tt.wmatch {
  2460  			t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
  2461  		}
  2462  		if p.Next != tt.wnext {
  2463  			t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
  2464  		}
  2465  
  2466  		msgs := sm.readMessages()
  2467  
  2468  		if len(msgs) != tt.wmsgNum {
  2469  			t.Errorf("#%d msgNum = %d, want %d", i, len(msgs), tt.wmsgNum)
  2470  		}
  2471  		for j, msg := range msgs {
  2472  			if msg.Index != tt.windex {
  2473  				t.Errorf("#%d.%d index = %d, want %d", i, j, msg.Index, tt.windex)
  2474  			}
  2475  			if msg.Commit != tt.wcommitted {
  2476  				t.Errorf("#%d.%d commit = %d, want %d", i, j, msg.Commit, tt.wcommitted)
  2477  			}
  2478  		}
  2479  	}
  2480  }
  2481  
  2482  // TestBcastBeat is when the leader receives a heartbeat tick, it should
  2483  // send a MsgHeartbeat with m.Index = 0, m.LogTerm=0 and empty entries.
  2484  func TestBcastBeat(t *testing.T) {
  2485  	offset := uint64(1000)
  2486  	// make a state machine with log.offset = 1000
  2487  	s := pb.Snapshot{
  2488  		Metadata: pb.SnapshotMetadata{
  2489  			Index:     offset,
  2490  			Term:      1,
  2491  			ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}},
  2492  		},
  2493  	}
  2494  	storage := NewMemoryStorage()
  2495  	storage.ApplySnapshot(s)
  2496  	sm := newTestRaft(1, 10, 1, storage)
  2497  	sm.Term = 1
  2498  
  2499  	sm.becomeCandidate()
  2500  	sm.becomeLeader()
  2501  	for i := 0; i < 10; i++ {
  2502  		mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
  2503  	}
  2504  	// slow follower
  2505  	sm.prs.Progress[2].Match, sm.prs.Progress[2].Next = 5, 6
  2506  	// normal follower
  2507  	sm.prs.Progress[3].Match, sm.prs.Progress[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
  2508  
  2509  	sm.Step(pb.Message{Type: pb.MsgBeat})
  2510  	msgs := sm.readMessages()
  2511  	if len(msgs) != 2 {
  2512  		t.Fatalf("len(msgs) = %v, want 2", len(msgs))
  2513  	}
  2514  	wantCommitMap := map[uint64]uint64{
  2515  		2: min(sm.raftLog.committed, sm.prs.Progress[2].Match),
  2516  		3: min(sm.raftLog.committed, sm.prs.Progress[3].Match),
  2517  	}
  2518  	for i, m := range msgs {
  2519  		if m.Type != pb.MsgHeartbeat {
  2520  			t.Fatalf("#%d: type = %v, want = %v", i, m.Type, pb.MsgHeartbeat)
  2521  		}
  2522  		if m.Index != 0 {
  2523  			t.Fatalf("#%d: prevIndex = %d, want %d", i, m.Index, 0)
  2524  		}
  2525  		if m.LogTerm != 0 {
  2526  			t.Fatalf("#%d: prevTerm = %d, want %d", i, m.LogTerm, 0)
  2527  		}
  2528  		if wantCommitMap[m.To] == 0 {
  2529  			t.Fatalf("#%d: unexpected to %d", i, m.To)
  2530  		} else {
  2531  			if m.Commit != wantCommitMap[m.To] {
  2532  				t.Fatalf("#%d: commit = %d, want %d", i, m.Commit, wantCommitMap[m.To])
  2533  			}
  2534  			delete(wantCommitMap, m.To)
  2535  		}
  2536  		if len(m.Entries) != 0 {
  2537  			t.Fatalf("#%d: len(entries) = %d, want 0", i, len(m.Entries))
  2538  		}
  2539  	}
  2540  }
  2541  
  2542  // TestRecvMsgBeat tests the output of the state machine when receiving MsgBeat
  2543  func TestRecvMsgBeat(t *testing.T) {
  2544  	tests := []struct {
  2545  		state StateType
  2546  		wMsg  int
  2547  	}{
  2548  		{StateLeader, 2},
  2549  		// candidate and follower should ignore MsgBeat
  2550  		{StateCandidate, 0},
  2551  		{StateFollower, 0},
  2552  	}
  2553  
  2554  	for i, tt := range tests {
  2555  		sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  2556  		sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
  2557  		sm.Term = 1
  2558  		sm.state = tt.state
  2559  		switch tt.state {
  2560  		case StateFollower:
  2561  			sm.step = stepFollower
  2562  		case StateCandidate:
  2563  			sm.step = stepCandidate
  2564  		case StateLeader:
  2565  			sm.step = stepLeader
  2566  		}
  2567  		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  2568  
  2569  		msgs := sm.readMessages()
  2570  		if len(msgs) != tt.wMsg {
  2571  			t.Errorf("%d: len(msgs) = %d, want %d", i, len(msgs), tt.wMsg)
  2572  		}
  2573  		for _, m := range msgs {
  2574  			if m.Type != pb.MsgHeartbeat {
  2575  				t.Errorf("%d: msg.type = %v, want %v", i, m.Type, pb.MsgHeartbeat)
  2576  			}
  2577  		}
  2578  	}
  2579  }
  2580  
  2581  func TestLeaderIncreaseNext(t *testing.T) {
  2582  	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
  2583  	tests := []struct {
  2584  		// progress
  2585  		state tracker.StateType
  2586  		next  uint64
  2587  
  2588  		wnext uint64
  2589  	}{
  2590  		// state replicate, optimistically increase next
  2591  		// previous entries + noop entry + propose + 1
  2592  		{tracker.StateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
  2593  		// state probe, not optimistically increase next
  2594  		{tracker.StateProbe, 2, 2},
  2595  	}
  2596  
  2597  	for i, tt := range tests {
  2598  		sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  2599  		sm.raftLog.append(previousEnts...)
  2600  		sm.becomeCandidate()
  2601  		sm.becomeLeader()
  2602  		sm.prs.Progress[2].State = tt.state
  2603  		sm.prs.Progress[2].Next = tt.next
  2604  		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  2605  
  2606  		p := sm.prs.Progress[2]
  2607  		if p.Next != tt.wnext {
  2608  			t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
  2609  		}
  2610  	}
  2611  }
  2612  
  2613  func TestSendAppendForProgressProbe(t *testing.T) {
  2614  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  2615  	r.becomeCandidate()
  2616  	r.becomeLeader()
  2617  	r.readMessages()
  2618  	r.prs.Progress[2].BecomeProbe()
  2619  
  2620  	// each round is a heartbeat
  2621  	for i := 0; i < 3; i++ {
  2622  		if i == 0 {
  2623  			// we expect that raft will only send out one msgAPP on the first
  2624  			// loop. After that, the follower is paused until a heartbeat response is
  2625  			// received.
  2626  			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
  2627  			r.sendAppend(2)
  2628  			msg := r.readMessages()
  2629  			if len(msg) != 1 {
  2630  				t.Errorf("len(msg) = %d, want %d", len(msg), 1)
  2631  			}
  2632  			if msg[0].Index != 0 {
  2633  				t.Errorf("index = %d, want %d", msg[0].Index, 0)
  2634  			}
  2635  		}
  2636  
  2637  		if !r.prs.Progress[2].ProbeSent {
  2638  			t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
  2639  		}
  2640  		for j := 0; j < 10; j++ {
  2641  			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
  2642  			r.sendAppend(2)
  2643  			if l := len(r.readMessages()); l != 0 {
  2644  				t.Errorf("len(msg) = %d, want %d", l, 0)
  2645  			}
  2646  		}
  2647  
  2648  		// do a heartbeat
  2649  		for j := 0; j < r.heartbeatTimeout; j++ {
  2650  			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  2651  		}
  2652  		if !r.prs.Progress[2].ProbeSent {
  2653  			t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
  2654  		}
  2655  
  2656  		// consume the heartbeat
  2657  		msg := r.readMessages()
  2658  		if len(msg) != 1 {
  2659  			t.Errorf("len(msg) = %d, want %d", len(msg), 1)
  2660  		}
  2661  		if msg[0].Type != pb.MsgHeartbeat {
  2662  			t.Errorf("type = %v, want %v", msg[0].Type, pb.MsgHeartbeat)
  2663  		}
  2664  	}
  2665  
  2666  	// a heartbeat response will allow another message to be sent
  2667  	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
  2668  	msg := r.readMessages()
  2669  	if len(msg) != 1 {
  2670  		t.Errorf("len(msg) = %d, want %d", len(msg), 1)
  2671  	}
  2672  	if msg[0].Index != 0 {
  2673  		t.Errorf("index = %d, want %d", msg[0].Index, 0)
  2674  	}
  2675  	if !r.prs.Progress[2].ProbeSent {
  2676  		t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
  2677  	}
  2678  }
  2679  
  2680  func TestSendAppendForProgressReplicate(t *testing.T) {
  2681  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  2682  	r.becomeCandidate()
  2683  	r.becomeLeader()
  2684  	r.readMessages()
  2685  	r.prs.Progress[2].BecomeReplicate()
  2686  
  2687  	for i := 0; i < 10; i++ {
  2688  		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
  2689  		r.sendAppend(2)
  2690  		msgs := r.readMessages()
  2691  		if len(msgs) != 1 {
  2692  			t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
  2693  		}
  2694  	}
  2695  }
  2696  
  2697  func TestSendAppendForProgressSnapshot(t *testing.T) {
  2698  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  2699  	r.becomeCandidate()
  2700  	r.becomeLeader()
  2701  	r.readMessages()
  2702  	r.prs.Progress[2].BecomeSnapshot(10)
  2703  
  2704  	for i := 0; i < 10; i++ {
  2705  		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
  2706  		r.sendAppend(2)
  2707  		msgs := r.readMessages()
  2708  		if len(msgs) != 0 {
  2709  			t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
  2710  		}
  2711  	}
  2712  }
  2713  
  2714  func TestRecvMsgUnreachable(t *testing.T) {
  2715  	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
  2716  	s := newTestMemoryStorage(withPeers(1, 2))
  2717  	s.Append(previousEnts)
  2718  	r := newTestRaft(1, 10, 1, s)
  2719  	r.becomeCandidate()
  2720  	r.becomeLeader()
  2721  	r.readMessages()
  2722  	// set node 2 to state replicate
  2723  	r.prs.Progress[2].Match = 3
  2724  	r.prs.Progress[2].BecomeReplicate()
  2725  	r.prs.Progress[2].OptimisticUpdate(5)
  2726  
  2727  	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
  2728  
  2729  	if r.prs.Progress[2].State != tracker.StateProbe {
  2730  		t.Errorf("state = %s, want %s", r.prs.Progress[2].State, tracker.StateProbe)
  2731  	}
  2732  	if wnext := r.prs.Progress[2].Match + 1; r.prs.Progress[2].Next != wnext {
  2733  		t.Errorf("next = %d, want %d", r.prs.Progress[2].Next, wnext)
  2734  	}
  2735  }
  2736  
  2737  func TestRestore(t *testing.T) {
  2738  	s := pb.Snapshot{
  2739  		Metadata: pb.SnapshotMetadata{
  2740  			Index:     11, // magic number
  2741  			Term:      11, // magic number
  2742  			ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}},
  2743  		},
  2744  	}
  2745  
  2746  	storage := newTestMemoryStorage(withPeers(1, 2))
  2747  	sm := newTestRaft(1, 10, 1, storage)
  2748  	if ok := sm.restore(s); !ok {
  2749  		t.Fatal("restore fail, want succeed")
  2750  	}
  2751  
  2752  	if sm.raftLog.lastIndex() != s.Metadata.Index {
  2753  		t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
  2754  	}
  2755  	if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
  2756  		t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
  2757  	}
  2758  	sg := sm.prs.VoterNodes()
  2759  	if !reflect.DeepEqual(sg, s.Metadata.ConfState.Voters) {
  2760  		t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters)
  2761  	}
  2762  
  2763  	if ok := sm.restore(s); ok {
  2764  		t.Fatal("restore succeed, want fail")
  2765  	}
  2766  	// It should not campaign before actually applying data.
  2767  	for i := 0; i < sm.randomizedElectionTimeout; i++ {
  2768  		sm.tick()
  2769  	}
  2770  	if sm.state != StateFollower {
  2771  		t.Errorf("state = %d, want %d", sm.state, StateFollower)
  2772  	}
  2773  }
  2774  
  2775  // TestRestoreWithLearner restores a snapshot which contains learners.
  2776  func TestRestoreWithLearner(t *testing.T) {
  2777  	s := pb.Snapshot{
  2778  		Metadata: pb.SnapshotMetadata{
  2779  			Index:     11, // magic number
  2780  			Term:      11, // magic number
  2781  			ConfState: pb.ConfState{Voters: []uint64{1, 2}, Learners: []uint64{3}},
  2782  		},
  2783  	}
  2784  
  2785  	storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3))
  2786  	sm := newTestLearnerRaft(3, 8, 2, storage)
  2787  	if ok := sm.restore(s); !ok {
  2788  		t.Error("restore fail, want succeed")
  2789  	}
  2790  
  2791  	if sm.raftLog.lastIndex() != s.Metadata.Index {
  2792  		t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
  2793  	}
  2794  	if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
  2795  		t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
  2796  	}
  2797  	sg := sm.prs.VoterNodes()
  2798  	if len(sg) != len(s.Metadata.ConfState.Voters) {
  2799  		t.Errorf("sm.Voters = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Voters)
  2800  	}
  2801  	lns := sm.prs.LearnerNodes()
  2802  	if len(lns) != len(s.Metadata.ConfState.Learners) {
  2803  		t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
  2804  	}
  2805  	for _, n := range s.Metadata.ConfState.Voters {
  2806  		if sm.prs.Progress[n].IsLearner {
  2807  			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], false)
  2808  		}
  2809  	}
  2810  	for _, n := range s.Metadata.ConfState.Learners {
  2811  		if !sm.prs.Progress[n].IsLearner {
  2812  			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], true)
  2813  		}
  2814  	}
  2815  
  2816  	if ok := sm.restore(s); ok {
  2817  		t.Error("restore succeed, want fail")
  2818  	}
  2819  }
  2820  
  2821  // TestRestoreWithVotersOutgoing tests if outgoing voter can receive and apply snapshot correctly.
  2822  func TestRestoreWithVotersOutgoing(t *testing.T) {
  2823  	s := pb.Snapshot{
  2824  		Metadata: pb.SnapshotMetadata{
  2825  			Index:     11, // magic number
  2826  			Term:      11, // magic number
  2827  			ConfState: pb.ConfState{Voters: []uint64{2, 3, 4}, VotersOutgoing: []uint64{1, 2, 3}},
  2828  		},
  2829  	}
  2830  
  2831  	storage := newTestMemoryStorage(withPeers(1, 2))
  2832  	sm := newTestRaft(1, 10, 1, storage)
  2833  	if ok := sm.restore(s); !ok {
  2834  		t.Fatal("restore fail, want succeed")
  2835  	}
  2836  
  2837  	if sm.raftLog.lastIndex() != s.Metadata.Index {
  2838  		t.Errorf("log.lastIndex = %d, want %d", sm.raftLog.lastIndex(), s.Metadata.Index)
  2839  	}
  2840  	if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
  2841  		t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
  2842  	}
  2843  	sg := sm.prs.VoterNodes()
  2844  	if !reflect.DeepEqual(sg, []uint64{1, 2, 3, 4}) {
  2845  		t.Errorf("sm.Voters = %+v, want %+v", sg, s.Metadata.ConfState.Voters)
  2846  	}
  2847  
  2848  	if ok := sm.restore(s); ok {
  2849  		t.Fatal("restore succeed, want fail")
  2850  	}
  2851  	// It should not campaign before actually applying data.
  2852  	for i := 0; i < sm.randomizedElectionTimeout; i++ {
  2853  		sm.tick()
  2854  	}
  2855  	if sm.state != StateFollower {
  2856  		t.Errorf("state = %d, want %d", sm.state, StateFollower)
  2857  	}
  2858  }
  2859  
  2860  // TestRestoreVoterToLearner verifies that a normal peer can be downgraded to a
  2861  // learner through a snapshot. At the time of writing, we don't allow
  2862  // configuration changes to do this directly, but note that the snapshot may
  2863  // compress multiple changes to the configuration into one: the voter could have
  2864  // been removed, then readded as a learner and the snapshot reflects both
  2865  // changes. In that case, a voter receives a snapshot telling it that it is now
  2866  // a learner. In fact, the node has to accept that snapshot, or it is
  2867  // permanently cut off from the Raft log.
  2868  func TestRestoreVoterToLearner(t *testing.T) {
  2869  	s := pb.Snapshot{
  2870  		Metadata: pb.SnapshotMetadata{
  2871  			Index:     11, // magic number
  2872  			Term:      11, // magic number
  2873  			ConfState: pb.ConfState{Voters: []uint64{1, 2}, Learners: []uint64{3}},
  2874  		},
  2875  	}
  2876  
  2877  	storage := newTestMemoryStorage(withPeers(1, 2, 3))
  2878  	sm := newTestRaft(3, 10, 1, storage)
  2879  
  2880  	if sm.isLearner {
  2881  		t.Errorf("%x is learner, want not", sm.id)
  2882  	}
  2883  	if ok := sm.restore(s); !ok {
  2884  		t.Error("restore failed unexpectedly")
  2885  	}
  2886  }
  2887  
  2888  // TestRestoreLearnerPromotion checks that a learner can become to a follower after
  2889  // restoring snapshot.
  2890  func TestRestoreLearnerPromotion(t *testing.T) {
  2891  	s := pb.Snapshot{
  2892  		Metadata: pb.SnapshotMetadata{
  2893  			Index:     11, // magic number
  2894  			Term:      11, // magic number
  2895  			ConfState: pb.ConfState{Voters: []uint64{1, 2, 3}},
  2896  		},
  2897  	}
  2898  
  2899  	storage := newTestMemoryStorage(withPeers(1, 2), withLearners(3))
  2900  	sm := newTestLearnerRaft(3, 10, 1, storage)
  2901  
  2902  	if !sm.isLearner {
  2903  		t.Errorf("%x is not learner, want yes", sm.id)
  2904  	}
  2905  
  2906  	if ok := sm.restore(s); !ok {
  2907  		t.Error("restore fail, want succeed")
  2908  	}
  2909  
  2910  	if sm.isLearner {
  2911  		t.Errorf("%x is learner, want not", sm.id)
  2912  	}
  2913  }
  2914  
  2915  // TestLearnerReceiveSnapshot tests that a learner can receive a snpahost from leader
  2916  func TestLearnerReceiveSnapshot(t *testing.T) {
  2917  	// restore the state machine from a snapshot so it has a compacted log and a snapshot
  2918  	s := pb.Snapshot{
  2919  		Metadata: pb.SnapshotMetadata{
  2920  			Index:     11, // magic number
  2921  			Term:      11, // magic number
  2922  			ConfState: pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
  2923  		},
  2924  	}
  2925  
  2926  	store := newTestMemoryStorage(withPeers(1), withLearners(2))
  2927  	n1 := newTestLearnerRaft(1, 10, 1, store)
  2928  	n2 := newTestLearnerRaft(2, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
  2929  
  2930  	n1.restore(s)
  2931  	ready := newReady(n1, &SoftState{}, pb.HardState{})
  2932  	store.ApplySnapshot(ready.Snapshot)
  2933  	n1.advance(ready)
  2934  
  2935  	// Force set n1 appplied index.
  2936  	n1.raftLog.appliedTo(n1.raftLog.committed)
  2937  
  2938  	nt := newNetwork(n1, n2)
  2939  
  2940  	setRandomizedElectionTimeout(n1, n1.electionTimeout)
  2941  	for i := 0; i < n1.electionTimeout; i++ {
  2942  		n1.tick()
  2943  	}
  2944  
  2945  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  2946  
  2947  	if n2.raftLog.committed != n1.raftLog.committed {
  2948  		t.Errorf("peer 2 must commit to %d, but %d", n1.raftLog.committed, n2.raftLog.committed)
  2949  	}
  2950  }
  2951  
  2952  func TestRestoreIgnoreSnapshot(t *testing.T) {
  2953  	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
  2954  	commit := uint64(1)
  2955  	storage := newTestMemoryStorage(withPeers(1, 2))
  2956  	sm := newTestRaft(1, 10, 1, storage)
  2957  	sm.raftLog.append(previousEnts...)
  2958  	sm.raftLog.commitTo(commit)
  2959  
  2960  	s := pb.Snapshot{
  2961  		Metadata: pb.SnapshotMetadata{
  2962  			Index:     commit,
  2963  			Term:      1,
  2964  			ConfState: pb.ConfState{Voters: []uint64{1, 2}},
  2965  		},
  2966  	}
  2967  
  2968  	// ignore snapshot
  2969  	if ok := sm.restore(s); ok {
  2970  		t.Errorf("restore = %t, want %t", ok, false)
  2971  	}
  2972  	if sm.raftLog.committed != commit {
  2973  		t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit)
  2974  	}
  2975  
  2976  	// ignore snapshot and fast forward commit
  2977  	s.Metadata.Index = commit + 1
  2978  	if ok := sm.restore(s); ok {
  2979  		t.Errorf("restore = %t, want %t", ok, false)
  2980  	}
  2981  	if sm.raftLog.committed != commit+1 {
  2982  		t.Errorf("commit = %d, want %d", sm.raftLog.committed, commit+1)
  2983  	}
  2984  }
  2985  
  2986  func TestProvideSnap(t *testing.T) {
  2987  	// restore the state machine from a snapshot so it has a compacted log and a snapshot
  2988  	s := pb.Snapshot{
  2989  		Metadata: pb.SnapshotMetadata{
  2990  			Index:     11, // magic number
  2991  			Term:      11, // magic number
  2992  			ConfState: pb.ConfState{Voters: []uint64{1, 2}},
  2993  		},
  2994  	}
  2995  	storage := newTestMemoryStorage(withPeers(1))
  2996  	sm := newTestRaft(1, 10, 1, storage)
  2997  	sm.restore(s)
  2998  
  2999  	sm.becomeCandidate()
  3000  	sm.becomeLeader()
  3001  
  3002  	// force set the next of node 2, so that node 2 needs a snapshot
  3003  	sm.prs.Progress[2].Next = sm.raftLog.firstIndex()
  3004  	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true})
  3005  
  3006  	msgs := sm.readMessages()
  3007  	if len(msgs) != 1 {
  3008  		t.Fatalf("len(msgs) = %d, want 1", len(msgs))
  3009  	}
  3010  	m := msgs[0]
  3011  	if m.Type != pb.MsgSnap {
  3012  		t.Errorf("m.Type = %v, want %v", m.Type, pb.MsgSnap)
  3013  	}
  3014  }
  3015  
  3016  func TestIgnoreProvidingSnap(t *testing.T) {
  3017  	// restore the state machine from a snapshot so it has a compacted log and a snapshot
  3018  	s := pb.Snapshot{
  3019  		Metadata: pb.SnapshotMetadata{
  3020  			Index:     11, // magic number
  3021  			Term:      11, // magic number
  3022  			ConfState: pb.ConfState{Voters: []uint64{1, 2}},
  3023  		},
  3024  	}
  3025  	storage := newTestMemoryStorage(withPeers(1))
  3026  	sm := newTestRaft(1, 10, 1, storage)
  3027  	sm.restore(s)
  3028  
  3029  	sm.becomeCandidate()
  3030  	sm.becomeLeader()
  3031  
  3032  	// force set the next of node 2, so that node 2 needs a snapshot
  3033  	// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
  3034  	sm.prs.Progress[2].Next = sm.raftLog.firstIndex() - 1
  3035  	sm.prs.Progress[2].RecentActive = false
  3036  
  3037  	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
  3038  
  3039  	msgs := sm.readMessages()
  3040  	if len(msgs) != 0 {
  3041  		t.Errorf("len(msgs) = %d, want 0", len(msgs))
  3042  	}
  3043  }
  3044  
  3045  func TestRestoreFromSnapMsg(t *testing.T) {
  3046  	s := pb.Snapshot{
  3047  		Metadata: pb.SnapshotMetadata{
  3048  			Index:     11, // magic number
  3049  			Term:      11, // magic number
  3050  			ConfState: pb.ConfState{Voters: []uint64{1, 2}},
  3051  		},
  3052  	}
  3053  	m := pb.Message{Type: pb.MsgSnap, From: 1, Term: 2, Snapshot: s}
  3054  
  3055  	sm := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  3056  	sm.Step(m)
  3057  
  3058  	if sm.lead != uint64(1) {
  3059  		t.Errorf("sm.lead = %d, want 1", sm.lead)
  3060  	}
  3061  
  3062  	// TODO(bdarnell): what should this test?
  3063  }
  3064  
  3065  func TestSlowNodeRestore(t *testing.T) {
  3066  	nt := newNetwork(nil, nil, nil)
  3067  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3068  
  3069  	nt.isolate(3)
  3070  	for j := 0; j <= 100; j++ {
  3071  		nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3072  	}
  3073  	lead := nt.peers[1].(*raft)
  3074  	nextEnts(lead, nt.storage[1])
  3075  	nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Voters: lead.prs.VoterNodes()}, nil)
  3076  	nt.storage[1].Compact(lead.raftLog.applied)
  3077  
  3078  	nt.recover()
  3079  	// send heartbeats so that the leader can learn everyone is active.
  3080  	// node 3 will only be considered as active when node 1 receives a reply from it.
  3081  	for {
  3082  		nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
  3083  		if lead.prs.Progress[3].RecentActive {
  3084  			break
  3085  		}
  3086  	}
  3087  
  3088  	// trigger a snapshot
  3089  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3090  
  3091  	follower := nt.peers[3].(*raft)
  3092  
  3093  	// trigger a commit
  3094  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3095  	if follower.raftLog.committed != lead.raftLog.committed {
  3096  		t.Errorf("follower.committed = %d, want %d", follower.raftLog.committed, lead.raftLog.committed)
  3097  	}
  3098  }
  3099  
  3100  // TestStepConfig tests that when raft step msgProp in EntryConfChange type,
  3101  // it appends the entry to log and sets pendingConf to be true.
  3102  func TestStepConfig(t *testing.T) {
  3103  	// a raft that cannot make progress
  3104  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  3105  	r.becomeCandidate()
  3106  	r.becomeLeader()
  3107  	index := r.raftLog.lastIndex()
  3108  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  3109  	if g := r.raftLog.lastIndex(); g != index+1 {
  3110  		t.Errorf("index = %d, want %d", g, index+1)
  3111  	}
  3112  	if r.pendingConfIndex != index+1 {
  3113  		t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, index+1)
  3114  	}
  3115  }
  3116  
  3117  // TestStepIgnoreConfig tests that if raft step the second msgProp in
  3118  // EntryConfChange type when the first one is uncommitted, the node will set
  3119  // the proposal to noop and keep its original state.
  3120  func TestStepIgnoreConfig(t *testing.T) {
  3121  	// a raft that cannot make progress
  3122  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  3123  	r.becomeCandidate()
  3124  	r.becomeLeader()
  3125  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  3126  	index := r.raftLog.lastIndex()
  3127  	pendingConfIndex := r.pendingConfIndex
  3128  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange}}})
  3129  	wents := []pb.Entry{{Type: pb.EntryNormal, Term: 1, Index: 3, Data: nil}}
  3130  	ents, err := r.raftLog.entries(index+1, noLimit)
  3131  	if err != nil {
  3132  		t.Fatalf("unexpected error %v", err)
  3133  	}
  3134  	if !reflect.DeepEqual(ents, wents) {
  3135  		t.Errorf("ents = %+v, want %+v", ents, wents)
  3136  	}
  3137  	if r.pendingConfIndex != pendingConfIndex {
  3138  		t.Errorf("pendingConfIndex = %d, want %d", r.pendingConfIndex, pendingConfIndex)
  3139  	}
  3140  }
  3141  
  3142  // TestNewLeaderPendingConfig tests that new leader sets its pendingConfigIndex
  3143  // based on uncommitted entries.
  3144  func TestNewLeaderPendingConfig(t *testing.T) {
  3145  	tests := []struct {
  3146  		addEntry      bool
  3147  		wpendingIndex uint64
  3148  	}{
  3149  		{false, 0},
  3150  		{true, 1},
  3151  	}
  3152  	for i, tt := range tests {
  3153  		r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  3154  		if tt.addEntry {
  3155  			mustAppendEntry(r, pb.Entry{Type: pb.EntryNormal})
  3156  		}
  3157  		r.becomeCandidate()
  3158  		r.becomeLeader()
  3159  		if r.pendingConfIndex != tt.wpendingIndex {
  3160  			t.Errorf("#%d: pendingConfIndex = %d, want %d",
  3161  				i, r.pendingConfIndex, tt.wpendingIndex)
  3162  		}
  3163  	}
  3164  }
  3165  
  3166  // TestAddNode tests that addNode could update nodes correctly.
  3167  func TestAddNode(t *testing.T) {
  3168  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
  3169  	r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
  3170  	nodes := r.prs.VoterNodes()
  3171  	wnodes := []uint64{1, 2}
  3172  	if !reflect.DeepEqual(nodes, wnodes) {
  3173  		t.Errorf("nodes = %v, want %v", nodes, wnodes)
  3174  	}
  3175  }
  3176  
  3177  // TestAddLearner tests that addLearner could update nodes correctly.
  3178  func TestAddLearner(t *testing.T) {
  3179  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
  3180  	// Add new learner peer.
  3181  	r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2())
  3182  	if r.isLearner {
  3183  		t.Fatal("expected 1 to be voter")
  3184  	}
  3185  	nodes := r.prs.LearnerNodes()
  3186  	wnodes := []uint64{2}
  3187  	if !reflect.DeepEqual(nodes, wnodes) {
  3188  		t.Errorf("nodes = %v, want %v", nodes, wnodes)
  3189  	}
  3190  	if !r.prs.Progress[2].IsLearner {
  3191  		t.Fatal("expected 2 to be learner")
  3192  	}
  3193  
  3194  	// Promote peer to voter.
  3195  	r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
  3196  	if r.prs.Progress[2].IsLearner {
  3197  		t.Fatal("expected 2 to be voter")
  3198  	}
  3199  
  3200  	// Demote r.
  3201  	r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddLearnerNode}.AsV2())
  3202  	if !r.prs.Progress[1].IsLearner {
  3203  		t.Fatal("expected 1 to be learner")
  3204  	}
  3205  	if !r.isLearner {
  3206  		t.Fatal("expected 1 to be learner")
  3207  	}
  3208  
  3209  	// Promote r again.
  3210  	r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeAddNode}.AsV2())
  3211  	if r.prs.Progress[1].IsLearner {
  3212  		t.Fatal("expected 1 to be voter")
  3213  	}
  3214  	if r.isLearner {
  3215  		t.Fatal("expected 1 to be voter")
  3216  	}
  3217  }
  3218  
  3219  // TestAddNodeCheckQuorum tests that addNode does not trigger a leader election
  3220  // immediately when checkQuorum is set.
  3221  func TestAddNodeCheckQuorum(t *testing.T) {
  3222  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
  3223  	r.checkQuorum = true
  3224  
  3225  	r.becomeCandidate()
  3226  	r.becomeLeader()
  3227  
  3228  	for i := 0; i < r.electionTimeout-1; i++ {
  3229  		r.tick()
  3230  	}
  3231  
  3232  	r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddNode}.AsV2())
  3233  
  3234  	// This tick will reach electionTimeout, which triggers a quorum check.
  3235  	r.tick()
  3236  
  3237  	// Node 1 should still be the leader after a single tick.
  3238  	if r.state != StateLeader {
  3239  		t.Errorf("state = %v, want %v", r.state, StateLeader)
  3240  	}
  3241  
  3242  	// After another electionTimeout ticks without hearing from node 2,
  3243  	// node 1 should step down.
  3244  	for i := 0; i < r.electionTimeout; i++ {
  3245  		r.tick()
  3246  	}
  3247  
  3248  	if r.state != StateFollower {
  3249  		t.Errorf("state = %v, want %v", r.state, StateFollower)
  3250  	}
  3251  }
  3252  
  3253  // TestRemoveNode tests that removeNode could update nodes and
  3254  // and removed list correctly.
  3255  func TestRemoveNode(t *testing.T) {
  3256  	r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2)))
  3257  	r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2())
  3258  	w := []uint64{1}
  3259  	if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
  3260  		t.Errorf("nodes = %v, want %v", g, w)
  3261  	}
  3262  
  3263  	// Removing the remaining voter will panic.
  3264  	defer func() {
  3265  		if r := recover(); r == nil {
  3266  			t.Error("did not panic")
  3267  		}
  3268  	}()
  3269  	r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2())
  3270  }
  3271  
  3272  // TestRemoveLearner tests that removeNode could update nodes and
  3273  // and removed list correctly.
  3274  func TestRemoveLearner(t *testing.T) {
  3275  	r := newTestLearnerRaft(1, 10, 1, newTestMemoryStorage(withPeers(1), withLearners(2)))
  3276  	r.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeRemoveNode}.AsV2())
  3277  	w := []uint64{1}
  3278  	if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
  3279  		t.Errorf("nodes = %v, want %v", g, w)
  3280  	}
  3281  
  3282  	w = nil
  3283  	if g := r.prs.LearnerNodes(); !reflect.DeepEqual(g, w) {
  3284  		t.Errorf("nodes = %v, want %v", g, w)
  3285  	}
  3286  
  3287  	// Removing the remaining voter will panic.
  3288  	defer func() {
  3289  		if r := recover(); r == nil {
  3290  			t.Error("did not panic")
  3291  		}
  3292  	}()
  3293  	r.applyConfChange(pb.ConfChange{NodeID: 1, Type: pb.ConfChangeRemoveNode}.AsV2())
  3294  }
  3295  
  3296  func TestPromotable(t *testing.T) {
  3297  	id := uint64(1)
  3298  	tests := []struct {
  3299  		peers []uint64
  3300  		wp    bool
  3301  	}{
  3302  		{[]uint64{1}, true},
  3303  		{[]uint64{1, 2, 3}, true},
  3304  		{[]uint64{}, false},
  3305  		{[]uint64{2, 3}, false},
  3306  	}
  3307  	for i, tt := range tests {
  3308  		r := newTestRaft(id, 5, 1, newTestMemoryStorage(withPeers(tt.peers...)))
  3309  		if g := r.promotable(); g != tt.wp {
  3310  			t.Errorf("#%d: promotable = %v, want %v", i, g, tt.wp)
  3311  		}
  3312  	}
  3313  }
  3314  
  3315  func TestRaftNodes(t *testing.T) {
  3316  	tests := []struct {
  3317  		ids  []uint64
  3318  		wids []uint64
  3319  	}{
  3320  		{
  3321  			[]uint64{1, 2, 3},
  3322  			[]uint64{1, 2, 3},
  3323  		},
  3324  		{
  3325  			[]uint64{3, 2, 1},
  3326  			[]uint64{1, 2, 3},
  3327  		},
  3328  	}
  3329  	for i, tt := range tests {
  3330  		r := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(tt.ids...)))
  3331  		if !reflect.DeepEqual(r.prs.VoterNodes(), tt.wids) {
  3332  			t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.VoterNodes(), tt.wids)
  3333  		}
  3334  	}
  3335  }
  3336  
  3337  func TestCampaignWhileLeader(t *testing.T) {
  3338  	testCampaignWhileLeader(t, false)
  3339  }
  3340  
  3341  func TestPreCampaignWhileLeader(t *testing.T) {
  3342  	testCampaignWhileLeader(t, true)
  3343  }
  3344  
  3345  func testCampaignWhileLeader(t *testing.T, preVote bool) {
  3346  	cfg := newTestConfig(1, 5, 1, newTestMemoryStorage(withPeers(1)))
  3347  	cfg.PreVote = preVote
  3348  	r := newRaft(cfg)
  3349  	if r.state != StateFollower {
  3350  		t.Errorf("expected new node to be follower but got %s", r.state)
  3351  	}
  3352  	// We don't call campaign() directly because it comes after the check
  3353  	// for our current state.
  3354  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3355  	if r.state != StateLeader {
  3356  		t.Errorf("expected single-node election to become leader but got %s", r.state)
  3357  	}
  3358  	term := r.Term
  3359  	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3360  	if r.state != StateLeader {
  3361  		t.Errorf("expected to remain leader but got %s", r.state)
  3362  	}
  3363  	if r.Term != term {
  3364  		t.Errorf("expected to remain in term %v but got %v", term, r.Term)
  3365  	}
  3366  }
  3367  
  3368  // TestCommitAfterRemoveNode verifies that pending commands can become
  3369  // committed when a config change reduces the quorum requirements.
  3370  func TestCommitAfterRemoveNode(t *testing.T) {
  3371  	// Create a cluster with two nodes.
  3372  	s := newTestMemoryStorage(withPeers(1, 2))
  3373  	r := newTestRaft(1, 5, 1, s)
  3374  	r.becomeCandidate()
  3375  	r.becomeLeader()
  3376  
  3377  	// Begin to remove the second node.
  3378  	cc := pb.ConfChange{
  3379  		Type:   pb.ConfChangeRemoveNode,
  3380  		NodeID: 2,
  3381  	}
  3382  	ccData, err := cc.Marshal()
  3383  	if err != nil {
  3384  		t.Fatal(err)
  3385  	}
  3386  	r.Step(pb.Message{
  3387  		Type: pb.MsgProp,
  3388  		Entries: []pb.Entry{
  3389  			{Type: pb.EntryConfChange, Data: ccData},
  3390  		},
  3391  	})
  3392  	// Stabilize the log and make sure nothing is committed yet.
  3393  	if ents := nextEnts(r, s); len(ents) > 0 {
  3394  		t.Fatalf("unexpected committed entries: %v", ents)
  3395  	}
  3396  	ccIndex := r.raftLog.lastIndex()
  3397  
  3398  	// While the config change is pending, make another proposal.
  3399  	r.Step(pb.Message{
  3400  		Type: pb.MsgProp,
  3401  		Entries: []pb.Entry{
  3402  			{Type: pb.EntryNormal, Data: []byte("hello")},
  3403  		},
  3404  	})
  3405  
  3406  	// Node 2 acknowledges the config change, committing it.
  3407  	r.Step(pb.Message{
  3408  		Type:  pb.MsgAppResp,
  3409  		From:  2,
  3410  		Index: ccIndex,
  3411  	})
  3412  	ents := nextEnts(r, s)
  3413  	if len(ents) != 2 {
  3414  		t.Fatalf("expected two committed entries, got %v", ents)
  3415  	}
  3416  	if ents[0].Type != pb.EntryNormal || ents[0].Data != nil {
  3417  		t.Fatalf("expected ents[0] to be empty, but got %v", ents[0])
  3418  	}
  3419  	if ents[1].Type != pb.EntryConfChange {
  3420  		t.Fatalf("expected ents[1] to be EntryConfChange, got %v", ents[1])
  3421  	}
  3422  
  3423  	// Apply the config change. This reduces quorum requirements so the
  3424  	// pending command can now commit.
  3425  	r.applyConfChange(cc.AsV2())
  3426  	ents = nextEnts(r, s)
  3427  	if len(ents) != 1 || ents[0].Type != pb.EntryNormal ||
  3428  		string(ents[0].Data) != "hello" {
  3429  		t.Fatalf("expected one committed EntryNormal, got %v", ents)
  3430  	}
  3431  }
  3432  
  3433  // TestLeaderTransferToUpToDateNode verifies transferring should succeed
  3434  // if the transferee has the most up-to-date log entries when transfer starts.
  3435  func TestLeaderTransferToUpToDateNode(t *testing.T) {
  3436  	nt := newNetwork(nil, nil, nil)
  3437  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3438  
  3439  	lead := nt.peers[1].(*raft)
  3440  
  3441  	if lead.lead != 1 {
  3442  		t.Fatalf("after election leader is %x, want 1", lead.lead)
  3443  	}
  3444  
  3445  	// Transfer leadership to 2.
  3446  	nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
  3447  
  3448  	checkLeaderTransferState(t, lead, StateFollower, 2)
  3449  
  3450  	// After some log replication, transfer leadership back to 1.
  3451  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3452  
  3453  	nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader})
  3454  
  3455  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3456  }
  3457  
  3458  // TestLeaderTransferToUpToDateNodeFromFollower verifies transferring should succeed
  3459  // if the transferee has the most up-to-date log entries when transfer starts.
  3460  // Not like TestLeaderTransferToUpToDateNode, where the leader transfer message
  3461  // is sent to the leader, in this test case every leader transfer message is sent
  3462  // to the follower.
  3463  func TestLeaderTransferToUpToDateNodeFromFollower(t *testing.T) {
  3464  	nt := newNetwork(nil, nil, nil)
  3465  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3466  
  3467  	lead := nt.peers[1].(*raft)
  3468  
  3469  	if lead.lead != 1 {
  3470  		t.Fatalf("after election leader is %x, want 1", lead.lead)
  3471  	}
  3472  
  3473  	// Transfer leadership to 2.
  3474  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgTransferLeader})
  3475  
  3476  	checkLeaderTransferState(t, lead, StateFollower, 2)
  3477  
  3478  	// After some log replication, transfer leadership back to 1.
  3479  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3480  
  3481  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
  3482  
  3483  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3484  }
  3485  
  3486  // TestLeaderTransferWithCheckQuorum ensures transferring leader still works
  3487  // even the current leader is still under its leader lease
  3488  func TestLeaderTransferWithCheckQuorum(t *testing.T) {
  3489  	nt := newNetwork(nil, nil, nil)
  3490  	for i := 1; i < 4; i++ {
  3491  		r := nt.peers[uint64(i)].(*raft)
  3492  		r.checkQuorum = true
  3493  		setRandomizedElectionTimeout(r, r.electionTimeout+i)
  3494  	}
  3495  
  3496  	// Letting peer 2 electionElapsed reach to timeout so that it can vote for peer 1
  3497  	f := nt.peers[2].(*raft)
  3498  	for i := 0; i < f.electionTimeout; i++ {
  3499  		f.tick()
  3500  	}
  3501  
  3502  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3503  
  3504  	lead := nt.peers[1].(*raft)
  3505  
  3506  	if lead.lead != 1 {
  3507  		t.Fatalf("after election leader is %x, want 1", lead.lead)
  3508  	}
  3509  
  3510  	// Transfer leadership to 2.
  3511  	nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
  3512  
  3513  	checkLeaderTransferState(t, lead, StateFollower, 2)
  3514  
  3515  	// After some log replication, transfer leadership back to 1.
  3516  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3517  
  3518  	nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTransferLeader})
  3519  
  3520  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3521  }
  3522  
  3523  func TestLeaderTransferToSlowFollower(t *testing.T) {
  3524  	defaultLogger.EnableDebug()
  3525  	nt := newNetwork(nil, nil, nil)
  3526  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3527  
  3528  	nt.isolate(3)
  3529  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3530  
  3531  	nt.recover()
  3532  	lead := nt.peers[1].(*raft)
  3533  	if lead.prs.Progress[3].Match != 1 {
  3534  		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1)
  3535  	}
  3536  
  3537  	// Transfer leadership to 3 when node 3 is lack of log.
  3538  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3539  
  3540  	checkLeaderTransferState(t, lead, StateFollower, 3)
  3541  }
  3542  
  3543  func TestLeaderTransferAfterSnapshot(t *testing.T) {
  3544  	nt := newNetwork(nil, nil, nil)
  3545  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3546  
  3547  	nt.isolate(3)
  3548  
  3549  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3550  	lead := nt.peers[1].(*raft)
  3551  	nextEnts(lead, nt.storage[1])
  3552  	nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Voters: lead.prs.VoterNodes()}, nil)
  3553  	nt.storage[1].Compact(lead.raftLog.applied)
  3554  
  3555  	nt.recover()
  3556  	if lead.prs.Progress[3].Match != 1 {
  3557  		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1)
  3558  	}
  3559  
  3560  	filtered := pb.Message{}
  3561  	// Snapshot needs to be applied before sending MsgAppResp
  3562  	nt.msgHook = func(m pb.Message) bool {
  3563  		if m.Type != pb.MsgAppResp || m.From != 3 || m.Reject {
  3564  			return true
  3565  		}
  3566  		filtered = m
  3567  		return false
  3568  	}
  3569  	// Transfer leadership to 3 when node 3 is lack of snapshot.
  3570  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3571  	if lead.state != StateLeader {
  3572  		t.Fatalf("node 1 should still be leader as snapshot is not applied, got %x", lead.state)
  3573  	}
  3574  	if reflect.DeepEqual(filtered, pb.Message{}) {
  3575  		t.Fatalf("Follower should report snapshot progress automatically.")
  3576  	}
  3577  
  3578  	// Apply snapshot and resume progress
  3579  	follower := nt.peers[3].(*raft)
  3580  	ready := newReady(follower, &SoftState{}, pb.HardState{})
  3581  	nt.storage[3].ApplySnapshot(ready.Snapshot)
  3582  	follower.advance(ready)
  3583  	nt.msgHook = nil
  3584  	nt.send(filtered)
  3585  
  3586  	checkLeaderTransferState(t, lead, StateFollower, 3)
  3587  }
  3588  
  3589  func TestLeaderTransferToSelf(t *testing.T) {
  3590  	nt := newNetwork(nil, nil, nil)
  3591  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3592  
  3593  	lead := nt.peers[1].(*raft)
  3594  
  3595  	// Transfer leadership to self, there will be noop.
  3596  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
  3597  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3598  }
  3599  
  3600  func TestLeaderTransferToNonExistingNode(t *testing.T) {
  3601  	nt := newNetwork(nil, nil, nil)
  3602  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3603  
  3604  	lead := nt.peers[1].(*raft)
  3605  	// Transfer leadership to non-existing node, there will be noop.
  3606  	nt.send(pb.Message{From: 4, To: 1, Type: pb.MsgTransferLeader})
  3607  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3608  }
  3609  
  3610  func TestLeaderTransferTimeout(t *testing.T) {
  3611  	nt := newNetwork(nil, nil, nil)
  3612  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3613  
  3614  	nt.isolate(3)
  3615  
  3616  	lead := nt.peers[1].(*raft)
  3617  
  3618  	// Transfer leadership to isolated node, wait for timeout.
  3619  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3620  	if lead.leadTransferee != 3 {
  3621  		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
  3622  	}
  3623  	for i := 0; i < lead.heartbeatTimeout; i++ {
  3624  		lead.tick()
  3625  	}
  3626  	if lead.leadTransferee != 3 {
  3627  		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
  3628  	}
  3629  
  3630  	for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ {
  3631  		lead.tick()
  3632  	}
  3633  
  3634  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3635  }
  3636  
  3637  func TestLeaderTransferIgnoreProposal(t *testing.T) {
  3638  	nt := newNetwork(nil, nil, nil)
  3639  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3640  
  3641  	nt.isolate(3)
  3642  
  3643  	lead := nt.peers[1].(*raft)
  3644  
  3645  	// Transfer leadership to isolated node to let transfer pending, then send proposal.
  3646  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3647  	if lead.leadTransferee != 3 {
  3648  		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
  3649  	}
  3650  
  3651  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3652  	err := lead.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
  3653  	if err != ErrProposalDropped {
  3654  		t.Fatalf("should return drop proposal error while transferring")
  3655  	}
  3656  
  3657  	if lead.prs.Progress[1].Match != 1 {
  3658  		t.Fatalf("node 1 has match %x, want %x", lead.prs.Progress[1].Match, 1)
  3659  	}
  3660  }
  3661  
  3662  func TestLeaderTransferReceiveHigherTermVote(t *testing.T) {
  3663  	nt := newNetwork(nil, nil, nil)
  3664  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3665  
  3666  	nt.isolate(3)
  3667  
  3668  	lead := nt.peers[1].(*raft)
  3669  
  3670  	// Transfer leadership to isolated node to let transfer pending.
  3671  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3672  	if lead.leadTransferee != 3 {
  3673  		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
  3674  	}
  3675  
  3676  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup, Index: 1, Term: 2})
  3677  
  3678  	checkLeaderTransferState(t, lead, StateFollower, 2)
  3679  }
  3680  
  3681  func TestLeaderTransferRemoveNode(t *testing.T) {
  3682  	nt := newNetwork(nil, nil, nil)
  3683  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3684  
  3685  	nt.ignore(pb.MsgTimeoutNow)
  3686  
  3687  	lead := nt.peers[1].(*raft)
  3688  
  3689  	// The leadTransferee is removed when leadship transferring.
  3690  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3691  	if lead.leadTransferee != 3 {
  3692  		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
  3693  	}
  3694  
  3695  	lead.applyConfChange(pb.ConfChange{NodeID: 3, Type: pb.ConfChangeRemoveNode}.AsV2())
  3696  
  3697  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3698  }
  3699  
  3700  func TestLeaderTransferDemoteNode(t *testing.T) {
  3701  	nt := newNetwork(nil, nil, nil)
  3702  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3703  
  3704  	nt.ignore(pb.MsgTimeoutNow)
  3705  
  3706  	lead := nt.peers[1].(*raft)
  3707  
  3708  	// The leadTransferee is demoted when leadship transferring.
  3709  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3710  	if lead.leadTransferee != 3 {
  3711  		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
  3712  	}
  3713  
  3714  	lead.applyConfChange(pb.ConfChangeV2{
  3715  		Changes: []pb.ConfChangeSingle{
  3716  			{
  3717  				Type:   pb.ConfChangeRemoveNode,
  3718  				NodeID: 3,
  3719  			},
  3720  			{
  3721  				Type:   pb.ConfChangeAddLearnerNode,
  3722  				NodeID: 3,
  3723  			},
  3724  		},
  3725  	})
  3726  
  3727  	// Make the Raft group commit the LeaveJoint entry.
  3728  	lead.applyConfChange(pb.ConfChangeV2{})
  3729  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3730  }
  3731  
  3732  // TestLeaderTransferBack verifies leadership can transfer back to self when last transfer is pending.
  3733  func TestLeaderTransferBack(t *testing.T) {
  3734  	nt := newNetwork(nil, nil, nil)
  3735  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3736  
  3737  	nt.isolate(3)
  3738  
  3739  	lead := nt.peers[1].(*raft)
  3740  
  3741  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3742  	if lead.leadTransferee != 3 {
  3743  		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
  3744  	}
  3745  
  3746  	// Transfer leadership back to self.
  3747  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgTransferLeader})
  3748  
  3749  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3750  }
  3751  
  3752  // TestLeaderTransferSecondTransferToAnotherNode verifies leader can transfer to another node
  3753  // when last transfer is pending.
  3754  func TestLeaderTransferSecondTransferToAnotherNode(t *testing.T) {
  3755  	nt := newNetwork(nil, nil, nil)
  3756  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3757  
  3758  	nt.isolate(3)
  3759  
  3760  	lead := nt.peers[1].(*raft)
  3761  
  3762  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3763  	if lead.leadTransferee != 3 {
  3764  		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
  3765  	}
  3766  
  3767  	// Transfer leadership to another node.
  3768  	nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
  3769  
  3770  	checkLeaderTransferState(t, lead, StateFollower, 2)
  3771  }
  3772  
  3773  // TestLeaderTransferSecondTransferToSameNode verifies second transfer leader request
  3774  // to the same node should not extend the timeout while the first one is pending.
  3775  func TestLeaderTransferSecondTransferToSameNode(t *testing.T) {
  3776  	nt := newNetwork(nil, nil, nil)
  3777  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3778  
  3779  	nt.isolate(3)
  3780  
  3781  	lead := nt.peers[1].(*raft)
  3782  
  3783  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3784  	if lead.leadTransferee != 3 {
  3785  		t.Fatalf("wait transferring, leadTransferee = %v, want %v", lead.leadTransferee, 3)
  3786  	}
  3787  
  3788  	for i := 0; i < lead.heartbeatTimeout; i++ {
  3789  		lead.tick()
  3790  	}
  3791  	// Second transfer leadership request to the same node.
  3792  	nt.send(pb.Message{From: 3, To: 1, Type: pb.MsgTransferLeader})
  3793  
  3794  	for i := 0; i < lead.electionTimeout-lead.heartbeatTimeout; i++ {
  3795  		lead.tick()
  3796  	}
  3797  
  3798  	checkLeaderTransferState(t, lead, StateLeader, 1)
  3799  }
  3800  
  3801  func checkLeaderTransferState(t *testing.T, r *raft, state StateType, lead uint64) {
  3802  	if r.state != state || r.lead != lead {
  3803  		t.Fatalf("after transferring, node has state %v lead %v, want state %v lead %v", r.state, r.lead, state, lead)
  3804  	}
  3805  	if r.leadTransferee != None {
  3806  		t.Fatalf("after transferring, node has leadTransferee %v, want leadTransferee %v", r.leadTransferee, None)
  3807  	}
  3808  }
  3809  
  3810  // TestTransferNonMember verifies that when a MsgTimeoutNow arrives at
  3811  // a node that has been removed from the group, nothing happens.
  3812  // (previously, if the node also got votes, it would panic as it
  3813  // transitioned to StateLeader)
  3814  func TestTransferNonMember(t *testing.T) {
  3815  	r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(2, 3, 4)))
  3816  	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgTimeoutNow})
  3817  
  3818  	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgVoteResp})
  3819  	r.Step(pb.Message{From: 3, To: 1, Type: pb.MsgVoteResp})
  3820  	if r.state != StateFollower {
  3821  		t.Fatalf("state is %s, want StateFollower", r.state)
  3822  	}
  3823  }
  3824  
  3825  // TestNodeWithSmallerTermCanCompleteElection tests the scenario where a node
  3826  // that has been partitioned away (and fallen behind) rejoins the cluster at
  3827  // about the same time the leader node gets partitioned away.
  3828  // Previously the cluster would come to a standstill when run with PreVote
  3829  // enabled.
  3830  func TestNodeWithSmallerTermCanCompleteElection(t *testing.T) {
  3831  	n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  3832  	n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  3833  	n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  3834  
  3835  	n1.becomeFollower(1, None)
  3836  	n2.becomeFollower(1, None)
  3837  	n3.becomeFollower(1, None)
  3838  
  3839  	n1.preVote = true
  3840  	n2.preVote = true
  3841  	n3.preVote = true
  3842  
  3843  	// cause a network partition to isolate node 3
  3844  	nt := newNetwork(n1, n2, n3)
  3845  	nt.cut(1, 3)
  3846  	nt.cut(2, 3)
  3847  
  3848  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3849  
  3850  	sm := nt.peers[1].(*raft)
  3851  	if sm.state != StateLeader {
  3852  		t.Errorf("peer 1 state: %s, want %s", sm.state, StateLeader)
  3853  	}
  3854  
  3855  	sm = nt.peers[2].(*raft)
  3856  	if sm.state != StateFollower {
  3857  		t.Errorf("peer 2 state: %s, want %s", sm.state, StateFollower)
  3858  	}
  3859  
  3860  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  3861  	sm = nt.peers[3].(*raft)
  3862  	if sm.state != StatePreCandidate {
  3863  		t.Errorf("peer 3 state: %s, want %s", sm.state, StatePreCandidate)
  3864  	}
  3865  
  3866  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  3867  
  3868  	// check whether the term values are expected
  3869  	// a.Term == 3
  3870  	// b.Term == 3
  3871  	// c.Term == 1
  3872  	sm = nt.peers[1].(*raft)
  3873  	if sm.Term != 3 {
  3874  		t.Errorf("peer 1 term: %d, want %d", sm.Term, 3)
  3875  	}
  3876  
  3877  	sm = nt.peers[2].(*raft)
  3878  	if sm.Term != 3 {
  3879  		t.Errorf("peer 2 term: %d, want %d", sm.Term, 3)
  3880  	}
  3881  
  3882  	sm = nt.peers[3].(*raft)
  3883  	if sm.Term != 1 {
  3884  		t.Errorf("peer 3 term: %d, want %d", sm.Term, 1)
  3885  	}
  3886  
  3887  	// check state
  3888  	// a == follower
  3889  	// b == leader
  3890  	// c == pre-candidate
  3891  	sm = nt.peers[1].(*raft)
  3892  	if sm.state != StateFollower {
  3893  		t.Errorf("peer 1 state: %s, want %s", sm.state, StateFollower)
  3894  	}
  3895  	sm = nt.peers[2].(*raft)
  3896  	if sm.state != StateLeader {
  3897  		t.Errorf("peer 2 state: %s, want %s", sm.state, StateLeader)
  3898  	}
  3899  	sm = nt.peers[3].(*raft)
  3900  	if sm.state != StatePreCandidate {
  3901  		t.Errorf("peer 3 state: %s, want %s", sm.state, StatePreCandidate)
  3902  	}
  3903  
  3904  	sm.logger.Infof("going to bring back peer 3 and kill peer 2")
  3905  	// recover the network then immediately isolate b which is currently
  3906  	// the leader, this is to emulate the crash of b.
  3907  	nt.recover()
  3908  	nt.cut(2, 1)
  3909  	nt.cut(2, 3)
  3910  
  3911  	// call for election
  3912  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  3913  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3914  
  3915  	// do we have a leader?
  3916  	sma := nt.peers[1].(*raft)
  3917  	smb := nt.peers[3].(*raft)
  3918  	if sma.state != StateLeader && smb.state != StateLeader {
  3919  		t.Errorf("no leader")
  3920  	}
  3921  }
  3922  
  3923  // TestPreVoteWithSplitVote verifies that after split vote, cluster can complete
  3924  // election in next round.
  3925  func TestPreVoteWithSplitVote(t *testing.T) {
  3926  	n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  3927  	n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  3928  	n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  3929  
  3930  	n1.becomeFollower(1, None)
  3931  	n2.becomeFollower(1, None)
  3932  	n3.becomeFollower(1, None)
  3933  
  3934  	n1.preVote = true
  3935  	n2.preVote = true
  3936  	n3.preVote = true
  3937  
  3938  	nt := newNetwork(n1, n2, n3)
  3939  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  3940  
  3941  	// simulate leader down. followers start split vote.
  3942  	nt.isolate(1)
  3943  	nt.send([]pb.Message{
  3944  		{From: 2, To: 2, Type: pb.MsgHup},
  3945  		{From: 3, To: 3, Type: pb.MsgHup},
  3946  	}...)
  3947  
  3948  	// check whether the term values are expected
  3949  	// n2.Term == 3
  3950  	// n3.Term == 3
  3951  	sm := nt.peers[2].(*raft)
  3952  	if sm.Term != 3 {
  3953  		t.Errorf("peer 2 term: %d, want %d", sm.Term, 3)
  3954  	}
  3955  	sm = nt.peers[3].(*raft)
  3956  	if sm.Term != 3 {
  3957  		t.Errorf("peer 3 term: %d, want %d", sm.Term, 3)
  3958  	}
  3959  
  3960  	// check state
  3961  	// n2 == candidate
  3962  	// n3 == candidate
  3963  	sm = nt.peers[2].(*raft)
  3964  	if sm.state != StateCandidate {
  3965  		t.Errorf("peer 2 state: %s, want %s", sm.state, StateCandidate)
  3966  	}
  3967  	sm = nt.peers[3].(*raft)
  3968  	if sm.state != StateCandidate {
  3969  		t.Errorf("peer 3 state: %s, want %s", sm.state, StateCandidate)
  3970  	}
  3971  
  3972  	// node 2 election timeout first
  3973  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  3974  
  3975  	// check whether the term values are expected
  3976  	// n2.Term == 4
  3977  	// n3.Term == 4
  3978  	sm = nt.peers[2].(*raft)
  3979  	if sm.Term != 4 {
  3980  		t.Errorf("peer 2 term: %d, want %d", sm.Term, 4)
  3981  	}
  3982  	sm = nt.peers[3].(*raft)
  3983  	if sm.Term != 4 {
  3984  		t.Errorf("peer 3 term: %d, want %d", sm.Term, 4)
  3985  	}
  3986  
  3987  	// check state
  3988  	// n2 == leader
  3989  	// n3 == follower
  3990  	sm = nt.peers[2].(*raft)
  3991  	if sm.state != StateLeader {
  3992  		t.Errorf("peer 2 state: %s, want %s", sm.state, StateLeader)
  3993  	}
  3994  	sm = nt.peers[3].(*raft)
  3995  	if sm.state != StateFollower {
  3996  		t.Errorf("peer 3 state: %s, want %s", sm.state, StateFollower)
  3997  	}
  3998  }
  3999  
  4000  // TestPreVoteWithCheckQuorum ensures that after a node become pre-candidate,
  4001  // it will checkQuorum correctly.
  4002  func TestPreVoteWithCheckQuorum(t *testing.T) {
  4003  	n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  4004  	n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  4005  	n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  4006  
  4007  	n1.becomeFollower(1, None)
  4008  	n2.becomeFollower(1, None)
  4009  	n3.becomeFollower(1, None)
  4010  
  4011  	n1.preVote = true
  4012  	n2.preVote = true
  4013  	n3.preVote = true
  4014  
  4015  	n1.checkQuorum = true
  4016  	n2.checkQuorum = true
  4017  	n3.checkQuorum = true
  4018  
  4019  	nt := newNetwork(n1, n2, n3)
  4020  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  4021  
  4022  	// isolate node 1. node 2 and node 3 have leader info
  4023  	nt.isolate(1)
  4024  
  4025  	// check state
  4026  	sm := nt.peers[1].(*raft)
  4027  	if sm.state != StateLeader {
  4028  		t.Fatalf("peer 1 state: %s, want %s", sm.state, StateLeader)
  4029  	}
  4030  	sm = nt.peers[2].(*raft)
  4031  	if sm.state != StateFollower {
  4032  		t.Fatalf("peer 2 state: %s, want %s", sm.state, StateFollower)
  4033  	}
  4034  	sm = nt.peers[3].(*raft)
  4035  	if sm.state != StateFollower {
  4036  		t.Fatalf("peer 3 state: %s, want %s", sm.state, StateFollower)
  4037  	}
  4038  
  4039  	// node 2 will ignore node 3's PreVote
  4040  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  4041  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  4042  
  4043  	// Do we have a leader?
  4044  	if n2.state != StateLeader && n3.state != StateFollower {
  4045  		t.Errorf("no leader")
  4046  	}
  4047  }
  4048  
  4049  // TestLearnerCampaign verifies that a learner won't campaign even if it receives
  4050  // a MsgHup or MsgTimeoutNow.
  4051  func TestLearnerCampaign(t *testing.T) {
  4052  	n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1)))
  4053  	n1.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2())
  4054  	n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1)))
  4055  	n2.applyConfChange(pb.ConfChange{NodeID: 2, Type: pb.ConfChangeAddLearnerNode}.AsV2())
  4056  	nt := newNetwork(n1, n2)
  4057  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  4058  
  4059  	if !n2.isLearner {
  4060  		t.Fatalf("failed to make n2 a learner")
  4061  	}
  4062  
  4063  	if n2.state != StateFollower {
  4064  		t.Fatalf("n2 campaigned despite being learner")
  4065  	}
  4066  
  4067  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  4068  	if n1.state != StateLeader || n1.lead != 1 {
  4069  		t.Fatalf("n1 did not become leader")
  4070  	}
  4071  
  4072  	// NB: TransferLeader already checks that the recipient is not a learner, but
  4073  	// the check could have happened by the time the recipient becomes a learner,
  4074  	// in which case it will receive MsgTimeoutNow as in this test case and we
  4075  	// verify that it's ignored.
  4076  	nt.send(pb.Message{From: 1, To: 2, Type: pb.MsgTimeoutNow})
  4077  
  4078  	if n2.state != StateFollower {
  4079  		t.Fatalf("n2 accepted leadership transfer despite being learner")
  4080  	}
  4081  }
  4082  
  4083  // simulate rolling update a cluster for Pre-Vote. cluster has 3 nodes [n1, n2, n3].
  4084  // n1 is leader with term 2
  4085  // n2 is follower with term 2
  4086  // n3 is partitioned, with term 4 and less log, state is candidate
  4087  func newPreVoteMigrationCluster(t *testing.T) *network {
  4088  	n1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  4089  	n2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  4090  	n3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
  4091  
  4092  	n1.becomeFollower(1, None)
  4093  	n2.becomeFollower(1, None)
  4094  	n3.becomeFollower(1, None)
  4095  
  4096  	n1.preVote = true
  4097  	n2.preVote = true
  4098  	// We intentionally do not enable PreVote for n3, this is done so in order
  4099  	// to simulate a rolling restart process where it's possible to have a mixed
  4100  	// version cluster with replicas with PreVote enabled, and replicas without.
  4101  
  4102  	nt := newNetwork(n1, n2, n3)
  4103  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  4104  
  4105  	// Cause a network partition to isolate n3.
  4106  	nt.isolate(3)
  4107  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("some data")}}})
  4108  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  4109  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  4110  
  4111  	// check state
  4112  	// n1.state == StateLeader
  4113  	// n2.state == StateFollower
  4114  	// n3.state == StateCandidate
  4115  	if n1.state != StateLeader {
  4116  		t.Fatalf("node 1 state: %s, want %s", n1.state, StateLeader)
  4117  	}
  4118  	if n2.state != StateFollower {
  4119  		t.Fatalf("node 2 state: %s, want %s", n2.state, StateFollower)
  4120  	}
  4121  	if n3.state != StateCandidate {
  4122  		t.Fatalf("node 3 state: %s, want %s", n3.state, StateCandidate)
  4123  	}
  4124  
  4125  	// check term
  4126  	// n1.Term == 2
  4127  	// n2.Term == 2
  4128  	// n3.Term == 4
  4129  	if n1.Term != 2 {
  4130  		t.Fatalf("node 1 term: %d, want %d", n1.Term, 2)
  4131  	}
  4132  	if n2.Term != 2 {
  4133  		t.Fatalf("node 2 term: %d, want %d", n2.Term, 2)
  4134  	}
  4135  	if n3.Term != 4 {
  4136  		t.Fatalf("node 3 term: %d, want %d", n3.Term, 4)
  4137  	}
  4138  
  4139  	// Enable prevote on n3, then recover the network
  4140  	n3.preVote = true
  4141  	nt.recover()
  4142  
  4143  	return nt
  4144  }
  4145  
  4146  func TestPreVoteMigrationCanCompleteElection(t *testing.T) {
  4147  	nt := newPreVoteMigrationCluster(t)
  4148  
  4149  	// n1 is leader with term 2
  4150  	// n2 is follower with term 2
  4151  	// n3 is pre-candidate with term 4, and less log
  4152  	n2 := nt.peers[2].(*raft)
  4153  	n3 := nt.peers[3].(*raft)
  4154  
  4155  	// simulate leader down
  4156  	nt.isolate(1)
  4157  
  4158  	// Call for elections from both n2 and n3.
  4159  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  4160  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  4161  
  4162  	// check state
  4163  	// n2.state == Follower
  4164  	// n3.state == PreCandidate
  4165  	if n2.state != StateFollower {
  4166  		t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
  4167  	}
  4168  	if n3.state != StatePreCandidate {
  4169  		t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
  4170  	}
  4171  
  4172  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  4173  	nt.send(pb.Message{From: 2, To: 2, Type: pb.MsgHup})
  4174  
  4175  	// Do we have a leader?
  4176  	if n2.state != StateLeader && n3.state != StateFollower {
  4177  		t.Errorf("no leader")
  4178  	}
  4179  }
  4180  
  4181  func TestPreVoteMigrationWithFreeStuckPreCandidate(t *testing.T) {
  4182  	nt := newPreVoteMigrationCluster(t)
  4183  
  4184  	// n1 is leader with term 2
  4185  	// n2 is follower with term 2
  4186  	// n3 is pre-candidate with term 4, and less log
  4187  	n1 := nt.peers[1].(*raft)
  4188  	n2 := nt.peers[2].(*raft)
  4189  	n3 := nt.peers[3].(*raft)
  4190  
  4191  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  4192  
  4193  	if n1.state != StateLeader {
  4194  		t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
  4195  	}
  4196  	if n2.state != StateFollower {
  4197  		t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
  4198  	}
  4199  	if n3.state != StatePreCandidate {
  4200  		t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
  4201  	}
  4202  
  4203  	// Pre-Vote again for safety
  4204  	nt.send(pb.Message{From: 3, To: 3, Type: pb.MsgHup})
  4205  
  4206  	if n1.state != StateLeader {
  4207  		t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
  4208  	}
  4209  	if n2.state != StateFollower {
  4210  		t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
  4211  	}
  4212  	if n3.state != StatePreCandidate {
  4213  		t.Errorf("node 3 state: %s, want %s", n3.state, StatePreCandidate)
  4214  	}
  4215  
  4216  	nt.send(pb.Message{From: 1, To: 3, Type: pb.MsgHeartbeat, Term: n1.Term})
  4217  
  4218  	// Disrupt the leader so that the stuck peer is freed
  4219  	if n1.state != StateFollower {
  4220  		t.Errorf("state = %s, want %s", n1.state, StateFollower)
  4221  	}
  4222  	if n3.Term != n1.Term {
  4223  		t.Errorf("term = %d, want %d", n3.Term, n1.Term)
  4224  	}
  4225  }
  4226  
  4227  func testConfChangeCheckBeforeCampaign(t *testing.T, v2 bool) {
  4228  	nt := newNetwork(nil, nil, nil)
  4229  	n1 := nt.peers[1].(*raft)
  4230  	n2 := nt.peers[2].(*raft)
  4231  	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
  4232  	if n1.state != StateLeader {
  4233  		t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
  4234  	}
  4235  
  4236  	// Begin to remove the third node.
  4237  	cc := pb.ConfChange{
  4238  		Type:   pb.ConfChangeRemoveNode,
  4239  		NodeID: 2,
  4240  	}
  4241  	var ccData []byte
  4242  	var err error
  4243  	var ty pb.EntryType
  4244  	if v2 {
  4245  		ccv2 := cc.AsV2()
  4246  		ccData, err = ccv2.Marshal()
  4247  		ty = pb.EntryConfChangeV2
  4248  	} else {
  4249  		ccData, err = cc.Marshal()
  4250  		ty = pb.EntryConfChange
  4251  	}
  4252  	if err != nil {
  4253  		t.Fatal(err)
  4254  	}
  4255  	nt.send(pb.Message{
  4256  		From: 1,
  4257  		To:   1,
  4258  		Type: pb.MsgProp,
  4259  		Entries: []pb.Entry{
  4260  			{Type: ty, Data: ccData},
  4261  		},
  4262  	})
  4263  
  4264  	// Trigger campaign in node 2
  4265  	for i := 0; i < n2.randomizedElectionTimeout; i++ {
  4266  		n2.tick()
  4267  	}
  4268  	// It's still follower because committed conf change is not applied.
  4269  	if n2.state != StateFollower {
  4270  		t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
  4271  	}
  4272  
  4273  	// Transfer leadership to peer 2.
  4274  	nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
  4275  	if n1.state != StateLeader {
  4276  		t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
  4277  	}
  4278  	// It's still follower because committed conf change is not applied.
  4279  	if n2.state != StateFollower {
  4280  		t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
  4281  	}
  4282  	// Abort transfer leader
  4283  	for i := 0; i < n1.electionTimeout; i++ {
  4284  		n1.tick()
  4285  	}
  4286  
  4287  	// Advance apply
  4288  	nextEnts(n2, nt.storage[2])
  4289  
  4290  	// Transfer leadership to peer 2 again.
  4291  	nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
  4292  	if n1.state != StateFollower {
  4293  		t.Errorf("node 1 state: %s, want %s", n1.state, StateFollower)
  4294  	}
  4295  	if n2.state != StateLeader {
  4296  		t.Errorf("node 2 state: %s, want %s", n2.state, StateLeader)
  4297  	}
  4298  
  4299  	nextEnts(n1, nt.storage[1])
  4300  	// Trigger campaign in node 2
  4301  	for i := 0; i < n1.randomizedElectionTimeout; i++ {
  4302  		n1.tick()
  4303  	}
  4304  	if n1.state != StateCandidate {
  4305  		t.Errorf("node 1 state: %s, want %s", n1.state, StateCandidate)
  4306  	}
  4307  }
  4308  
  4309  // TestConfChangeCheckBeforeCampaign tests if unapplied ConfChange is checked before campaign.
  4310  func TestConfChangeCheckBeforeCampaign(t *testing.T) {
  4311  	testConfChangeCheckBeforeCampaign(t, false)
  4312  }
  4313  
  4314  // TestConfChangeV2CheckBeforeCampaign tests if unapplied ConfChangeV2 is checked before campaign.
  4315  func TestConfChangeV2CheckBeforeCampaign(t *testing.T) {
  4316  	testConfChangeCheckBeforeCampaign(t, true)
  4317  }
  4318  
  4319  func TestFastLogRejection(t *testing.T) {
  4320  	tests := []struct {
  4321  		leaderLog       []pb.Entry // Logs on the leader
  4322  		followerLog     []pb.Entry // Logs on the follower
  4323  		rejectHintTerm  uint64     // Expected term included in rejected MsgAppResp.
  4324  		rejectHintIndex uint64     // Expected index included in rejected MsgAppResp.
  4325  		nextAppendTerm  uint64     // Expected term when leader appends after rejected.
  4326  		nextAppendIndex uint64     // Expected index when leader appends after rejected.
  4327  	}{
  4328  		// This case tests that leader can find the conflict index quickly.
  4329  		// Firstly leader appends (type=MsgApp,index=7,logTerm=4, entries=...);
  4330  		// After rejected leader appends (type=MsgApp,index=3,logTerm=2).
  4331  		{
  4332  			leaderLog: []pb.Entry{
  4333  				{Term: 1, Index: 1},
  4334  				{Term: 2, Index: 2},
  4335  				{Term: 2, Index: 3},
  4336  				{Term: 4, Index: 4},
  4337  				{Term: 4, Index: 5},
  4338  				{Term: 4, Index: 6},
  4339  				{Term: 4, Index: 7},
  4340  			},
  4341  			followerLog: []pb.Entry{
  4342  				{Term: 1, Index: 1},
  4343  				{Term: 2, Index: 2},
  4344  				{Term: 2, Index: 3},
  4345  				{Term: 3, Index: 4},
  4346  				{Term: 3, Index: 5},
  4347  				{Term: 3, Index: 6},
  4348  				{Term: 3, Index: 7},
  4349  				{Term: 3, Index: 8},
  4350  				{Term: 3, Index: 9},
  4351  				{Term: 3, Index: 10},
  4352  				{Term: 3, Index: 11},
  4353  			},
  4354  			rejectHintTerm:  3,
  4355  			rejectHintIndex: 7,
  4356  			nextAppendTerm:  2,
  4357  			nextAppendIndex: 3,
  4358  		},
  4359  		// This case tests that leader can find the conflict index quickly.
  4360  		// Firstly leader appends (type=MsgApp,index=8,logTerm=5, entries=...);
  4361  		// After rejected leader appends (type=MsgApp,index=4,logTerm=3).
  4362  		{
  4363  			leaderLog: []pb.Entry{
  4364  				{Term: 1, Index: 1},
  4365  				{Term: 2, Index: 2},
  4366  				{Term: 2, Index: 3},
  4367  				{Term: 3, Index: 4},
  4368  				{Term: 4, Index: 5},
  4369  				{Term: 4, Index: 6},
  4370  				{Term: 4, Index: 7},
  4371  				{Term: 5, Index: 8},
  4372  			},
  4373  			followerLog: []pb.Entry{
  4374  				{Term: 1, Index: 1},
  4375  				{Term: 2, Index: 2},
  4376  				{Term: 2, Index: 3},
  4377  				{Term: 3, Index: 4},
  4378  				{Term: 3, Index: 5},
  4379  				{Term: 3, Index: 6},
  4380  				{Term: 3, Index: 7},
  4381  				{Term: 3, Index: 8},
  4382  				{Term: 3, Index: 9},
  4383  				{Term: 3, Index: 10},
  4384  				{Term: 3, Index: 11},
  4385  			},
  4386  			rejectHintTerm:  3,
  4387  			rejectHintIndex: 8,
  4388  			nextAppendTerm:  3,
  4389  			nextAppendIndex: 4,
  4390  		},
  4391  		// This case tests that follower can find the conflict index quickly.
  4392  		// Firstly leader appends (type=MsgApp,index=4,logTerm=1, entries=...);
  4393  		// After rejected leader appends (type=MsgApp,index=1,logTerm=1).
  4394  		{
  4395  			leaderLog: []pb.Entry{
  4396  				{Term: 1, Index: 1},
  4397  				{Term: 1, Index: 2},
  4398  				{Term: 1, Index: 3},
  4399  				{Term: 1, Index: 4},
  4400  			},
  4401  			followerLog: []pb.Entry{
  4402  				{Term: 1, Index: 1},
  4403  				{Term: 2, Index: 2},
  4404  				{Term: 2, Index: 3},
  4405  				{Term: 4, Index: 4},
  4406  			},
  4407  			rejectHintTerm:  1,
  4408  			rejectHintIndex: 1,
  4409  			nextAppendTerm:  1,
  4410  			nextAppendIndex: 1,
  4411  		},
  4412  		// This case is similar to the previous case. However, this time, the
  4413  		// leader has a longer uncommitted log tail than the follower.
  4414  		// Firstly leader appends (type=MsgApp,index=6,logTerm=1, entries=...);
  4415  		// After rejected leader appends (type=MsgApp,index=1,logTerm=1).
  4416  		{
  4417  			leaderLog: []pb.Entry{
  4418  				{Term: 1, Index: 1},
  4419  				{Term: 1, Index: 2},
  4420  				{Term: 1, Index: 3},
  4421  				{Term: 1, Index: 4},
  4422  				{Term: 1, Index: 5},
  4423  				{Term: 1, Index: 6},
  4424  			},
  4425  			followerLog: []pb.Entry{
  4426  				{Term: 1, Index: 1},
  4427  				{Term: 2, Index: 2},
  4428  				{Term: 2, Index: 3},
  4429  				{Term: 4, Index: 4},
  4430  			},
  4431  			rejectHintTerm:  1,
  4432  			rejectHintIndex: 1,
  4433  			nextAppendTerm:  1,
  4434  			nextAppendIndex: 1,
  4435  		},
  4436  		// This case is similar to the previous case. However, this time, the
  4437  		// follower has a longer uncommitted log tail than the leader.
  4438  		// Firstly leader appends (type=MsgApp,index=4,logTerm=1, entries=...);
  4439  		// After rejected leader appends (type=MsgApp,index=1,logTerm=1).
  4440  		{
  4441  			leaderLog: []pb.Entry{
  4442  				{Term: 1, Index: 1},
  4443  				{Term: 1, Index: 2},
  4444  				{Term: 1, Index: 3},
  4445  				{Term: 1, Index: 4},
  4446  			},
  4447  			followerLog: []pb.Entry{
  4448  				{Term: 1, Index: 1},
  4449  				{Term: 2, Index: 2},
  4450  				{Term: 2, Index: 3},
  4451  				{Term: 4, Index: 4},
  4452  				{Term: 4, Index: 5},
  4453  				{Term: 4, Index: 6},
  4454  			},
  4455  			rejectHintTerm:  1,
  4456  			rejectHintIndex: 1,
  4457  			nextAppendTerm:  1,
  4458  			nextAppendIndex: 1,
  4459  		},
  4460  		// An normal case that there are no log conflicts.
  4461  		// Firstly leader appends (type=MsgApp,index=5,logTerm=5, entries=...);
  4462  		// After rejected leader appends (type=MsgApp,index=4,logTerm=4).
  4463  		{
  4464  			leaderLog: []pb.Entry{
  4465  				{Term: 1, Index: 1},
  4466  				{Term: 1, Index: 2},
  4467  				{Term: 1, Index: 3},
  4468  				{Term: 4, Index: 4},
  4469  				{Term: 5, Index: 5},
  4470  			},
  4471  			followerLog: []pb.Entry{
  4472  				{Term: 1, Index: 1},
  4473  				{Term: 1, Index: 2},
  4474  				{Term: 1, Index: 3},
  4475  				{Term: 4, Index: 4},
  4476  			},
  4477  			rejectHintTerm:  4,
  4478  			rejectHintIndex: 4,
  4479  			nextAppendTerm:  4,
  4480  			nextAppendIndex: 4,
  4481  		},
  4482  		// Test case from example comment in stepLeader (on leader).
  4483  		{
  4484  			leaderLog: []pb.Entry{
  4485  				{Term: 2, Index: 1},
  4486  				{Term: 5, Index: 2},
  4487  				{Term: 5, Index: 3},
  4488  				{Term: 5, Index: 4},
  4489  				{Term: 5, Index: 5},
  4490  				{Term: 5, Index: 6},
  4491  				{Term: 5, Index: 7},
  4492  				{Term: 5, Index: 8},
  4493  				{Term: 5, Index: 9},
  4494  			},
  4495  			followerLog: []pb.Entry{
  4496  				{Term: 2, Index: 1},
  4497  				{Term: 4, Index: 2},
  4498  				{Term: 4, Index: 3},
  4499  				{Term: 4, Index: 4},
  4500  				{Term: 4, Index: 5},
  4501  				{Term: 4, Index: 6},
  4502  			},
  4503  			rejectHintTerm:  4,
  4504  			rejectHintIndex: 6,
  4505  			nextAppendTerm:  2,
  4506  			nextAppendIndex: 1,
  4507  		},
  4508  		// Test case from example comment in handleAppendEntries (on follower).
  4509  		{
  4510  			leaderLog: []pb.Entry{
  4511  				{Term: 2, Index: 1},
  4512  				{Term: 2, Index: 2},
  4513  				{Term: 2, Index: 3},
  4514  				{Term: 2, Index: 4},
  4515  				{Term: 2, Index: 5},
  4516  			},
  4517  			followerLog: []pb.Entry{
  4518  				{Term: 2, Index: 1},
  4519  				{Term: 4, Index: 2},
  4520  				{Term: 4, Index: 3},
  4521  				{Term: 4, Index: 4},
  4522  				{Term: 4, Index: 5},
  4523  				{Term: 4, Index: 6},
  4524  				{Term: 4, Index: 7},
  4525  				{Term: 4, Index: 8},
  4526  			},
  4527  			nextAppendTerm:  2,
  4528  			nextAppendIndex: 1,
  4529  			rejectHintTerm:  2,
  4530  			rejectHintIndex: 1,
  4531  		},
  4532  	}
  4533  
  4534  	for i, test := range tests {
  4535  		t.Run("", func(t *testing.T) {
  4536  			s1 := NewMemoryStorage()
  4537  			s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
  4538  			s1.Append(test.leaderLog)
  4539  			s2 := NewMemoryStorage()
  4540  			s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}}
  4541  			s2.Append(test.followerLog)
  4542  
  4543  			n1 := newTestRaft(1, 10, 1, s1)
  4544  			n2 := newTestRaft(2, 10, 1, s2)
  4545  
  4546  			n1.becomeCandidate()
  4547  			n1.becomeLeader()
  4548  
  4549  			n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat})
  4550  
  4551  			msgs := n2.readMessages()
  4552  			if len(msgs) != 1 {
  4553  				t.Errorf("can't read 1 message from peer 2")
  4554  			}
  4555  			if msgs[0].Type != pb.MsgHeartbeatResp {
  4556  				t.Errorf("can't read heartbeat response from peer 2")
  4557  			}
  4558  			if n1.Step(msgs[0]) != nil {
  4559  				t.Errorf("peer 1 step heartbeat response fail")
  4560  			}
  4561  
  4562  			msgs = n1.readMessages()
  4563  			if len(msgs) != 1 {
  4564  				t.Errorf("can't read 1 message from peer 1")
  4565  			}
  4566  			if msgs[0].Type != pb.MsgApp {
  4567  				t.Errorf("can't read append from peer 1")
  4568  			}
  4569  
  4570  			if n2.Step(msgs[0]) != nil {
  4571  				t.Errorf("peer 2 step append fail")
  4572  			}
  4573  			msgs = n2.readMessages()
  4574  			if len(msgs) != 1 {
  4575  				t.Errorf("can't read 1 message from peer 2")
  4576  			}
  4577  			if msgs[0].Type != pb.MsgAppResp {
  4578  				t.Errorf("can't read append response from peer 2")
  4579  			}
  4580  			if !msgs[0].Reject {
  4581  				t.Errorf("expected rejected append response from peer 2")
  4582  			}
  4583  			if msgs[0].LogTerm != test.rejectHintTerm {
  4584  				t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm)
  4585  			}
  4586  			if msgs[0].RejectHint != test.rejectHintIndex {
  4587  				t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint)
  4588  			}
  4589  
  4590  			if n1.Step(msgs[0]) != nil {
  4591  				t.Errorf("peer 1 step append fail")
  4592  			}
  4593  			msgs = n1.readMessages()
  4594  			if msgs[0].LogTerm != test.nextAppendTerm {
  4595  				t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm)
  4596  			}
  4597  			if msgs[0].Index != test.nextAppendIndex {
  4598  				t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index)
  4599  			}
  4600  		})
  4601  	}
  4602  }
  4603  
  4604  func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft {
  4605  	storage := NewMemoryStorage()
  4606  	for i, term := range terms {
  4607  		storage.Append([]pb.Entry{{Index: uint64(i + 1), Term: term}})
  4608  	}
  4609  	cfg := newTestConfig(1, 5, 1, storage)
  4610  	if configFunc != nil {
  4611  		configFunc(cfg)
  4612  	}
  4613  	sm := newRaft(cfg)
  4614  	sm.reset(terms[len(terms)-1])
  4615  	return sm
  4616  }
  4617  
  4618  // votedWithConfig creates a raft state machine with Vote and Term set
  4619  // to the given value but no log entries (indicating that it voted in
  4620  // the given term but has not received any logs).
  4621  func votedWithConfig(configFunc func(*Config), vote, term uint64) *raft {
  4622  	storage := NewMemoryStorage()
  4623  	storage.SetHardState(pb.HardState{Vote: vote, Term: term})
  4624  	cfg := newTestConfig(1, 5, 1, storage)
  4625  	if configFunc != nil {
  4626  		configFunc(cfg)
  4627  	}
  4628  	sm := newRaft(cfg)
  4629  	sm.reset(term)
  4630  	return sm
  4631  }
  4632  
  4633  type network struct {
  4634  	peers   map[uint64]stateMachine
  4635  	storage map[uint64]*MemoryStorage
  4636  	dropm   map[connem]float64
  4637  	ignorem map[pb.MessageType]bool
  4638  
  4639  	// msgHook is called for each message sent. It may inspect the
  4640  	// message and return true to send it or false to drop it.
  4641  	msgHook func(pb.Message) bool
  4642  }
  4643  
  4644  // newNetwork initializes a network from peers.
  4645  // A nil node will be replaced with a new *stateMachine.
  4646  // A *stateMachine will get its k, id.
  4647  // When using stateMachine, the address list is always [1, n].
  4648  func newNetwork(peers ...stateMachine) *network {
  4649  	return newNetworkWithConfig(nil, peers...)
  4650  }
  4651  
  4652  // newNetworkWithConfig is like newNetwork but calls the given func to
  4653  // modify the configuration of any state machines it creates.
  4654  func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *network {
  4655  	size := len(peers)
  4656  	peerAddrs := idsBySize(size)
  4657  
  4658  	npeers := make(map[uint64]stateMachine, size)
  4659  	nstorage := make(map[uint64]*MemoryStorage, size)
  4660  
  4661  	for j, p := range peers {
  4662  		id := peerAddrs[j]
  4663  		switch v := p.(type) {
  4664  		case nil:
  4665  			nstorage[id] = newTestMemoryStorage(withPeers(peerAddrs...))
  4666  			cfg := newTestConfig(id, 10, 1, nstorage[id])
  4667  			if configFunc != nil {
  4668  				configFunc(cfg)
  4669  			}
  4670  			sm := newRaft(cfg)
  4671  			npeers[id] = sm
  4672  		case *raft:
  4673  			// TODO(tbg): this is all pretty confused. Clean this up.
  4674  			learners := make(map[uint64]bool, len(v.prs.Learners))
  4675  			for i := range v.prs.Learners {
  4676  				learners[i] = true
  4677  			}
  4678  			v.id = id
  4679  			v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight)
  4680  			if len(learners) > 0 {
  4681  				v.prs.Learners = map[uint64]struct{}{}
  4682  			}
  4683  			for i := 0; i < size; i++ {
  4684  				pr := &tracker.Progress{}
  4685  				if _, ok := learners[peerAddrs[i]]; ok {
  4686  					pr.IsLearner = true
  4687  					v.prs.Learners[peerAddrs[i]] = struct{}{}
  4688  				} else {
  4689  					v.prs.Voters[0][peerAddrs[i]] = struct{}{}
  4690  				}
  4691  				v.prs.Progress[peerAddrs[i]] = pr
  4692  			}
  4693  			v.reset(v.Term)
  4694  			npeers[id] = v
  4695  		case *blackHole:
  4696  			npeers[id] = v
  4697  		default:
  4698  			panic(fmt.Sprintf("unexpected state machine type: %T", p))
  4699  		}
  4700  	}
  4701  	return &network{
  4702  		peers:   npeers,
  4703  		storage: nstorage,
  4704  		dropm:   make(map[connem]float64),
  4705  		ignorem: make(map[pb.MessageType]bool),
  4706  	}
  4707  }
  4708  
  4709  func preVoteConfig(c *Config) {
  4710  	c.PreVote = true
  4711  }
  4712  
  4713  func (nw *network) send(msgs ...pb.Message) {
  4714  	for len(msgs) > 0 {
  4715  		m := msgs[0]
  4716  		p := nw.peers[m.To]
  4717  		p.Step(m)
  4718  		msgs = append(msgs[1:], nw.filter(p.readMessages())...)
  4719  	}
  4720  }
  4721  
  4722  func (nw *network) drop(from, to uint64, perc float64) {
  4723  	nw.dropm[connem{from, to}] = perc
  4724  }
  4725  
  4726  func (nw *network) cut(one, other uint64) {
  4727  	nw.drop(one, other, 2.0) // always drop
  4728  	nw.drop(other, one, 2.0) // always drop
  4729  }
  4730  
  4731  func (nw *network) isolate(id uint64) {
  4732  	for i := 0; i < len(nw.peers); i++ {
  4733  		nid := uint64(i) + 1
  4734  		if nid != id {
  4735  			nw.drop(id, nid, 1.0) // always drop
  4736  			nw.drop(nid, id, 1.0) // always drop
  4737  		}
  4738  	}
  4739  }
  4740  
  4741  func (nw *network) ignore(t pb.MessageType) {
  4742  	nw.ignorem[t] = true
  4743  }
  4744  
  4745  func (nw *network) recover() {
  4746  	nw.dropm = make(map[connem]float64)
  4747  	nw.ignorem = make(map[pb.MessageType]bool)
  4748  }
  4749  
  4750  func (nw *network) filter(msgs []pb.Message) []pb.Message {
  4751  	mm := []pb.Message{}
  4752  	for _, m := range msgs {
  4753  		if nw.ignorem[m.Type] {
  4754  			continue
  4755  		}
  4756  		switch m.Type {
  4757  		case pb.MsgHup:
  4758  			// hups never go over the network, so don't drop them but panic
  4759  			panic("unexpected msgHup")
  4760  		default:
  4761  			perc := nw.dropm[connem{m.From, m.To}]
  4762  			if n := rand.Float64(); n < perc {
  4763  				continue
  4764  			}
  4765  		}
  4766  		if nw.msgHook != nil {
  4767  			if !nw.msgHook(m) {
  4768  				continue
  4769  			}
  4770  		}
  4771  		mm = append(mm, m)
  4772  	}
  4773  	return mm
  4774  }
  4775  
  4776  type connem struct {
  4777  	from, to uint64
  4778  }
  4779  
  4780  type blackHole struct{}
  4781  
  4782  func (blackHole) Step(pb.Message) error      { return nil }
  4783  func (blackHole) readMessages() []pb.Message { return nil }
  4784  
  4785  var nopStepper = &blackHole{}
  4786  
  4787  func idsBySize(size int) []uint64 {
  4788  	ids := make([]uint64, size)
  4789  	for i := 0; i < size; i++ {
  4790  		ids[i] = 1 + uint64(i)
  4791  	}
  4792  	return ids
  4793  }
  4794  
  4795  // setRandomizedElectionTimeout set up the value by caller instead of choosing
  4796  // by system, in some test scenario we need to fill in some expected value to
  4797  // ensure the certainty
  4798  func setRandomizedElectionTimeout(r *raft, v int) {
  4799  	r.randomizedElectionTimeout = v
  4800  }
  4801  
  4802  func newTestConfig(id uint64, election, heartbeat int, storage Storage) *Config {
  4803  	return &Config{
  4804  		ID:              id,
  4805  		ElectionTick:    election,
  4806  		HeartbeatTick:   heartbeat,
  4807  		Storage:         storage,
  4808  		MaxSizePerMsg:   noLimit,
  4809  		MaxInflightMsgs: 256,
  4810  	}
  4811  }
  4812  
  4813  type testMemoryStorageOptions func(*MemoryStorage)
  4814  
  4815  func withPeers(peers ...uint64) testMemoryStorageOptions {
  4816  	return func(ms *MemoryStorage) {
  4817  		ms.snapshot.Metadata.ConfState.Voters = peers
  4818  	}
  4819  }
  4820  
  4821  func withLearners(learners ...uint64) testMemoryStorageOptions {
  4822  	return func(ms *MemoryStorage) {
  4823  		ms.snapshot.Metadata.ConfState.Learners = learners
  4824  	}
  4825  }
  4826  
  4827  func newTestMemoryStorage(opts ...testMemoryStorageOptions) *MemoryStorage {
  4828  	ms := NewMemoryStorage()
  4829  	for _, o := range opts {
  4830  		o(ms)
  4831  	}
  4832  	return ms
  4833  }
  4834  
  4835  func newTestRaft(id uint64, election, heartbeat int, storage Storage) *raft {
  4836  	return newRaft(newTestConfig(id, election, heartbeat, storage))
  4837  }
  4838  
  4839  func newTestLearnerRaft(id uint64, election, heartbeat int, storage Storage) *raft {
  4840  	cfg := newTestConfig(id, election, heartbeat, storage)
  4841  	return newRaft(cfg)
  4842  }
  4843  
  4844  // newTestRawNode sets up a RawNode with the given peers. The configuration will
  4845  // not be reflected in the Storage.
  4846  func newTestRawNode(id uint64, election, heartbeat int, storage Storage) *RawNode {
  4847  	cfg := newTestConfig(id, election, heartbeat, storage)
  4848  	rn, err := NewRawNode(cfg)
  4849  	if err != nil {
  4850  		panic(err)
  4851  	}
  4852  	return rn
  4853  }
  4854  

View as plain text