...

Source file src/go.etcd.io/etcd/raft/v3/tracker/progress.go

Documentation: go.etcd.io/etcd/raft/v3/tracker

     1  // Copyright 2019 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package tracker
    16  
    17  import (
    18  	"fmt"
    19  	"sort"
    20  	"strings"
    21  )
    22  
    23  // Progress represents a follower’s progress in the view of the leader. Leader
    24  // maintains progresses of all followers, and sends entries to the follower
    25  // based on its progress.
    26  //
    27  // NB(tbg): Progress is basically a state machine whose transitions are mostly
    28  // strewn around `*raft.raft`. Additionally, some fields are only used when in a
    29  // certain State. All of this isn't ideal.
    30  type Progress struct {
    31  	Match, Next uint64
    32  	// State defines how the leader should interact with the follower.
    33  	//
    34  	// When in StateProbe, leader sends at most one replication message
    35  	// per heartbeat interval. It also probes actual progress of the follower.
    36  	//
    37  	// When in StateReplicate, leader optimistically increases next
    38  	// to the latest entry sent after sending replication message. This is
    39  	// an optimized state for fast replicating log entries to the follower.
    40  	//
    41  	// When in StateSnapshot, leader should have sent out snapshot
    42  	// before and stops sending any replication message.
    43  	State StateType
    44  
    45  	// PendingSnapshot is used in StateSnapshot.
    46  	// If there is a pending snapshot, the pendingSnapshot will be set to the
    47  	// index of the snapshot. If pendingSnapshot is set, the replication process of
    48  	// this Progress will be paused. raft will not resend snapshot until the pending one
    49  	// is reported to be failed.
    50  	PendingSnapshot uint64
    51  
    52  	// RecentActive is true if the progress is recently active. Receiving any messages
    53  	// from the corresponding follower indicates the progress is active.
    54  	// RecentActive can be reset to false after an election timeout.
    55  	//
    56  	// TODO(tbg): the leader should always have this set to true.
    57  	RecentActive bool
    58  
    59  	// ProbeSent is used while this follower is in StateProbe. When ProbeSent is
    60  	// true, raft should pause sending replication message to this peer until
    61  	// ProbeSent is reset. See ProbeAcked() and IsPaused().
    62  	ProbeSent bool
    63  
    64  	// Inflights is a sliding window for the inflight messages.
    65  	// Each inflight message contains one or more log entries.
    66  	// The max number of entries per message is defined in raft config as MaxSizePerMsg.
    67  	// Thus inflight effectively limits both the number of inflight messages
    68  	// and the bandwidth each Progress can use.
    69  	// When inflights is Full, no more message should be sent.
    70  	// When a leader sends out a message, the index of the last
    71  	// entry should be added to inflights. The index MUST be added
    72  	// into inflights in order.
    73  	// When a leader receives a reply, the previous inflights should
    74  	// be freed by calling inflights.FreeLE with the index of the last
    75  	// received entry.
    76  	Inflights *Inflights
    77  
    78  	// IsLearner is true if this progress is tracked for a learner.
    79  	IsLearner bool
    80  }
    81  
    82  // ResetState moves the Progress into the specified State, resetting ProbeSent,
    83  // PendingSnapshot, and Inflights.
    84  func (pr *Progress) ResetState(state StateType) {
    85  	pr.ProbeSent = false
    86  	pr.PendingSnapshot = 0
    87  	pr.State = state
    88  	pr.Inflights.reset()
    89  }
    90  
    91  func max(a, b uint64) uint64 {
    92  	if a > b {
    93  		return a
    94  	}
    95  	return b
    96  }
    97  
    98  func min(a, b uint64) uint64 {
    99  	if a > b {
   100  		return b
   101  	}
   102  	return a
   103  }
   104  
   105  // ProbeAcked is called when this peer has accepted an append. It resets
   106  // ProbeSent to signal that additional append messages should be sent without
   107  // further delay.
   108  func (pr *Progress) ProbeAcked() {
   109  	pr.ProbeSent = false
   110  }
   111  
   112  // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
   113  // optionally and if larger, the index of the pending snapshot.
   114  func (pr *Progress) BecomeProbe() {
   115  	// If the original state is StateSnapshot, progress knows that
   116  	// the pending snapshot has been sent to this peer successfully, then
   117  	// probes from pendingSnapshot + 1.
   118  	if pr.State == StateSnapshot {
   119  		pendingSnapshot := pr.PendingSnapshot
   120  		pr.ResetState(StateProbe)
   121  		pr.Next = max(pr.Match+1, pendingSnapshot+1)
   122  	} else {
   123  		pr.ResetState(StateProbe)
   124  		pr.Next = pr.Match + 1
   125  	}
   126  }
   127  
   128  // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
   129  func (pr *Progress) BecomeReplicate() {
   130  	pr.ResetState(StateReplicate)
   131  	pr.Next = pr.Match + 1
   132  }
   133  
   134  // BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
   135  // snapshot index.
   136  func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
   137  	pr.ResetState(StateSnapshot)
   138  	pr.PendingSnapshot = snapshoti
   139  }
   140  
   141  // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
   142  // index acked by it. The method returns false if the given n index comes from
   143  // an outdated message. Otherwise it updates the progress and returns true.
   144  func (pr *Progress) MaybeUpdate(n uint64) bool {
   145  	var updated bool
   146  	if pr.Match < n {
   147  		pr.Match = n
   148  		updated = true
   149  		pr.ProbeAcked()
   150  	}
   151  	pr.Next = max(pr.Next, n+1)
   152  	return updated
   153  }
   154  
   155  // OptimisticUpdate signals that appends all the way up to and including index n
   156  // are in-flight. As a result, Next is increased to n+1.
   157  func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
   158  
   159  // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
   160  // arguments are the index of the append message rejected by the follower, and
   161  // the hint that we want to decrease to.
   162  //
   163  // Rejections can happen spuriously as messages are sent out of order or
   164  // duplicated. In such cases, the rejection pertains to an index that the
   165  // Progress already knows were previously acknowledged, and false is returned
   166  // without changing the Progress.
   167  //
   168  // If the rejection is genuine, Next is lowered sensibly, and the Progress is
   169  // cleared for sending log entries.
   170  func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool {
   171  	if pr.State == StateReplicate {
   172  		// The rejection must be stale if the progress has matched and "rejected"
   173  		// is smaller than "match".
   174  		if rejected <= pr.Match {
   175  			return false
   176  		}
   177  		// Directly decrease next to match + 1.
   178  		//
   179  		// TODO(tbg): why not use matchHint if it's larger?
   180  		pr.Next = pr.Match + 1
   181  		return true
   182  	}
   183  
   184  	// The rejection must be stale if "rejected" does not match next - 1. This
   185  	// is because non-replicating followers are probed one entry at a time.
   186  	if pr.Next-1 != rejected {
   187  		return false
   188  	}
   189  
   190  	pr.Next = max(min(rejected, matchHint+1), 1)
   191  	pr.ProbeSent = false
   192  	return true
   193  }
   194  
   195  // IsPaused returns whether sending log entries to this node has been throttled.
   196  // This is done when a node has rejected recent MsgApps, is currently waiting
   197  // for a snapshot, or has reached the MaxInflightMsgs limit. In normal
   198  // operation, this is false. A throttled node will be contacted less frequently
   199  // until it has reached a state in which it's able to accept a steady stream of
   200  // log entries again.
   201  func (pr *Progress) IsPaused() bool {
   202  	switch pr.State {
   203  	case StateProbe:
   204  		return pr.ProbeSent
   205  	case StateReplicate:
   206  		return pr.Inflights.Full()
   207  	case StateSnapshot:
   208  		return true
   209  	default:
   210  		panic("unexpected state")
   211  	}
   212  }
   213  
   214  func (pr *Progress) String() string {
   215  	var buf strings.Builder
   216  	fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next)
   217  	if pr.IsLearner {
   218  		fmt.Fprint(&buf, " learner")
   219  	}
   220  	if pr.IsPaused() {
   221  		fmt.Fprint(&buf, " paused")
   222  	}
   223  	if pr.PendingSnapshot > 0 {
   224  		fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot)
   225  	}
   226  	if !pr.RecentActive {
   227  		fmt.Fprintf(&buf, " inactive")
   228  	}
   229  	if n := pr.Inflights.Count(); n > 0 {
   230  		fmt.Fprintf(&buf, " inflight=%d", n)
   231  		if pr.Inflights.Full() {
   232  			fmt.Fprint(&buf, "[full]")
   233  		}
   234  	}
   235  	return buf.String()
   236  }
   237  
   238  // ProgressMap is a map of *Progress.
   239  type ProgressMap map[uint64]*Progress
   240  
   241  // String prints the ProgressMap in sorted key order, one Progress per line.
   242  func (m ProgressMap) String() string {
   243  	ids := make([]uint64, 0, len(m))
   244  	for k := range m {
   245  		ids = append(ids, k)
   246  	}
   247  	sort.Slice(ids, func(i, j int) bool {
   248  		return ids[i] < ids[j]
   249  	})
   250  	var buf strings.Builder
   251  	for _, id := range ids {
   252  		fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
   253  	}
   254  	return buf.String()
   255  }
   256  

View as plain text