1 // Copyright 2015 The etcd Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 /* 16 Package raft sends and receives messages in the Protocol Buffer format 17 defined in the raftpb package. 18 19 Raft is a protocol with which a cluster of nodes can maintain a replicated state machine. 20 The state machine is kept in sync through the use of a replicated log. 21 For more details on Raft, see "In Search of an Understandable Consensus Algorithm" 22 (https://raft.github.io/raft.pdf) by Diego Ongaro and John Ousterhout. 23 24 A simple example application, _raftexample_, is also available to help illustrate 25 how to use this package in practice: 26 https://github.com/etcd-io/etcd/tree/main/contrib/raftexample 27 28 # Usage 29 30 The primary object in raft is a Node. You either start a Node from scratch 31 using raft.StartNode or start a Node from some initial state using raft.RestartNode. 32 33 To start a node from scratch: 34 35 storage := raft.NewMemoryStorage() 36 c := &Config{ 37 ID: 0x01, 38 ElectionTick: 10, 39 HeartbeatTick: 1, 40 Storage: storage, 41 MaxSizePerMsg: 4096, 42 MaxInflightMsgs: 256, 43 } 44 n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}}) 45 46 To restart a node from previous state: 47 48 storage := raft.NewMemoryStorage() 49 50 // recover the in-memory storage from persistent 51 // snapshot, state and entries. 52 storage.ApplySnapshot(snapshot) 53 storage.SetHardState(state) 54 storage.Append(entries) 55 56 c := &Config{ 57 ID: 0x01, 58 ElectionTick: 10, 59 HeartbeatTick: 1, 60 Storage: storage, 61 MaxSizePerMsg: 4096, 62 MaxInflightMsgs: 256, 63 } 64 65 // restart raft without peer information. 66 // peer information is already included in the storage. 67 n := raft.RestartNode(c) 68 69 Now that you are holding onto a Node you have a few responsibilities: 70 71 First, you must read from the Node.Ready() channel and process the updates 72 it contains. These steps may be performed in parallel, except as noted in step 73 2. 74 75 1. Write HardState, Entries, and Snapshot to persistent storage if they are 76 not empty. Note that when writing an Entry with Index i, any 77 previously-persisted entries with Index >= i must be discarded. 78 79 2. Send all Messages to the nodes named in the To field. It is important that 80 no messages be sent until the latest HardState has been persisted to disk, 81 and all Entries written by any previous Ready batch (Messages may be sent while 82 entries from the same batch are being persisted). To reduce the I/O latency, an 83 optimization can be applied to make leader write to disk in parallel with its 84 followers (as explained at section 10.2.1 in Raft thesis). If any Message has type 85 MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be 86 large). 87 88 Note: Marshalling messages is not thread-safe; it is important that you 89 make sure that no new entries are persisted while marshalling. 90 The easiest way to achieve this is to serialize the messages directly inside 91 your main raft loop. 92 93 3. Apply Snapshot (if any) and CommittedEntries to the state machine. 94 If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange() 95 to apply it to the node. The configuration change may be cancelled at this point 96 by setting the NodeID field to zero before calling ApplyConfChange 97 (but ApplyConfChange must be called one way or the other, and the decision to cancel 98 must be based solely on the state machine and not external information such as 99 the observed health of the node). 100 101 4. Call Node.Advance() to signal readiness for the next batch of updates. 102 This may be done at any time after step 1, although all updates must be processed 103 in the order they were returned by Ready. 104 105 Second, all persisted log entries must be made available via an 106 implementation of the Storage interface. The provided MemoryStorage 107 type can be used for this (if you repopulate its state upon a 108 restart), or you can supply your own disk-backed implementation. 109 110 Third, when you receive a message from another node, pass it to Node.Step: 111 112 func recvRaftRPC(ctx context.Context, m raftpb.Message) { 113 n.Step(ctx, m) 114 } 115 116 Finally, you need to call Node.Tick() at regular intervals (probably 117 via a time.Ticker). Raft has two important timeouts: heartbeat and the 118 election timeout. However, internally to the raft package time is 119 represented by an abstract "tick". 120 121 The total state machine handling loop will look something like this: 122 123 for { 124 select { 125 case <-s.Ticker: 126 n.Tick() 127 case rd := <-s.Node.Ready(): 128 saveToStorage(rd.State, rd.Entries, rd.Snapshot) 129 send(rd.Messages) 130 if !raft.IsEmptySnap(rd.Snapshot) { 131 processSnapshot(rd.Snapshot) 132 } 133 for _, entry := range rd.CommittedEntries { 134 process(entry) 135 if entry.Type == raftpb.EntryConfChange { 136 var cc raftpb.ConfChange 137 cc.Unmarshal(entry.Data) 138 s.Node.ApplyConfChange(cc) 139 } 140 } 141 s.Node.Advance() 142 case <-s.done: 143 return 144 } 145 } 146 147 To propose changes to the state machine from your node take your application 148 data, serialize it into a byte slice and call: 149 150 n.Propose(ctx, data) 151 152 If the proposal is committed, data will appear in committed entries with type 153 raftpb.EntryNormal. There is no guarantee that a proposed command will be 154 committed; you may have to re-propose after a timeout. 155 156 To add or remove a node in a cluster, build ConfChange struct 'cc' and call: 157 158 n.ProposeConfChange(ctx, cc) 159 160 After config change is committed, some committed entry with type 161 raftpb.EntryConfChange will be returned. You must apply it to node through: 162 163 var cc raftpb.ConfChange 164 cc.Unmarshal(data) 165 n.ApplyConfChange(cc) 166 167 Note: An ID represents a unique node in a cluster for all time. A 168 given ID MUST be used only once even if the old node has been removed. 169 This means that for example IP addresses make poor node IDs since they 170 may be reused. Node IDs must be non-zero. 171 172 # Implementation notes 173 174 This implementation is up to date with the final Raft thesis 175 (https://github.com/ongardie/dissertation/blob/master/stanford.pdf), although our 176 implementation of the membership change protocol differs somewhat from 177 that described in chapter 4. The key invariant that membership changes 178 happen one node at a time is preserved, but in our implementation the 179 membership change takes effect when its entry is applied, not when it 180 is added to the log (so the entry is committed under the old 181 membership instead of the new). This is equivalent in terms of safety, 182 since the old and new configurations are guaranteed to overlap. 183 184 To ensure that we do not attempt to commit two membership changes at 185 once by matching log positions (which would be unsafe since they 186 should have different quorum requirements), we simply disallow any 187 proposed membership change while any uncommitted change appears in 188 the leader's log. 189 190 This approach introduces a problem when you try to remove a member 191 from a two-member cluster: If one of the members dies before the 192 other one receives the commit of the confchange entry, then the member 193 cannot be removed any more since the cluster cannot make progress. 194 For this reason it is highly recommended to use three or more nodes in 195 every cluster. 196 197 # MessageType 198 199 Package raft sends and receives message in Protocol Buffer format (defined 200 in raftpb package). Each state (follower, candidate, leader) implements its 201 own 'step' method ('stepFollower', 'stepCandidate', 'stepLeader') when 202 advancing with the given raftpb.Message. Each step is determined by its 203 raftpb.MessageType. Note that every step is checked by one common method 204 'Step' that safety-checks the terms of node and incoming message to prevent 205 stale log entries: 206 207 'MsgHup' is used for election. If a node is a follower or candidate, the 208 'tick' function in 'raft' struct is set as 'tickElection'. If a follower or 209 candidate has not received any heartbeat before the election timeout, it 210 passes 'MsgHup' to its Step method and becomes (or remains) a candidate to 211 start a new election. 212 213 'MsgBeat' is an internal type that signals the leader to send a heartbeat of 214 the 'MsgHeartbeat' type. If a node is a leader, the 'tick' function in 215 the 'raft' struct is set as 'tickHeartbeat', and triggers the leader to 216 send periodic 'MsgHeartbeat' messages to its followers. 217 218 'MsgProp' proposes to append data to its log entries. This is a special 219 type to redirect proposals to leader. Therefore, send method overwrites 220 raftpb.Message's term with its HardState's term to avoid attaching its 221 local term to 'MsgProp'. When 'MsgProp' is passed to the leader's 'Step' 222 method, the leader first calls the 'appendEntry' method to append entries 223 to its log, and then calls 'bcastAppend' method to send those entries to 224 its peers. When passed to candidate, 'MsgProp' is dropped. When passed to 225 follower, 'MsgProp' is stored in follower's mailbox(msgs) by the send 226 method. It is stored with sender's ID and later forwarded to leader by 227 rafthttp package. 228 229 'MsgApp' contains log entries to replicate. A leader calls bcastAppend, 230 which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp' 231 type. When 'MsgApp' is passed to candidate's Step method, candidate reverts 232 back to follower, because it indicates that there is a valid leader sending 233 'MsgApp' messages. Candidate and follower respond to this message in 234 'MsgAppResp' type. 235 236 'MsgAppResp' is response to log replication request('MsgApp'). When 237 'MsgApp' is passed to candidate or follower's Step method, it responds by 238 calling 'handleAppendEntries' method, which sends 'MsgAppResp' to raft 239 mailbox. 240 241 'MsgVote' requests votes for election. When a node is a follower or 242 candidate and 'MsgHup' is passed to its Step method, then the node calls 243 'campaign' method to campaign itself to become a leader. Once 'campaign' 244 method is called, the node becomes candidate and sends 'MsgVote' to peers 245 in cluster to request votes. When passed to leader or candidate's Step 246 method and the message's Term is lower than leader's or candidate's, 247 'MsgVote' will be rejected ('MsgVoteResp' is returned with Reject true). 248 If leader or candidate receives 'MsgVote' with higher term, it will revert 249 back to follower. When 'MsgVote' is passed to follower, it votes for the 250 sender only when sender's last term is greater than MsgVote's term or 251 sender's last term is equal to MsgVote's term but sender's last committed 252 index is greater than or equal to follower's. 253 254 'MsgVoteResp' contains responses from voting request. When 'MsgVoteResp' is 255 passed to candidate, the candidate calculates how many votes it has won. If 256 it's more than majority (quorum), it becomes leader and calls 'bcastAppend'. 257 If candidate receives majority of votes of denials, it reverts back to 258 follower. 259 260 'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election 261 protocol. When Config.PreVote is true, a pre-election is carried out first 262 (using the same rules as a regular election), and no node increases its term 263 number unless the pre-election indicates that the campaigning node would win. 264 This minimizes disruption when a partitioned node rejoins the cluster. 265 266 'MsgSnap' requests to install a snapshot message. When a node has just 267 become a leader or the leader receives 'MsgProp' message, it calls 268 'bcastAppend' method, which then calls 'sendAppend' method to each 269 follower. In 'sendAppend', if a leader fails to get term or entries, 270 the leader requests snapshot by sending 'MsgSnap' type message. 271 272 'MsgSnapStatus' tells the result of snapshot install message. When a 273 follower rejected 'MsgSnap', it indicates the snapshot request with 274 'MsgSnap' had failed from network issues which causes the network layer 275 to fail to send out snapshots to its followers. Then leader considers 276 follower's progress as probe. When 'MsgSnap' were not rejected, it 277 indicates that the snapshot succeeded and the leader sets follower's 278 progress to probe and resumes its log replication. 279 280 'MsgHeartbeat' sends heartbeat from leader. When 'MsgHeartbeat' is passed 281 to candidate and message's term is higher than candidate's, the candidate 282 reverts back to follower and updates its committed index from the one in 283 this heartbeat. And it sends the message to its mailbox. When 284 'MsgHeartbeat' is passed to follower's Step method and message's term is 285 higher than follower's, the follower updates its leaderID with the ID 286 from the message. 287 288 'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp' 289 is passed to leader's Step method, the leader knows which follower 290 responded. And only when the leader's last committed index is greater than 291 follower's Match index, the leader runs 'sendAppend` method. 292 293 'MsgUnreachable' tells that request(message) wasn't delivered. When 294 'MsgUnreachable' is passed to leader's Step method, the leader discovers 295 that the follower that sent this 'MsgUnreachable' is not reachable, often 296 indicating 'MsgApp' is lost. When follower's progress state is replicate, 297 the leader sets it back to probe. 298 */ 299 package raft 300