1 // Copyright 2019 The etcd Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package tracker 16 17 import ( 18 "fmt" 19 "sort" 20 "strings" 21 ) 22 23 // Progress represents a follower’s progress in the view of the leader. Leader 24 // maintains progresses of all followers, and sends entries to the follower 25 // based on its progress. 26 // 27 // NB(tbg): Progress is basically a state machine whose transitions are mostly 28 // strewn around `*raft.raft`. Additionally, some fields are only used when in a 29 // certain State. All of this isn't ideal. 30 type Progress struct { 31 Match, Next uint64 32 // State defines how the leader should interact with the follower. 33 // 34 // When in StateProbe, leader sends at most one replication message 35 // per heartbeat interval. It also probes actual progress of the follower. 36 // 37 // When in StateReplicate, leader optimistically increases next 38 // to the latest entry sent after sending replication message. This is 39 // an optimized state for fast replicating log entries to the follower. 40 // 41 // When in StateSnapshot, leader should have sent out snapshot 42 // before and stops sending any replication message. 43 State StateType 44 45 // PendingSnapshot is used in StateSnapshot. 46 // If there is a pending snapshot, the pendingSnapshot will be set to the 47 // index of the snapshot. If pendingSnapshot is set, the replication process of 48 // this Progress will be paused. raft will not resend snapshot until the pending one 49 // is reported to be failed. 50 PendingSnapshot uint64 51 52 // RecentActive is true if the progress is recently active. Receiving any messages 53 // from the corresponding follower indicates the progress is active. 54 // RecentActive can be reset to false after an election timeout. 55 // 56 // TODO(tbg): the leader should always have this set to true. 57 RecentActive bool 58 59 // ProbeSent is used while this follower is in StateProbe. When ProbeSent is 60 // true, raft should pause sending replication message to this peer until 61 // ProbeSent is reset. See ProbeAcked() and IsPaused(). 62 ProbeSent bool 63 64 // Inflights is a sliding window for the inflight messages. 65 // Each inflight message contains one or more log entries. 66 // The max number of entries per message is defined in raft config as MaxSizePerMsg. 67 // Thus inflight effectively limits both the number of inflight messages 68 // and the bandwidth each Progress can use. 69 // When inflights is Full, no more message should be sent. 70 // When a leader sends out a message, the index of the last 71 // entry should be added to inflights. The index MUST be added 72 // into inflights in order. 73 // When a leader receives a reply, the previous inflights should 74 // be freed by calling inflights.FreeLE with the index of the last 75 // received entry. 76 Inflights *Inflights 77 78 // IsLearner is true if this progress is tracked for a learner. 79 IsLearner bool 80 } 81 82 // ResetState moves the Progress into the specified State, resetting ProbeSent, 83 // PendingSnapshot, and Inflights. 84 func (pr *Progress) ResetState(state StateType) { 85 pr.ProbeSent = false 86 pr.PendingSnapshot = 0 87 pr.State = state 88 pr.Inflights.reset() 89 } 90 91 func max(a, b uint64) uint64 { 92 if a > b { 93 return a 94 } 95 return b 96 } 97 98 func min(a, b uint64) uint64 { 99 if a > b { 100 return b 101 } 102 return a 103 } 104 105 // ProbeAcked is called when this peer has accepted an append. It resets 106 // ProbeSent to signal that additional append messages should be sent without 107 // further delay. 108 func (pr *Progress) ProbeAcked() { 109 pr.ProbeSent = false 110 } 111 112 // BecomeProbe transitions into StateProbe. Next is reset to Match+1 or, 113 // optionally and if larger, the index of the pending snapshot. 114 func (pr *Progress) BecomeProbe() { 115 // If the original state is StateSnapshot, progress knows that 116 // the pending snapshot has been sent to this peer successfully, then 117 // probes from pendingSnapshot + 1. 118 if pr.State == StateSnapshot { 119 pendingSnapshot := pr.PendingSnapshot 120 pr.ResetState(StateProbe) 121 pr.Next = max(pr.Match+1, pendingSnapshot+1) 122 } else { 123 pr.ResetState(StateProbe) 124 pr.Next = pr.Match + 1 125 } 126 } 127 128 // BecomeReplicate transitions into StateReplicate, resetting Next to Match+1. 129 func (pr *Progress) BecomeReplicate() { 130 pr.ResetState(StateReplicate) 131 pr.Next = pr.Match + 1 132 } 133 134 // BecomeSnapshot moves the Progress to StateSnapshot with the specified pending 135 // snapshot index. 136 func (pr *Progress) BecomeSnapshot(snapshoti uint64) { 137 pr.ResetState(StateSnapshot) 138 pr.PendingSnapshot = snapshoti 139 } 140 141 // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the 142 // index acked by it. The method returns false if the given n index comes from 143 // an outdated message. Otherwise it updates the progress and returns true. 144 func (pr *Progress) MaybeUpdate(n uint64) bool { 145 var updated bool 146 if pr.Match < n { 147 pr.Match = n 148 updated = true 149 pr.ProbeAcked() 150 } 151 pr.Next = max(pr.Next, n+1) 152 return updated 153 } 154 155 // OptimisticUpdate signals that appends all the way up to and including index n 156 // are in-flight. As a result, Next is increased to n+1. 157 func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 } 158 159 // MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The 160 // arguments are the index of the append message rejected by the follower, and 161 // the hint that we want to decrease to. 162 // 163 // Rejections can happen spuriously as messages are sent out of order or 164 // duplicated. In such cases, the rejection pertains to an index that the 165 // Progress already knows were previously acknowledged, and false is returned 166 // without changing the Progress. 167 // 168 // If the rejection is genuine, Next is lowered sensibly, and the Progress is 169 // cleared for sending log entries. 170 func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { 171 if pr.State == StateReplicate { 172 // The rejection must be stale if the progress has matched and "rejected" 173 // is smaller than "match". 174 if rejected <= pr.Match { 175 return false 176 } 177 // Directly decrease next to match + 1. 178 // 179 // TODO(tbg): why not use matchHint if it's larger? 180 pr.Next = pr.Match + 1 181 return true 182 } 183 184 // The rejection must be stale if "rejected" does not match next - 1. This 185 // is because non-replicating followers are probed one entry at a time. 186 if pr.Next-1 != rejected { 187 return false 188 } 189 190 pr.Next = max(min(rejected, matchHint+1), 1) 191 pr.ProbeSent = false 192 return true 193 } 194 195 // IsPaused returns whether sending log entries to this node has been throttled. 196 // This is done when a node has rejected recent MsgApps, is currently waiting 197 // for a snapshot, or has reached the MaxInflightMsgs limit. In normal 198 // operation, this is false. A throttled node will be contacted less frequently 199 // until it has reached a state in which it's able to accept a steady stream of 200 // log entries again. 201 func (pr *Progress) IsPaused() bool { 202 switch pr.State { 203 case StateProbe: 204 return pr.ProbeSent 205 case StateReplicate: 206 return pr.Inflights.Full() 207 case StateSnapshot: 208 return true 209 default: 210 panic("unexpected state") 211 } 212 } 213 214 func (pr *Progress) String() string { 215 var buf strings.Builder 216 fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next) 217 if pr.IsLearner { 218 fmt.Fprint(&buf, " learner") 219 } 220 if pr.IsPaused() { 221 fmt.Fprint(&buf, " paused") 222 } 223 if pr.PendingSnapshot > 0 { 224 fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot) 225 } 226 if !pr.RecentActive { 227 fmt.Fprintf(&buf, " inactive") 228 } 229 if n := pr.Inflights.Count(); n > 0 { 230 fmt.Fprintf(&buf, " inflight=%d", n) 231 if pr.Inflights.Full() { 232 fmt.Fprint(&buf, "[full]") 233 } 234 } 235 return buf.String() 236 } 237 238 // ProgressMap is a map of *Progress. 239 type ProgressMap map[uint64]*Progress 240 241 // String prints the ProgressMap in sorted key order, one Progress per line. 242 func (m ProgressMap) String() string { 243 ids := make([]uint64, 0, len(m)) 244 for k := range m { 245 ids = append(ids, k) 246 } 247 sort.Slice(ids, func(i, j int) bool { 248 return ids[i] < ids[j] 249 }) 250 var buf strings.Builder 251 for _, id := range ids { 252 fmt.Fprintf(&buf, "%d: %s\n", id, m[id]) 253 } 254 return buf.String() 255 } 256