...

Source file src/go.etcd.io/etcd/raft/v3/rawnode.go

Documentation: go.etcd.io/etcd/raft/v3

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package raft
    16  
    17  import (
    18  	"errors"
    19  
    20  	pb "go.etcd.io/etcd/raft/v3/raftpb"
    21  	"go.etcd.io/etcd/raft/v3/tracker"
    22  )
    23  
    24  // ErrStepLocalMsg is returned when try to step a local raft message
    25  var ErrStepLocalMsg = errors.New("raft: cannot step raft local message")
    26  
    27  // ErrStepPeerNotFound is returned when try to step a response message
    28  // but there is no peer found in raft.prs for that node.
    29  var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found")
    30  
    31  // RawNode is a thread-unsafe Node.
    32  // The methods of this struct correspond to the methods of Node and are described
    33  // more fully there.
    34  type RawNode struct {
    35  	raft       *raft
    36  	prevSoftSt *SoftState
    37  	prevHardSt pb.HardState
    38  }
    39  
    40  // NewRawNode instantiates a RawNode from the given configuration.
    41  //
    42  // See Bootstrap() for bootstrapping an initial state; this replaces the former
    43  // 'peers' argument to this method (with identical behavior). However, It is
    44  // recommended that instead of calling Bootstrap, applications bootstrap their
    45  // state manually by setting up a Storage that has a first index > 1 and which
    46  // stores the desired ConfState as its InitialState.
    47  func NewRawNode(config *Config) (*RawNode, error) {
    48  	r := newRaft(config)
    49  	rn := &RawNode{
    50  		raft: r,
    51  	}
    52  	rn.prevSoftSt = r.softState()
    53  	rn.prevHardSt = r.hardState()
    54  	return rn, nil
    55  }
    56  
    57  // Tick advances the internal logical clock by a single tick.
    58  func (rn *RawNode) Tick() {
    59  	rn.raft.tick()
    60  }
    61  
    62  // TickQuiesced advances the internal logical clock by a single tick without
    63  // performing any other state machine processing. It allows the caller to avoid
    64  // periodic heartbeats and elections when all of the peers in a Raft group are
    65  // known to be at the same state. Expected usage is to periodically invoke Tick
    66  // or TickQuiesced depending on whether the group is "active" or "quiesced".
    67  //
    68  // WARNING: Be very careful about using this method as it subverts the Raft
    69  // state machine. You should probably be using Tick instead.
    70  func (rn *RawNode) TickQuiesced() {
    71  	rn.raft.electionElapsed++
    72  }
    73  
    74  // Campaign causes this RawNode to transition to candidate state.
    75  func (rn *RawNode) Campaign() error {
    76  	return rn.raft.Step(pb.Message{
    77  		Type: pb.MsgHup,
    78  	})
    79  }
    80  
    81  // Propose proposes data be appended to the raft log.
    82  func (rn *RawNode) Propose(data []byte) error {
    83  	return rn.raft.Step(pb.Message{
    84  		Type: pb.MsgProp,
    85  		From: rn.raft.id,
    86  		Entries: []pb.Entry{
    87  			{Data: data},
    88  		}})
    89  }
    90  
    91  // ProposeConfChange proposes a config change. See (Node).ProposeConfChange for
    92  // details.
    93  func (rn *RawNode) ProposeConfChange(cc pb.ConfChangeI) error {
    94  	m, err := confChangeToMsg(cc)
    95  	if err != nil {
    96  		return err
    97  	}
    98  	return rn.raft.Step(m)
    99  }
   100  
   101  // ApplyConfChange applies a config change to the local node. The app must call
   102  // this when it applies a configuration change, except when it decides to reject
   103  // the configuration change, in which case no call must take place.
   104  func (rn *RawNode) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
   105  	cs := rn.raft.applyConfChange(cc.AsV2())
   106  	return &cs
   107  }
   108  
   109  // Step advances the state machine using the given message.
   110  func (rn *RawNode) Step(m pb.Message) error {
   111  	// ignore unexpected local messages receiving over network
   112  	if IsLocalMsg(m.Type) {
   113  		return ErrStepLocalMsg
   114  	}
   115  	if pr := rn.raft.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
   116  		return rn.raft.Step(m)
   117  	}
   118  	return ErrStepPeerNotFound
   119  }
   120  
   121  // Ready returns the outstanding work that the application needs to handle. This
   122  // includes appending and applying entries or a snapshot, updating the HardState,
   123  // and sending messages. The returned Ready() *must* be handled and subsequently
   124  // passed back via Advance().
   125  func (rn *RawNode) Ready() Ready {
   126  	rd := rn.readyWithoutAccept()
   127  	rn.acceptReady(rd)
   128  	return rd
   129  }
   130  
   131  // readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
   132  // is no obligation that the Ready must be handled.
   133  func (rn *RawNode) readyWithoutAccept() Ready {
   134  	return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
   135  }
   136  
   137  // acceptReady is called when the consumer of the RawNode has decided to go
   138  // ahead and handle a Ready. Nothing must alter the state of the RawNode between
   139  // this call and the prior call to Ready().
   140  func (rn *RawNode) acceptReady(rd Ready) {
   141  	if rd.SoftState != nil {
   142  		rn.prevSoftSt = rd.SoftState
   143  	}
   144  	if len(rd.ReadStates) != 0 {
   145  		rn.raft.readStates = nil
   146  	}
   147  	rn.raft.msgs = nil
   148  }
   149  
   150  // HasReady called when RawNode user need to check if any Ready pending.
   151  // Checking logic in this method should be consistent with Ready.containsUpdates().
   152  func (rn *RawNode) HasReady() bool {
   153  	r := rn.raft
   154  	if !r.softState().equal(rn.prevSoftSt) {
   155  		return true
   156  	}
   157  	if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
   158  		return true
   159  	}
   160  	if r.raftLog.hasPendingSnapshot() {
   161  		return true
   162  	}
   163  	if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
   164  		return true
   165  	}
   166  	if len(r.readStates) != 0 {
   167  		return true
   168  	}
   169  	return false
   170  }
   171  
   172  // Advance notifies the RawNode that the application has applied and saved progress in the
   173  // last Ready results.
   174  func (rn *RawNode) Advance(rd Ready) {
   175  	if !IsEmptyHardState(rd.HardState) {
   176  		rn.prevHardSt = rd.HardState
   177  	}
   178  	rn.raft.advance(rd)
   179  }
   180  
   181  // Status returns the current status of the given group. This allocates, see
   182  // BasicStatus and WithProgress for allocation-friendlier choices.
   183  func (rn *RawNode) Status() Status {
   184  	status := getStatus(rn.raft)
   185  	return status
   186  }
   187  
   188  // BasicStatus returns a BasicStatus. Notably this does not contain the
   189  // Progress map; see WithProgress for an allocation-free way to inspect it.
   190  func (rn *RawNode) BasicStatus() BasicStatus {
   191  	return getBasicStatus(rn.raft)
   192  }
   193  
   194  // ProgressType indicates the type of replica a Progress corresponds to.
   195  type ProgressType byte
   196  
   197  const (
   198  	// ProgressTypePeer accompanies a Progress for a regular peer replica.
   199  	ProgressTypePeer ProgressType = iota
   200  	// ProgressTypeLearner accompanies a Progress for a learner replica.
   201  	ProgressTypeLearner
   202  )
   203  
   204  // WithProgress is a helper to introspect the Progress for this node and its
   205  // peers.
   206  func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr tracker.Progress)) {
   207  	rn.raft.prs.Visit(func(id uint64, pr *tracker.Progress) {
   208  		typ := ProgressTypePeer
   209  		if pr.IsLearner {
   210  			typ = ProgressTypeLearner
   211  		}
   212  		p := *pr
   213  		p.Inflights = nil
   214  		visitor(id, typ, p)
   215  	})
   216  }
   217  
   218  // ReportUnreachable reports the given node is not reachable for the last send.
   219  func (rn *RawNode) ReportUnreachable(id uint64) {
   220  	_ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id})
   221  }
   222  
   223  // ReportSnapshot reports the status of the sent snapshot.
   224  func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
   225  	rej := status == SnapshotFailure
   226  
   227  	_ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej})
   228  }
   229  
   230  // TransferLeader tries to transfer leadership to the given transferee.
   231  func (rn *RawNode) TransferLeader(transferee uint64) {
   232  	_ = rn.raft.Step(pb.Message{Type: pb.MsgTransferLeader, From: transferee})
   233  }
   234  
   235  // ReadIndex requests a read state. The read state will be set in ready.
   236  // Read State has a read index. Once the application advances further than the read
   237  // index, any linearizable read requests issued before the read request can be
   238  // processed safely. The read state will have the same rctx attached.
   239  func (rn *RawNode) ReadIndex(rctx []byte) {
   240  	_ = rn.raft.Step(pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
   241  }
   242  

View as plain text