None is a placeholder node ID used when there is no leader.
const None uint64 = 0
ErrCompacted is returned by Storage.Entries/Compact when a requested index is unavailable because it predates the last snapshot.
var ErrCompacted = errors.New("requested index is unavailable due to compaction")
ErrProposalDropped is returned when the proposal is ignored by some cases, so that the proposer can be notified and fail fast.
var ErrProposalDropped = errors.New("raft proposal dropped")
ErrSnapOutOfDate is returned by Storage.CreateSnapshot when a requested index is older than the existing snapshot.
var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot")
ErrSnapshotTemporarilyUnavailable is returned by the Storage interface when the required snapshot is temporarily unavailable.
var errors.New("snapshot is temporarily unavailable")=
ErrStepLocalMsg is returned when try to step a local raft message
var ErrStepLocalMsg = errors.New("raft: cannot step raft local message")
ErrStepPeerNotFound is returned when try to step a response message but there is no peer found in raft.prs for that node.
var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found")
var ( // ErrStopped is returned by methods on Nodes that have been stopped. ErrStopped = errors.New("raft: stopped") )
ErrUnavailable is returned by Storage interface when the requested log entries are unavailable.
var errors.New("requested entry at index is unavailable")=
func DescribeConfState(state pb.ConfState) string
func DescribeEntries(ents []pb.Entry, f EntryFormatter) string
DescribeEntries calls DescribeEntry for each Entry, adding a newline to each.
func DescribeEntry(e pb.Entry, f EntryFormatter) string
DescribeEntry returns a concise human-readable description of an Entry for debugging.
func DescribeHardState(hs pb.HardState) string
func DescribeMessage(m pb.Message, f EntryFormatter) string
DescribeMessage returns a concise human-readable description of a Message for debugging.
func DescribeReady(rd Ready, f EntryFormatter) string
func DescribeSnapshot(snap pb.Snapshot) string
func DescribeSoftState(ss SoftState) string
func IsEmptyHardState(st pb.HardState) bool
IsEmptyHardState returns true if the given HardState is empty.
func IsEmptySnap(sp pb.Snapshot) bool
IsEmptySnap returns true if the given Snapshot is empty.
func IsLocalMsg(msgt pb.MessageType) bool
func IsResponseMsg(msgt pb.MessageType) bool
func MustSync(st, prevst pb.HardState, entsnum int) bool
MustSync returns true if the hard state and count of Raft entries indicate that a synchronous write to persistent storage is required.
func PayloadSize(e pb.Entry) int
PayloadSize is the size of the payload of this Entry. Notably, it does not depend on its Index or Term.
func ResetDefaultLogger()
func SetLogger(l Logger)
BasicStatus contains basic information about the Raft peer. It does not allocate.
type BasicStatus struct { ID uint64 pb.HardState SoftState Applied uint64 LeadTransferee uint64 }
CampaignType represents the type of campaigning the reason we use the type of string instead of uint64 is because it's simpler to compare and fill in raft entries
type CampaignType string
Config contains the parameters to start a raft.
type Config struct { // ID is the identity of the local raft. ID cannot be 0. ID uint64 // ElectionTick is the number of Node.Tick invocations that must pass between // elections. That is, if a follower does not receive any message from the // leader of current term before ElectionTick has elapsed, it will become // candidate and start an election. ElectionTick must be greater than // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid // unnecessary leader switching. ElectionTick int // HeartbeatTick is the number of Node.Tick invocations that must pass between // heartbeats. That is, a leader sends heartbeat messages to maintain its // leadership every HeartbeatTick ticks. HeartbeatTick int // Storage is the storage for raft. raft generates entries and states to be // stored in storage. raft reads the persisted entries and states out of // Storage when it needs. raft reads out the previous state and configuration // out of storage when restarting. Storage Storage // Applied is the last applied index. It should only be set when restarting // raft. raft will not return entries to the application smaller or equal to // Applied. If Applied is unset when restarting, raft might return previous // applied entries. This is a very application dependent configuration. Applied uint64 // MaxSizePerMsg limits the max byte size of each append message. Smaller // value lowers the raft recovery cost(initial probing and message lost // during normal operation). On the other side, it might affect the // throughput during normal replication. Note: math.MaxUint64 for unlimited, // 0 for at most one entry per message. MaxSizePerMsg uint64 // MaxCommittedSizePerReady limits the size of the committed entries which // can be applied. MaxCommittedSizePerReady uint64 // MaxUncommittedEntriesSize limits the aggregate byte size of the // uncommitted entries that may be appended to a leader's log. Once this // limit is exceeded, proposals will begin to return ErrProposalDropped // errors. Note: 0 for no limit. MaxUncommittedEntriesSize uint64 // MaxInflightMsgs limits the max number of in-flight append messages during // optimistic replication phase. The application transportation layer usually // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid // overflowing that sending buffer. TODO (xiangli): feedback to application to // limit the proposal rate? MaxInflightMsgs int // CheckQuorum specifies if the leader should check quorum activity. Leader // steps down when quorum is not active for an electionTimeout. CheckQuorum bool // PreVote enables the Pre-Vote algorithm described in raft thesis section // 9.6. This prevents disruption when a node that has been partitioned away // rejoins the cluster. PreVote bool // ReadOnlyOption specifies how the read only request is processed. // // ReadOnlySafe guarantees the linearizability of the read only request by // communicating with the quorum. It is the default and suggested option. // // ReadOnlyLeaseBased ensures linearizability of the read only request by // relying on the leader lease. It can be affected by clock drift. // If the clock drift is unbounded, leader might keep the lease longer than it // should (clock can move backward/pause without any bound). ReadIndex is not safe // in that case. // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased. ReadOnlyOption ReadOnlyOption // Logger is the logger used for raft log. For multinode which can host // multiple raft group, each raft group can have its own logger Logger Logger // DisableProposalForwarding set to true means that followers will drop // proposals, rather than forwarding them to the leader. One use case for // this feature would be in a situation where the Raft leader is used to // compute the data of a proposal, for example, adding a timestamp from a // hybrid logical clock to data in a monotonically increasing way. Forwarding // should be disabled to prevent a follower with an inaccurate hybrid // logical clock from assigning the timestamp and then forwarding the data // to the leader. DisableProposalForwarding bool }
DefaultLogger is a default implementation of the Logger interface.
type DefaultLogger struct { *log.Logger // contains filtered or unexported fields }
func (l *DefaultLogger) Debug(v ...interface{})
func (l *DefaultLogger) Debugf(format string, v ...interface{})
func (l *DefaultLogger) EnableDebug()
func (l *DefaultLogger) EnableTimestamps()
func (l *DefaultLogger) Error(v ...interface{})
func (l *DefaultLogger) Errorf(format string, v ...interface{})
func (l *DefaultLogger) Fatal(v ...interface{})
func (l *DefaultLogger) Fatalf(format string, v ...interface{})
func (l *DefaultLogger) Info(v ...interface{})
func (l *DefaultLogger) Infof(format string, v ...interface{})
func (l *DefaultLogger) Panic(v ...interface{})
func (l *DefaultLogger) Panicf(format string, v ...interface{})
func (l *DefaultLogger) Warning(v ...interface{})
func (l *DefaultLogger) Warningf(format string, v ...interface{})
EntryFormatter can be implemented by the application to provide human-readable formatting of entry data. Nil is a valid EntryFormatter and will use a default format.
type EntryFormatter func([]byte) string
type Logger interface { Debug(v ...interface{}) Debugf(format string, v ...interface{}) Error(v ...interface{}) Errorf(format string, v ...interface{}) Info(v ...interface{}) Infof(format string, v ...interface{}) Warning(v ...interface{}) Warningf(format string, v ...interface{}) Fatal(v ...interface{}) Fatalf(format string, v ...interface{}) Panic(v ...interface{}) Panicf(format string, v ...interface{}) }
MemoryStorage implements the Storage interface backed by an in-memory array.
type MemoryStorage struct { // Protects access to all fields. Most methods of MemoryStorage are // run on the raft goroutine, but Append() is run on an application // goroutine. sync.Mutex // contains filtered or unexported fields }
func NewMemoryStorage() *MemoryStorage
NewMemoryStorage creates an empty MemoryStorage.
func (ms *MemoryStorage) Append(entries []pb.Entry) error
Append the new entries to storage. TODO (xiangli): ensure the entries are continuous and entries[0].Index > ms.entries[0].Index
func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error
ApplySnapshot overwrites the contents of this Storage object with those of the given snapshot.
func (ms *MemoryStorage) Compact(compactIndex uint64) error
Compact discards all log entries prior to compactIndex. It is the application's responsibility to not attempt to compact an index greater than raftLog.applied.
func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error)
CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and can be used to reconstruct the state at that point. If any configuration changes have been made since the last compaction, the result of the last ApplyConfChange must be passed in.
func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
Entries implements the Storage interface.
func (ms *MemoryStorage) FirstIndex() (uint64, error)
FirstIndex implements the Storage interface.
func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error)
InitialState implements the Storage interface.
func (ms *MemoryStorage) LastIndex() (uint64, error)
LastIndex implements the Storage interface.
func (ms *MemoryStorage) SetHardState(st pb.HardState) error
SetHardState saves the current HardState.
func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error)
Snapshot implements the Storage interface.
func (ms *MemoryStorage) Term(i uint64) (uint64, error)
Term implements the Storage interface.
Node represents a node in a raft cluster.
type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election // timeouts and heartbeat timeouts are in units of ticks. Tick() // Campaign causes the Node to transition to candidate state and start campaigning to become leader. Campaign(ctx context.Context) error // Propose proposes that data be appended to the log. Note that proposals can be lost without // notice, therefore it is user's job to ensure proposal retries. Propose(ctx context.Context, data []byte) error // ProposeConfChange proposes a configuration change. Like any proposal, the // configuration change may be dropped with or without an error being // returned. In particular, configuration changes are dropped unless the // leader has certainty that there is no prior unapplied configuration // change in its log. // // The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2 // message. The latter allows arbitrary configuration changes via joint // consensus, notably including replacing a voter. Passing a ConfChangeV2 // message is only allowed if all Nodes participating in the cluster run a // version of this library aware of the V2 API. See pb.ConfChangeV2 for // usage details and semantics. ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error // Step advances the state machine using the given message. ctx.Err() will be returned, if any. Step(ctx context.Context, msg pb.Message) error // Ready returns a channel that returns the current point-in-time state. // Users of the Node must call Advance after retrieving the state returned by Ready. // // NOTE: No committed entries from the next Ready may be applied until all committed entries // and snapshots from the previous one have finished. Ready() <-chan Ready // Advance notifies the Node that the application has saved progress up to the last Ready. // It prepares the node to return the next available Ready. // // The application should generally call Advance after it applies the entries in last Ready. // // However, as an optimization, the application may call Advance while it is applying the // commands. For example. when the last Ready contains a snapshot, the application might take // a long time to apply the snapshot data. To continue receiving Ready without blocking raft // progress, it can call Advance before finishing applying the last ready. Advance() // ApplyConfChange applies a config change (previously passed to // ProposeConfChange) to the node. This must be called whenever a config // change is observed in Ready.CommittedEntries, except when the app decides // to reject the configuration change (i.e. treats it as a noop instead), in // which case it must not be called. // // Returns an opaque non-nil ConfState protobuf which must be recorded in // snapshots. ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState // TransferLeadership attempts to transfer leadership to the given transferee. TransferLeadership(ctx context.Context, lead, transferee uint64) // ReadIndex request a read state. The read state will be set in the ready. // Read state has a read index. Once the application advances further than the read // index, any linearizable read requests issued before the read request can be // processed safely. The read state will have the same rctx attached. // Note that request can be lost without notice, therefore it is user's job // to ensure read index retries. ReadIndex(ctx context.Context, rctx []byte) error // Status returns the current status of the raft state machine. Status() Status // ReportUnreachable reports the given node is not reachable for the last send. ReportUnreachable(id uint64) // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure. // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a // snapshot (for e.g., while streaming it from leader to follower), should be reported to the // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft // log probes until the follower can apply the snapshot and advance its state. If the follower // can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any // updates from the leader. Therefore, it is crucial that the application ensures that any // failure in snapshot sending is caught and reported back to the leader; so it can resume raft // log probing in the follower. ReportSnapshot(id uint64, status SnapshotStatus) // Stop performs any necessary termination of the Node. Stop() }
▹ Example
func RestartNode(c *Config) Node
RestartNode is similar to StartNode but does not take a list of peers. The current membership of the cluster will be restored from the Storage. If the caller has an existing state machine, pass in the last log index that has been applied to it; otherwise use zero.
func StartNode(c *Config, peers []Peer) Node
StartNode returns a new Node given configuration and a list of raft peers. It appends a ConfChangeAddNode entry for each given peer to the initial log.
Peers must not be zero length; call RestartNode in that case.
type Peer struct { ID uint64 Context []byte }
ProgressType indicates the type of replica a Progress corresponds to.
type ProgressType byte
const ( // ProgressTypePeer accompanies a Progress for a regular peer replica. ProgressTypePeer ProgressType = iota // ProgressTypeLearner accompanies a Progress for a learner replica. ProgressTypeLearner )
RawNode is a thread-unsafe Node. The methods of this struct correspond to the methods of Node and are described more fully there.
type RawNode struct {
// contains filtered or unexported fields
}
func NewRawNode(config *Config) (*RawNode, error)
NewRawNode instantiates a RawNode from the given configuration.
See Bootstrap() for bootstrapping an initial state; this replaces the former 'peers' argument to this method (with identical behavior). However, It is recommended that instead of calling Bootstrap, applications bootstrap their state manually by setting up a Storage that has a first index > 1 and which stores the desired ConfState as its InitialState.
func (rn *RawNode) Advance(rd Ready)
Advance notifies the RawNode that the application has applied and saved progress in the last Ready results.
func (rn *RawNode) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
ApplyConfChange applies a config change to the local node. The app must call this when it applies a configuration change, except when it decides to reject the configuration change, in which case no call must take place.
func (rn *RawNode) BasicStatus() BasicStatus
BasicStatus returns a BasicStatus. Notably this does not contain the Progress map; see WithProgress for an allocation-free way to inspect it.
func (rn *RawNode) Bootstrap(peers []Peer) error
Bootstrap initializes the RawNode for first use by appending configuration changes for the supplied peers. This method returns an error if the Storage is nonempty.
It is recommended that instead of calling this method, applications bootstrap their state manually by setting up a Storage that has a first index > 1 and which stores the desired ConfState as its InitialState.
func (rn *RawNode) Campaign() error
Campaign causes this RawNode to transition to candidate state.
func (rn *RawNode) HasReady() bool
HasReady called when RawNode user need to check if any Ready pending. Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) Propose(data []byte) error
Propose proposes data be appended to the raft log.
func (rn *RawNode) ProposeConfChange(cc pb.ConfChangeI) error
ProposeConfChange proposes a config change. See (Node).ProposeConfChange for details.
func (rn *RawNode) ReadIndex(rctx []byte)
ReadIndex requests a read state. The read state will be set in ready. Read State has a read index. Once the application advances further than the read index, any linearizable read requests issued before the read request can be processed safely. The read state will have the same rctx attached.
func (rn *RawNode) Ready() Ready
Ready returns the outstanding work that the application needs to handle. This includes appending and applying entries or a snapshot, updating the HardState, and sending messages. The returned Ready() *must* be handled and subsequently passed back via Advance().
func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus)
ReportSnapshot reports the status of the sent snapshot.
func (rn *RawNode) ReportUnreachable(id uint64)
ReportUnreachable reports the given node is not reachable for the last send.
func (rn *RawNode) Status() Status
Status returns the current status of the given group. This allocates, see BasicStatus and WithProgress for allocation-friendlier choices.
func (rn *RawNode) Step(m pb.Message) error
Step advances the state machine using the given message.
func (rn *RawNode) Tick()
Tick advances the internal logical clock by a single tick.
func (rn *RawNode) TickQuiesced()
TickQuiesced advances the internal logical clock by a single tick without performing any other state machine processing. It allows the caller to avoid periodic heartbeats and elections when all of the peers in a Raft group are known to be at the same state. Expected usage is to periodically invoke Tick or TickQuiesced depending on whether the group is "active" or "quiesced".
WARNING: Be very careful about using this method as it subverts the Raft state machine. You should probably be using Tick instead.
func (rn *RawNode) TransferLeader(transferee uint64)
TransferLeader tries to transfer leadership to the given transferee.
func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr tracker.Progress))
WithProgress is a helper to introspect the Progress for this node and its peers.
type ReadOnlyOption int
const ( // ReadOnlySafe guarantees the linearizability of the read only request by // communicating with the quorum. It is the default and suggested option. ReadOnlySafe ReadOnlyOption = iota // ReadOnlyLeaseBased ensures linearizability of the read only request by // relying on the leader lease. It can be affected by clock drift. // If the clock drift is unbounded, leader might keep the lease longer than it // should (clock can move backward/pause without any bound). ReadIndex is not safe // in that case. ReadOnlyLeaseBased )
ReadState provides state for read only query. It's caller's responsibility to call ReadIndex first before getting this state from ready, it's also caller's duty to differentiate if this state is what it requests through RequestCtx, eg. given a unique id as RequestCtx
type ReadState struct { Index uint64 RequestCtx []byte }
Ready encapsulates the entries and messages that are ready to read, be saved to stable storage, committed or sent to other peers. All fields in Ready are read-only.
type Ready struct { // The current volatile state of a Node. // SoftState will be nil if there is no update. // It is not required to consume or store SoftState. *SoftState // The current state of a Node to be saved to stable storage BEFORE // Messages are sent. // HardState will be equal to empty state if there is no update. pb.HardState // ReadStates can be used for node to serve linearizable read requests locally // when its applied index is greater than the index in ReadState. // Note that the readState will be returned when raft receives msgReadIndex. // The returned is only valid for the request that requested to read. ReadStates []ReadState // Entries specifies entries to be saved to stable storage BEFORE // Messages are sent. Entries []pb.Entry // Snapshot specifies the snapshot to be saved to stable storage. Snapshot pb.Snapshot // CommittedEntries specifies entries to be committed to a // store/state-machine. These have previously been committed to stable // store. CommittedEntries []pb.Entry // Messages specifies outbound messages to be sent AFTER Entries are // committed to stable storage. // If it contains a MsgSnap message, the application MUST report back to raft // when the snapshot has been received or has failed by calling ReportSnapshot. Messages []pb.Message // MustSync indicates whether the HardState and Entries must be synchronously // written to disk or if an asynchronous write is permissible. MustSync bool }
type SnapshotStatus int
const ( SnapshotFinish SnapshotStatus = 1 SnapshotFailure SnapshotStatus = 2 )
SoftState provides state that is useful for logging and debugging. The state is volatile and does not need to be persisted to the WAL.
type SoftState struct { Lead uint64 // must use atomic operations to access; keep 64-bit aligned. RaftState StateType }
StateType represents the role of a node in a cluster.
type StateType uint64
Possible values for StateType.
const ( StateFollower StateType = iota StateCandidate StateLeader StatePreCandidate )
func (st StateType) MarshalJSON() ([]byte, error)
func (st StateType) String() string
Status contains information about this Raft peer and its view of the system. The Progress is only populated on the leader.
type Status struct { BasicStatus Config tracker.Config Progress map[uint64]tracker.Progress }
func (s Status) MarshalJSON() ([]byte, error)
MarshalJSON translates the raft status into JSON. TODO: try to simplify this by introducing ID type into raft
func (s Status) String() string
Storage is an interface that may be implemented by the application to retrieve log entries from storage.
If any Storage method returns an error, the raft instance will become inoperable and refuse to participate in elections; the application is responsible for cleanup and recovery in this case.
type Storage interface { // InitialState returns the saved HardState and ConfState information. InitialState() (pb.HardState, pb.ConfState, error) // Entries returns a slice of log entries in the range [lo,hi). // MaxSize limits the total size of the log entries returned, but // Entries returns at least one entry if any. Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) // Term returns the term of entry i, which must be in the range // [FirstIndex()-1, LastIndex()]. The term of the entry before // FirstIndex is retained for matching purposes even though the // rest of that entry may not be available. Term(i uint64) (uint64, error) // LastIndex returns the index of the last entry in the log. LastIndex() (uint64, error) // FirstIndex returns the index of the first log entry that is // possibly available via Entries (older entries have been incorporated // into the latest Snapshot; if storage only contains the dummy entry the // first log entry is not available). FirstIndex() (uint64, error) // Snapshot returns the most recent snapshot. // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable, // so raft state machine could know that Storage needs some time to prepare // snapshot and call Snapshot later. Snapshot() (pb.Snapshot, error) }