...

Source file src/go.etcd.io/etcd/raft/v3/raft.go

Documentation: go.etcd.io/etcd/raft/v3

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package raft
    16  
    17  import (
    18  	"bytes"
    19  	"errors"
    20  	"fmt"
    21  	"math"
    22  	"math/rand"
    23  	"sort"
    24  	"strings"
    25  	"sync"
    26  	"time"
    27  
    28  	"go.etcd.io/etcd/raft/v3/confchange"
    29  	"go.etcd.io/etcd/raft/v3/quorum"
    30  	pb "go.etcd.io/etcd/raft/v3/raftpb"
    31  	"go.etcd.io/etcd/raft/v3/tracker"
    32  )
    33  
    34  // None is a placeholder node ID used when there is no leader.
    35  const None uint64 = 0
    36  const noLimit = math.MaxUint64
    37  
    38  // Possible values for StateType.
    39  const (
    40  	StateFollower StateType = iota
    41  	StateCandidate
    42  	StateLeader
    43  	StatePreCandidate
    44  	numStates
    45  )
    46  
    47  type ReadOnlyOption int
    48  
    49  const (
    50  	// ReadOnlySafe guarantees the linearizability of the read only request by
    51  	// communicating with the quorum. It is the default and suggested option.
    52  	ReadOnlySafe ReadOnlyOption = iota
    53  	// ReadOnlyLeaseBased ensures linearizability of the read only request by
    54  	// relying on the leader lease. It can be affected by clock drift.
    55  	// If the clock drift is unbounded, leader might keep the lease longer than it
    56  	// should (clock can move backward/pause without any bound). ReadIndex is not safe
    57  	// in that case.
    58  	ReadOnlyLeaseBased
    59  )
    60  
    61  // Possible values for CampaignType
    62  const (
    63  	// campaignPreElection represents the first phase of a normal election when
    64  	// Config.PreVote is true.
    65  	campaignPreElection CampaignType = "CampaignPreElection"
    66  	// campaignElection represents a normal (time-based) election (the second phase
    67  	// of the election when Config.PreVote is true).
    68  	campaignElection CampaignType = "CampaignElection"
    69  	// campaignTransfer represents the type of leader transfer
    70  	campaignTransfer CampaignType = "CampaignTransfer"
    71  )
    72  
    73  // ErrProposalDropped is returned when the proposal is ignored by some cases,
    74  // so that the proposer can be notified and fail fast.
    75  var ErrProposalDropped = errors.New("raft proposal dropped")
    76  
    77  // lockedRand is a small wrapper around rand.Rand to provide
    78  // synchronization among multiple raft groups. Only the methods needed
    79  // by the code are exposed (e.g. Intn).
    80  type lockedRand struct {
    81  	mu   sync.Mutex
    82  	rand *rand.Rand
    83  }
    84  
    85  func (r *lockedRand) Intn(n int) int {
    86  	r.mu.Lock()
    87  	v := r.rand.Intn(n)
    88  	r.mu.Unlock()
    89  	return v
    90  }
    91  
    92  var globalRand = &lockedRand{
    93  	rand: rand.New(rand.NewSource(time.Now().UnixNano())),
    94  }
    95  
    96  // CampaignType represents the type of campaigning
    97  // the reason we use the type of string instead of uint64
    98  // is because it's simpler to compare and fill in raft entries
    99  type CampaignType string
   100  
   101  // StateType represents the role of a node in a cluster.
   102  type StateType uint64
   103  
   104  var stmap = [...]string{
   105  	"StateFollower",
   106  	"StateCandidate",
   107  	"StateLeader",
   108  	"StatePreCandidate",
   109  }
   110  
   111  func (st StateType) String() string {
   112  	return stmap[uint64(st)]
   113  }
   114  
   115  // Config contains the parameters to start a raft.
   116  type Config struct {
   117  	// ID is the identity of the local raft. ID cannot be 0.
   118  	ID uint64
   119  
   120  	// ElectionTick is the number of Node.Tick invocations that must pass between
   121  	// elections. That is, if a follower does not receive any message from the
   122  	// leader of current term before ElectionTick has elapsed, it will become
   123  	// candidate and start an election. ElectionTick must be greater than
   124  	// HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
   125  	// unnecessary leader switching.
   126  	ElectionTick int
   127  	// HeartbeatTick is the number of Node.Tick invocations that must pass between
   128  	// heartbeats. That is, a leader sends heartbeat messages to maintain its
   129  	// leadership every HeartbeatTick ticks.
   130  	HeartbeatTick int
   131  
   132  	// Storage is the storage for raft. raft generates entries and states to be
   133  	// stored in storage. raft reads the persisted entries and states out of
   134  	// Storage when it needs. raft reads out the previous state and configuration
   135  	// out of storage when restarting.
   136  	Storage Storage
   137  	// Applied is the last applied index. It should only be set when restarting
   138  	// raft. raft will not return entries to the application smaller or equal to
   139  	// Applied. If Applied is unset when restarting, raft might return previous
   140  	// applied entries. This is a very application dependent configuration.
   141  	Applied uint64
   142  
   143  	// MaxSizePerMsg limits the max byte size of each append message. Smaller
   144  	// value lowers the raft recovery cost(initial probing and message lost
   145  	// during normal operation). On the other side, it might affect the
   146  	// throughput during normal replication. Note: math.MaxUint64 for unlimited,
   147  	// 0 for at most one entry per message.
   148  	MaxSizePerMsg uint64
   149  	// MaxCommittedSizePerReady limits the size of the committed entries which
   150  	// can be applied.
   151  	MaxCommittedSizePerReady uint64
   152  	// MaxUncommittedEntriesSize limits the aggregate byte size of the
   153  	// uncommitted entries that may be appended to a leader's log. Once this
   154  	// limit is exceeded, proposals will begin to return ErrProposalDropped
   155  	// errors. Note: 0 for no limit.
   156  	MaxUncommittedEntriesSize uint64
   157  	// MaxInflightMsgs limits the max number of in-flight append messages during
   158  	// optimistic replication phase. The application transportation layer usually
   159  	// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
   160  	// overflowing that sending buffer. TODO (xiangli): feedback to application to
   161  	// limit the proposal rate?
   162  	MaxInflightMsgs int
   163  
   164  	// CheckQuorum specifies if the leader should check quorum activity. Leader
   165  	// steps down when quorum is not active for an electionTimeout.
   166  	CheckQuorum bool
   167  
   168  	// PreVote enables the Pre-Vote algorithm described in raft thesis section
   169  	// 9.6. This prevents disruption when a node that has been partitioned away
   170  	// rejoins the cluster.
   171  	PreVote bool
   172  
   173  	// ReadOnlyOption specifies how the read only request is processed.
   174  	//
   175  	// ReadOnlySafe guarantees the linearizability of the read only request by
   176  	// communicating with the quorum. It is the default and suggested option.
   177  	//
   178  	// ReadOnlyLeaseBased ensures linearizability of the read only request by
   179  	// relying on the leader lease. It can be affected by clock drift.
   180  	// If the clock drift is unbounded, leader might keep the lease longer than it
   181  	// should (clock can move backward/pause without any bound). ReadIndex is not safe
   182  	// in that case.
   183  	// CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
   184  	ReadOnlyOption ReadOnlyOption
   185  
   186  	// Logger is the logger used for raft log. For multinode which can host
   187  	// multiple raft group, each raft group can have its own logger
   188  	Logger Logger
   189  
   190  	// DisableProposalForwarding set to true means that followers will drop
   191  	// proposals, rather than forwarding them to the leader. One use case for
   192  	// this feature would be in a situation where the Raft leader is used to
   193  	// compute the data of a proposal, for example, adding a timestamp from a
   194  	// hybrid logical clock to data in a monotonically increasing way. Forwarding
   195  	// should be disabled to prevent a follower with an inaccurate hybrid
   196  	// logical clock from assigning the timestamp and then forwarding the data
   197  	// to the leader.
   198  	DisableProposalForwarding bool
   199  }
   200  
   201  func (c *Config) validate() error {
   202  	if c.ID == None {
   203  		return errors.New("cannot use none as id")
   204  	}
   205  
   206  	if c.HeartbeatTick <= 0 {
   207  		return errors.New("heartbeat tick must be greater than 0")
   208  	}
   209  
   210  	if c.ElectionTick <= c.HeartbeatTick {
   211  		return errors.New("election tick must be greater than heartbeat tick")
   212  	}
   213  
   214  	if c.Storage == nil {
   215  		return errors.New("storage cannot be nil")
   216  	}
   217  
   218  	if c.MaxUncommittedEntriesSize == 0 {
   219  		c.MaxUncommittedEntriesSize = noLimit
   220  	}
   221  
   222  	// default MaxCommittedSizePerReady to MaxSizePerMsg because they were
   223  	// previously the same parameter.
   224  	if c.MaxCommittedSizePerReady == 0 {
   225  		c.MaxCommittedSizePerReady = c.MaxSizePerMsg
   226  	}
   227  
   228  	if c.MaxInflightMsgs <= 0 {
   229  		return errors.New("max inflight messages must be greater than 0")
   230  	}
   231  
   232  	if c.Logger == nil {
   233  		c.Logger = getLogger()
   234  	}
   235  
   236  	if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
   237  		return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
   238  	}
   239  
   240  	return nil
   241  }
   242  
   243  type raft struct {
   244  	id uint64
   245  
   246  	Term uint64
   247  	Vote uint64
   248  
   249  	readStates []ReadState
   250  
   251  	// the log
   252  	raftLog *raftLog
   253  
   254  	maxMsgSize         uint64
   255  	maxUncommittedSize uint64
   256  	// TODO(tbg): rename to trk.
   257  	prs tracker.ProgressTracker
   258  
   259  	state StateType
   260  
   261  	// isLearner is true if the local raft node is a learner.
   262  	isLearner bool
   263  
   264  	msgs []pb.Message
   265  
   266  	// the leader id
   267  	lead uint64
   268  	// leadTransferee is id of the leader transfer target when its value is not zero.
   269  	// Follow the procedure defined in raft thesis 3.10.
   270  	leadTransferee uint64
   271  	// Only one conf change may be pending (in the log, but not yet
   272  	// applied) at a time. This is enforced via pendingConfIndex, which
   273  	// is set to a value >= the log index of the latest pending
   274  	// configuration change (if any). Config changes are only allowed to
   275  	// be proposed if the leader's applied index is greater than this
   276  	// value.
   277  	pendingConfIndex uint64
   278  	// an estimate of the size of the uncommitted tail of the Raft log. Used to
   279  	// prevent unbounded log growth. Only maintained by the leader. Reset on
   280  	// term changes.
   281  	uncommittedSize uint64
   282  
   283  	readOnly *readOnly
   284  
   285  	// number of ticks since it reached last electionTimeout when it is leader
   286  	// or candidate.
   287  	// number of ticks since it reached last electionTimeout or received a
   288  	// valid message from current leader when it is a follower.
   289  	electionElapsed int
   290  
   291  	// number of ticks since it reached last heartbeatTimeout.
   292  	// only leader keeps heartbeatElapsed.
   293  	heartbeatElapsed int
   294  
   295  	checkQuorum bool
   296  	preVote     bool
   297  
   298  	heartbeatTimeout int
   299  	electionTimeout  int
   300  	// randomizedElectionTimeout is a random number between
   301  	// [electiontimeout, 2 * electiontimeout - 1]. It gets reset
   302  	// when raft changes its state to follower or candidate.
   303  	randomizedElectionTimeout int
   304  	disableProposalForwarding bool
   305  
   306  	tick func()
   307  	step stepFunc
   308  
   309  	logger Logger
   310  
   311  	// pendingReadIndexMessages is used to store messages of type MsgReadIndex
   312  	// that can't be answered as new leader didn't committed any log in
   313  	// current term. Those will be handled as fast as first log is committed in
   314  	// current term.
   315  	pendingReadIndexMessages []pb.Message
   316  }
   317  
   318  func newRaft(c *Config) *raft {
   319  	if err := c.validate(); err != nil {
   320  		panic(err.Error())
   321  	}
   322  	raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
   323  	hs, cs, err := c.Storage.InitialState()
   324  	if err != nil {
   325  		panic(err) // TODO(bdarnell)
   326  	}
   327  
   328  	r := &raft{
   329  		id:                        c.ID,
   330  		lead:                      None,
   331  		isLearner:                 false,
   332  		raftLog:                   raftlog,
   333  		maxMsgSize:                c.MaxSizePerMsg,
   334  		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
   335  		prs:                       tracker.MakeProgressTracker(c.MaxInflightMsgs),
   336  		electionTimeout:           c.ElectionTick,
   337  		heartbeatTimeout:          c.HeartbeatTick,
   338  		logger:                    c.Logger,
   339  		checkQuorum:               c.CheckQuorum,
   340  		preVote:                   c.PreVote,
   341  		readOnly:                  newReadOnly(c.ReadOnlyOption),
   342  		disableProposalForwarding: c.DisableProposalForwarding,
   343  	}
   344  
   345  	cfg, prs, err := confchange.Restore(confchange.Changer{
   346  		Tracker:   r.prs,
   347  		LastIndex: raftlog.lastIndex(),
   348  	}, cs)
   349  	if err != nil {
   350  		panic(err)
   351  	}
   352  	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
   353  
   354  	if !IsEmptyHardState(hs) {
   355  		r.loadState(hs)
   356  	}
   357  	if c.Applied > 0 {
   358  		raftlog.appliedTo(c.Applied)
   359  	}
   360  	r.becomeFollower(r.Term, None)
   361  
   362  	var nodesStrs []string
   363  	for _, n := range r.prs.VoterNodes() {
   364  		nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
   365  	}
   366  
   367  	r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
   368  		r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
   369  	return r
   370  }
   371  
   372  func (r *raft) hasLeader() bool { return r.lead != None }
   373  
   374  func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
   375  
   376  func (r *raft) hardState() pb.HardState {
   377  	return pb.HardState{
   378  		Term:   r.Term,
   379  		Vote:   r.Vote,
   380  		Commit: r.raftLog.committed,
   381  	}
   382  }
   383  
   384  // send schedules persisting state to a stable storage and AFTER that
   385  // sending the message (as part of next Ready message processing).
   386  func (r *raft) send(m pb.Message) {
   387  	if m.From == None {
   388  		m.From = r.id
   389  	}
   390  	if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
   391  		if m.Term == 0 {
   392  			// All {pre-,}campaign messages need to have the term set when
   393  			// sending.
   394  			// - MsgVote: m.Term is the term the node is campaigning for,
   395  			//   non-zero as we increment the term when campaigning.
   396  			// - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
   397  			//   granted, non-zero for the same reason MsgVote is
   398  			// - MsgPreVote: m.Term is the term the node will campaign,
   399  			//   non-zero as we use m.Term to indicate the next term we'll be
   400  			//   campaigning for
   401  			// - MsgPreVoteResp: m.Term is the term received in the original
   402  			//   MsgPreVote if the pre-vote was granted, non-zero for the
   403  			//   same reasons MsgPreVote is
   404  			panic(fmt.Sprintf("term should be set when sending %s", m.Type))
   405  		}
   406  	} else {
   407  		if m.Term != 0 {
   408  			panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
   409  		}
   410  		// do not attach term to MsgProp, MsgReadIndex
   411  		// proposals are a way to forward to the leader and
   412  		// should be treated as local message.
   413  		// MsgReadIndex is also forwarded to leader.
   414  		if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
   415  			m.Term = r.Term
   416  		}
   417  	}
   418  	r.msgs = append(r.msgs, m)
   419  }
   420  
   421  // sendAppend sends an append RPC with new entries (if any) and the
   422  // current commit index to the given peer.
   423  func (r *raft) sendAppend(to uint64) {
   424  	r.maybeSendAppend(to, true)
   425  }
   426  
   427  // maybeSendAppend sends an append RPC with new entries to the given peer,
   428  // if necessary. Returns true if a message was sent. The sendIfEmpty
   429  // argument controls whether messages with no entries will be sent
   430  // ("empty" messages are useful to convey updated Commit indexes, but
   431  // are undesirable when we're sending multiple messages in a batch).
   432  func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
   433  	pr := r.prs.Progress[to]
   434  	if pr.IsPaused() {
   435  		return false
   436  	}
   437  	m := pb.Message{}
   438  	m.To = to
   439  
   440  	term, errt := r.raftLog.term(pr.Next - 1)
   441  	ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
   442  	if len(ents) == 0 && !sendIfEmpty {
   443  		return false
   444  	}
   445  
   446  	if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
   447  		if !pr.RecentActive {
   448  			r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
   449  			return false
   450  		}
   451  
   452  		m.Type = pb.MsgSnap
   453  		snapshot, err := r.raftLog.snapshot()
   454  		if err != nil {
   455  			if err == ErrSnapshotTemporarilyUnavailable {
   456  				r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
   457  				return false
   458  			}
   459  			panic(err) // TODO(bdarnell)
   460  		}
   461  		if IsEmptySnap(snapshot) {
   462  			panic("need non-empty snapshot")
   463  		}
   464  		m.Snapshot = snapshot
   465  		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
   466  		r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
   467  			r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
   468  		pr.BecomeSnapshot(sindex)
   469  		r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
   470  	} else {
   471  		m.Type = pb.MsgApp
   472  		m.Index = pr.Next - 1
   473  		m.LogTerm = term
   474  		m.Entries = ents
   475  		m.Commit = r.raftLog.committed
   476  		if n := len(m.Entries); n != 0 {
   477  			switch pr.State {
   478  			// optimistically increase the next when in StateReplicate
   479  			case tracker.StateReplicate:
   480  				last := m.Entries[n-1].Index
   481  				pr.OptimisticUpdate(last)
   482  				pr.Inflights.Add(last)
   483  			case tracker.StateProbe:
   484  				pr.ProbeSent = true
   485  			default:
   486  				r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
   487  			}
   488  		}
   489  	}
   490  	r.send(m)
   491  	return true
   492  }
   493  
   494  // sendHeartbeat sends a heartbeat RPC to the given peer.
   495  func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
   496  	// Attach the commit as min(to.matched, r.committed).
   497  	// When the leader sends out heartbeat message,
   498  	// the receiver(follower) might not be matched with the leader
   499  	// or it might not have all the committed entries.
   500  	// The leader MUST NOT forward the follower's commit to
   501  	// an unmatched index.
   502  	commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
   503  	m := pb.Message{
   504  		To:      to,
   505  		Type:    pb.MsgHeartbeat,
   506  		Commit:  commit,
   507  		Context: ctx,
   508  	}
   509  
   510  	r.send(m)
   511  }
   512  
   513  // bcastAppend sends RPC, with entries to all peers that are not up-to-date
   514  // according to the progress recorded in r.prs.
   515  func (r *raft) bcastAppend() {
   516  	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
   517  		if id == r.id {
   518  			return
   519  		}
   520  		r.sendAppend(id)
   521  	})
   522  }
   523  
   524  // bcastHeartbeat sends RPC, without entries to all the peers.
   525  func (r *raft) bcastHeartbeat() {
   526  	lastCtx := r.readOnly.lastPendingRequestCtx()
   527  	if len(lastCtx) == 0 {
   528  		r.bcastHeartbeatWithCtx(nil)
   529  	} else {
   530  		r.bcastHeartbeatWithCtx([]byte(lastCtx))
   531  	}
   532  }
   533  
   534  func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
   535  	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
   536  		if id == r.id {
   537  			return
   538  		}
   539  		r.sendHeartbeat(id, ctx)
   540  	})
   541  }
   542  
   543  func (r *raft) advance(rd Ready) {
   544  	r.reduceUncommittedSize(rd.CommittedEntries)
   545  
   546  	// If entries were applied (or a snapshot), update our cursor for
   547  	// the next Ready. Note that if the current HardState contains a
   548  	// new Commit index, this does not mean that we're also applying
   549  	// all of the new entries due to commit pagination by size.
   550  	if newApplied := rd.appliedCursor(); newApplied > 0 {
   551  		oldApplied := r.raftLog.applied
   552  		r.raftLog.appliedTo(newApplied)
   553  
   554  		if r.prs.Config.AutoLeave && oldApplied <= r.pendingConfIndex && newApplied >= r.pendingConfIndex && r.state == StateLeader {
   555  			// If the current (and most recent, at least for this leader's term)
   556  			// configuration should be auto-left, initiate that now. We use a
   557  			// nil Data which unmarshals into an empty ConfChangeV2 and has the
   558  			// benefit that appendEntry can never refuse it based on its size
   559  			// (which registers as zero).
   560  			ent := pb.Entry{
   561  				Type: pb.EntryConfChangeV2,
   562  				Data: nil,
   563  			}
   564  			// There's no way in which this proposal should be able to be rejected.
   565  			if !r.appendEntry(ent) {
   566  				panic("refused un-refusable auto-leaving ConfChangeV2")
   567  			}
   568  			r.pendingConfIndex = r.raftLog.lastIndex()
   569  			r.logger.Infof("initiating automatic transition out of joint configuration %s", r.prs.Config)
   570  		}
   571  	}
   572  
   573  	if len(rd.Entries) > 0 {
   574  		e := rd.Entries[len(rd.Entries)-1]
   575  		r.raftLog.stableTo(e.Index, e.Term)
   576  	}
   577  	if !IsEmptySnap(rd.Snapshot) {
   578  		r.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
   579  	}
   580  }
   581  
   582  // maybeCommit attempts to advance the commit index. Returns true if
   583  // the commit index changed (in which case the caller should call
   584  // r.bcastAppend).
   585  func (r *raft) maybeCommit() bool {
   586  	mci := r.prs.Committed()
   587  	return r.raftLog.maybeCommit(mci, r.Term)
   588  }
   589  
   590  func (r *raft) reset(term uint64) {
   591  	if r.Term != term {
   592  		r.Term = term
   593  		r.Vote = None
   594  	}
   595  	r.lead = None
   596  
   597  	r.electionElapsed = 0
   598  	r.heartbeatElapsed = 0
   599  	r.resetRandomizedElectionTimeout()
   600  
   601  	r.abortLeaderTransfer()
   602  
   603  	r.prs.ResetVotes()
   604  	r.prs.Visit(func(id uint64, pr *tracker.Progress) {
   605  		*pr = tracker.Progress{
   606  			Match:     0,
   607  			Next:      r.raftLog.lastIndex() + 1,
   608  			Inflights: tracker.NewInflights(r.prs.MaxInflight),
   609  			IsLearner: pr.IsLearner,
   610  		}
   611  		if id == r.id {
   612  			pr.Match = r.raftLog.lastIndex()
   613  		}
   614  	})
   615  
   616  	r.pendingConfIndex = 0
   617  	r.uncommittedSize = 0
   618  	r.readOnly = newReadOnly(r.readOnly.option)
   619  }
   620  
   621  func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
   622  	li := r.raftLog.lastIndex()
   623  	for i := range es {
   624  		es[i].Term = r.Term
   625  		es[i].Index = li + 1 + uint64(i)
   626  	}
   627  	// Track the size of this uncommitted proposal.
   628  	if !r.increaseUncommittedSize(es) {
   629  		r.logger.Debugf(
   630  			"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
   631  			r.id,
   632  		)
   633  		// Drop the proposal.
   634  		return false
   635  	}
   636  	// use latest "last" index after truncate/append
   637  	li = r.raftLog.append(es...)
   638  	r.prs.Progress[r.id].MaybeUpdate(li)
   639  	// Regardless of maybeCommit's return, our caller will call bcastAppend.
   640  	r.maybeCommit()
   641  	return true
   642  }
   643  
   644  // tickElection is run by followers and candidates after r.electionTimeout.
   645  func (r *raft) tickElection() {
   646  	r.electionElapsed++
   647  
   648  	if r.promotable() && r.pastElectionTimeout() {
   649  		r.electionElapsed = 0
   650  		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
   651  	}
   652  }
   653  
   654  // tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
   655  func (r *raft) tickHeartbeat() {
   656  	r.heartbeatElapsed++
   657  	r.electionElapsed++
   658  
   659  	if r.electionElapsed >= r.electionTimeout {
   660  		r.electionElapsed = 0
   661  		if r.checkQuorum {
   662  			r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
   663  		}
   664  		// If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
   665  		if r.state == StateLeader && r.leadTransferee != None {
   666  			r.abortLeaderTransfer()
   667  		}
   668  	}
   669  
   670  	if r.state != StateLeader {
   671  		return
   672  	}
   673  
   674  	if r.heartbeatElapsed >= r.heartbeatTimeout {
   675  		r.heartbeatElapsed = 0
   676  		r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
   677  	}
   678  }
   679  
   680  func (r *raft) becomeFollower(term uint64, lead uint64) {
   681  	r.step = stepFollower
   682  	r.reset(term)
   683  	r.tick = r.tickElection
   684  	r.lead = lead
   685  	r.state = StateFollower
   686  	r.logger.Infof("%x became follower at term %d", r.id, r.Term)
   687  }
   688  
   689  func (r *raft) becomeCandidate() {
   690  	// TODO(xiangli) remove the panic when the raft implementation is stable
   691  	if r.state == StateLeader {
   692  		panic("invalid transition [leader -> candidate]")
   693  	}
   694  	r.step = stepCandidate
   695  	r.reset(r.Term + 1)
   696  	r.tick = r.tickElection
   697  	r.Vote = r.id
   698  	r.state = StateCandidate
   699  	r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
   700  }
   701  
   702  func (r *raft) becomePreCandidate() {
   703  	// TODO(xiangli) remove the panic when the raft implementation is stable
   704  	if r.state == StateLeader {
   705  		panic("invalid transition [leader -> pre-candidate]")
   706  	}
   707  	// Becoming a pre-candidate changes our step functions and state,
   708  	// but doesn't change anything else. In particular it does not increase
   709  	// r.Term or change r.Vote.
   710  	r.step = stepCandidate
   711  	r.prs.ResetVotes()
   712  	r.tick = r.tickElection
   713  	r.lead = None
   714  	r.state = StatePreCandidate
   715  	r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
   716  }
   717  
   718  func (r *raft) becomeLeader() {
   719  	// TODO(xiangli) remove the panic when the raft implementation is stable
   720  	if r.state == StateFollower {
   721  		panic("invalid transition [follower -> leader]")
   722  	}
   723  	r.step = stepLeader
   724  	r.reset(r.Term)
   725  	r.tick = r.tickHeartbeat
   726  	r.lead = r.id
   727  	r.state = StateLeader
   728  	// Followers enter replicate mode when they've been successfully probed
   729  	// (perhaps after having received a snapshot as a result). The leader is
   730  	// trivially in this state. Note that r.reset() has initialized this
   731  	// progress with the last index already.
   732  	r.prs.Progress[r.id].BecomeReplicate()
   733  
   734  	// Conservatively set the pendingConfIndex to the last index in the
   735  	// log. There may or may not be a pending config change, but it's
   736  	// safe to delay any future proposals until we commit all our
   737  	// pending log entries, and scanning the entire tail of the log
   738  	// could be expensive.
   739  	r.pendingConfIndex = r.raftLog.lastIndex()
   740  
   741  	emptyEnt := pb.Entry{Data: nil}
   742  	if !r.appendEntry(emptyEnt) {
   743  		// This won't happen because we just called reset() above.
   744  		r.logger.Panic("empty entry was dropped")
   745  	}
   746  	// As a special case, don't count the initial empty entry towards the
   747  	// uncommitted log quota. This is because we want to preserve the
   748  	// behavior of allowing one entry larger than quota if the current
   749  	// usage is zero.
   750  	r.reduceUncommittedSize([]pb.Entry{emptyEnt})
   751  	r.logger.Infof("%x became leader at term %d", r.id, r.Term)
   752  }
   753  
   754  func (r *raft) hup(t CampaignType) {
   755  	if r.state == StateLeader {
   756  		r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
   757  		return
   758  	}
   759  
   760  	if !r.promotable() {
   761  		r.logger.Warningf("%x is unpromotable and can not campaign", r.id)
   762  		return
   763  	}
   764  	ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
   765  	if err != nil {
   766  		r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
   767  	}
   768  	if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
   769  		r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
   770  		return
   771  	}
   772  
   773  	r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
   774  	r.campaign(t)
   775  }
   776  
   777  // campaign transitions the raft instance to candidate state. This must only be
   778  // called after verifying that this is a legitimate transition.
   779  func (r *raft) campaign(t CampaignType) {
   780  	if !r.promotable() {
   781  		// This path should not be hit (callers are supposed to check), but
   782  		// better safe than sorry.
   783  		r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
   784  	}
   785  	var term uint64
   786  	var voteMsg pb.MessageType
   787  	if t == campaignPreElection {
   788  		r.becomePreCandidate()
   789  		voteMsg = pb.MsgPreVote
   790  		// PreVote RPCs are sent for the next term before we've incremented r.Term.
   791  		term = r.Term + 1
   792  	} else {
   793  		r.becomeCandidate()
   794  		voteMsg = pb.MsgVote
   795  		term = r.Term
   796  	}
   797  	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
   798  		// We won the election after voting for ourselves (which must mean that
   799  		// this is a single-node cluster). Advance to the next state.
   800  		if t == campaignPreElection {
   801  			r.campaign(campaignElection)
   802  		} else {
   803  			r.becomeLeader()
   804  		}
   805  		return
   806  	}
   807  	var ids []uint64
   808  	{
   809  		idMap := r.prs.Voters.IDs()
   810  		ids = make([]uint64, 0, len(idMap))
   811  		for id := range idMap {
   812  			ids = append(ids, id)
   813  		}
   814  		sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
   815  	}
   816  	for _, id := range ids {
   817  		if id == r.id {
   818  			continue
   819  		}
   820  		r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
   821  			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
   822  
   823  		var ctx []byte
   824  		if t == campaignTransfer {
   825  			ctx = []byte(t)
   826  		}
   827  		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
   828  	}
   829  }
   830  
   831  func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
   832  	if v {
   833  		r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
   834  	} else {
   835  		r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
   836  	}
   837  	r.prs.RecordVote(id, v)
   838  	return r.prs.TallyVotes()
   839  }
   840  
   841  func (r *raft) Step(m pb.Message) error {
   842  	// Handle the message term, which may result in our stepping down to a follower.
   843  	switch {
   844  	case m.Term == 0:
   845  		// local message
   846  	case m.Term > r.Term:
   847  		if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
   848  			force := bytes.Equal(m.Context, []byte(campaignTransfer))
   849  			inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
   850  			if !force && inLease {
   851  				// If a server receives a RequestVote request within the minimum election timeout
   852  				// of hearing from a current leader, it does not update its term or grant its vote
   853  				r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
   854  					r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
   855  				return nil
   856  			}
   857  		}
   858  		switch {
   859  		case m.Type == pb.MsgPreVote:
   860  			// Never change our term in response to a PreVote
   861  		case m.Type == pb.MsgPreVoteResp && !m.Reject:
   862  			// We send pre-vote requests with a term in our future. If the
   863  			// pre-vote is granted, we will increment our term when we get a
   864  			// quorum. If it is not, the term comes from the node that
   865  			// rejected our vote so we should become a follower at the new
   866  			// term.
   867  		default:
   868  			r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
   869  				r.id, r.Term, m.Type, m.From, m.Term)
   870  			if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
   871  				r.becomeFollower(m.Term, m.From)
   872  			} else {
   873  				r.becomeFollower(m.Term, None)
   874  			}
   875  		}
   876  
   877  	case m.Term < r.Term:
   878  		if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
   879  			// We have received messages from a leader at a lower term. It is possible
   880  			// that these messages were simply delayed in the network, but this could
   881  			// also mean that this node has advanced its term number during a network
   882  			// partition, and it is now unable to either win an election or to rejoin
   883  			// the majority on the old term. If checkQuorum is false, this will be
   884  			// handled by incrementing term numbers in response to MsgVote with a
   885  			// higher term, but if checkQuorum is true we may not advance the term on
   886  			// MsgVote and must generate other messages to advance the term. The net
   887  			// result of these two features is to minimize the disruption caused by
   888  			// nodes that have been removed from the cluster's configuration: a
   889  			// removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
   890  			// but it will not receive MsgApp or MsgHeartbeat, so it will not create
   891  			// disruptive term increases, by notifying leader of this node's activeness.
   892  			// The above comments also true for Pre-Vote
   893  			//
   894  			// When follower gets isolated, it soon starts an election ending
   895  			// up with a higher term than leader, although it won't receive enough
   896  			// votes to win the election. When it regains connectivity, this response
   897  			// with "pb.MsgAppResp" of higher term would force leader to step down.
   898  			// However, this disruption is inevitable to free this stuck node with
   899  			// fresh election. This can be prevented with Pre-Vote phase.
   900  			r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
   901  		} else if m.Type == pb.MsgPreVote {
   902  			// Before Pre-Vote enable, there may have candidate with higher term,
   903  			// but less log. After update to Pre-Vote, the cluster may deadlock if
   904  			// we drop messages with a lower term.
   905  			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
   906  				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
   907  			r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
   908  		} else {
   909  			// ignore other cases
   910  			r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
   911  				r.id, r.Term, m.Type, m.From, m.Term)
   912  		}
   913  		return nil
   914  	}
   915  
   916  	switch m.Type {
   917  	case pb.MsgHup:
   918  		if r.preVote {
   919  			r.hup(campaignPreElection)
   920  		} else {
   921  			r.hup(campaignElection)
   922  		}
   923  
   924  	case pb.MsgVote, pb.MsgPreVote:
   925  		// We can vote if this is a repeat of a vote we've already cast...
   926  		canVote := r.Vote == m.From ||
   927  			// ...we haven't voted and we don't think there's a leader yet in this term...
   928  			(r.Vote == None && r.lead == None) ||
   929  			// ...or this is a PreVote for a future term...
   930  			(m.Type == pb.MsgPreVote && m.Term > r.Term)
   931  		// ...and we believe the candidate is up to date.
   932  		if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
   933  			// Note: it turns out that that learners must be allowed to cast votes.
   934  			// This seems counter- intuitive but is necessary in the situation in which
   935  			// a learner has been promoted (i.e. is now a voter) but has not learned
   936  			// about this yet.
   937  			// For example, consider a group in which id=1 is a learner and id=2 and
   938  			// id=3 are voters. A configuration change promoting 1 can be committed on
   939  			// the quorum `{2,3}` without the config change being appended to the
   940  			// learner's log. If the leader (say 2) fails, there are de facto two
   941  			// voters remaining. Only 3 can win an election (due to its log containing
   942  			// all committed entries), but to do so it will need 1 to vote. But 1
   943  			// considers itself a learner and will continue to do so until 3 has
   944  			// stepped up as leader, replicates the conf change to 1, and 1 applies it.
   945  			// Ultimately, by receiving a request to vote, the learner realizes that
   946  			// the candidate believes it to be a voter, and that it should act
   947  			// accordingly. The candidate's config may be stale, too; but in that case
   948  			// it won't win the election, at least in the absence of the bug discussed
   949  			// in:
   950  			// https://github.com/etcd-io/etcd/issues/7625#issuecomment-488798263.
   951  			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
   952  				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
   953  			// When responding to Msg{Pre,}Vote messages we include the term
   954  			// from the message, not the local term. To see why, consider the
   955  			// case where a single node was previously partitioned away and
   956  			// it's local term is now out of date. If we include the local term
   957  			// (recall that for pre-votes we don't update the local term), the
   958  			// (pre-)campaigning node on the other end will proceed to ignore
   959  			// the message (it ignores all out of date messages).
   960  			// The term in the original message and current local term are the
   961  			// same in the case of regular votes, but different for pre-votes.
   962  			r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
   963  			if m.Type == pb.MsgVote {
   964  				// Only record real votes.
   965  				r.electionElapsed = 0
   966  				r.Vote = m.From
   967  			}
   968  		} else {
   969  			r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
   970  				r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
   971  			r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
   972  		}
   973  
   974  	default:
   975  		err := r.step(r, m)
   976  		if err != nil {
   977  			return err
   978  		}
   979  	}
   980  	return nil
   981  }
   982  
   983  type stepFunc func(r *raft, m pb.Message) error
   984  
   985  func stepLeader(r *raft, m pb.Message) error {
   986  	// These message types do not require any progress for m.From.
   987  	switch m.Type {
   988  	case pb.MsgBeat:
   989  		r.bcastHeartbeat()
   990  		return nil
   991  	case pb.MsgCheckQuorum:
   992  		// The leader should always see itself as active. As a precaution, handle
   993  		// the case in which the leader isn't in the configuration any more (for
   994  		// example if it just removed itself).
   995  		//
   996  		// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
   997  		// leader steps down when removing itself. I might be missing something.
   998  		if pr := r.prs.Progress[r.id]; pr != nil {
   999  			pr.RecentActive = true
  1000  		}
  1001  		if !r.prs.QuorumActive() {
  1002  			r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
  1003  			r.becomeFollower(r.Term, None)
  1004  		}
  1005  		// Mark everyone (but ourselves) as inactive in preparation for the next
  1006  		// CheckQuorum.
  1007  		r.prs.Visit(func(id uint64, pr *tracker.Progress) {
  1008  			if id != r.id {
  1009  				pr.RecentActive = false
  1010  			}
  1011  		})
  1012  		return nil
  1013  	case pb.MsgProp:
  1014  		if len(m.Entries) == 0 {
  1015  			r.logger.Panicf("%x stepped empty MsgProp", r.id)
  1016  		}
  1017  		if r.prs.Progress[r.id] == nil {
  1018  			// If we are not currently a member of the range (i.e. this node
  1019  			// was removed from the configuration while serving as leader),
  1020  			// drop any new proposals.
  1021  			return ErrProposalDropped
  1022  		}
  1023  		if r.leadTransferee != None {
  1024  			r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
  1025  			return ErrProposalDropped
  1026  		}
  1027  
  1028  		for i := range m.Entries {
  1029  			e := &m.Entries[i]
  1030  			var cc pb.ConfChangeI
  1031  			if e.Type == pb.EntryConfChange {
  1032  				var ccc pb.ConfChange
  1033  				if err := ccc.Unmarshal(e.Data); err != nil {
  1034  					panic(err)
  1035  				}
  1036  				cc = ccc
  1037  			} else if e.Type == pb.EntryConfChangeV2 {
  1038  				var ccc pb.ConfChangeV2
  1039  				if err := ccc.Unmarshal(e.Data); err != nil {
  1040  					panic(err)
  1041  				}
  1042  				cc = ccc
  1043  			}
  1044  			if cc != nil {
  1045  				alreadyPending := r.pendingConfIndex > r.raftLog.applied
  1046  				alreadyJoint := len(r.prs.Config.Voters[1]) > 0
  1047  				wantsLeaveJoint := len(cc.AsV2().Changes) == 0
  1048  
  1049  				var refused string
  1050  				if alreadyPending {
  1051  					refused = fmt.Sprintf("possible unapplied conf change at index %d (applied to %d)", r.pendingConfIndex, r.raftLog.applied)
  1052  				} else if alreadyJoint && !wantsLeaveJoint {
  1053  					refused = "must transition out of joint config first"
  1054  				} else if !alreadyJoint && wantsLeaveJoint {
  1055  					refused = "not in joint state; refusing empty conf change"
  1056  				}
  1057  
  1058  				if refused != "" {
  1059  					r.logger.Infof("%x ignoring conf change %v at config %s: %s", r.id, cc, r.prs.Config, refused)
  1060  					m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
  1061  				} else {
  1062  					r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
  1063  				}
  1064  			}
  1065  		}
  1066  
  1067  		if !r.appendEntry(m.Entries...) {
  1068  			return ErrProposalDropped
  1069  		}
  1070  		r.bcastAppend()
  1071  		return nil
  1072  	case pb.MsgReadIndex:
  1073  		// only one voting member (the leader) in the cluster
  1074  		if r.prs.IsSingleton() {
  1075  			if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
  1076  				r.send(resp)
  1077  			}
  1078  			return nil
  1079  		}
  1080  
  1081  		// Postpone read only request when this leader has not committed
  1082  		// any log entry at its term.
  1083  		if !r.committedEntryInCurrentTerm() {
  1084  			r.pendingReadIndexMessages = append(r.pendingReadIndexMessages, m)
  1085  			return nil
  1086  		}
  1087  
  1088  		sendMsgReadIndexResponse(r, m)
  1089  
  1090  		return nil
  1091  	}
  1092  
  1093  	// All other message types require a progress for m.From (pr).
  1094  	pr := r.prs.Progress[m.From]
  1095  	if pr == nil {
  1096  		r.logger.Debugf("%x no progress available for %x", r.id, m.From)
  1097  		return nil
  1098  	}
  1099  	switch m.Type {
  1100  	case pb.MsgAppResp:
  1101  		pr.RecentActive = true
  1102  
  1103  		if m.Reject {
  1104  			// RejectHint is the suggested next base entry for appending (i.e.
  1105  			// we try to append entry RejectHint+1 next), and LogTerm is the
  1106  			// term that the follower has at index RejectHint. Older versions
  1107  			// of this library did not populate LogTerm for rejections and it
  1108  			// is zero for followers with an empty log.
  1109  			//
  1110  			// Under normal circumstances, the leader's log is longer than the
  1111  			// follower's and the follower's log is a prefix of the leader's
  1112  			// (i.e. there is no divergent uncommitted suffix of the log on the
  1113  			// follower). In that case, the first probe reveals where the
  1114  			// follower's log ends (RejectHint=follower's last index) and the
  1115  			// subsequent probe succeeds.
  1116  			//
  1117  			// However, when networks are partitioned or systems overloaded,
  1118  			// large divergent log tails can occur. The naive attempt, probing
  1119  			// entry by entry in decreasing order, will be the product of the
  1120  			// length of the diverging tails and the network round-trip latency,
  1121  			// which can easily result in hours of time spent probing and can
  1122  			// even cause outright outages. The probes are thus optimized as
  1123  			// described below.
  1124  			r.logger.Debugf("%x received MsgAppResp(rejected, hint: (index %d, term %d)) from %x for index %d",
  1125  				r.id, m.RejectHint, m.LogTerm, m.From, m.Index)
  1126  			nextProbeIdx := m.RejectHint
  1127  			if m.LogTerm > 0 {
  1128  				// If the follower has an uncommitted log tail, we would end up
  1129  				// probing one by one until we hit the common prefix.
  1130  				//
  1131  				// For example, if the leader has:
  1132  				//
  1133  				//   idx        1 2 3 4 5 6 7 8 9
  1134  				//              -----------------
  1135  				//   term (L)   1 3 3 3 5 5 5 5 5
  1136  				//   term (F)   1 1 1 1 2 2
  1137  				//
  1138  				// Then, after sending an append anchored at (idx=9,term=5) we
  1139  				// would receive a RejectHint of 6 and LogTerm of 2. Without the
  1140  				// code below, we would try an append at index 6, which would
  1141  				// fail again.
  1142  				//
  1143  				// However, looking only at what the leader knows about its own
  1144  				// log and the rejection hint, it is clear that a probe at index
  1145  				// 6, 5, 4, 3, and 2 must fail as well:
  1146  				//
  1147  				// For all of these indexes, the leader's log term is larger than
  1148  				// the rejection's log term. If a probe at one of these indexes
  1149  				// succeeded, its log term at that index would match the leader's,
  1150  				// i.e. 3 or 5 in this example. But the follower already told the
  1151  				// leader that it is still at term 2 at index 9, and since the
  1152  				// log term only ever goes up (within a log), this is a contradiction.
  1153  				//
  1154  				// At index 1, however, the leader can draw no such conclusion,
  1155  				// as its term 1 is not larger than the term 2 from the
  1156  				// follower's rejection. We thus probe at 1, which will succeed
  1157  				// in this example. In general, with this approach we probe at
  1158  				// most once per term found in the leader's log.
  1159  				//
  1160  				// There is a similar mechanism on the follower (implemented in
  1161  				// handleAppendEntries via a call to findConflictByTerm) that is
  1162  				// useful if the follower has a large divergent uncommitted log
  1163  				// tail[1], as in this example:
  1164  				//
  1165  				//   idx        1 2 3 4 5 6 7 8 9
  1166  				//              -----------------
  1167  				//   term (L)   1 3 3 3 3 3 3 3 7
  1168  				//   term (F)   1 3 3 4 4 5 5 5 6
  1169  				//
  1170  				// Naively, the leader would probe at idx=9, receive a rejection
  1171  				// revealing the log term of 6 at the follower. Since the leader's
  1172  				// term at the previous index is already smaller than 6, the leader-
  1173  				// side optimization discussed above is ineffective. The leader thus
  1174  				// probes at index 8 and, naively, receives a rejection for the same
  1175  				// index and log term 5. Again, the leader optimization does not improve
  1176  				// over linear probing as term 5 is above the leader's term 3 for that
  1177  				// and many preceding indexes; the leader would have to probe linearly
  1178  				// until it would finally hit index 3, where the probe would succeed.
  1179  				//
  1180  				// Instead, we apply a similar optimization on the follower. When the
  1181  				// follower receives the probe at index 8 (log term 3), it concludes
  1182  				// that all of the leader's log preceding that index has log terms of
  1183  				// 3 or below. The largest index in the follower's log with a log term
  1184  				// of 3 or below is index 3. The follower will thus return a rejection
  1185  				// for index=3, log term=3 instead. The leader's next probe will then
  1186  				// succeed at that index.
  1187  				//
  1188  				// [1]: more precisely, if the log terms in the large uncommitted
  1189  				// tail on the follower are larger than the leader's. At first,
  1190  				// it may seem unintuitive that a follower could even have such
  1191  				// a large tail, but it can happen:
  1192  				//
  1193  				// 1. Leader appends (but does not commit) entries 2 and 3, crashes.
  1194  				//   idx        1 2 3 4 5 6 7 8 9
  1195  				//              -----------------
  1196  				//   term (L)   1 2 2     [crashes]
  1197  				//   term (F)   1
  1198  				//   term (F)   1
  1199  				//
  1200  				// 2. a follower becomes leader and appends entries at term 3.
  1201  				//              -----------------
  1202  				//   term (x)   1 2 2     [down]
  1203  				//   term (F)   1 3 3 3 3
  1204  				//   term (F)   1
  1205  				//
  1206  				// 3. term 3 leader goes down, term 2 leader returns as term 4
  1207  				//    leader. It commits the log & entries at term 4.
  1208  				//
  1209  				//              -----------------
  1210  				//   term (L)   1 2 2 2
  1211  				//   term (x)   1 3 3 3 3 [down]
  1212  				//   term (F)   1
  1213  				//              -----------------
  1214  				//   term (L)   1 2 2 2 4 4 4
  1215  				//   term (F)   1 3 3 3 3 [gets probed]
  1216  				//   term (F)   1 2 2 2 4 4 4
  1217  				//
  1218  				// 4. the leader will now probe the returning follower at index
  1219  				//    7, the rejection points it at the end of the follower's log
  1220  				//    which is at a higher log term than the actually committed
  1221  				//    log.
  1222  				nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
  1223  			}
  1224  			if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
  1225  				r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
  1226  				if pr.State == tracker.StateReplicate {
  1227  					pr.BecomeProbe()
  1228  				}
  1229  				r.sendAppend(m.From)
  1230  			}
  1231  		} else {
  1232  			oldPaused := pr.IsPaused()
  1233  			if pr.MaybeUpdate(m.Index) {
  1234  				switch {
  1235  				case pr.State == tracker.StateProbe:
  1236  					pr.BecomeReplicate()
  1237  				case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
  1238  					// TODO(tbg): we should also enter this branch if a snapshot is
  1239  					// received that is below pr.PendingSnapshot but which makes it
  1240  					// possible to use the log again.
  1241  					r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
  1242  					// Transition back to replicating state via probing state
  1243  					// (which takes the snapshot into account). If we didn't
  1244  					// move to replicating state, that would only happen with
  1245  					// the next round of appends (but there may not be a next
  1246  					// round for a while, exposing an inconsistent RaftStatus).
  1247  					pr.BecomeProbe()
  1248  					pr.BecomeReplicate()
  1249  				case pr.State == tracker.StateReplicate:
  1250  					pr.Inflights.FreeLE(m.Index)
  1251  				}
  1252  
  1253  				if r.maybeCommit() {
  1254  					// committed index has progressed for the term, so it is safe
  1255  					// to respond to pending read index requests
  1256  					releasePendingReadIndexMessages(r)
  1257  					r.bcastAppend()
  1258  				} else if oldPaused {
  1259  					// If we were paused before, this node may be missing the
  1260  					// latest commit index, so send it.
  1261  					r.sendAppend(m.From)
  1262  				}
  1263  				// We've updated flow control information above, which may
  1264  				// allow us to send multiple (size-limited) in-flight messages
  1265  				// at once (such as when transitioning from probe to
  1266  				// replicate, or when freeTo() covers multiple messages). If
  1267  				// we have more entries to send, send as many messages as we
  1268  				// can (without sending empty messages for the commit index)
  1269  				for r.maybeSendAppend(m.From, false) {
  1270  				}
  1271  				// Transfer leadership is in progress.
  1272  				if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
  1273  					r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
  1274  					r.sendTimeoutNow(m.From)
  1275  				}
  1276  			}
  1277  		}
  1278  	case pb.MsgHeartbeatResp:
  1279  		pr.RecentActive = true
  1280  		pr.ProbeSent = false
  1281  
  1282  		// free one slot for the full inflights window to allow progress.
  1283  		if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
  1284  			pr.Inflights.FreeFirstOne()
  1285  		}
  1286  		if pr.Match < r.raftLog.lastIndex() {
  1287  			r.sendAppend(m.From)
  1288  		}
  1289  
  1290  		if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
  1291  			return nil
  1292  		}
  1293  
  1294  		if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
  1295  			return nil
  1296  		}
  1297  
  1298  		rss := r.readOnly.advance(m)
  1299  		for _, rs := range rss {
  1300  			if resp := r.responseToReadIndexReq(rs.req, rs.index); resp.To != None {
  1301  				r.send(resp)
  1302  			}
  1303  		}
  1304  	case pb.MsgSnapStatus:
  1305  		if pr.State != tracker.StateSnapshot {
  1306  			return nil
  1307  		}
  1308  		// TODO(tbg): this code is very similar to the snapshot handling in
  1309  		// MsgAppResp above. In fact, the code there is more correct than the
  1310  		// code here and should likely be updated to match (or even better, the
  1311  		// logic pulled into a newly created Progress state machine handler).
  1312  		if !m.Reject {
  1313  			pr.BecomeProbe()
  1314  			r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
  1315  		} else {
  1316  			// NB: the order here matters or we'll be probing erroneously from
  1317  			// the snapshot index, but the snapshot never applied.
  1318  			pr.PendingSnapshot = 0
  1319  			pr.BecomeProbe()
  1320  			r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
  1321  		}
  1322  		// If snapshot finish, wait for the MsgAppResp from the remote node before sending
  1323  		// out the next MsgApp.
  1324  		// If snapshot failure, wait for a heartbeat interval before next try
  1325  		pr.ProbeSent = true
  1326  	case pb.MsgUnreachable:
  1327  		// During optimistic replication, if the remote becomes unreachable,
  1328  		// there is huge probability that a MsgApp is lost.
  1329  		if pr.State == tracker.StateReplicate {
  1330  			pr.BecomeProbe()
  1331  		}
  1332  		r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
  1333  	case pb.MsgTransferLeader:
  1334  		if pr.IsLearner {
  1335  			r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
  1336  			return nil
  1337  		}
  1338  		leadTransferee := m.From
  1339  		lastLeadTransferee := r.leadTransferee
  1340  		if lastLeadTransferee != None {
  1341  			if lastLeadTransferee == leadTransferee {
  1342  				r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
  1343  					r.id, r.Term, leadTransferee, leadTransferee)
  1344  				return nil
  1345  			}
  1346  			r.abortLeaderTransfer()
  1347  			r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
  1348  		}
  1349  		if leadTransferee == r.id {
  1350  			r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
  1351  			return nil
  1352  		}
  1353  		// Transfer leadership to third party.
  1354  		r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
  1355  		// Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
  1356  		r.electionElapsed = 0
  1357  		r.leadTransferee = leadTransferee
  1358  		if pr.Match == r.raftLog.lastIndex() {
  1359  			r.sendTimeoutNow(leadTransferee)
  1360  			r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
  1361  		} else {
  1362  			r.sendAppend(leadTransferee)
  1363  		}
  1364  	}
  1365  	return nil
  1366  }
  1367  
  1368  // stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
  1369  // whether they respond to MsgVoteResp or MsgPreVoteResp.
  1370  func stepCandidate(r *raft, m pb.Message) error {
  1371  	// Only handle vote responses corresponding to our candidacy (while in
  1372  	// StateCandidate, we may get stale MsgPreVoteResp messages in this term from
  1373  	// our pre-candidate state).
  1374  	var myVoteRespType pb.MessageType
  1375  	if r.state == StatePreCandidate {
  1376  		myVoteRespType = pb.MsgPreVoteResp
  1377  	} else {
  1378  		myVoteRespType = pb.MsgVoteResp
  1379  	}
  1380  	switch m.Type {
  1381  	case pb.MsgProp:
  1382  		r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
  1383  		return ErrProposalDropped
  1384  	case pb.MsgApp:
  1385  		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
  1386  		r.handleAppendEntries(m)
  1387  	case pb.MsgHeartbeat:
  1388  		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
  1389  		r.handleHeartbeat(m)
  1390  	case pb.MsgSnap:
  1391  		r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
  1392  		r.handleSnapshot(m)
  1393  	case myVoteRespType:
  1394  		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
  1395  		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
  1396  		switch res {
  1397  		case quorum.VoteWon:
  1398  			if r.state == StatePreCandidate {
  1399  				r.campaign(campaignElection)
  1400  			} else {
  1401  				r.becomeLeader()
  1402  				r.bcastAppend()
  1403  			}
  1404  		case quorum.VoteLost:
  1405  			// pb.MsgPreVoteResp contains future term of pre-candidate
  1406  			// m.Term > r.Term; reuse r.Term
  1407  			r.becomeFollower(r.Term, None)
  1408  		}
  1409  	case pb.MsgTimeoutNow:
  1410  		r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
  1411  	}
  1412  	return nil
  1413  }
  1414  
  1415  func stepFollower(r *raft, m pb.Message) error {
  1416  	switch m.Type {
  1417  	case pb.MsgProp:
  1418  		if r.lead == None {
  1419  			r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
  1420  			return ErrProposalDropped
  1421  		} else if r.disableProposalForwarding {
  1422  			r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
  1423  			return ErrProposalDropped
  1424  		}
  1425  		m.To = r.lead
  1426  		r.send(m)
  1427  	case pb.MsgApp:
  1428  		r.electionElapsed = 0
  1429  		r.lead = m.From
  1430  		r.handleAppendEntries(m)
  1431  	case pb.MsgHeartbeat:
  1432  		r.electionElapsed = 0
  1433  		r.lead = m.From
  1434  		r.handleHeartbeat(m)
  1435  	case pb.MsgSnap:
  1436  		r.electionElapsed = 0
  1437  		r.lead = m.From
  1438  		r.handleSnapshot(m)
  1439  	case pb.MsgTransferLeader:
  1440  		if r.lead == None {
  1441  			r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
  1442  			return nil
  1443  		}
  1444  		m.To = r.lead
  1445  		r.send(m)
  1446  	case pb.MsgTimeoutNow:
  1447  		r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
  1448  		// Leadership transfers never use pre-vote even if r.preVote is true; we
  1449  		// know we are not recovering from a partition so there is no need for the
  1450  		// extra round trip.
  1451  		r.hup(campaignTransfer)
  1452  	case pb.MsgReadIndex:
  1453  		if r.lead == None {
  1454  			r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
  1455  			return nil
  1456  		}
  1457  		m.To = r.lead
  1458  		r.send(m)
  1459  	case pb.MsgReadIndexResp:
  1460  		if len(m.Entries) != 1 {
  1461  			r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
  1462  			return nil
  1463  		}
  1464  		r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
  1465  	}
  1466  	return nil
  1467  }
  1468  
  1469  func (r *raft) handleAppendEntries(m pb.Message) {
  1470  	if m.Index < r.raftLog.committed {
  1471  		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  1472  		return
  1473  	}
  1474  
  1475  	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
  1476  		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
  1477  	} else {
  1478  		r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
  1479  			r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
  1480  
  1481  		// Return a hint to the leader about the maximum index and term that the
  1482  		// two logs could be divergent at. Do this by searching through the
  1483  		// follower's log for the maximum (index, term) pair with a term <= the
  1484  		// MsgApp's LogTerm and an index <= the MsgApp's Index. This can help
  1485  		// skip all indexes in the follower's uncommitted tail with terms
  1486  		// greater than the MsgApp's LogTerm.
  1487  		//
  1488  		// See the other caller for findConflictByTerm (in stepLeader) for a much
  1489  		// more detailed explanation of this mechanism.
  1490  		hintIndex := min(m.Index, r.raftLog.lastIndex())
  1491  		hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
  1492  		hintTerm, err := r.raftLog.term(hintIndex)
  1493  		if err != nil {
  1494  			panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
  1495  		}
  1496  		r.send(pb.Message{
  1497  			To:         m.From,
  1498  			Type:       pb.MsgAppResp,
  1499  			Index:      m.Index,
  1500  			Reject:     true,
  1501  			RejectHint: hintIndex,
  1502  			LogTerm:    hintTerm,
  1503  		})
  1504  	}
  1505  }
  1506  
  1507  func (r *raft) handleHeartbeat(m pb.Message) {
  1508  	r.raftLog.commitTo(m.Commit)
  1509  	r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
  1510  }
  1511  
  1512  func (r *raft) handleSnapshot(m pb.Message) {
  1513  	sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
  1514  	if r.restore(m.Snapshot) {
  1515  		r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
  1516  			r.id, r.raftLog.committed, sindex, sterm)
  1517  		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
  1518  	} else {
  1519  		r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
  1520  			r.id, r.raftLog.committed, sindex, sterm)
  1521  		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
  1522  	}
  1523  }
  1524  
  1525  // restore recovers the state machine from a snapshot. It restores the log and the
  1526  // configuration of state machine. If this method returns false, the snapshot was
  1527  // ignored, either because it was obsolete or because of an error.
  1528  func (r *raft) restore(s pb.Snapshot) bool {
  1529  	if s.Metadata.Index <= r.raftLog.committed {
  1530  		return false
  1531  	}
  1532  	if r.state != StateFollower {
  1533  		// This is defense-in-depth: if the leader somehow ended up applying a
  1534  		// snapshot, it could move into a new term without moving into a
  1535  		// follower state. This should never fire, but if it did, we'd have
  1536  		// prevented damage by returning early, so log only a loud warning.
  1537  		//
  1538  		// At the time of writing, the instance is guaranteed to be in follower
  1539  		// state when this method is called.
  1540  		r.logger.Warningf("%x attempted to restore snapshot as leader; should never happen", r.id)
  1541  		r.becomeFollower(r.Term+1, None)
  1542  		return false
  1543  	}
  1544  
  1545  	// More defense-in-depth: throw away snapshot if recipient is not in the
  1546  	// config. This shouldn't ever happen (at the time of writing) but lots of
  1547  	// code here and there assumes that r.id is in the progress tracker.
  1548  	found := false
  1549  	cs := s.Metadata.ConfState
  1550  
  1551  	for _, set := range [][]uint64{
  1552  		cs.Voters,
  1553  		cs.Learners,
  1554  		cs.VotersOutgoing,
  1555  		// `LearnersNext` doesn't need to be checked. According to the rules, if a peer in
  1556  		// `LearnersNext`, it has to be in `VotersOutgoing`.
  1557  	} {
  1558  		for _, id := range set {
  1559  			if id == r.id {
  1560  				found = true
  1561  				break
  1562  			}
  1563  		}
  1564  		if found {
  1565  			break
  1566  		}
  1567  	}
  1568  	if !found {
  1569  		r.logger.Warningf(
  1570  			"%x attempted to restore snapshot but it is not in the ConfState %v; should never happen",
  1571  			r.id, cs,
  1572  		)
  1573  		return false
  1574  	}
  1575  
  1576  	// Now go ahead and actually restore.
  1577  
  1578  	if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
  1579  		r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
  1580  			r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  1581  		r.raftLog.commitTo(s.Metadata.Index)
  1582  		return false
  1583  	}
  1584  
  1585  	r.raftLog.restore(s)
  1586  
  1587  	// Reset the configuration and add the (potentially updated) peers in anew.
  1588  	r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
  1589  	cfg, prs, err := confchange.Restore(confchange.Changer{
  1590  		Tracker:   r.prs,
  1591  		LastIndex: r.raftLog.lastIndex(),
  1592  	}, cs)
  1593  
  1594  	if err != nil {
  1595  		// This should never happen. Either there's a bug in our config change
  1596  		// handling or the client corrupted the conf change.
  1597  		panic(fmt.Sprintf("unable to restore config %+v: %s", cs, err))
  1598  	}
  1599  
  1600  	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
  1601  
  1602  	pr := r.prs.Progress[r.id]
  1603  	pr.MaybeUpdate(pr.Next - 1) // TODO(tbg): this is untested and likely unneeded
  1604  
  1605  	r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] restored snapshot [index: %d, term: %d]",
  1606  		r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
  1607  	return true
  1608  }
  1609  
  1610  // promotable indicates whether state machine can be promoted to leader,
  1611  // which is true when its own id is in progress list.
  1612  func (r *raft) promotable() bool {
  1613  	pr := r.prs.Progress[r.id]
  1614  	return pr != nil && !pr.IsLearner && !r.raftLog.hasPendingSnapshot()
  1615  }
  1616  
  1617  func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
  1618  	cfg, prs, err := func() (tracker.Config, tracker.ProgressMap, error) {
  1619  		changer := confchange.Changer{
  1620  			Tracker:   r.prs,
  1621  			LastIndex: r.raftLog.lastIndex(),
  1622  		}
  1623  		if cc.LeaveJoint() {
  1624  			return changer.LeaveJoint()
  1625  		} else if autoLeave, ok := cc.EnterJoint(); ok {
  1626  			return changer.EnterJoint(autoLeave, cc.Changes...)
  1627  		}
  1628  		return changer.Simple(cc.Changes...)
  1629  	}()
  1630  
  1631  	if err != nil {
  1632  		// TODO(tbg): return the error to the caller.
  1633  		panic(err)
  1634  	}
  1635  
  1636  	return r.switchToConfig(cfg, prs)
  1637  }
  1638  
  1639  // switchToConfig reconfigures this node to use the provided configuration. It
  1640  // updates the in-memory state and, when necessary, carries out additional
  1641  // actions such as reacting to the removal of nodes or changed quorum
  1642  // requirements.
  1643  //
  1644  // The inputs usually result from restoring a ConfState or applying a ConfChange.
  1645  func (r *raft) switchToConfig(cfg tracker.Config, prs tracker.ProgressMap) pb.ConfState {
  1646  	r.prs.Config = cfg
  1647  	r.prs.Progress = prs
  1648  
  1649  	r.logger.Infof("%x switched to configuration %s", r.id, r.prs.Config)
  1650  	cs := r.prs.ConfState()
  1651  	pr, ok := r.prs.Progress[r.id]
  1652  
  1653  	// Update whether the node itself is a learner, resetting to false when the
  1654  	// node is removed.
  1655  	r.isLearner = ok && pr.IsLearner
  1656  
  1657  	if (!ok || r.isLearner) && r.state == StateLeader {
  1658  		// This node is leader and was removed or demoted. We prevent demotions
  1659  		// at the time writing but hypothetically we handle them the same way as
  1660  		// removing the leader: stepping down into the next Term.
  1661  		//
  1662  		// TODO(tbg): step down (for sanity) and ask follower with largest Match
  1663  		// to TimeoutNow (to avoid interruption). This might still drop some
  1664  		// proposals but it's better than nothing.
  1665  		//
  1666  		// TODO(tbg): test this branch. It is untested at the time of writing.
  1667  		return cs
  1668  	}
  1669  
  1670  	// The remaining steps only make sense if this node is the leader and there
  1671  	// are other nodes.
  1672  	if r.state != StateLeader || len(cs.Voters) == 0 {
  1673  		return cs
  1674  	}
  1675  
  1676  	if r.maybeCommit() {
  1677  		// If the configuration change means that more entries are committed now,
  1678  		// broadcast/append to everyone in the updated config.
  1679  		r.bcastAppend()
  1680  	} else {
  1681  		// Otherwise, still probe the newly added replicas; there's no reason to
  1682  		// let them wait out a heartbeat interval (or the next incoming
  1683  		// proposal).
  1684  		r.prs.Visit(func(id uint64, pr *tracker.Progress) {
  1685  			r.maybeSendAppend(id, false /* sendIfEmpty */)
  1686  		})
  1687  	}
  1688  	// If the the leadTransferee was removed or demoted, abort the leadership transfer.
  1689  	if _, tOK := r.prs.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 {
  1690  		r.abortLeaderTransfer()
  1691  	}
  1692  
  1693  	return cs
  1694  }
  1695  
  1696  func (r *raft) loadState(state pb.HardState) {
  1697  	if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
  1698  		r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
  1699  	}
  1700  	r.raftLog.committed = state.Commit
  1701  	r.Term = state.Term
  1702  	r.Vote = state.Vote
  1703  }
  1704  
  1705  // pastElectionTimeout returns true iff r.electionElapsed is greater
  1706  // than or equal to the randomized election timeout in
  1707  // [electiontimeout, 2 * electiontimeout - 1].
  1708  func (r *raft) pastElectionTimeout() bool {
  1709  	return r.electionElapsed >= r.randomizedElectionTimeout
  1710  }
  1711  
  1712  func (r *raft) resetRandomizedElectionTimeout() {
  1713  	r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
  1714  }
  1715  
  1716  func (r *raft) sendTimeoutNow(to uint64) {
  1717  	r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
  1718  }
  1719  
  1720  func (r *raft) abortLeaderTransfer() {
  1721  	r.leadTransferee = None
  1722  }
  1723  
  1724  // committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
  1725  func (r *raft) committedEntryInCurrentTerm() bool {
  1726  	return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term
  1727  }
  1728  
  1729  // responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer
  1730  // itself, a blank value will be returned.
  1731  func (r *raft) responseToReadIndexReq(req pb.Message, readIndex uint64) pb.Message {
  1732  	if req.From == None || req.From == r.id {
  1733  		r.readStates = append(r.readStates, ReadState{
  1734  			Index:      readIndex,
  1735  			RequestCtx: req.Entries[0].Data,
  1736  		})
  1737  		return pb.Message{}
  1738  	}
  1739  	return pb.Message{
  1740  		Type:    pb.MsgReadIndexResp,
  1741  		To:      req.From,
  1742  		Index:   readIndex,
  1743  		Entries: req.Entries,
  1744  	}
  1745  }
  1746  
  1747  // increaseUncommittedSize computes the size of the proposed entries and
  1748  // determines whether they would push leader over its maxUncommittedSize limit.
  1749  // If the new entries would exceed the limit, the method returns false. If not,
  1750  // the increase in uncommitted entry size is recorded and the method returns
  1751  // true.
  1752  //
  1753  // Empty payloads are never refused. This is used both for appending an empty
  1754  // entry at a new leader's term, as well as leaving a joint configuration.
  1755  func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
  1756  	var s uint64
  1757  	for _, e := range ents {
  1758  		s += uint64(PayloadSize(e))
  1759  	}
  1760  
  1761  	if r.uncommittedSize > 0 && s > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
  1762  		// If the uncommitted tail of the Raft log is empty, allow any size
  1763  		// proposal. Otherwise, limit the size of the uncommitted tail of the
  1764  		// log and drop any proposal that would push the size over the limit.
  1765  		// Note the added requirement s>0 which is used to make sure that
  1766  		// appending single empty entries to the log always succeeds, used both
  1767  		// for replicating a new leader's initial empty entry, and for
  1768  		// auto-leaving joint configurations.
  1769  		return false
  1770  	}
  1771  	r.uncommittedSize += s
  1772  	return true
  1773  }
  1774  
  1775  // reduceUncommittedSize accounts for the newly committed entries by decreasing
  1776  // the uncommitted entry size limit.
  1777  func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
  1778  	if r.uncommittedSize == 0 {
  1779  		// Fast-path for followers, who do not track or enforce the limit.
  1780  		return
  1781  	}
  1782  
  1783  	var s uint64
  1784  	for _, e := range ents {
  1785  		s += uint64(PayloadSize(e))
  1786  	}
  1787  	if s > r.uncommittedSize {
  1788  		// uncommittedSize may underestimate the size of the uncommitted Raft
  1789  		// log tail but will never overestimate it. Saturate at 0 instead of
  1790  		// allowing overflow.
  1791  		r.uncommittedSize = 0
  1792  	} else {
  1793  		r.uncommittedSize -= s
  1794  	}
  1795  }
  1796  
  1797  func numOfPendingConf(ents []pb.Entry) int {
  1798  	n := 0
  1799  	for i := range ents {
  1800  		if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
  1801  			n++
  1802  		}
  1803  	}
  1804  	return n
  1805  }
  1806  
  1807  func releasePendingReadIndexMessages(r *raft) {
  1808  	if !r.committedEntryInCurrentTerm() {
  1809  		r.logger.Error("pending MsgReadIndex should be released only after first commit in current term")
  1810  		return
  1811  	}
  1812  
  1813  	msgs := r.pendingReadIndexMessages
  1814  	r.pendingReadIndexMessages = nil
  1815  
  1816  	for _, m := range msgs {
  1817  		sendMsgReadIndexResponse(r, m)
  1818  	}
  1819  }
  1820  
  1821  func sendMsgReadIndexResponse(r *raft, m pb.Message) {
  1822  	// thinking: use an internally defined context instead of the user given context.
  1823  	// We can express this in terms of the term and index instead of a user-supplied value.
  1824  	// This would allow multiple reads to piggyback on the same message.
  1825  	switch r.readOnly.option {
  1826  	// If more than the local vote is needed, go through a full broadcast.
  1827  	case ReadOnlySafe:
  1828  		r.readOnly.addRequest(r.raftLog.committed, m)
  1829  		// The local node automatically acks the request.
  1830  		r.readOnly.recvAck(r.id, m.Entries[0].Data)
  1831  		r.bcastHeartbeatWithCtx(m.Entries[0].Data)
  1832  	case ReadOnlyLeaseBased:
  1833  		if resp := r.responseToReadIndexReq(m, r.raftLog.committed); resp.To != None {
  1834  			r.send(resp)
  1835  		}
  1836  	}
  1837  }
  1838  

View as plain text