...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/server.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdserver
    16  
    17  import (
    18  	"context"
    19  	"encoding/json"
    20  	"expvar"
    21  	"fmt"
    22  	"math"
    23  	"math/rand"
    24  	"net/http"
    25  	"os"
    26  	"path"
    27  	"regexp"
    28  	"strconv"
    29  	"strings"
    30  	"sync"
    31  	"sync/atomic"
    32  	"time"
    33  
    34  	"github.com/coreos/go-semver/semver"
    35  	humanize "github.com/dustin/go-humanize"
    36  	"github.com/prometheus/client_golang/prometheus"
    37  	"go.etcd.io/etcd/server/v3/config"
    38  	"go.etcd.io/etcd/server/v3/wal/walpb"
    39  	"go.uber.org/zap"
    40  
    41  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    42  	"go.etcd.io/etcd/api/v3/membershippb"
    43  	"go.etcd.io/etcd/api/v3/version"
    44  	"go.etcd.io/etcd/client/pkg/v3/fileutil"
    45  	"go.etcd.io/etcd/client/pkg/v3/types"
    46  	"go.etcd.io/etcd/pkg/v3/idutil"
    47  	"go.etcd.io/etcd/pkg/v3/pbutil"
    48  	"go.etcd.io/etcd/pkg/v3/runtime"
    49  	"go.etcd.io/etcd/pkg/v3/schedule"
    50  	"go.etcd.io/etcd/pkg/v3/traceutil"
    51  	"go.etcd.io/etcd/pkg/v3/wait"
    52  	"go.etcd.io/etcd/raft/v3"
    53  	"go.etcd.io/etcd/raft/v3/raftpb"
    54  	"go.etcd.io/etcd/server/v3/auth"
    55  	"go.etcd.io/etcd/server/v3/etcdserver/api"
    56  	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
    57  	"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
    58  	"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
    59  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
    60  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2http/httptypes"
    61  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    62  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
    63  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
    64  	"go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
    65  	"go.etcd.io/etcd/server/v3/etcdserver/cindex"
    66  	"go.etcd.io/etcd/server/v3/lease"
    67  	"go.etcd.io/etcd/server/v3/lease/leasehttp"
    68  	"go.etcd.io/etcd/server/v3/mvcc"
    69  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    70  	"go.etcd.io/etcd/server/v3/wal"
    71  )
    72  
    73  const (
    74  	DefaultSnapshotCount = 100000
    75  
    76  	// DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
    77  	// to catch-up after compacting the raft storage entries.
    78  	// We expect the follower has a millisecond level latency with the leader.
    79  	// The max throughput is around 10K. Keep a 5K entries is enough for helping
    80  	// follower to catch up.
    81  	DefaultSnapshotCatchUpEntries uint64 = 5000
    82  
    83  	StoreClusterPrefix = "/0"
    84  	StoreKeysPrefix    = "/1"
    85  
    86  	// HealthInterval is the minimum time the cluster should be healthy
    87  	// before accepting add member requests.
    88  	HealthInterval = 5 * time.Second
    89  
    90  	purgeFileInterval = 30 * time.Second
    91  
    92  	// max number of in-flight snapshot messages etcdserver allows to have
    93  	// This number is more than enough for most clusters with 5 machines.
    94  	maxInFlightMsgSnap = 16
    95  
    96  	releaseDelayAfterSnapshot = 30 * time.Second
    97  
    98  	// maxPendingRevokes is the maximum number of outstanding expired lease revocations.
    99  	maxPendingRevokes = 16
   100  
   101  	recommendedMaxRequestBytes = 10 * 1024 * 1024
   102  
   103  	readyPercent = 0.9
   104  
   105  	DowngradeEnabledPath = "/downgrade/enabled"
   106  )
   107  
   108  var (
   109  	// monitorVersionInterval should be smaller than the timeout
   110  	// on the connection. Or we will not be able to reuse the connection
   111  	// (since it will timeout).
   112  	monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
   113  
   114  	recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes))
   115  	storeMemberAttributeRegexp       = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
   116  )
   117  
   118  func init() {
   119  	rand.Seed(time.Now().UnixNano())
   120  
   121  	expvar.Publish(
   122  		"file_descriptor_limit",
   123  		expvar.Func(
   124  			func() interface{} {
   125  				n, _ := runtime.FDLimit()
   126  				return n
   127  			},
   128  		),
   129  	)
   130  }
   131  
   132  type Response struct {
   133  	Term    uint64
   134  	Index   uint64
   135  	Event   *v2store.Event
   136  	Watcher v2store.Watcher
   137  	Err     error
   138  }
   139  
   140  type ServerV2 interface {
   141  	Server
   142  	Leader() types.ID
   143  
   144  	// Do takes a V2 request and attempts to fulfill it, returning a Response.
   145  	Do(ctx context.Context, r pb.Request) (Response, error)
   146  	stats.Stats
   147  	ClientCertAuthEnabled() bool
   148  }
   149  
   150  type ServerV3 interface {
   151  	Server
   152  	RaftStatusGetter
   153  }
   154  
   155  func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
   156  
   157  type Server interface {
   158  	// AddMember attempts to add a member into the cluster. It will return
   159  	// ErrIDRemoved if member ID is removed from the cluster, or return
   160  	// ErrIDExists if member ID exists in the cluster.
   161  	AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
   162  	// RemoveMember attempts to remove a member from the cluster. It will
   163  	// return ErrIDRemoved if member ID is removed from the cluster, or return
   164  	// ErrIDNotFound if member ID is not in the cluster.
   165  	RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
   166  	// UpdateMember attempts to update an existing member in the cluster. It will
   167  	// return ErrIDNotFound if the member ID does not exist.
   168  	UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
   169  	// PromoteMember attempts to promote a non-voting node to a voting node. It will
   170  	// return ErrIDNotFound if the member ID does not exist.
   171  	// return ErrLearnerNotReady if the member are not ready.
   172  	// return ErrMemberNotLearner if the member is not a learner.
   173  	PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
   174  
   175  	// ClusterVersion is the cluster-wide minimum major.minor version.
   176  	// Cluster version is set to the min version that an etcd member is
   177  	// compatible with when first bootstrap.
   178  	//
   179  	// ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
   180  	//
   181  	// During a rolling upgrades, the ClusterVersion will be updated
   182  	// automatically after a sync. (5 second by default)
   183  	//
   184  	// The API/raft component can utilize ClusterVersion to determine if
   185  	// it can accept a client request or a raft RPC.
   186  	// NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
   187  	// the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
   188  	// this feature is introduced post 2.0.
   189  	ClusterVersion() *semver.Version
   190  	Cluster() api.Cluster
   191  	Alarms() []*pb.AlarmMember
   192  
   193  	// LeaderChangedNotify returns a channel for application level code to be notified
   194  	// when etcd leader changes, this function is intend to be used only in application
   195  	// which embed etcd.
   196  	// Caution:
   197  	// 1. the returned channel is being closed when the leadership changes.
   198  	// 2. so the new channel needs to be obtained for each raft term.
   199  	// 3. user can loose some consecutive channel changes using this API.
   200  	LeaderChangedNotify() <-chan struct{}
   201  }
   202  
   203  // EtcdServer is the production implementation of the Server interface
   204  type EtcdServer struct {
   205  	// inflightSnapshots holds count the number of snapshots currently inflight.
   206  	inflightSnapshots int64  // must use atomic operations to access; keep 64-bit aligned.
   207  	appliedIndex      uint64 // must use atomic operations to access; keep 64-bit aligned.
   208  	committedIndex    uint64 // must use atomic operations to access; keep 64-bit aligned.
   209  	term              uint64 // must use atomic operations to access; keep 64-bit aligned.
   210  	lead              uint64 // must use atomic operations to access; keep 64-bit aligned.
   211  
   212  	consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
   213  	r            raftNode                 // uses 64-bit atomics; keep 64-bit aligned.
   214  
   215  	readych chan struct{}
   216  	Cfg     config.ServerConfig
   217  
   218  	lgMu *sync.RWMutex
   219  	lg   *zap.Logger
   220  
   221  	w wait.Wait
   222  
   223  	readMu sync.RWMutex
   224  	// read routine notifies etcd server that it waits for reading by sending an empty struct to
   225  	// readwaitC
   226  	readwaitc chan struct{}
   227  	// readNotifier is used to notify the read routine that it can process the request
   228  	// when there is no error
   229  	readNotifier *notifier
   230  
   231  	// stop signals the run goroutine should shutdown.
   232  	stop chan struct{}
   233  	// stopping is closed by run goroutine on shutdown.
   234  	stopping chan struct{}
   235  	// done is closed when all goroutines from start() complete.
   236  	done chan struct{}
   237  	// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
   238  	leaderChanged   chan struct{}
   239  	leaderChangedMu sync.RWMutex
   240  
   241  	errorc     chan error
   242  	id         types.ID
   243  	attributes membership.Attributes
   244  
   245  	cluster *membership.RaftCluster
   246  
   247  	v2store     v2store.Store
   248  	snapshotter *snap.Snapshotter
   249  
   250  	applyV2 ApplierV2
   251  
   252  	// applyV3 is the applier with auth and quotas
   253  	applyV3 applierV3
   254  	// applyV3Base is the core applier without auth or quotas
   255  	applyV3Base applierV3
   256  	// applyV3Internal is the applier for internal request
   257  	applyV3Internal applierV3Internal
   258  	applyWait       wait.WaitTime
   259  
   260  	kv         mvcc.WatchableKV
   261  	lessor     lease.Lessor
   262  	bemu       sync.Mutex
   263  	be         backend.Backend
   264  	beHooks    *backendHooks
   265  	authStore  auth.AuthStore
   266  	alarmStore *v3alarm.AlarmStore
   267  
   268  	stats  *stats.ServerStats
   269  	lstats *stats.LeaderStats
   270  
   271  	SyncTicker *time.Ticker
   272  	// compactor is used to auto-compact the KV.
   273  	compactor v3compactor.Compactor
   274  
   275  	// peerRt used to send requests (version, lease) to peers.
   276  	peerRt   http.RoundTripper
   277  	reqIDGen *idutil.Generator
   278  
   279  	// wgMu blocks concurrent waitgroup mutation while server stopping
   280  	wgMu sync.RWMutex
   281  	// wg is used to wait for the goroutines that depends on the server state
   282  	// to exit when stopping the server.
   283  	wg sync.WaitGroup
   284  
   285  	// ctx is used for etcd-initiated requests that may need to be canceled
   286  	// on etcd server shutdown.
   287  	ctx    context.Context
   288  	cancel context.CancelFunc
   289  
   290  	leadTimeMu      sync.RWMutex
   291  	leadElectedTime time.Time
   292  
   293  	firstCommitInTermMu sync.RWMutex
   294  	firstCommitInTermC  chan struct{}
   295  
   296  	*AccessController
   297  	corruptionChecker CorruptionChecker
   298  }
   299  
   300  type backendHooks struct {
   301  	indexer cindex.ConsistentIndexer
   302  	lg      *zap.Logger
   303  
   304  	// confState to be written in the next submitted backend transaction (if dirty)
   305  	confState raftpb.ConfState
   306  	// first write changes it to 'dirty'. false by default, so
   307  	// not initialized `confState` is meaningless.
   308  	confStateDirty bool
   309  	confStateLock  sync.Mutex
   310  }
   311  
   312  func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
   313  	bh.indexer.UnsafeSave(tx)
   314  	bh.confStateLock.Lock()
   315  	defer bh.confStateLock.Unlock()
   316  	if bh.confStateDirty {
   317  		membership.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
   318  		// save bh.confState
   319  		bh.confStateDirty = false
   320  	}
   321  }
   322  
   323  func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) {
   324  	bh.confStateLock.Lock()
   325  	defer bh.confStateLock.Unlock()
   326  	bh.confState = *confState
   327  	bh.confStateDirty = true
   328  }
   329  
   330  // NewServer creates a new EtcdServer from the supplied configuration. The
   331  // configuration is considered static for the lifetime of the EtcdServer.
   332  func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
   333  	st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
   334  
   335  	var (
   336  		w  *wal.WAL
   337  		n  raft.Node
   338  		s  *raft.MemoryStorage
   339  		id types.ID
   340  		cl *membership.RaftCluster
   341  	)
   342  
   343  	if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
   344  		cfg.Logger.Warn(
   345  			"exceeded recommended request limit",
   346  			zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
   347  			zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
   348  			zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
   349  			zap.String("recommended-request-size", recommendedMaxRequestBytesString),
   350  		)
   351  	}
   352  
   353  	if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
   354  		return nil, fmt.Errorf("cannot access data directory: %v", terr)
   355  	}
   356  
   357  	haveWAL := wal.Exist(cfg.WALDir())
   358  
   359  	if err = fileutil.TouchDirAll(cfg.Logger, cfg.SnapDir()); err != nil {
   360  		cfg.Logger.Fatal(
   361  			"failed to create snapshot directory",
   362  			zap.String("path", cfg.SnapDir()),
   363  			zap.Error(err),
   364  		)
   365  	}
   366  
   367  	if err = fileutil.RemoveMatchFile(cfg.Logger, cfg.SnapDir(), func(fileName string) bool {
   368  		return strings.HasPrefix(fileName, "tmp")
   369  	}); err != nil {
   370  		cfg.Logger.Error(
   371  			"failed to remove temp file(s) in snapshot directory",
   372  			zap.String("path", cfg.SnapDir()),
   373  			zap.Error(err),
   374  		)
   375  	}
   376  
   377  	ss := snap.New(cfg.Logger, cfg.SnapDir())
   378  
   379  	bepath := cfg.BackendPath()
   380  	beExist := fileutil.Exist(bepath)
   381  
   382  	ci := cindex.NewConsistentIndex(nil)
   383  	beHooks := &backendHooks{lg: cfg.Logger, indexer: ci}
   384  	be := openBackend(cfg, beHooks)
   385  	ci.SetBackend(be)
   386  	cindex.CreateMetaBucket(be.BatchTx())
   387  
   388  	if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
   389  		err := maybeDefragBackend(cfg, be)
   390  		if err != nil {
   391  			return nil, err
   392  		}
   393  	}
   394  
   395  	defer func() {
   396  		if be != nil && err != nil {
   397  			be.Close()
   398  		}
   399  	}()
   400  
   401  	prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
   402  	if err != nil {
   403  		return nil, err
   404  	}
   405  	var (
   406  		remotes  []*membership.Member
   407  		snapshot *raftpb.Snapshot
   408  	)
   409  
   410  	switch {
   411  	case !haveWAL && !cfg.NewCluster:
   412  		if err = cfg.VerifyJoinExisting(); err != nil {
   413  			return nil, err
   414  		}
   415  		cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
   416  		if err != nil {
   417  			return nil, err
   418  		}
   419  		existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt)
   420  		if gerr != nil {
   421  			return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
   422  		}
   423  		if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil {
   424  			return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
   425  		}
   426  		if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) {
   427  			return nil, fmt.Errorf("incompatible with current running cluster")
   428  		}
   429  
   430  		remotes = existingCluster.Members()
   431  		cl.SetID(types.ID(0), existingCluster.ID())
   432  		cl.SetStore(st)
   433  		cl.SetBackend(be)
   434  		id, n, s, w = startNode(cfg, cl, nil)
   435  		cl.SetID(id, existingCluster.ID())
   436  
   437  	case !haveWAL && cfg.NewCluster:
   438  		if err = cfg.VerifyBootstrap(); err != nil {
   439  			return nil, err
   440  		}
   441  		cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
   442  		if err != nil {
   443  			return nil, err
   444  		}
   445  		m := cl.MemberByName(cfg.Name)
   446  		if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) {
   447  			return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
   448  		}
   449  		if cfg.ShouldDiscover() {
   450  			var str string
   451  			str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
   452  			if err != nil {
   453  				return nil, &DiscoveryError{Op: "join", Err: err}
   454  			}
   455  			var urlsmap types.URLsMap
   456  			urlsmap, err = types.NewURLsMap(str)
   457  			if err != nil {
   458  				return nil, err
   459  			}
   460  			if config.CheckDuplicateURL(urlsmap) {
   461  				return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
   462  			}
   463  			if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil {
   464  				return nil, err
   465  			}
   466  		}
   467  		cl.SetStore(st)
   468  		cl.SetBackend(be)
   469  		id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
   470  		cl.SetID(id, cl.ID())
   471  
   472  	case haveWAL:
   473  		if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
   474  			return nil, fmt.Errorf("cannot write to member directory: %v", err)
   475  		}
   476  
   477  		if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
   478  			return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
   479  		}
   480  
   481  		if cfg.ShouldDiscover() {
   482  			cfg.Logger.Warn(
   483  				"discovery token is ignored since cluster already initialized; valid logs are found",
   484  				zap.String("wal-dir", cfg.WALDir()),
   485  			)
   486  		}
   487  
   488  		// Find a snapshot to start/restart a raft node
   489  		var walSnaps []walpb.Snapshot
   490  		walSnaps, err = wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
   491  		if err != nil {
   492  			return nil, err
   493  		}
   494  		// snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
   495  		// wal log entries
   496  		snapshot, err = ss.LoadNewestAvailable(walSnaps)
   497  		if err != nil && err != snap.ErrNoSnapshot {
   498  			return nil, err
   499  		}
   500  
   501  		if snapshot != nil {
   502  			if err = st.Recovery(snapshot.Data); err != nil {
   503  				cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
   504  			}
   505  
   506  			if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
   507  				cfg.Logger.Error("illegal v2store content", zap.Error(err))
   508  				return nil, err
   509  			}
   510  
   511  			cfg.Logger.Info(
   512  				"recovered v2 store from snapshot",
   513  				zap.Uint64("snapshot-index", snapshot.Metadata.Index),
   514  				zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
   515  			)
   516  
   517  			if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
   518  				cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
   519  			}
   520  			// A snapshot db may have already been recovered, and the old db should have
   521  			// already been closed in this case, so we should set the backend again.
   522  			ci.SetBackend(be)
   523  			s1, s2 := be.Size(), be.SizeInUse()
   524  			cfg.Logger.Info(
   525  				"recovered v3 backend from snapshot",
   526  				zap.Int64("backend-size-bytes", s1),
   527  				zap.String("backend-size", humanize.Bytes(uint64(s1))),
   528  				zap.Int64("backend-size-in-use-bytes", s2),
   529  				zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
   530  			)
   531  		} else {
   532  			cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
   533  		}
   534  
   535  		if !cfg.ForceNewCluster {
   536  			id, cl, n, s, w = restartNode(cfg, snapshot)
   537  		} else {
   538  			id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
   539  		}
   540  
   541  		cl.SetStore(st)
   542  		cl.SetBackend(be)
   543  		cl.Recover(api.UpdateCapability)
   544  		if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
   545  			os.RemoveAll(bepath)
   546  			return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
   547  		}
   548  
   549  	default:
   550  		return nil, fmt.Errorf("unsupported bootstrap config")
   551  	}
   552  
   553  	if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
   554  		return nil, fmt.Errorf("cannot access member directory: %v", terr)
   555  	}
   556  
   557  	sstats := stats.NewServerStats(cfg.Name, id.String())
   558  	lstats := stats.NewLeaderStats(cfg.Logger, id.String())
   559  
   560  	heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
   561  	srv = &EtcdServer{
   562  		readych:     make(chan struct{}),
   563  		Cfg:         cfg,
   564  		lgMu:        new(sync.RWMutex),
   565  		lg:          cfg.Logger,
   566  		errorc:      make(chan error, 1),
   567  		v2store:     st,
   568  		snapshotter: ss,
   569  		r: *newRaftNode(
   570  			raftNodeConfig{
   571  				lg:          cfg.Logger,
   572  				isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
   573  				Node:        n,
   574  				heartbeat:   heartbeat,
   575  				raftStorage: s,
   576  				storage:     NewStorage(w, ss),
   577  			},
   578  		),
   579  		id:                 id,
   580  		attributes:         membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
   581  		cluster:            cl,
   582  		stats:              sstats,
   583  		lstats:             lstats,
   584  		SyncTicker:         time.NewTicker(500 * time.Millisecond),
   585  		peerRt:             prt,
   586  		reqIDGen:           idutil.NewGenerator(uint16(id), time.Now()),
   587  		AccessController:   &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
   588  		consistIndex:       ci,
   589  		firstCommitInTermC: make(chan struct{}),
   590  	}
   591  	serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
   592  
   593  	srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
   594  
   595  	srv.be = be
   596  	srv.beHooks = beHooks
   597  	minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
   598  
   599  	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
   600  	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
   601  	srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
   602  		MinLeaseTTL:                int64(math.Ceil(minTTL.Seconds())),
   603  		CheckpointInterval:         cfg.LeaseCheckpointInterval,
   604  		CheckpointPersist:          cfg.LeaseCheckpointPersist,
   605  		ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
   606  	})
   607  
   608  	tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
   609  		func(index uint64) <-chan struct{} {
   610  			return srv.applyWait.Wait(index)
   611  		},
   612  		time.Duration(cfg.TokenTTL)*time.Second,
   613  	)
   614  	if err != nil {
   615  		cfg.Logger.Warn("failed to create token provider", zap.Error(err))
   616  		return nil, err
   617  	}
   618  	srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
   619  
   620  	kvindex := ci.ConsistentIndex()
   621  	srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
   622  	if beExist {
   623  		// TODO: remove kvindex != 0 checking when we do not expect users to upgrade
   624  		// etcd from pre-3.0 release.
   625  		if snapshot != nil && kvindex < snapshot.Metadata.Index {
   626  			if kvindex != 0 {
   627  				return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index)
   628  			}
   629  			cfg.Logger.Warn(
   630  				"consistent index was never saved",
   631  				zap.Uint64("snapshot-index", snapshot.Metadata.Index),
   632  			)
   633  		}
   634  	}
   635  	srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
   636  
   637  	srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost))
   638  
   639  	newSrv := srv // since srv == nil in defer if srv is returned as nil
   640  	defer func() {
   641  		// closing backend without first closing kv can cause
   642  		// resumed compactions to fail with closed tx errors
   643  		if err != nil {
   644  			newSrv.kv.Close()
   645  		}
   646  	}()
   647  	if num := cfg.AutoCompactionRetention; num != 0 {
   648  		srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
   649  		if err != nil {
   650  			return nil, err
   651  		}
   652  		srv.compactor.Run()
   653  	}
   654  
   655  	srv.applyV3Base = srv.newApplierV3Backend()
   656  	srv.applyV3Internal = srv.newApplierV3Internal()
   657  	if err = srv.restoreAlarms(); err != nil {
   658  		return nil, err
   659  	}
   660  
   661  	if srv.Cfg.EnableLeaseCheckpoint {
   662  		// setting checkpointer enables lease checkpoint feature.
   663  		srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
   664  			if !srv.ensureLeadership() {
   665  				srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader",
   666  					zap.Uint64("local-member-id", uint64(srv.ID())))
   667  				return lease.ErrNotPrimary
   668  			}
   669  
   670  			srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
   671  			return nil
   672  		})
   673  	}
   674  
   675  	// Set the hook after EtcdServer finishes the initialization to avoid
   676  	// the hook being called during the initialization process.
   677  	srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
   678  
   679  	// TODO: move transport initialization near the definition of remote
   680  	tr := &rafthttp.Transport{
   681  		Logger:      cfg.Logger,
   682  		TLSInfo:     cfg.PeerTLSInfo,
   683  		DialTimeout: cfg.PeerDialTimeout(),
   684  		ID:          id,
   685  		URLs:        cfg.PeerURLs,
   686  		ClusterID:   cl.ID(),
   687  		Raft:        srv,
   688  		Snapshotter: ss,
   689  		ServerStats: sstats,
   690  		LeaderStats: lstats,
   691  		ErrorC:      srv.errorc,
   692  	}
   693  	if err = tr.Start(); err != nil {
   694  		return nil, err
   695  	}
   696  	// add all remotes into transport
   697  	for _, m := range remotes {
   698  		if m.ID != id {
   699  			tr.AddRemote(m.ID, m.PeerURLs)
   700  		}
   701  	}
   702  	for _, m := range cl.Members() {
   703  		if m.ID != id {
   704  			tr.AddPeer(m.ID, m.PeerURLs)
   705  		}
   706  	}
   707  	srv.r.transport = tr
   708  
   709  	return srv, nil
   710  }
   711  
   712  // assertNoV2StoreContent -> depending on the deprecation stage, warns or report an error
   713  // if the v2store contains custom content.
   714  func assertNoV2StoreContent(lg *zap.Logger, st v2store.Store, deprecationStage config.V2DeprecationEnum) error {
   715  	metaOnly, err := membership.IsMetaStoreOnly(st)
   716  	if err != nil {
   717  		return err
   718  	}
   719  	if metaOnly {
   720  		return nil
   721  	}
   722  	if deprecationStage.IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) {
   723  		return fmt.Errorf("detected disallowed custom content in v2store for stage --v2-deprecation=%s", deprecationStage)
   724  	}
   725  	lg.Warn("detected custom v2store content. Etcd v3.5 is the last version allowing to access it using API v2. Please remove the content.")
   726  	return nil
   727  }
   728  
   729  func (s *EtcdServer) Logger() *zap.Logger {
   730  	s.lgMu.RLock()
   731  	l := s.lg
   732  	s.lgMu.RUnlock()
   733  	return l
   734  }
   735  
   736  func (s *EtcdServer) Config() config.ServerConfig {
   737  	return s.Cfg
   738  }
   739  
   740  func tickToDur(ticks int, tickMs uint) string {
   741  	return fmt.Sprintf("%v", time.Duration(ticks)*time.Duration(tickMs)*time.Millisecond)
   742  }
   743  
   744  func (s *EtcdServer) adjustTicks() {
   745  	lg := s.Logger()
   746  	clusterN := len(s.cluster.Members())
   747  
   748  	// single-node fresh start, or single-node recovers from snapshot
   749  	if clusterN == 1 {
   750  		ticks := s.Cfg.ElectionTicks - 1
   751  		lg.Info(
   752  			"started as single-node; fast-forwarding election ticks",
   753  			zap.String("local-member-id", s.ID().String()),
   754  			zap.Int("forward-ticks", ticks),
   755  			zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
   756  			zap.Int("election-ticks", s.Cfg.ElectionTicks),
   757  			zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
   758  		)
   759  		s.r.advanceTicks(ticks)
   760  		return
   761  	}
   762  
   763  	if !s.Cfg.InitialElectionTickAdvance {
   764  		lg.Info("skipping initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
   765  		return
   766  	}
   767  	lg.Info("starting initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
   768  
   769  	// retry up to "rafthttp.ConnReadTimeout", which is 5-sec
   770  	// until peer connection reports; otherwise:
   771  	// 1. all connections failed, or
   772  	// 2. no active peers, or
   773  	// 3. restarted single-node with no snapshot
   774  	// then, do nothing, because advancing ticks would have no effect
   775  	waitTime := rafthttp.ConnReadTimeout
   776  	itv := 50 * time.Millisecond
   777  	for i := int64(0); i < int64(waitTime/itv); i++ {
   778  		select {
   779  		case <-time.After(itv):
   780  		case <-s.stopping:
   781  			return
   782  		}
   783  
   784  		peerN := s.r.transport.ActivePeers()
   785  		if peerN > 1 {
   786  			// multi-node received peer connection reports
   787  			// adjust ticks, in case slow leader message receive
   788  			ticks := s.Cfg.ElectionTicks - 2
   789  
   790  			lg.Info(
   791  				"initialized peer connections; fast-forwarding election ticks",
   792  				zap.String("local-member-id", s.ID().String()),
   793  				zap.Int("forward-ticks", ticks),
   794  				zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
   795  				zap.Int("election-ticks", s.Cfg.ElectionTicks),
   796  				zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
   797  				zap.Int("active-remote-members", peerN),
   798  			)
   799  
   800  			s.r.advanceTicks(ticks)
   801  			return
   802  		}
   803  	}
   804  }
   805  
   806  // Start performs any initialization of the Server necessary for it to
   807  // begin serving requests. It must be called before Do or Process.
   808  // Start must be non-blocking; any long-running server functionality
   809  // should be implemented in goroutines.
   810  func (s *EtcdServer) Start() {
   811  	s.start()
   812  	s.GoAttach(func() { s.adjustTicks() })
   813  	// TODO: Switch to publishV3 in 3.6.
   814  	// Support for cluster_member_set_attr was added in 3.5.
   815  	s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
   816  	s.GoAttach(s.purgeFile)
   817  	s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
   818  	s.GoAttach(s.monitorVersions)
   819  	s.GoAttach(s.linearizableReadLoop)
   820  	s.GoAttach(s.monitorKVHash)
   821  	s.GoAttach(s.monitorCompactHash)
   822  	s.GoAttach(s.monitorDowngrade)
   823  }
   824  
   825  // start prepares and starts server in a new goroutine. It is no longer safe to
   826  // modify a server's fields after it has been sent to Start.
   827  // This function is just used for testing.
   828  func (s *EtcdServer) start() {
   829  	lg := s.Logger()
   830  
   831  	if s.Cfg.SnapshotCount == 0 {
   832  		lg.Info(
   833  			"updating snapshot-count to default",
   834  			zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
   835  			zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
   836  		)
   837  		s.Cfg.SnapshotCount = DefaultSnapshotCount
   838  	}
   839  	if s.Cfg.SnapshotCatchUpEntries == 0 {
   840  		lg.Info(
   841  			"updating snapshot catch-up entries to default",
   842  			zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
   843  			zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
   844  		)
   845  		s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
   846  	}
   847  
   848  	s.w = wait.New()
   849  	s.applyWait = wait.NewTimeList()
   850  	s.done = make(chan struct{})
   851  	s.stop = make(chan struct{})
   852  	s.stopping = make(chan struct{}, 1)
   853  	s.ctx, s.cancel = context.WithCancel(context.Background())
   854  	s.readwaitc = make(chan struct{}, 1)
   855  	s.readNotifier = newNotifier()
   856  	s.leaderChanged = make(chan struct{})
   857  	if s.ClusterVersion() != nil {
   858  		lg.Info(
   859  			"starting etcd server",
   860  			zap.String("local-member-id", s.ID().String()),
   861  			zap.String("local-server-version", version.Version),
   862  			zap.String("cluster-id", s.Cluster().ID().String()),
   863  			zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())),
   864  		)
   865  		membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1)
   866  	} else {
   867  		lg.Info(
   868  			"starting etcd server",
   869  			zap.String("local-member-id", s.ID().String()),
   870  			zap.String("local-server-version", version.Version),
   871  			zap.String("cluster-version", "to_be_decided"),
   872  		)
   873  	}
   874  
   875  	// TODO: if this is an empty log, writes all peer infos
   876  	// into the first entry
   877  	go s.run()
   878  }
   879  
   880  func (s *EtcdServer) purgeFile() {
   881  	lg := s.Logger()
   882  	var dberrc, serrc, werrc <-chan error
   883  	var dbdonec, sdonec, wdonec <-chan struct{}
   884  	if s.Cfg.MaxSnapFiles > 0 {
   885  		dbdonec, dberrc = fileutil.PurgeFileWithoutFlock(lg, s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
   886  		sdonec, serrc = fileutil.PurgeFileWithoutFlock(lg, s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
   887  	}
   888  	if s.Cfg.MaxWALFiles > 0 {
   889  		wdonec, werrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping)
   890  	}
   891  
   892  	select {
   893  	case e := <-dberrc:
   894  		lg.Fatal("failed to purge snap db file", zap.Error(e))
   895  	case e := <-serrc:
   896  		lg.Fatal("failed to purge snap file", zap.Error(e))
   897  	case e := <-werrc:
   898  		lg.Fatal("failed to purge wal file", zap.Error(e))
   899  	case <-s.stopping:
   900  		if dbdonec != nil {
   901  			<-dbdonec
   902  		}
   903  		if sdonec != nil {
   904  			<-sdonec
   905  		}
   906  		if wdonec != nil {
   907  			<-wdonec
   908  		}
   909  		return
   910  	}
   911  }
   912  
   913  func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
   914  
   915  func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
   916  
   917  type ServerPeer interface {
   918  	ServerV2
   919  	RaftHandler() http.Handler
   920  	LeaseHandler() http.Handler
   921  }
   922  
   923  func (s *EtcdServer) LeaseHandler() http.Handler {
   924  	if s.lessor == nil {
   925  		return nil
   926  	}
   927  	return leasehttp.NewHandler(s.lessor, s.ApplyWait)
   928  }
   929  
   930  func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
   931  
   932  type ServerPeerV2 interface {
   933  	ServerPeer
   934  	HashKVHandler() http.Handler
   935  	DowngradeEnabledHandler() http.Handler
   936  }
   937  
   938  func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() }
   939  
   940  type downgradeEnabledHandler struct {
   941  	lg      *zap.Logger
   942  	cluster api.Cluster
   943  	server  *EtcdServer
   944  }
   945  
   946  func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
   947  	return &downgradeEnabledHandler{
   948  		lg:      s.Logger(),
   949  		cluster: s.cluster,
   950  		server:  s,
   951  	}
   952  }
   953  
   954  func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   955  	if r.Method != http.MethodGet {
   956  		w.Header().Set("Allow", http.MethodGet)
   957  		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
   958  		return
   959  	}
   960  
   961  	w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
   962  
   963  	if r.URL.Path != DowngradeEnabledPath {
   964  		http.Error(w, "bad path", http.StatusBadRequest)
   965  		return
   966  	}
   967  
   968  	ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout())
   969  	defer cancel()
   970  
   971  	// serve with linearized downgrade info
   972  	if err := h.server.linearizableReadNotify(ctx); err != nil {
   973  		http.Error(w, fmt.Sprintf("failed linearized read: %v", err),
   974  			http.StatusInternalServerError)
   975  		return
   976  	}
   977  	enabled := h.server.DowngradeInfo().Enabled
   978  	w.Header().Set("Content-Type", "text/plain")
   979  	w.Write([]byte(strconv.FormatBool(enabled)))
   980  }
   981  
   982  // Process takes a raft message and applies it to the server's raft state
   983  // machine, respecting any timeout of the given context.
   984  func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
   985  	lg := s.Logger()
   986  	if s.cluster.IsIDRemoved(types.ID(m.From)) {
   987  		lg.Warn(
   988  			"rejected Raft message from removed member",
   989  			zap.String("local-member-id", s.ID().String()),
   990  			zap.String("removed-member-id", types.ID(m.From).String()),
   991  		)
   992  		return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
   993  	}
   994  	if m.Type == raftpb.MsgApp {
   995  		s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
   996  	}
   997  	return s.r.Step(ctx, m)
   998  }
   999  
  1000  func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
  1001  
  1002  func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
  1003  
  1004  // ReportSnapshot reports snapshot sent status to the raft state machine,
  1005  // and clears the used snapshot from the snapshot store.
  1006  func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
  1007  	s.r.ReportSnapshot(id, status)
  1008  }
  1009  
  1010  type etcdProgress struct {
  1011  	confState raftpb.ConfState
  1012  	snapi     uint64
  1013  	appliedt  uint64
  1014  	appliedi  uint64
  1015  }
  1016  
  1017  // raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
  1018  // and helps decouple state machine logic from Raft algorithms.
  1019  // TODO: add a state machine interface to apply the commit entries and do snapshot/recover
  1020  type raftReadyHandler struct {
  1021  	getLead              func() (lead uint64)
  1022  	updateLead           func(lead uint64)
  1023  	updateLeadership     func(newLeader bool)
  1024  	updateCommittedIndex func(uint64)
  1025  }
  1026  
  1027  func (s *EtcdServer) run() {
  1028  	lg := s.Logger()
  1029  
  1030  	sn, err := s.r.raftStorage.Snapshot()
  1031  	if err != nil {
  1032  		lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
  1033  	}
  1034  
  1035  	// asynchronously accept apply packets, dispatch progress in-order
  1036  	sched := schedule.NewFIFOScheduler()
  1037  
  1038  	var (
  1039  		smu   sync.RWMutex
  1040  		syncC <-chan time.Time
  1041  	)
  1042  	setSyncC := func(ch <-chan time.Time) {
  1043  		smu.Lock()
  1044  		syncC = ch
  1045  		smu.Unlock()
  1046  	}
  1047  	getSyncC := func() (ch <-chan time.Time) {
  1048  		smu.RLock()
  1049  		ch = syncC
  1050  		smu.RUnlock()
  1051  		return
  1052  	}
  1053  	rh := &raftReadyHandler{
  1054  		getLead:    func() (lead uint64) { return s.getLead() },
  1055  		updateLead: func(lead uint64) { s.setLead(lead) },
  1056  		updateLeadership: func(newLeader bool) {
  1057  			if !s.isLeader() {
  1058  				if s.lessor != nil {
  1059  					s.lessor.Demote()
  1060  				}
  1061  				if s.compactor != nil {
  1062  					s.compactor.Pause()
  1063  				}
  1064  				setSyncC(nil)
  1065  			} else {
  1066  				if newLeader {
  1067  					t := time.Now()
  1068  					s.leadTimeMu.Lock()
  1069  					s.leadElectedTime = t
  1070  					s.leadTimeMu.Unlock()
  1071  				}
  1072  				setSyncC(s.SyncTicker.C)
  1073  				if s.compactor != nil {
  1074  					s.compactor.Resume()
  1075  				}
  1076  			}
  1077  			if newLeader {
  1078  				s.leaderChangedMu.Lock()
  1079  				lc := s.leaderChanged
  1080  				s.leaderChanged = make(chan struct{})
  1081  				close(lc)
  1082  				s.leaderChangedMu.Unlock()
  1083  			}
  1084  			// TODO: remove the nil checking
  1085  			// current test utility does not provide the stats
  1086  			if s.stats != nil {
  1087  				s.stats.BecomeLeader()
  1088  			}
  1089  		},
  1090  		updateCommittedIndex: func(ci uint64) {
  1091  			cci := s.getCommittedIndex()
  1092  			if ci > cci {
  1093  				s.setCommittedIndex(ci)
  1094  			}
  1095  		},
  1096  	}
  1097  	s.r.start(rh)
  1098  
  1099  	ep := etcdProgress{
  1100  		confState: sn.Metadata.ConfState,
  1101  		snapi:     sn.Metadata.Index,
  1102  		appliedt:  sn.Metadata.Term,
  1103  		appliedi:  sn.Metadata.Index,
  1104  	}
  1105  
  1106  	defer func() {
  1107  		s.wgMu.Lock() // block concurrent waitgroup adds in GoAttach while stopping
  1108  		close(s.stopping)
  1109  		s.wgMu.Unlock()
  1110  		s.cancel()
  1111  		sched.Stop()
  1112  
  1113  		// wait for gouroutines before closing raft so wal stays open
  1114  		s.wg.Wait()
  1115  
  1116  		s.SyncTicker.Stop()
  1117  
  1118  		// must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
  1119  		// by adding a peer after raft stops the transport
  1120  		s.r.stop()
  1121  
  1122  		s.Cleanup()
  1123  
  1124  		close(s.done)
  1125  	}()
  1126  
  1127  	var expiredLeaseC <-chan []*lease.Lease
  1128  	if s.lessor != nil {
  1129  		expiredLeaseC = s.lessor.ExpiredLeasesC()
  1130  	}
  1131  
  1132  	for {
  1133  		select {
  1134  		case ap := <-s.r.apply():
  1135  			f := func(context.Context) { s.applyAll(&ep, &ap) }
  1136  			sched.Schedule(f)
  1137  		case leases := <-expiredLeaseC:
  1138  			s.revokeExpiredLeases(leases)
  1139  		case err := <-s.errorc:
  1140  			lg.Warn("server error", zap.Error(err))
  1141  			lg.Warn("data-dir used by this member must be removed")
  1142  			return
  1143  		case <-getSyncC():
  1144  			if s.v2store.HasTTLKeys() {
  1145  				s.sync(s.Cfg.ReqTimeout())
  1146  			}
  1147  		case <-s.stop:
  1148  			return
  1149  		}
  1150  	}
  1151  }
  1152  
  1153  func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
  1154  	s.GoAttach(func() {
  1155  		// We shouldn't revoke any leases if current member isn't a leader,
  1156  		// because the operation should only be performed by the leader. When
  1157  		// the leader gets blocked on the raft loop, such as writing WAL entries,
  1158  		// it can't process any events or messages from raft. It may think it
  1159  		// is still the leader even the leader has already changed.
  1160  		// Refer to https://github.com/etcd-io/etcd/issues/15247
  1161  		lg := s.Logger()
  1162  		if !s.ensureLeadership() {
  1163  			lg.Warn("Ignore the lease revoking request because current member isn't a leader",
  1164  				zap.Uint64("local-member-id", uint64(s.ID())))
  1165  			return
  1166  		}
  1167  
  1168  		// Increases throughput of expired leases deletion process through parallelization
  1169  		c := make(chan struct{}, maxPendingRevokes)
  1170  		for _, curLease := range leases {
  1171  			select {
  1172  			case c <- struct{}{}:
  1173  			case <-s.stopping:
  1174  				return
  1175  			}
  1176  
  1177  			f := func(lid int64) {
  1178  				s.GoAttach(func() {
  1179  					ctx := s.authStore.WithRoot(s.ctx)
  1180  					_, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid})
  1181  					if lerr == nil {
  1182  						leaseExpired.Inc()
  1183  					} else {
  1184  						lg.Warn(
  1185  							"failed to revoke lease",
  1186  							zap.String("lease-id", fmt.Sprintf("%016x", lid)),
  1187  							zap.Error(lerr),
  1188  						)
  1189  					}
  1190  
  1191  					<-c
  1192  				})
  1193  			}
  1194  
  1195  			f(int64(curLease.ID))
  1196  		}
  1197  	})
  1198  }
  1199  
  1200  // ensureLeadership checks whether current member is still the leader.
  1201  func (s *EtcdServer) ensureLeadership() bool {
  1202  	lg := s.Logger()
  1203  
  1204  	ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
  1205  	defer cancel()
  1206  	if err := s.linearizableReadNotify(ctx); err != nil {
  1207  		lg.Warn("Failed to check current member's leadership",
  1208  			zap.Error(err))
  1209  		return false
  1210  	}
  1211  
  1212  	newLeaderId := s.raftStatus().Lead
  1213  	if newLeaderId != uint64(s.ID()) {
  1214  		lg.Warn("Current member isn't a leader",
  1215  			zap.Uint64("local-member-id", uint64(s.ID())),
  1216  			zap.Uint64("new-lead", newLeaderId))
  1217  		return false
  1218  	}
  1219  
  1220  	return true
  1221  }
  1222  
  1223  // Cleanup removes allocated objects by EtcdServer.NewServer in
  1224  // situation that EtcdServer::Start was not called (that takes care of cleanup).
  1225  func (s *EtcdServer) Cleanup() {
  1226  	// kv, lessor and backend can be nil if running without v3 enabled
  1227  	// or running unit tests.
  1228  	if s.lessor != nil {
  1229  		s.lessor.Stop()
  1230  	}
  1231  	if s.kv != nil {
  1232  		s.kv.Close()
  1233  	}
  1234  	if s.authStore != nil {
  1235  		s.authStore.Close()
  1236  	}
  1237  	if s.be != nil {
  1238  		s.be.Close()
  1239  	}
  1240  	if s.compactor != nil {
  1241  		s.compactor.Stop()
  1242  	}
  1243  }
  1244  
  1245  func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
  1246  	s.applySnapshot(ep, apply)
  1247  	s.applyEntries(ep, apply)
  1248  
  1249  	proposalsApplied.Set(float64(ep.appliedi))
  1250  	s.applyWait.Trigger(ep.appliedi)
  1251  
  1252  	// wait for the raft routine to finish the disk writes before triggering a
  1253  	// snapshot. or applied index might be greater than the last index in raft
  1254  	// storage, since the raft routine might be slower than apply routine.
  1255  	<-apply.notifyc
  1256  
  1257  	s.triggerSnapshot(ep)
  1258  	select {
  1259  	// snapshot requested via send()
  1260  	case m := <-s.r.msgSnapC:
  1261  		merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
  1262  		s.sendMergedSnap(merged)
  1263  	default:
  1264  	}
  1265  }
  1266  
  1267  func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
  1268  	if raft.IsEmptySnap(apply.snapshot) {
  1269  		return
  1270  	}
  1271  	applySnapshotInProgress.Inc()
  1272  
  1273  	lg := s.Logger()
  1274  	lg.Info(
  1275  		"applying snapshot",
  1276  		zap.Uint64("current-snapshot-index", ep.snapi),
  1277  		zap.Uint64("current-applied-index", ep.appliedi),
  1278  		zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
  1279  		zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
  1280  	)
  1281  	defer func() {
  1282  		lg.Info(
  1283  			"applied snapshot",
  1284  			zap.Uint64("current-snapshot-index", ep.snapi),
  1285  			zap.Uint64("current-applied-index", ep.appliedi),
  1286  			zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
  1287  			zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
  1288  		)
  1289  		applySnapshotInProgress.Dec()
  1290  	}()
  1291  
  1292  	if apply.snapshot.Metadata.Index <= ep.appliedi {
  1293  		lg.Panic(
  1294  			"unexpected leader snapshot from outdated index",
  1295  			zap.Uint64("current-snapshot-index", ep.snapi),
  1296  			zap.Uint64("current-applied-index", ep.appliedi),
  1297  			zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
  1298  			zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
  1299  		)
  1300  	}
  1301  
  1302  	// wait for raftNode to persist snapshot onto the disk
  1303  	<-apply.notifyc
  1304  
  1305  	newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
  1306  	if err != nil {
  1307  		lg.Panic("failed to open snapshot backend", zap.Error(err))
  1308  	}
  1309  
  1310  	// We need to set the backend to consistIndex before recovering the lessor,
  1311  	// because lessor.Recover will commit the boltDB transaction, accordingly it
  1312  	// will get the old consistent_index persisted into the db in OnPreCommitUnsafe.
  1313  	// Eventually the new consistent_index value coming from snapshot is overwritten
  1314  	// by the old value.
  1315  	s.consistIndex.SetBackend(newbe)
  1316  
  1317  	// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
  1318  	// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
  1319  	if s.lessor != nil {
  1320  		lg.Info("restoring lease store")
  1321  
  1322  		s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
  1323  
  1324  		lg.Info("restored lease store")
  1325  	}
  1326  
  1327  	lg.Info("restoring mvcc store")
  1328  
  1329  	if err := s.kv.Restore(newbe); err != nil {
  1330  		lg.Panic("failed to restore mvcc store", zap.Error(err))
  1331  	}
  1332  
  1333  	newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
  1334  	lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
  1335  
  1336  	// Closing old backend might block until all the txns
  1337  	// on the backend are finished.
  1338  	// We do not want to wait on closing the old backend.
  1339  	s.bemu.Lock()
  1340  	oldbe := s.be
  1341  	go func() {
  1342  		lg.Info("closing old backend file")
  1343  		defer func() {
  1344  			lg.Info("closed old backend file")
  1345  		}()
  1346  		if err := oldbe.Close(); err != nil {
  1347  			lg.Panic("failed to close old backend", zap.Error(err))
  1348  		}
  1349  	}()
  1350  
  1351  	s.be = newbe
  1352  	s.bemu.Unlock()
  1353  
  1354  	lg.Info("restoring alarm store")
  1355  
  1356  	if err := s.restoreAlarms(); err != nil {
  1357  		lg.Panic("failed to restore alarm store", zap.Error(err))
  1358  	}
  1359  
  1360  	lg.Info("restored alarm store")
  1361  
  1362  	if s.authStore != nil {
  1363  		lg.Info("restoring auth store")
  1364  
  1365  		s.authStore.Recover(newbe)
  1366  
  1367  		lg.Info("restored auth store")
  1368  	}
  1369  
  1370  	lg.Info("restoring v2 store")
  1371  	if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {
  1372  		lg.Panic("failed to restore v2 store", zap.Error(err))
  1373  	}
  1374  
  1375  	if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {
  1376  		lg.Panic("illegal v2store content", zap.Error(err))
  1377  	}
  1378  
  1379  	lg.Info("restored v2 store")
  1380  
  1381  	s.cluster.SetBackend(newbe)
  1382  
  1383  	lg.Info("restoring cluster configuration")
  1384  
  1385  	s.cluster.Recover(api.UpdateCapability)
  1386  
  1387  	lg.Info("restored cluster configuration")
  1388  	lg.Info("removing old peers from network")
  1389  
  1390  	// recover raft transport
  1391  	s.r.transport.RemoveAllPeers()
  1392  
  1393  	lg.Info("removed old peers from network")
  1394  	lg.Info("adding peers from new cluster configuration")
  1395  
  1396  	for _, m := range s.cluster.Members() {
  1397  		if m.ID == s.ID() {
  1398  			continue
  1399  		}
  1400  		s.r.transport.AddPeer(m.ID, m.PeerURLs)
  1401  	}
  1402  
  1403  	lg.Info("added peers from new cluster configuration")
  1404  
  1405  	ep.appliedt = apply.snapshot.Metadata.Term
  1406  	ep.appliedi = apply.snapshot.Metadata.Index
  1407  	ep.snapi = ep.appliedi
  1408  	ep.confState = apply.snapshot.Metadata.ConfState
  1409  }
  1410  
  1411  func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
  1412  	if len(apply.entries) == 0 {
  1413  		return
  1414  	}
  1415  	firsti := apply.entries[0].Index
  1416  	if firsti > ep.appliedi+1 {
  1417  		lg := s.Logger()
  1418  		lg.Panic(
  1419  			"unexpected committed entry index",
  1420  			zap.Uint64("current-applied-index", ep.appliedi),
  1421  			zap.Uint64("first-committed-entry-index", firsti),
  1422  		)
  1423  	}
  1424  	var ents []raftpb.Entry
  1425  	if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
  1426  		ents = apply.entries[ep.appliedi+1-firsti:]
  1427  	}
  1428  	if len(ents) == 0 {
  1429  		return
  1430  	}
  1431  	var shouldstop bool
  1432  	if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
  1433  		go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
  1434  	}
  1435  }
  1436  
  1437  func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
  1438  	if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount {
  1439  		return
  1440  	}
  1441  
  1442  	lg := s.Logger()
  1443  	lg.Info(
  1444  		"triggering snapshot",
  1445  		zap.String("local-member-id", s.ID().String()),
  1446  		zap.Uint64("local-member-applied-index", ep.appliedi),
  1447  		zap.Uint64("local-member-snapshot-index", ep.snapi),
  1448  		zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
  1449  	)
  1450  
  1451  	s.snapshot(ep.appliedi, ep.confState)
  1452  	ep.snapi = ep.appliedi
  1453  }
  1454  
  1455  func (s *EtcdServer) hasMultipleVotingMembers() bool {
  1456  	return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
  1457  }
  1458  
  1459  func (s *EtcdServer) isLeader() bool {
  1460  	return uint64(s.ID()) == s.Lead()
  1461  }
  1462  
  1463  // MoveLeader transfers the leader to the given transferee.
  1464  func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
  1465  	if !s.cluster.IsMemberExist(types.ID(transferee)) || s.cluster.Member(types.ID(transferee)).IsLearner {
  1466  		return ErrBadLeaderTransferee
  1467  	}
  1468  
  1469  	now := time.Now()
  1470  	interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
  1471  
  1472  	lg := s.Logger()
  1473  	lg.Info(
  1474  		"leadership transfer starting",
  1475  		zap.String("local-member-id", s.ID().String()),
  1476  		zap.String("current-leader-member-id", types.ID(lead).String()),
  1477  		zap.String("transferee-member-id", types.ID(transferee).String()),
  1478  	)
  1479  
  1480  	s.r.TransferLeadership(ctx, lead, transferee)
  1481  	for s.Lead() != transferee {
  1482  		select {
  1483  		case <-ctx.Done(): // time out
  1484  			return ErrTimeoutLeaderTransfer
  1485  		case <-time.After(interval):
  1486  		}
  1487  	}
  1488  
  1489  	// TODO: drain all requests, or drop all messages to the old leader
  1490  	lg.Info(
  1491  		"leadership transfer finished",
  1492  		zap.String("local-member-id", s.ID().String()),
  1493  		zap.String("old-leader-member-id", types.ID(lead).String()),
  1494  		zap.String("new-leader-member-id", types.ID(transferee).String()),
  1495  		zap.Duration("took", time.Since(now)),
  1496  	)
  1497  	return nil
  1498  }
  1499  
  1500  // TransferLeadership transfers the leader to the chosen transferee.
  1501  func (s *EtcdServer) TransferLeadership() error {
  1502  	lg := s.Logger()
  1503  	if !s.isLeader() {
  1504  		lg.Info(
  1505  			"skipped leadership transfer; local server is not leader",
  1506  			zap.String("local-member-id", s.ID().String()),
  1507  			zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
  1508  		)
  1509  		return nil
  1510  	}
  1511  
  1512  	if !s.hasMultipleVotingMembers() {
  1513  		lg.Info(
  1514  			"skipped leadership transfer for single voting member cluster",
  1515  			zap.String("local-member-id", s.ID().String()),
  1516  			zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
  1517  		)
  1518  		return nil
  1519  	}
  1520  
  1521  	transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs())
  1522  	if !ok {
  1523  		return ErrUnhealthy
  1524  	}
  1525  
  1526  	tm := s.Cfg.ReqTimeout()
  1527  	ctx, cancel := context.WithTimeout(s.ctx, tm)
  1528  	err := s.MoveLeader(ctx, s.Lead(), uint64(transferee))
  1529  	cancel()
  1530  	return err
  1531  }
  1532  
  1533  // HardStop stops the server without coordination with other members in the cluster.
  1534  func (s *EtcdServer) HardStop() {
  1535  	select {
  1536  	case s.stop <- struct{}{}:
  1537  	case <-s.done:
  1538  		return
  1539  	}
  1540  	<-s.done
  1541  }
  1542  
  1543  // Stop stops the server gracefully, and shuts down the running goroutine.
  1544  // Stop should be called after a Start(s), otherwise it will block forever.
  1545  // When stopping leader, Stop transfers its leadership to one of its peers
  1546  // before stopping the server.
  1547  // Stop terminates the Server and performs any necessary finalization.
  1548  // Do and Process cannot be called after Stop has been invoked.
  1549  func (s *EtcdServer) Stop() {
  1550  	lg := s.Logger()
  1551  	if err := s.TransferLeadership(); err != nil {
  1552  		lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err))
  1553  	}
  1554  	s.HardStop()
  1555  }
  1556  
  1557  // ReadyNotify returns a channel that will be closed when the server
  1558  // is ready to serve client requests
  1559  func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
  1560  
  1561  func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
  1562  	select {
  1563  	case <-time.After(d):
  1564  	case <-s.done:
  1565  	}
  1566  	select {
  1567  	case s.errorc <- err:
  1568  	default:
  1569  	}
  1570  }
  1571  
  1572  // StopNotify returns a channel that receives a empty struct
  1573  // when the server is stopped.
  1574  func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
  1575  
  1576  // StoppingNotify returns a channel that receives a empty struct
  1577  // when the server is being stopped.
  1578  func (s *EtcdServer) StoppingNotify() <-chan struct{} { return s.stopping }
  1579  
  1580  func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
  1581  
  1582  func (s *EtcdServer) LeaderStats() []byte {
  1583  	lead := s.getLead()
  1584  	if lead != uint64(s.id) {
  1585  		return nil
  1586  	}
  1587  	return s.lstats.JSON()
  1588  }
  1589  
  1590  func (s *EtcdServer) StoreStats() []byte { return s.v2store.JsonStats() }
  1591  
  1592  func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error {
  1593  	if s.authStore == nil {
  1594  		// In the context of ordinary etcd process, s.authStore will never be nil.
  1595  		// This branch is for handling cases in server_test.go
  1596  		return nil
  1597  	}
  1598  
  1599  	// Note that this permission check is done in the API layer,
  1600  	// so TOCTOU problem can be caused potentially in a schedule like this:
  1601  	// update membership with user A -> revoke root role of A -> apply membership change
  1602  	// in the state machine layer
  1603  	// However, both of membership change and role management requires the root privilege.
  1604  	// So careful operation by admins can prevent the problem.
  1605  	authInfo, err := s.AuthInfoFromCtx(ctx)
  1606  	if err != nil {
  1607  		return err
  1608  	}
  1609  
  1610  	return s.AuthStore().IsAdminPermitted(authInfo)
  1611  }
  1612  
  1613  func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
  1614  	if err := s.checkMembershipOperationPermission(ctx); err != nil {
  1615  		return nil, err
  1616  	}
  1617  
  1618  	// TODO: move Member to protobuf type
  1619  	b, err := json.Marshal(memb)
  1620  	if err != nil {
  1621  		return nil, err
  1622  	}
  1623  
  1624  	// by default StrictReconfigCheck is enabled; reject new members if unhealthy.
  1625  	if err := s.mayAddMember(memb); err != nil {
  1626  		return nil, err
  1627  	}
  1628  
  1629  	cc := raftpb.ConfChange{
  1630  		Type:    raftpb.ConfChangeAddNode,
  1631  		NodeID:  uint64(memb.ID),
  1632  		Context: b,
  1633  	}
  1634  
  1635  	if memb.IsLearner {
  1636  		cc.Type = raftpb.ConfChangeAddLearnerNode
  1637  	}
  1638  
  1639  	return s.configure(ctx, cc)
  1640  }
  1641  
  1642  func (s *EtcdServer) mayAddMember(memb membership.Member) error {
  1643  	lg := s.Logger()
  1644  	if !s.Cfg.StrictReconfigCheck {
  1645  		return nil
  1646  	}
  1647  
  1648  	// protect quorum when adding voting member
  1649  	if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() {
  1650  		lg.Warn(
  1651  			"rejecting member add request; not enough healthy members",
  1652  			zap.String("local-member-id", s.ID().String()),
  1653  			zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
  1654  			zap.Error(ErrNotEnoughStartedMembers),
  1655  		)
  1656  		return ErrNotEnoughStartedMembers
  1657  	}
  1658  
  1659  	if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) {
  1660  		lg.Warn(
  1661  			"rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
  1662  			zap.String("local-member-id", s.ID().String()),
  1663  			zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
  1664  			zap.Error(ErrUnhealthy),
  1665  		)
  1666  		return ErrUnhealthy
  1667  	}
  1668  
  1669  	return nil
  1670  }
  1671  
  1672  func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
  1673  	if err := s.checkMembershipOperationPermission(ctx); err != nil {
  1674  		return nil, err
  1675  	}
  1676  
  1677  	// by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss
  1678  	if err := s.mayRemoveMember(types.ID(id)); err != nil {
  1679  		return nil, err
  1680  	}
  1681  
  1682  	cc := raftpb.ConfChange{
  1683  		Type:   raftpb.ConfChangeRemoveNode,
  1684  		NodeID: id,
  1685  	}
  1686  	return s.configure(ctx, cc)
  1687  }
  1688  
  1689  // PromoteMember promotes a learner node to a voting node.
  1690  func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
  1691  	// only raft leader has information on whether the to-be-promoted learner node is ready. If promoteMember call
  1692  	// fails with ErrNotLeader, forward the request to leader node via HTTP. If promoteMember call fails with error
  1693  	// other than ErrNotLeader, return the error.
  1694  	resp, err := s.promoteMember(ctx, id)
  1695  	if err == nil {
  1696  		learnerPromoteSucceed.Inc()
  1697  		return resp, nil
  1698  	}
  1699  	if err != ErrNotLeader {
  1700  		learnerPromoteFailed.WithLabelValues(err.Error()).Inc()
  1701  		return resp, err
  1702  	}
  1703  
  1704  	cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
  1705  	defer cancel()
  1706  	// forward to leader
  1707  	for cctx.Err() == nil {
  1708  		leader, err := s.waitLeader(cctx)
  1709  		if err != nil {
  1710  			return nil, err
  1711  		}
  1712  		for _, url := range leader.PeerURLs {
  1713  			resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
  1714  			if err == nil {
  1715  				return resp, nil
  1716  			}
  1717  			// If member promotion failed, return early. Otherwise keep retry.
  1718  			if err == ErrLearnerNotReady || err == membership.ErrIDNotFound || err == membership.ErrMemberNotLearner {
  1719  				return nil, err
  1720  			}
  1721  		}
  1722  	}
  1723  
  1724  	if cctx.Err() == context.DeadlineExceeded {
  1725  		return nil, ErrTimeout
  1726  	}
  1727  	return nil, ErrCanceled
  1728  }
  1729  
  1730  // promoteMember checks whether the to-be-promoted learner node is ready before sending the promote
  1731  // request to raft.
  1732  // The function returns ErrNotLeader if the local node is not raft leader (therefore does not have
  1733  // enough information to determine if the learner node is ready), returns ErrLearnerNotReady if the
  1734  // local node is leader (therefore has enough information) but decided the learner node is not ready
  1735  // to be promoted.
  1736  func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
  1737  	if err := s.checkMembershipOperationPermission(ctx); err != nil {
  1738  		return nil, err
  1739  	}
  1740  
  1741  	// check if we can promote this learner.
  1742  	if err := s.mayPromoteMember(types.ID(id)); err != nil {
  1743  		return nil, err
  1744  	}
  1745  
  1746  	// build the context for the promote confChange. mark IsLearner to false and IsPromote to true.
  1747  	promoteChangeContext := membership.ConfigChangeContext{
  1748  		Member: membership.Member{
  1749  			ID: types.ID(id),
  1750  		},
  1751  		IsPromote: true,
  1752  	}
  1753  
  1754  	b, err := json.Marshal(promoteChangeContext)
  1755  	if err != nil {
  1756  		return nil, err
  1757  	}
  1758  
  1759  	cc := raftpb.ConfChange{
  1760  		Type:    raftpb.ConfChangeAddNode,
  1761  		NodeID:  id,
  1762  		Context: b,
  1763  	}
  1764  
  1765  	return s.configure(ctx, cc)
  1766  }
  1767  
  1768  func (s *EtcdServer) mayPromoteMember(id types.ID) error {
  1769  	lg := s.Logger()
  1770  	err := s.isLearnerReady(uint64(id))
  1771  	if err != nil {
  1772  		return err
  1773  	}
  1774  
  1775  	if !s.Cfg.StrictReconfigCheck {
  1776  		return nil
  1777  	}
  1778  	if !s.cluster.IsReadyToPromoteMember(uint64(id)) {
  1779  		lg.Warn(
  1780  			"rejecting member promote request; not enough healthy members",
  1781  			zap.String("local-member-id", s.ID().String()),
  1782  			zap.String("requested-member-remove-id", id.String()),
  1783  			zap.Error(ErrNotEnoughStartedMembers),
  1784  		)
  1785  		return ErrNotEnoughStartedMembers
  1786  	}
  1787  
  1788  	return nil
  1789  }
  1790  
  1791  // check whether the learner catches up with leader or not.
  1792  // Note: it will return nil if member is not found in cluster or if member is not learner.
  1793  // These two conditions will be checked before apply phase later.
  1794  func (s *EtcdServer) isLearnerReady(id uint64) error {
  1795  	if err := s.waitAppliedIndex(); err != nil {
  1796  		return err
  1797  	}
  1798  
  1799  	rs := s.raftStatus()
  1800  
  1801  	// leader's raftStatus.Progress is not nil
  1802  	if rs.Progress == nil {
  1803  		return ErrNotLeader
  1804  	}
  1805  
  1806  	var learnerMatch uint64
  1807  	isFound := false
  1808  	leaderID := rs.ID
  1809  	for memberID, progress := range rs.Progress {
  1810  		if id == memberID {
  1811  			// check its status
  1812  			learnerMatch = progress.Match
  1813  			isFound = true
  1814  			break
  1815  		}
  1816  	}
  1817  
  1818  	// We should return an error in API directly, to avoid the request
  1819  	// being unnecessarily delivered to raft.
  1820  	if !isFound {
  1821  		return membership.ErrIDNotFound
  1822  	}
  1823  
  1824  	leaderMatch := rs.Progress[leaderID].Match
  1825  	// the learner's Match not caught up with leader yet
  1826  	if float64(learnerMatch) < float64(leaderMatch)*readyPercent {
  1827  		return ErrLearnerNotReady
  1828  	}
  1829  
  1830  	return nil
  1831  }
  1832  
  1833  func (s *EtcdServer) mayRemoveMember(id types.ID) error {
  1834  	if !s.Cfg.StrictReconfigCheck {
  1835  		return nil
  1836  	}
  1837  
  1838  	lg := s.Logger()
  1839  	isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner
  1840  	// no need to check quorum when removing non-voting member
  1841  	if isLearner {
  1842  		return nil
  1843  	}
  1844  
  1845  	if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) {
  1846  		lg.Warn(
  1847  			"rejecting member remove request; not enough healthy members",
  1848  			zap.String("local-member-id", s.ID().String()),
  1849  			zap.String("requested-member-remove-id", id.String()),
  1850  			zap.Error(ErrNotEnoughStartedMembers),
  1851  		)
  1852  		return ErrNotEnoughStartedMembers
  1853  	}
  1854  
  1855  	// downed member is safe to remove since it's not part of the active quorum
  1856  	if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() {
  1857  		return nil
  1858  	}
  1859  
  1860  	// protect quorum if some members are down
  1861  	m := s.cluster.VotingMembers()
  1862  	active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m)
  1863  	if (active - 1) < 1+((len(m)-1)/2) {
  1864  		lg.Warn(
  1865  			"rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum",
  1866  			zap.String("local-member-id", s.ID().String()),
  1867  			zap.String("requested-member-remove", id.String()),
  1868  			zap.Int("active-peers", active),
  1869  			zap.Error(ErrUnhealthy),
  1870  		)
  1871  		return ErrUnhealthy
  1872  	}
  1873  
  1874  	return nil
  1875  }
  1876  
  1877  func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
  1878  	b, merr := json.Marshal(memb)
  1879  	if merr != nil {
  1880  		return nil, merr
  1881  	}
  1882  
  1883  	if err := s.checkMembershipOperationPermission(ctx); err != nil {
  1884  		return nil, err
  1885  	}
  1886  	cc := raftpb.ConfChange{
  1887  		Type:    raftpb.ConfChangeUpdateNode,
  1888  		NodeID:  uint64(memb.ID),
  1889  		Context: b,
  1890  	}
  1891  	return s.configure(ctx, cc)
  1892  }
  1893  
  1894  func (s *EtcdServer) setCommittedIndex(v uint64) {
  1895  	atomic.StoreUint64(&s.committedIndex, v)
  1896  }
  1897  
  1898  func (s *EtcdServer) getCommittedIndex() uint64 {
  1899  	return atomic.LoadUint64(&s.committedIndex)
  1900  }
  1901  
  1902  func (s *EtcdServer) setAppliedIndex(v uint64) {
  1903  	atomic.StoreUint64(&s.appliedIndex, v)
  1904  }
  1905  
  1906  func (s *EtcdServer) getAppliedIndex() uint64 {
  1907  	return atomic.LoadUint64(&s.appliedIndex)
  1908  }
  1909  
  1910  func (s *EtcdServer) setTerm(v uint64) {
  1911  	atomic.StoreUint64(&s.term, v)
  1912  }
  1913  
  1914  func (s *EtcdServer) getTerm() uint64 {
  1915  	return atomic.LoadUint64(&s.term)
  1916  }
  1917  
  1918  func (s *EtcdServer) setLead(v uint64) {
  1919  	atomic.StoreUint64(&s.lead, v)
  1920  }
  1921  
  1922  func (s *EtcdServer) getLead() uint64 {
  1923  	return atomic.LoadUint64(&s.lead)
  1924  }
  1925  
  1926  func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
  1927  	s.leaderChangedMu.RLock()
  1928  	defer s.leaderChangedMu.RUnlock()
  1929  	return s.leaderChanged
  1930  }
  1931  
  1932  // FirstCommitInTermNotify returns channel that will be unlocked on first
  1933  // entry committed in new term, which is necessary for new leader to answer
  1934  // read-only requests (leader is not able to respond any read-only requests
  1935  // as long as linearizable semantic is required)
  1936  func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
  1937  	s.firstCommitInTermMu.RLock()
  1938  	defer s.firstCommitInTermMu.RUnlock()
  1939  	return s.firstCommitInTermC
  1940  }
  1941  
  1942  // RaftStatusGetter represents etcd server and Raft progress.
  1943  type RaftStatusGetter interface {
  1944  	ID() types.ID
  1945  	Leader() types.ID
  1946  	CommittedIndex() uint64
  1947  	AppliedIndex() uint64
  1948  	Term() uint64
  1949  }
  1950  
  1951  func (s *EtcdServer) ID() types.ID { return s.id }
  1952  
  1953  func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) }
  1954  
  1955  func (s *EtcdServer) Lead() uint64 { return s.getLead() }
  1956  
  1957  func (s *EtcdServer) CommittedIndex() uint64 { return s.getCommittedIndex() }
  1958  
  1959  func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
  1960  
  1961  func (s *EtcdServer) Term() uint64 { return s.getTerm() }
  1962  
  1963  type confChangeResponse struct {
  1964  	membs []*membership.Member
  1965  	err   error
  1966  }
  1967  
  1968  // configure sends a configuration change through consensus and
  1969  // then waits for it to be applied to the server. It
  1970  // will block until the change is performed or there is an error.
  1971  func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
  1972  	lg := s.Logger()
  1973  	cc.ID = s.reqIDGen.Next()
  1974  	ch := s.w.Register(cc.ID)
  1975  
  1976  	start := time.Now()
  1977  	if err := s.r.ProposeConfChange(ctx, cc); err != nil {
  1978  		s.w.Trigger(cc.ID, nil)
  1979  		return nil, err
  1980  	}
  1981  
  1982  	select {
  1983  	case x := <-ch:
  1984  		if x == nil {
  1985  			lg.Panic("failed to configure")
  1986  		}
  1987  		resp := x.(*confChangeResponse)
  1988  		lg.Info(
  1989  			"applied a configuration change through raft",
  1990  			zap.String("local-member-id", s.ID().String()),
  1991  			zap.String("raft-conf-change", cc.Type.String()),
  1992  			zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()),
  1993  		)
  1994  		return resp.membs, resp.err
  1995  
  1996  	case <-ctx.Done():
  1997  		s.w.Trigger(cc.ID, nil) // GC wait
  1998  		return nil, s.parseProposeCtxErr(ctx.Err(), start)
  1999  
  2000  	case <-s.stopping:
  2001  		return nil, ErrStopped
  2002  	}
  2003  }
  2004  
  2005  // sync proposes a SYNC request and is non-blocking.
  2006  // This makes no guarantee that the request will be proposed or performed.
  2007  // The request will be canceled after the given timeout.
  2008  func (s *EtcdServer) sync(timeout time.Duration) {
  2009  	req := pb.Request{
  2010  		Method: "SYNC",
  2011  		ID:     s.reqIDGen.Next(),
  2012  		Time:   time.Now().UnixNano(),
  2013  	}
  2014  	data := pbutil.MustMarshal(&req)
  2015  	// There is no promise that node has leader when do SYNC request,
  2016  	// so it uses goroutine to propose.
  2017  	ctx, cancel := context.WithTimeout(s.ctx, timeout)
  2018  	s.GoAttach(func() {
  2019  		s.r.Propose(ctx, data)
  2020  		cancel()
  2021  	})
  2022  }
  2023  
  2024  // publishV3 registers server information into the cluster using v3 request. The
  2025  // information is the JSON representation of this server's member struct, updated
  2026  // with the static clientURLs of the server.
  2027  // The function keeps attempting to register until it succeeds,
  2028  // or its server is stopped.
  2029  func (s *EtcdServer) publishV3(timeout time.Duration) {
  2030  	req := &membershippb.ClusterMemberAttrSetRequest{
  2031  		Member_ID: uint64(s.id),
  2032  		MemberAttributes: &membershippb.Attributes{
  2033  			Name:       s.attributes.Name,
  2034  			ClientUrls: s.attributes.ClientURLs,
  2035  		},
  2036  	}
  2037  	lg := s.Logger()
  2038  	for {
  2039  		select {
  2040  		case <-s.stopping:
  2041  			lg.Warn(
  2042  				"stopped publish because server is stopping",
  2043  				zap.String("local-member-id", s.ID().String()),
  2044  				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
  2045  				zap.Duration("publish-timeout", timeout),
  2046  			)
  2047  			return
  2048  
  2049  		default:
  2050  		}
  2051  
  2052  		ctx, cancel := context.WithTimeout(s.ctx, timeout)
  2053  		_, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterMemberAttrSet: req})
  2054  		cancel()
  2055  		switch err {
  2056  		case nil:
  2057  			close(s.readych)
  2058  			lg.Info(
  2059  				"published local member to cluster through raft",
  2060  				zap.String("local-member-id", s.ID().String()),
  2061  				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
  2062  				zap.String("cluster-id", s.cluster.ID().String()),
  2063  				zap.Duration("publish-timeout", timeout),
  2064  			)
  2065  			return
  2066  
  2067  		default:
  2068  			lg.Warn(
  2069  				"failed to publish local member to cluster through raft",
  2070  				zap.String("local-member-id", s.ID().String()),
  2071  				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
  2072  				zap.Duration("publish-timeout", timeout),
  2073  				zap.Error(err),
  2074  			)
  2075  		}
  2076  	}
  2077  }
  2078  
  2079  // publish registers server information into the cluster. The information
  2080  // is the JSON representation of this server's member struct, updated with the
  2081  // static clientURLs of the server.
  2082  // The function keeps attempting to register until it succeeds,
  2083  // or its server is stopped.
  2084  //
  2085  // Use v2 store to encode member attributes, and apply through Raft
  2086  // but does not go through v2 API endpoint, which means even with v2
  2087  // client handler disabled (e.g. --enable-v2=false), cluster can still
  2088  // process publish requests through rafthttp
  2089  // TODO: Remove in 3.6 (start using publishV3)
  2090  func (s *EtcdServer) publish(timeout time.Duration) {
  2091  	lg := s.Logger()
  2092  	b, err := json.Marshal(s.attributes)
  2093  	if err != nil {
  2094  		lg.Panic("failed to marshal JSON", zap.Error(err))
  2095  		return
  2096  	}
  2097  	req := pb.Request{
  2098  		Method: "PUT",
  2099  		Path:   membership.MemberAttributesStorePath(s.id),
  2100  		Val:    string(b),
  2101  	}
  2102  
  2103  	for {
  2104  		ctx, cancel := context.WithTimeout(s.ctx, timeout)
  2105  		_, err := s.Do(ctx, req)
  2106  		cancel()
  2107  		switch err {
  2108  		case nil:
  2109  			close(s.readych)
  2110  			lg.Info(
  2111  				"published local member to cluster through raft",
  2112  				zap.String("local-member-id", s.ID().String()),
  2113  				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
  2114  				zap.String("request-path", req.Path),
  2115  				zap.String("cluster-id", s.cluster.ID().String()),
  2116  				zap.Duration("publish-timeout", timeout),
  2117  			)
  2118  			return
  2119  
  2120  		case ErrStopped:
  2121  			lg.Warn(
  2122  				"stopped publish because server is stopped",
  2123  				zap.String("local-member-id", s.ID().String()),
  2124  				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
  2125  				zap.Duration("publish-timeout", timeout),
  2126  				zap.Error(err),
  2127  			)
  2128  			return
  2129  
  2130  		default:
  2131  			lg.Warn(
  2132  				"failed to publish local member to cluster through raft",
  2133  				zap.String("local-member-id", s.ID().String()),
  2134  				zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
  2135  				zap.String("request-path", req.Path),
  2136  				zap.Duration("publish-timeout", timeout),
  2137  				zap.Error(err),
  2138  			)
  2139  		}
  2140  	}
  2141  }
  2142  
  2143  func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
  2144  	atomic.AddInt64(&s.inflightSnapshots, 1)
  2145  
  2146  	lg := s.Logger()
  2147  	fields := []zap.Field{
  2148  		zap.String("from", s.ID().String()),
  2149  		zap.String("to", types.ID(merged.To).String()),
  2150  		zap.Int64("bytes", merged.TotalSize),
  2151  		zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
  2152  	}
  2153  
  2154  	now := time.Now()
  2155  	s.r.transport.SendSnapshot(merged)
  2156  	lg.Info("sending merged snapshot", fields...)
  2157  
  2158  	s.GoAttach(func() {
  2159  		select {
  2160  		case ok := <-merged.CloseNotify():
  2161  			// delay releasing inflight snapshot for another 30 seconds to
  2162  			// block log compaction.
  2163  			// If the follower still fails to catch up, it is probably just too slow
  2164  			// to catch up. We cannot avoid the snapshot cycle anyway.
  2165  			if ok {
  2166  				select {
  2167  				case <-time.After(releaseDelayAfterSnapshot):
  2168  				case <-s.stopping:
  2169  				}
  2170  			}
  2171  
  2172  			atomic.AddInt64(&s.inflightSnapshots, -1)
  2173  
  2174  			lg.Info("sent merged snapshot", append(fields, zap.Duration("took", time.Since(now)))...)
  2175  
  2176  		case <-s.stopping:
  2177  			lg.Warn("canceled sending merged snapshot; server stopping", fields...)
  2178  			return
  2179  		}
  2180  	})
  2181  }
  2182  
  2183  // apply takes entries received from Raft (after it has been committed) and
  2184  // applies them to the current state of the EtcdServer.
  2185  // The given entries should not be empty.
  2186  func (s *EtcdServer) apply(
  2187  	es []raftpb.Entry,
  2188  	confState *raftpb.ConfState,
  2189  ) (appliedt uint64, appliedi uint64, shouldStop bool) {
  2190  	s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
  2191  	for i := range es {
  2192  		e := es[i]
  2193  		s.lg.Debug("Applying entry",
  2194  			zap.Uint64("index", e.Index),
  2195  			zap.Uint64("term", e.Term),
  2196  			zap.Stringer("type", e.Type))
  2197  		switch e.Type {
  2198  		case raftpb.EntryNormal:
  2199  			// gofail: var beforeApplyOneEntryNormal struct{}
  2200  			s.applyEntryNormal(&e)
  2201  			s.setAppliedIndex(e.Index)
  2202  			s.setTerm(e.Term)
  2203  
  2204  		case raftpb.EntryConfChange:
  2205  			// We need to apply all WAL entries on top of v2store
  2206  			// and only 'unapplied' (e.Index>backend.ConsistentIndex) on the backend.
  2207  			shouldApplyV3 := membership.ApplyV2storeOnly
  2208  
  2209  			// set the consistent index of current executing entry
  2210  			if e.Index > s.consistIndex.ConsistentIndex() {
  2211  				s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
  2212  				shouldApplyV3 = membership.ApplyBoth
  2213  			}
  2214  
  2215  			var cc raftpb.ConfChange
  2216  			pbutil.MustUnmarshal(&cc, e.Data)
  2217  			removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
  2218  			s.setAppliedIndex(e.Index)
  2219  			s.setTerm(e.Term)
  2220  			shouldStop = shouldStop || removedSelf
  2221  			s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
  2222  
  2223  		default:
  2224  			lg := s.Logger()
  2225  			lg.Panic(
  2226  				"unknown entry type; must be either EntryNormal or EntryConfChange",
  2227  				zap.String("type", e.Type.String()),
  2228  			)
  2229  		}
  2230  		appliedi, appliedt = e.Index, e.Term
  2231  	}
  2232  	return appliedt, appliedi, shouldStop
  2233  }
  2234  
  2235  // applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
  2236  func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
  2237  	shouldApplyV3 := membership.ApplyV2storeOnly
  2238  	var ar *applyResult
  2239  	index := s.consistIndex.ConsistentIndex()
  2240  	if e.Index > index {
  2241  		// set the consistent index of current executing entry
  2242  		s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
  2243  		shouldApplyV3 = membership.ApplyBoth
  2244  		defer func() {
  2245  			// The txPostLockInsideApplyHook will not get called in some cases,
  2246  			// in which we should move the consistent index forward directly.
  2247  			newIndex := s.consistIndex.ConsistentIndex()
  2248  			if newIndex < e.Index {
  2249  				s.consistIndex.SetConsistentIndex(e.Index, e.Term)
  2250  			}
  2251  		}()
  2252  	}
  2253  	s.lg.Debug("apply entry normal",
  2254  		zap.Uint64("consistent-index", index),
  2255  		zap.Uint64("entry-index", e.Index),
  2256  		zap.Bool("should-applyV3", bool(shouldApplyV3)))
  2257  
  2258  	// raft state machine may generate noop entry when leader confirmation.
  2259  	// skip it in advance to avoid some potential bug in the future
  2260  	if len(e.Data) == 0 {
  2261  		s.notifyAboutFirstCommitInTerm()
  2262  
  2263  		// promote lessor when the local member is leader and finished
  2264  		// applying all entries from the last term.
  2265  		if s.isLeader() {
  2266  			s.lessor.Promote(s.Cfg.ElectionTimeout())
  2267  		}
  2268  		return
  2269  	}
  2270  
  2271  	var raftReq pb.InternalRaftRequest
  2272  	if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
  2273  		var r pb.Request
  2274  		rp := &r
  2275  		pbutil.MustUnmarshal(rp, e.Data)
  2276  		s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
  2277  		s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3))
  2278  		return
  2279  	}
  2280  	s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
  2281  
  2282  	if raftReq.V2 != nil {
  2283  		req := (*RequestV2)(raftReq.V2)
  2284  		s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3))
  2285  		return
  2286  	}
  2287  
  2288  	id := raftReq.ID
  2289  	if id == 0 {
  2290  		id = raftReq.Header.ID
  2291  	}
  2292  
  2293  	needResult := s.w.IsRegistered(id)
  2294  	if needResult || !noSideEffect(&raftReq) {
  2295  		if !needResult && raftReq.Txn != nil {
  2296  			removeNeedlessRangeReqs(raftReq.Txn)
  2297  		}
  2298  		ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
  2299  	}
  2300  
  2301  	// do not re-apply applied entries.
  2302  	if !shouldApplyV3 {
  2303  		return
  2304  	}
  2305  
  2306  	if ar == nil {
  2307  		return
  2308  	}
  2309  
  2310  	if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
  2311  		s.w.Trigger(id, ar)
  2312  		return
  2313  	}
  2314  
  2315  	lg := s.Logger()
  2316  	lg.Warn(
  2317  		"message exceeded backend quota; raising alarm",
  2318  		zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
  2319  		zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
  2320  		zap.Error(ar.err),
  2321  	)
  2322  
  2323  	s.GoAttach(func() {
  2324  		a := &pb.AlarmRequest{
  2325  			MemberID: uint64(s.ID()),
  2326  			Action:   pb.AlarmRequest_ACTIVATE,
  2327  			Alarm:    pb.AlarmType_NOSPACE,
  2328  		}
  2329  		s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
  2330  		s.w.Trigger(id, ar)
  2331  	})
  2332  }
  2333  
  2334  func (s *EtcdServer) notifyAboutFirstCommitInTerm() {
  2335  	newNotifier := make(chan struct{})
  2336  	s.firstCommitInTermMu.Lock()
  2337  	notifierToClose := s.firstCommitInTermC
  2338  	s.firstCommitInTermC = newNotifier
  2339  	s.firstCommitInTermMu.Unlock()
  2340  	close(notifierToClose)
  2341  }
  2342  
  2343  // applyConfChange applies a ConfChange to the server. It is only
  2344  // invoked with a ConfChange that has already passed through Raft
  2345  func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
  2346  	if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
  2347  		cc.NodeID = raft.None
  2348  		s.r.ApplyConfChange(cc)
  2349  
  2350  		// The txPostLock callback will not get called in this case,
  2351  		// so we should set the consistent index directly.
  2352  		if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
  2353  			applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
  2354  			s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
  2355  		}
  2356  		return false, err
  2357  	}
  2358  
  2359  	lg := s.Logger()
  2360  	*confState = *s.r.ApplyConfChange(cc)
  2361  	s.beHooks.SetConfState(confState)
  2362  	switch cc.Type {
  2363  	case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
  2364  		confChangeContext := new(membership.ConfigChangeContext)
  2365  		if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
  2366  			lg.Panic("failed to unmarshal member", zap.Error(err))
  2367  		}
  2368  		if cc.NodeID != uint64(confChangeContext.Member.ID) {
  2369  			lg.Panic(
  2370  				"got different member ID",
  2371  				zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
  2372  				zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
  2373  			)
  2374  		}
  2375  		if confChangeContext.IsPromote {
  2376  			s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3)
  2377  		} else {
  2378  			s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3)
  2379  
  2380  			if confChangeContext.Member.ID != s.id {
  2381  				s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
  2382  			}
  2383  		}
  2384  
  2385  		// update the isLearner metric when this server id is equal to the id in raft member confChange
  2386  		if confChangeContext.Member.ID == s.id {
  2387  			if cc.Type == raftpb.ConfChangeAddLearnerNode {
  2388  				isLearner.Set(1)
  2389  			} else {
  2390  				isLearner.Set(0)
  2391  			}
  2392  		}
  2393  
  2394  	case raftpb.ConfChangeRemoveNode:
  2395  		id := types.ID(cc.NodeID)
  2396  		s.cluster.RemoveMember(id, shouldApplyV3)
  2397  		if id == s.id {
  2398  			return true, nil
  2399  		}
  2400  		s.r.transport.RemovePeer(id)
  2401  
  2402  	case raftpb.ConfChangeUpdateNode:
  2403  		m := new(membership.Member)
  2404  		if err := json.Unmarshal(cc.Context, m); err != nil {
  2405  			lg.Panic("failed to unmarshal member", zap.Error(err))
  2406  		}
  2407  		if cc.NodeID != uint64(m.ID) {
  2408  			lg.Panic(
  2409  				"got different member ID",
  2410  				zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
  2411  				zap.String("member-id-from-message", m.ID.String()),
  2412  			)
  2413  		}
  2414  		s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
  2415  		if m.ID != s.id {
  2416  			s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
  2417  		}
  2418  	}
  2419  	return false, nil
  2420  }
  2421  
  2422  // TODO: non-blocking snapshot
  2423  func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
  2424  	clone := s.v2store.Clone()
  2425  	// commit kv to write metadata (for example: consistent index) to disk.
  2426  	//
  2427  	// This guarantees that Backend's consistent_index is >= index of last snapshot.
  2428  	//
  2429  	// KV().commit() updates the consistent index in backend.
  2430  	// All operations that update consistent index must be called sequentially
  2431  	// from applyAll function.
  2432  	// So KV().Commit() cannot run in parallel with apply. It has to be called outside
  2433  	// the go routine created below.
  2434  	s.KV().Commit()
  2435  
  2436  	s.GoAttach(func() {
  2437  		lg := s.Logger()
  2438  
  2439  		d, err := clone.SaveNoCopy()
  2440  		// TODO: current store will never fail to do a snapshot
  2441  		// what should we do if the store might fail?
  2442  		if err != nil {
  2443  			lg.Panic("failed to save v2 store", zap.Error(err))
  2444  		}
  2445  		snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
  2446  		if err != nil {
  2447  			// the snapshot was done asynchronously with the progress of raft.
  2448  			// raft might have already got a newer snapshot.
  2449  			if err == raft.ErrSnapOutOfDate {
  2450  				return
  2451  			}
  2452  			lg.Panic("failed to create snapshot", zap.Error(err))
  2453  		}
  2454  		// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
  2455  		if err = s.r.storage.SaveSnap(snap); err != nil {
  2456  			lg.Panic("failed to save snapshot", zap.Error(err))
  2457  		}
  2458  		if err = s.r.storage.Release(snap); err != nil {
  2459  			lg.Panic("failed to release wal", zap.Error(err))
  2460  		}
  2461  
  2462  		lg.Info(
  2463  			"saved snapshot",
  2464  			zap.Uint64("snapshot-index", snap.Metadata.Index),
  2465  		)
  2466  
  2467  		// When sending a snapshot, etcd will pause compaction.
  2468  		// After receives a snapshot, the slow follower needs to get all the entries right after
  2469  		// the snapshot sent to catch up. If we do not pause compaction, the log entries right after
  2470  		// the snapshot sent might already be compacted. It happens when the snapshot takes long time
  2471  		// to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
  2472  		if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
  2473  			lg.Info("skip compaction since there is an inflight snapshot")
  2474  			return
  2475  		}
  2476  
  2477  		// keep some in memory log entries for slow followers.
  2478  		compacti := uint64(1)
  2479  		if snapi > s.Cfg.SnapshotCatchUpEntries {
  2480  			compacti = snapi - s.Cfg.SnapshotCatchUpEntries
  2481  		}
  2482  
  2483  		err = s.r.raftStorage.Compact(compacti)
  2484  		if err != nil {
  2485  			// the compaction was done asynchronously with the progress of raft.
  2486  			// raft log might already been compact.
  2487  			if err == raft.ErrCompacted {
  2488  				return
  2489  			}
  2490  			lg.Panic("failed to compact", zap.Error(err))
  2491  		}
  2492  		lg.Info(
  2493  			"compacted Raft logs",
  2494  			zap.Uint64("compact-index", compacti),
  2495  		)
  2496  	})
  2497  }
  2498  
  2499  // CutPeer drops messages to the specified peer.
  2500  func (s *EtcdServer) CutPeer(id types.ID) {
  2501  	tr, ok := s.r.transport.(*rafthttp.Transport)
  2502  	if ok {
  2503  		tr.CutPeer(id)
  2504  	}
  2505  }
  2506  
  2507  // MendPeer recovers the message dropping behavior of the given peer.
  2508  func (s *EtcdServer) MendPeer(id types.ID) {
  2509  	tr, ok := s.r.transport.(*rafthttp.Transport)
  2510  	if ok {
  2511  		tr.MendPeer(id)
  2512  	}
  2513  }
  2514  
  2515  func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
  2516  
  2517  func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
  2518  
  2519  func (s *EtcdServer) ClusterVersion() *semver.Version {
  2520  	if s.cluster == nil {
  2521  		return nil
  2522  	}
  2523  	return s.cluster.Version()
  2524  }
  2525  
  2526  // monitorVersions checks the member's version every monitorVersionInterval.
  2527  // It updates the cluster version if all members agrees on a higher one.
  2528  // It prints out log if there is a member with a higher version than the
  2529  // local version.
  2530  // TODO switch to updateClusterVersionV3 in 3.6
  2531  func (s *EtcdServer) monitorVersions() {
  2532  	for {
  2533  		select {
  2534  		case <-s.FirstCommitInTermNotify():
  2535  		case <-time.After(monitorVersionInterval):
  2536  		case <-s.stopping:
  2537  			return
  2538  		}
  2539  
  2540  		if s.Leader() != s.ID() {
  2541  			continue
  2542  		}
  2543  
  2544  		v := decideClusterVersion(s.Logger(), getVersions(s.Logger(), s.cluster, s.id, s.peerRt))
  2545  		if v != nil {
  2546  			// only keep major.minor version for comparison
  2547  			v = &semver.Version{
  2548  				Major: v.Major,
  2549  				Minor: v.Minor,
  2550  			}
  2551  		}
  2552  
  2553  		// if the current version is nil:
  2554  		// 1. use the decided version if possible
  2555  		// 2. or use the min cluster version
  2556  		if s.cluster.Version() == nil {
  2557  			verStr := version.MinClusterVersion
  2558  			if v != nil {
  2559  				verStr = v.String()
  2560  			}
  2561  			s.GoAttach(func() { s.updateClusterVersionV2(verStr) })
  2562  			continue
  2563  		}
  2564  
  2565  		if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
  2566  			s.GoAttach(func() { s.updateClusterVersionV2(v.String()) })
  2567  		}
  2568  	}
  2569  }
  2570  
  2571  func (s *EtcdServer) monitorKVHash() {
  2572  	t := s.Cfg.CorruptCheckTime
  2573  	if t == 0 {
  2574  		return
  2575  	}
  2576  
  2577  	lg := s.Logger()
  2578  	lg.Info(
  2579  		"enabled corruption checking",
  2580  		zap.String("local-member-id", s.ID().String()),
  2581  		zap.Duration("interval", t),
  2582  	)
  2583  	for {
  2584  		select {
  2585  		case <-s.stopping:
  2586  			return
  2587  		case <-time.After(t):
  2588  		}
  2589  		if !s.isLeader() {
  2590  			continue
  2591  		}
  2592  		if err := s.corruptionChecker.PeriodicCheck(); err != nil {
  2593  			lg.Warn("failed to check hash KV", zap.Error(err))
  2594  		}
  2595  	}
  2596  }
  2597  
  2598  func (s *EtcdServer) monitorCompactHash() {
  2599  	if !s.Cfg.CompactHashCheckEnabled {
  2600  		return
  2601  	}
  2602  	t := s.Cfg.CompactHashCheckTime
  2603  	for {
  2604  		select {
  2605  		case <-time.After(t):
  2606  		case <-s.stopping:
  2607  			return
  2608  		}
  2609  		if !s.isLeader() {
  2610  			continue
  2611  		}
  2612  		s.corruptionChecker.CompactHashCheck()
  2613  	}
  2614  }
  2615  
  2616  func (s *EtcdServer) updateClusterVersionV2(ver string) {
  2617  	lg := s.Logger()
  2618  
  2619  	if s.cluster.Version() == nil {
  2620  		lg.Info(
  2621  			"setting up initial cluster version using v2 API",
  2622  			zap.String("cluster-version", version.Cluster(ver)),
  2623  		)
  2624  	} else {
  2625  		lg.Info(
  2626  			"updating cluster version using v2 API",
  2627  			zap.String("from", version.Cluster(s.cluster.Version().String())),
  2628  			zap.String("to", version.Cluster(ver)),
  2629  		)
  2630  	}
  2631  
  2632  	req := pb.Request{
  2633  		Method: "PUT",
  2634  		Path:   membership.StoreClusterVersionKey(),
  2635  		Val:    ver,
  2636  	}
  2637  
  2638  	ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
  2639  	_, err := s.Do(ctx, req)
  2640  	cancel()
  2641  
  2642  	switch err {
  2643  	case nil:
  2644  		lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
  2645  		return
  2646  
  2647  	case ErrStopped:
  2648  		lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
  2649  		return
  2650  
  2651  	default:
  2652  		lg.Warn("failed to update cluster version", zap.Error(err))
  2653  	}
  2654  }
  2655  
  2656  func (s *EtcdServer) updateClusterVersionV3(ver string) {
  2657  	lg := s.Logger()
  2658  
  2659  	if s.cluster.Version() == nil {
  2660  		lg.Info(
  2661  			"setting up initial cluster version using v3 API",
  2662  			zap.String("cluster-version", version.Cluster(ver)),
  2663  		)
  2664  	} else {
  2665  		lg.Info(
  2666  			"updating cluster version using v3 API",
  2667  			zap.String("from", version.Cluster(s.cluster.Version().String())),
  2668  			zap.String("to", version.Cluster(ver)),
  2669  		)
  2670  	}
  2671  
  2672  	req := membershippb.ClusterVersionSetRequest{Ver: ver}
  2673  
  2674  	ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
  2675  	_, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterVersionSet: &req})
  2676  	cancel()
  2677  
  2678  	switch err {
  2679  	case nil:
  2680  		lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
  2681  		return
  2682  
  2683  	case ErrStopped:
  2684  		lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
  2685  		return
  2686  
  2687  	default:
  2688  		lg.Warn("failed to update cluster version", zap.Error(err))
  2689  	}
  2690  }
  2691  
  2692  func (s *EtcdServer) monitorDowngrade() {
  2693  	t := s.Cfg.DowngradeCheckTime
  2694  	if t == 0 {
  2695  		return
  2696  	}
  2697  	lg := s.Logger()
  2698  	for {
  2699  		select {
  2700  		case <-time.After(t):
  2701  		case <-s.stopping:
  2702  			return
  2703  		}
  2704  
  2705  		if !s.isLeader() {
  2706  			continue
  2707  		}
  2708  
  2709  		d := s.cluster.DowngradeInfo()
  2710  		if !d.Enabled {
  2711  			continue
  2712  		}
  2713  
  2714  		targetVersion := d.TargetVersion
  2715  		v := semver.Must(semver.NewVersion(targetVersion))
  2716  		if isMatchedVersions(s.Logger(), v, getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) {
  2717  			lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
  2718  			ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
  2719  			if _, err := s.downgradeCancel(ctx); err != nil {
  2720  				lg.Warn("failed to cancel downgrade", zap.Error(err))
  2721  			}
  2722  			cancel()
  2723  		}
  2724  	}
  2725  }
  2726  
  2727  func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
  2728  	switch err {
  2729  	case context.Canceled:
  2730  		return ErrCanceled
  2731  
  2732  	case context.DeadlineExceeded:
  2733  		s.leadTimeMu.RLock()
  2734  		curLeadElected := s.leadElectedTime
  2735  		s.leadTimeMu.RUnlock()
  2736  		prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
  2737  		if start.After(prevLeadLost) && start.Before(curLeadElected) {
  2738  			return ErrTimeoutDueToLeaderFail
  2739  		}
  2740  		lead := types.ID(s.getLead())
  2741  		switch lead {
  2742  		case types.ID(raft.None):
  2743  			// TODO: return error to specify it happens because the cluster does not have leader now
  2744  		case s.ID():
  2745  			if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) {
  2746  				return ErrTimeoutDueToConnectionLost
  2747  			}
  2748  		default:
  2749  			if !isConnectedSince(s.r.transport, start, lead) {
  2750  				return ErrTimeoutDueToConnectionLost
  2751  			}
  2752  		}
  2753  		return ErrTimeout
  2754  
  2755  	default:
  2756  		return err
  2757  	}
  2758  }
  2759  
  2760  func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv }
  2761  func (s *EtcdServer) Backend() backend.Backend {
  2762  	s.bemu.Lock()
  2763  	defer s.bemu.Unlock()
  2764  	return s.be
  2765  }
  2766  
  2767  func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
  2768  
  2769  func (s *EtcdServer) restoreAlarms() error {
  2770  	s.applyV3 = s.newApplierV3()
  2771  	as, err := v3alarm.NewAlarmStore(s.lg, s)
  2772  	if err != nil {
  2773  		return err
  2774  	}
  2775  	s.alarmStore = as
  2776  	if len(as.Get(pb.AlarmType_NOSPACE)) > 0 {
  2777  		s.applyV3 = newApplierV3Capped(s.applyV3)
  2778  	}
  2779  	if len(as.Get(pb.AlarmType_CORRUPT)) > 0 {
  2780  		s.applyV3 = newApplierV3Corrupt(s.applyV3)
  2781  	}
  2782  	return nil
  2783  }
  2784  
  2785  // GoAttach creates a goroutine on a given function and tracks it using
  2786  // the etcdserver waitgroup.
  2787  // The passed function should interrupt on s.StoppingNotify().
  2788  func (s *EtcdServer) GoAttach(f func()) {
  2789  	s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
  2790  	defer s.wgMu.RUnlock()
  2791  	select {
  2792  	case <-s.stopping:
  2793  		lg := s.Logger()
  2794  		lg.Warn("server has stopped; skipping GoAttach")
  2795  		return
  2796  	default:
  2797  	}
  2798  
  2799  	// now safe to add since waitgroup wait has not started yet
  2800  	s.wg.Add(1)
  2801  	go func() {
  2802  		defer s.wg.Done()
  2803  		f()
  2804  	}()
  2805  }
  2806  
  2807  func (s *EtcdServer) Alarms() []*pb.AlarmMember {
  2808  	return s.alarmStore.Get(pb.AlarmType_NONE)
  2809  }
  2810  
  2811  // IsLearner returns if the local member is raft learner
  2812  func (s *EtcdServer) IsLearner() bool {
  2813  	return s.cluster.IsLocalMemberLearner()
  2814  }
  2815  
  2816  // IsMemberExist returns if the member with the given id exists in cluster.
  2817  func (s *EtcdServer) IsMemberExist(id types.ID) bool {
  2818  	return s.cluster.IsMemberExist(id)
  2819  }
  2820  
  2821  // raftStatus returns the raft status of this etcd node.
  2822  func (s *EtcdServer) raftStatus() raft.Status {
  2823  	return s.r.Node.Status()
  2824  }
  2825  
  2826  func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
  2827  	return func() {
  2828  		applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
  2829  		if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
  2830  			s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
  2831  		}
  2832  	}
  2833  }
  2834  
  2835  func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
  2836  	size := be.Size()
  2837  	sizeInUse := be.SizeInUse()
  2838  	freeableMemory := uint(size - sizeInUse)
  2839  	thresholdBytes := cfg.ExperimentalBootstrapDefragThresholdMegabytes * 1024 * 1024
  2840  	if freeableMemory < thresholdBytes {
  2841  		cfg.Logger.Info("Skipping defragmentation",
  2842  			zap.Int64("current-db-size-bytes", size),
  2843  			zap.String("current-db-size", humanize.Bytes(uint64(size))),
  2844  			zap.Int64("current-db-size-in-use-bytes", sizeInUse),
  2845  			zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))),
  2846  			zap.Uint("experimental-bootstrap-defrag-threshold-bytes", thresholdBytes),
  2847  			zap.String("experimental-bootstrap-defrag-threshold", humanize.Bytes(uint64(thresholdBytes))),
  2848  		)
  2849  		return nil
  2850  	}
  2851  	return be.Defrag()
  2852  }
  2853  
  2854  func (s *EtcdServer) CorruptionChecker() CorruptionChecker {
  2855  	return s.corruptionChecker
  2856  }
  2857  

View as plain text