...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/raft.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdserver
    16  
    17  import (
    18  	"encoding/json"
    19  	"expvar"
    20  	"fmt"
    21  	"log"
    22  	"sort"
    23  	"sync"
    24  	"time"
    25  
    26  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    27  	"go.etcd.io/etcd/client/pkg/v3/logutil"
    28  	"go.etcd.io/etcd/client/pkg/v3/types"
    29  	"go.etcd.io/etcd/pkg/v3/contention"
    30  	"go.etcd.io/etcd/pkg/v3/pbutil"
    31  	"go.etcd.io/etcd/raft/v3"
    32  	"go.etcd.io/etcd/raft/v3/raftpb"
    33  	"go.etcd.io/etcd/server/v3/config"
    34  	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
    35  	"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
    36  	"go.etcd.io/etcd/server/v3/wal"
    37  	"go.etcd.io/etcd/server/v3/wal/walpb"
    38  	"go.uber.org/zap"
    39  )
    40  
    41  const (
    42  	// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
    43  	// Assuming the RTT is around 10ms, 1MB max size is large enough.
    44  	maxSizePerMsg = 1 * 1024 * 1024
    45  	// Never overflow the rafthttp buffer, which is 4096.
    46  	// TODO: a better const?
    47  	maxInflightMsgs = 4096 / 8
    48  )
    49  
    50  var (
    51  	// protects raftStatus
    52  	raftStatusMu sync.Mutex
    53  	// indirection for expvar func interface
    54  	// expvar panics when publishing duplicate name
    55  	// expvar does not support remove a registered name
    56  	// so only register a func that calls raftStatus
    57  	// and change raftStatus as we need.
    58  	raftStatus func() raft.Status
    59  )
    60  
    61  func init() {
    62  	expvar.Publish("raft.status", expvar.Func(func() interface{} {
    63  		raftStatusMu.Lock()
    64  		defer raftStatusMu.Unlock()
    65  		if raftStatus == nil {
    66  			return nil
    67  		}
    68  		return raftStatus()
    69  	}))
    70  }
    71  
    72  // apply contains entries, snapshot to be applied. Once
    73  // an apply is consumed, the entries will be persisted to
    74  // to raft storage concurrently; the application must read
    75  // raftDone before assuming the raft messages are stable.
    76  type apply struct {
    77  	entries  []raftpb.Entry
    78  	snapshot raftpb.Snapshot
    79  	// notifyc synchronizes etcd server applies with the raft node
    80  	notifyc chan struct{}
    81  }
    82  
    83  type raftNode struct {
    84  	lg *zap.Logger
    85  
    86  	tickMu *sync.Mutex
    87  	raftNodeConfig
    88  
    89  	// a chan to send/receive snapshot
    90  	msgSnapC chan raftpb.Message
    91  
    92  	// a chan to send out apply
    93  	applyc chan apply
    94  
    95  	// a chan to send out readState
    96  	readStateC chan raft.ReadState
    97  
    98  	// utility
    99  	ticker *time.Ticker
   100  	// contention detectors for raft heartbeat message
   101  	td *contention.TimeoutDetector
   102  
   103  	stopped chan struct{}
   104  	done    chan struct{}
   105  }
   106  
   107  type raftNodeConfig struct {
   108  	lg *zap.Logger
   109  
   110  	// to check if msg receiver is removed from cluster
   111  	isIDRemoved func(id uint64) bool
   112  	raft.Node
   113  	raftStorage *raft.MemoryStorage
   114  	storage     Storage
   115  	heartbeat   time.Duration // for logging
   116  	// transport specifies the transport to send and receive msgs to members.
   117  	// Sending messages MUST NOT block. It is okay to drop messages, since
   118  	// clients should timeout and reissue their messages.
   119  	// If transport is nil, server will panic.
   120  	transport rafthttp.Transporter
   121  }
   122  
   123  func newRaftNode(cfg raftNodeConfig) *raftNode {
   124  	var lg raft.Logger
   125  	if cfg.lg != nil {
   126  		lg = NewRaftLoggerZap(cfg.lg)
   127  	} else {
   128  		lcfg := logutil.DefaultZapLoggerConfig
   129  		var err error
   130  		lg, err = NewRaftLogger(&lcfg)
   131  		if err != nil {
   132  			log.Fatalf("cannot create raft logger %v", err)
   133  		}
   134  	}
   135  	raft.SetLogger(lg)
   136  	r := &raftNode{
   137  		lg:             cfg.lg,
   138  		tickMu:         new(sync.Mutex),
   139  		raftNodeConfig: cfg,
   140  		// set up contention detectors for raft heartbeat message.
   141  		// expect to send a heartbeat within 2 heartbeat intervals.
   142  		td:         contention.NewTimeoutDetector(2 * cfg.heartbeat),
   143  		readStateC: make(chan raft.ReadState, 1),
   144  		msgSnapC:   make(chan raftpb.Message, maxInFlightMsgSnap),
   145  		applyc:     make(chan apply),
   146  		stopped:    make(chan struct{}),
   147  		done:       make(chan struct{}),
   148  	}
   149  	if r.heartbeat == 0 {
   150  		r.ticker = &time.Ticker{}
   151  	} else {
   152  		r.ticker = time.NewTicker(r.heartbeat)
   153  	}
   154  	return r
   155  }
   156  
   157  // raft.Node does not have locks in Raft package
   158  func (r *raftNode) tick() {
   159  	r.tickMu.Lock()
   160  	r.Tick()
   161  	r.tickMu.Unlock()
   162  }
   163  
   164  // start prepares and starts raftNode in a new goroutine. It is no longer safe
   165  // to modify the fields after it has been started.
   166  func (r *raftNode) start(rh *raftReadyHandler) {
   167  	internalTimeout := time.Second
   168  
   169  	go func() {
   170  		defer r.onStop()
   171  		islead := false
   172  
   173  		for {
   174  			select {
   175  			case <-r.ticker.C:
   176  				r.tick()
   177  			case rd := <-r.Ready():
   178  				if rd.SoftState != nil {
   179  					newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Lead
   180  					if newLeader {
   181  						leaderChanges.Inc()
   182  					}
   183  
   184  					if rd.SoftState.Lead == raft.None {
   185  						hasLeader.Set(0)
   186  					} else {
   187  						hasLeader.Set(1)
   188  					}
   189  
   190  					rh.updateLead(rd.SoftState.Lead)
   191  					islead = rd.RaftState == raft.StateLeader
   192  					if islead {
   193  						isLeader.Set(1)
   194  					} else {
   195  						isLeader.Set(0)
   196  					}
   197  					rh.updateLeadership(newLeader)
   198  					r.td.Reset()
   199  				}
   200  
   201  				if len(rd.ReadStates) != 0 {
   202  					select {
   203  					case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
   204  					case <-time.After(internalTimeout):
   205  						r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))
   206  					case <-r.stopped:
   207  						return
   208  					}
   209  				}
   210  
   211  				notifyc := make(chan struct{}, 1)
   212  				ap := apply{
   213  					entries:  rd.CommittedEntries,
   214  					snapshot: rd.Snapshot,
   215  					notifyc:  notifyc,
   216  				}
   217  
   218  				updateCommittedIndex(&ap, rh)
   219  
   220  				waitWALSync := shouldWaitWALSync(rd)
   221  				if waitWALSync {
   222  					// gofail: var raftBeforeSaveWaitWalSync struct{}
   223  					if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
   224  						r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
   225  					}
   226  				}
   227  
   228  				select {
   229  				case r.applyc <- ap:
   230  				case <-r.stopped:
   231  					return
   232  				}
   233  
   234  				// the leader can write to its disk in parallel with replicating to the followers and them
   235  				// writing to their disks.
   236  				// For more details, check raft thesis 10.2.1
   237  				if islead {
   238  					// gofail: var raftBeforeLeaderSend struct{}
   239  					r.transport.Send(r.processMessages(rd.Messages))
   240  				}
   241  
   242  				// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
   243  				// ensure that recovery after a snapshot restore is possible.
   244  				if !raft.IsEmptySnap(rd.Snapshot) {
   245  					// gofail: var raftBeforeSaveSnap struct{}
   246  					if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
   247  						r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))
   248  					}
   249  					// gofail: var raftAfterSaveSnap struct{}
   250  				}
   251  
   252  				if !waitWALSync {
   253  					// gofail: var raftBeforeSave struct{}
   254  					if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
   255  						r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))
   256  					}
   257  				}
   258  				if !raft.IsEmptyHardState(rd.HardState) {
   259  					proposalsCommitted.Set(float64(rd.HardState.Commit))
   260  				}
   261  				// gofail: var raftAfterSave struct{}
   262  
   263  				if !raft.IsEmptySnap(rd.Snapshot) {
   264  					// Force WAL to fsync its hard state before Release() releases
   265  					// old data from the WAL. Otherwise could get an error like:
   266  					// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
   267  					// See https://github.com/etcd-io/etcd/issues/10219 for more details.
   268  					if err := r.storage.Sync(); err != nil {
   269  						r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
   270  					}
   271  
   272  					// etcdserver now claim the snapshot has been persisted onto the disk
   273  					notifyc <- struct{}{}
   274  
   275  					// gofail: var raftBeforeApplySnap struct{}
   276  					r.raftStorage.ApplySnapshot(rd.Snapshot)
   277  					r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
   278  					// gofail: var raftAfterApplySnap struct{}
   279  
   280  					if err := r.storage.Release(rd.Snapshot); err != nil {
   281  						r.lg.Fatal("failed to release Raft wal", zap.Error(err))
   282  					}
   283  					// gofail: var raftAfterWALRelease struct{}
   284  				}
   285  
   286  				r.raftStorage.Append(rd.Entries)
   287  
   288  				if !islead {
   289  					// finish processing incoming messages before we signal raftdone chan
   290  					msgs := r.processMessages(rd.Messages)
   291  
   292  					// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
   293  					notifyc <- struct{}{}
   294  
   295  					// Candidate or follower needs to wait for all pending configuration
   296  					// changes to be applied before sending messages.
   297  					// Otherwise we might incorrectly count votes (e.g. votes from removed members).
   298  					// Also slow machine's follower raft-layer could proceed to become the leader
   299  					// on its own single-node cluster, before apply-layer applies the config change.
   300  					// We simply wait for ALL pending entries to be applied for now.
   301  					// We might improve this later on if it causes unnecessary long blocking issues.
   302  					waitApply := false
   303  					for _, ent := range rd.CommittedEntries {
   304  						if ent.Type == raftpb.EntryConfChange {
   305  							waitApply = true
   306  							break
   307  						}
   308  					}
   309  					if waitApply {
   310  						// blocks until 'applyAll' calls 'applyWait.Trigger'
   311  						// to be in sync with scheduled config-change job
   312  						// (assume notifyc has cap of 1)
   313  						select {
   314  						case notifyc <- struct{}{}:
   315  						case <-r.stopped:
   316  							return
   317  						}
   318  					}
   319  
   320  					// gofail: var raftBeforeFollowerSend struct{}
   321  					r.transport.Send(msgs)
   322  				} else {
   323  					// leader already processed 'MsgSnap' and signaled
   324  					notifyc <- struct{}{}
   325  				}
   326  
   327  				r.Advance()
   328  			case <-r.stopped:
   329  				return
   330  			}
   331  		}
   332  	}()
   333  }
   334  
   335  // For a cluster with only one member, the raft may send both the
   336  // unstable entries and committed entries to etcdserver, and there
   337  // may have overlapped log entries between them.
   338  //
   339  // etcd responds to the client once it finishes (actually partially)
   340  // the applying workflow. But when the client receives the response,
   341  // it doesn't mean etcd has already successfully saved the data,
   342  // including BoltDB and WAL, because:
   343  //  1. etcd commits the boltDB transaction periodically instead of on each request;
   344  //  2. etcd saves WAL entries in parallel with applying the committed entries.
   345  //
   346  // Accordingly, it might run into a situation of data loss when the etcd crashes
   347  // immediately after responding to the client and before the boltDB and WAL
   348  // successfully save the data to disk.
   349  // Note that this issue can only happen for clusters with only one member.
   350  //
   351  // For clusters with multiple members, it isn't an issue, because etcd will
   352  // not commit & apply the data before it being replicated to majority members.
   353  // When the client receives the response, it means the data must have been applied.
   354  // It further means the data must have been committed.
   355  // Note: for clusters with multiple members, the raft will never send identical
   356  // unstable entries and committed entries to etcdserver.
   357  //
   358  // Refer to https://github.com/etcd-io/etcd/issues/14370.
   359  func shouldWaitWALSync(rd raft.Ready) bool {
   360  	if len(rd.CommittedEntries) == 0 || len(rd.Entries) == 0 {
   361  		return false
   362  	}
   363  
   364  	// Check if there is overlap between unstable and committed entries
   365  	// assuming that their index and term are only incrementing.
   366  	lastCommittedEntry := rd.CommittedEntries[len(rd.CommittedEntries)-1]
   367  	firstUnstableEntry := rd.Entries[0]
   368  	return lastCommittedEntry.Term > firstUnstableEntry.Term ||
   369  		(lastCommittedEntry.Term == firstUnstableEntry.Term && lastCommittedEntry.Index >= firstUnstableEntry.Index)
   370  }
   371  
   372  func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
   373  	var ci uint64
   374  	if len(ap.entries) != 0 {
   375  		ci = ap.entries[len(ap.entries)-1].Index
   376  	}
   377  	if ap.snapshot.Metadata.Index > ci {
   378  		ci = ap.snapshot.Metadata.Index
   379  	}
   380  	if ci != 0 {
   381  		rh.updateCommittedIndex(ci)
   382  	}
   383  }
   384  
   385  func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
   386  	sentAppResp := false
   387  	for i := len(ms) - 1; i >= 0; i-- {
   388  		if r.isIDRemoved(ms[i].To) {
   389  			ms[i].To = 0
   390  		}
   391  
   392  		if ms[i].Type == raftpb.MsgAppResp {
   393  			if sentAppResp {
   394  				ms[i].To = 0
   395  			} else {
   396  				sentAppResp = true
   397  			}
   398  		}
   399  
   400  		if ms[i].Type == raftpb.MsgSnap {
   401  			// There are two separate data store: the store for v2, and the KV for v3.
   402  			// The msgSnap only contains the most recent snapshot of store without KV.
   403  			// So we need to redirect the msgSnap to etcd server main loop for merging in the
   404  			// current store snapshot and KV snapshot.
   405  			select {
   406  			case r.msgSnapC <- ms[i]:
   407  			default:
   408  				// drop msgSnap if the inflight chan if full.
   409  			}
   410  			ms[i].To = 0
   411  		}
   412  		if ms[i].Type == raftpb.MsgHeartbeat {
   413  			ok, exceed := r.td.Observe(ms[i].To)
   414  			if !ok {
   415  				// TODO: limit request rate.
   416  				r.lg.Warn(
   417  					"leader failed to send out heartbeat on time; took too long, leader is overloaded likely from slow disk",
   418  					zap.String("to", fmt.Sprintf("%x", ms[i].To)),
   419  					zap.Duration("heartbeat-interval", r.heartbeat),
   420  					zap.Duration("expected-duration", 2*r.heartbeat),
   421  					zap.Duration("exceeded-duration", exceed),
   422  				)
   423  				heartbeatSendFailures.Inc()
   424  			}
   425  		}
   426  	}
   427  	return ms
   428  }
   429  
   430  func (r *raftNode) apply() chan apply {
   431  	return r.applyc
   432  }
   433  
   434  func (r *raftNode) stop() {
   435  	r.stopped <- struct{}{}
   436  	<-r.done
   437  }
   438  
   439  func (r *raftNode) onStop() {
   440  	r.Stop()
   441  	r.ticker.Stop()
   442  	r.transport.Stop()
   443  	if err := r.storage.Close(); err != nil {
   444  		r.lg.Panic("failed to close Raft storage", zap.Error(err))
   445  	}
   446  	close(r.done)
   447  }
   448  
   449  // for testing
   450  func (r *raftNode) pauseSending() {
   451  	p := r.transport.(rafthttp.Pausable)
   452  	p.Pause()
   453  }
   454  
   455  func (r *raftNode) resumeSending() {
   456  	p := r.transport.(rafthttp.Pausable)
   457  	p.Resume()
   458  }
   459  
   460  // advanceTicks advances ticks of Raft node.
   461  // This can be used for fast-forwarding election
   462  // ticks in multi data-center deployments, thus
   463  // speeding up election process.
   464  func (r *raftNode) advanceTicks(ticks int) {
   465  	for i := 0; i < ticks; i++ {
   466  		r.tick()
   467  	}
   468  }
   469  
   470  func startNode(cfg config.ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
   471  	var err error
   472  	member := cl.MemberByName(cfg.Name)
   473  	metadata := pbutil.MustMarshal(
   474  		&pb.Metadata{
   475  			NodeID:    uint64(member.ID),
   476  			ClusterID: uint64(cl.ID()),
   477  		},
   478  	)
   479  	if w, err = wal.Create(cfg.Logger, cfg.WALDir(), metadata); err != nil {
   480  		cfg.Logger.Panic("failed to create WAL", zap.Error(err))
   481  	}
   482  	if cfg.UnsafeNoFsync {
   483  		w.SetUnsafeNoFsync()
   484  	}
   485  	peers := make([]raft.Peer, len(ids))
   486  	for i, id := range ids {
   487  		var ctx []byte
   488  		ctx, err = json.Marshal((*cl).Member(id))
   489  		if err != nil {
   490  			cfg.Logger.Panic("failed to marshal member", zap.Error(err))
   491  		}
   492  		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
   493  	}
   494  	id = member.ID
   495  	cfg.Logger.Info(
   496  		"starting local member",
   497  		zap.String("local-member-id", id.String()),
   498  		zap.String("cluster-id", cl.ID().String()),
   499  	)
   500  	s = raft.NewMemoryStorage()
   501  	c := &raft.Config{
   502  		ID:              uint64(id),
   503  		ElectionTick:    cfg.ElectionTicks,
   504  		HeartbeatTick:   1,
   505  		Storage:         s,
   506  		MaxSizePerMsg:   maxSizePerMsg,
   507  		MaxInflightMsgs: maxInflightMsgs,
   508  		CheckQuorum:     true,
   509  		PreVote:         cfg.PreVote,
   510  		Logger:          NewRaftLoggerZap(cfg.Logger.Named("raft")),
   511  	}
   512  	if len(peers) == 0 {
   513  		n = raft.RestartNode(c)
   514  	} else {
   515  		n = raft.StartNode(c, peers)
   516  	}
   517  	raftStatusMu.Lock()
   518  	raftStatus = n.Status
   519  	raftStatusMu.Unlock()
   520  	return id, n, s, w
   521  }
   522  
   523  func restartNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
   524  	var walsnap walpb.Snapshot
   525  	if snapshot != nil {
   526  		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
   527  	}
   528  	w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
   529  
   530  	cfg.Logger.Info(
   531  		"restarting local member",
   532  		zap.String("cluster-id", cid.String()),
   533  		zap.String("local-member-id", id.String()),
   534  		zap.Uint64("commit-index", st.Commit),
   535  	)
   536  	cl := membership.NewCluster(cfg.Logger)
   537  	cl.SetID(id, cid)
   538  	s := raft.NewMemoryStorage()
   539  	if snapshot != nil {
   540  		s.ApplySnapshot(*snapshot)
   541  	}
   542  	s.SetHardState(st)
   543  	s.Append(ents)
   544  	c := &raft.Config{
   545  		ID:              uint64(id),
   546  		ElectionTick:    cfg.ElectionTicks,
   547  		HeartbeatTick:   1,
   548  		Storage:         s,
   549  		MaxSizePerMsg:   maxSizePerMsg,
   550  		MaxInflightMsgs: maxInflightMsgs,
   551  		CheckQuorum:     true,
   552  		PreVote:         cfg.PreVote,
   553  		Logger:          NewRaftLoggerZap(cfg.Logger.Named("raft")),
   554  	}
   555  
   556  	n := raft.RestartNode(c)
   557  	raftStatusMu.Lock()
   558  	raftStatus = n.Status
   559  	raftStatusMu.Unlock()
   560  	return id, cl, n, s, w
   561  }
   562  
   563  func restartAsStandaloneNode(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
   564  	var walsnap walpb.Snapshot
   565  	if snapshot != nil {
   566  		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
   567  	}
   568  	w, id, cid, st, ents := readWAL(cfg.Logger, cfg.WALDir(), walsnap, cfg.UnsafeNoFsync)
   569  
   570  	// discard the previously uncommitted entries
   571  	for i, ent := range ents {
   572  		if ent.Index > st.Commit {
   573  			cfg.Logger.Info(
   574  				"discarding uncommitted WAL entries",
   575  				zap.Uint64("entry-index", ent.Index),
   576  				zap.Uint64("commit-index-from-wal", st.Commit),
   577  				zap.Int("number-of-discarded-entries", len(ents)-i),
   578  			)
   579  			ents = ents[:i]
   580  			break
   581  		}
   582  	}
   583  
   584  	// force append the configuration change entries
   585  	toAppEnts := createConfigChangeEnts(
   586  		cfg.Logger,
   587  		getIDs(cfg.Logger, snapshot, ents),
   588  		uint64(id),
   589  		st.Term,
   590  		st.Commit,
   591  	)
   592  	ents = append(ents, toAppEnts...)
   593  
   594  	// force commit newly appended entries
   595  	err := w.Save(raftpb.HardState{}, toAppEnts)
   596  	if err != nil {
   597  		cfg.Logger.Fatal("failed to save hard state and entries", zap.Error(err))
   598  	}
   599  	if len(ents) != 0 {
   600  		st.Commit = ents[len(ents)-1].Index
   601  	}
   602  
   603  	cfg.Logger.Info(
   604  		"forcing restart member",
   605  		zap.String("cluster-id", cid.String()),
   606  		zap.String("local-member-id", id.String()),
   607  		zap.Uint64("commit-index", st.Commit),
   608  	)
   609  
   610  	cl := membership.NewCluster(cfg.Logger)
   611  	cl.SetID(id, cid)
   612  	s := raft.NewMemoryStorage()
   613  	if snapshot != nil {
   614  		s.ApplySnapshot(*snapshot)
   615  	}
   616  	s.SetHardState(st)
   617  	s.Append(ents)
   618  	c := &raft.Config{
   619  		ID:              uint64(id),
   620  		ElectionTick:    cfg.ElectionTicks,
   621  		HeartbeatTick:   1,
   622  		Storage:         s,
   623  		MaxSizePerMsg:   maxSizePerMsg,
   624  		MaxInflightMsgs: maxInflightMsgs,
   625  		CheckQuorum:     true,
   626  		PreVote:         cfg.PreVote,
   627  		Logger:          NewRaftLoggerZap(cfg.Logger.Named("raft")),
   628  	}
   629  
   630  	n := raft.RestartNode(c)
   631  	raftStatus = n.Status
   632  	return id, cl, n, s, w
   633  }
   634  
   635  // getIDs returns an ordered set of IDs included in the given snapshot and
   636  // the entries. The given snapshot/entries can contain three kinds of
   637  // ID-related entry:
   638  // - ConfChangeAddNode, in which case the contained ID will be added into the set.
   639  // - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
   640  // - ConfChangeAddLearnerNode, in which the contained ID will be added into the set.
   641  func getIDs(lg *zap.Logger, snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
   642  	ids := make(map[uint64]bool)
   643  	if snap != nil {
   644  		for _, id := range snap.Metadata.ConfState.Voters {
   645  			ids[id] = true
   646  		}
   647  	}
   648  	for _, e := range ents {
   649  		if e.Type != raftpb.EntryConfChange {
   650  			continue
   651  		}
   652  		var cc raftpb.ConfChange
   653  		pbutil.MustUnmarshal(&cc, e.Data)
   654  		switch cc.Type {
   655  		case raftpb.ConfChangeAddLearnerNode:
   656  			ids[cc.NodeID] = true
   657  		case raftpb.ConfChangeAddNode:
   658  			ids[cc.NodeID] = true
   659  		case raftpb.ConfChangeRemoveNode:
   660  			delete(ids, cc.NodeID)
   661  		case raftpb.ConfChangeUpdateNode:
   662  			// do nothing
   663  		default:
   664  			lg.Panic("unknown ConfChange Type", zap.String("type", cc.Type.String()))
   665  		}
   666  	}
   667  	sids := make(types.Uint64Slice, 0, len(ids))
   668  	for id := range ids {
   669  		sids = append(sids, id)
   670  	}
   671  	sort.Sort(sids)
   672  	return []uint64(sids)
   673  }
   674  
   675  // createConfigChangeEnts creates a series of Raft entries (i.e.
   676  // EntryConfChange) to remove the set of given IDs from the cluster. The ID
   677  // `self` is _not_ removed, even if present in the set.
   678  // If `self` is not inside the given ids, it creates a Raft entry to add a
   679  // default member with the given `self`.
   680  func createConfigChangeEnts(lg *zap.Logger, ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
   681  	found := false
   682  	for _, id := range ids {
   683  		if id == self {
   684  			found = true
   685  		}
   686  	}
   687  
   688  	var ents []raftpb.Entry
   689  	next := index + 1
   690  
   691  	// NB: always add self first, then remove other nodes. Raft will panic if the
   692  	// set of voters ever becomes empty.
   693  	if !found {
   694  		m := membership.Member{
   695  			ID:             types.ID(self),
   696  			RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
   697  		}
   698  		ctx, err := json.Marshal(m)
   699  		if err != nil {
   700  			lg.Panic("failed to marshal member", zap.Error(err))
   701  		}
   702  		cc := &raftpb.ConfChange{
   703  			Type:    raftpb.ConfChangeAddNode,
   704  			NodeID:  self,
   705  			Context: ctx,
   706  		}
   707  		e := raftpb.Entry{
   708  			Type:  raftpb.EntryConfChange,
   709  			Data:  pbutil.MustMarshal(cc),
   710  			Term:  term,
   711  			Index: next,
   712  		}
   713  		ents = append(ents, e)
   714  		next++
   715  	}
   716  
   717  	for _, id := range ids {
   718  		if id == self {
   719  			continue
   720  		}
   721  		cc := &raftpb.ConfChange{
   722  			Type:   raftpb.ConfChangeRemoveNode,
   723  			NodeID: id,
   724  		}
   725  		e := raftpb.Entry{
   726  			Type:  raftpb.EntryConfChange,
   727  			Data:  pbutil.MustMarshal(cc),
   728  			Term:  term,
   729  			Index: next,
   730  		}
   731  		ents = append(ents, e)
   732  		next++
   733  	}
   734  
   735  	return ents
   736  }
   737  

View as plain text