1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdserver
16
17 import (
18 "context"
19 "encoding/json"
20 "expvar"
21 "fmt"
22 "math"
23 "math/rand"
24 "net/http"
25 "os"
26 "path"
27 "regexp"
28 "strconv"
29 "strings"
30 "sync"
31 "sync/atomic"
32 "time"
33
34 "github.com/coreos/go-semver/semver"
35 humanize "github.com/dustin/go-humanize"
36 "github.com/prometheus/client_golang/prometheus"
37 "go.etcd.io/etcd/server/v3/config"
38 "go.etcd.io/etcd/server/v3/wal/walpb"
39 "go.uber.org/zap"
40
41 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
42 "go.etcd.io/etcd/api/v3/membershippb"
43 "go.etcd.io/etcd/api/v3/version"
44 "go.etcd.io/etcd/client/pkg/v3/fileutil"
45 "go.etcd.io/etcd/client/pkg/v3/types"
46 "go.etcd.io/etcd/pkg/v3/idutil"
47 "go.etcd.io/etcd/pkg/v3/pbutil"
48 "go.etcd.io/etcd/pkg/v3/runtime"
49 "go.etcd.io/etcd/pkg/v3/schedule"
50 "go.etcd.io/etcd/pkg/v3/traceutil"
51 "go.etcd.io/etcd/pkg/v3/wait"
52 "go.etcd.io/etcd/raft/v3"
53 "go.etcd.io/etcd/raft/v3/raftpb"
54 "go.etcd.io/etcd/server/v3/auth"
55 "go.etcd.io/etcd/server/v3/etcdserver/api"
56 "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
57 "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
58 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
59 "go.etcd.io/etcd/server/v3/etcdserver/api/v2discovery"
60 "go.etcd.io/etcd/server/v3/etcdserver/api/v2http/httptypes"
61 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
62 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
63 "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm"
64 "go.etcd.io/etcd/server/v3/etcdserver/api/v3compactor"
65 "go.etcd.io/etcd/server/v3/etcdserver/cindex"
66 "go.etcd.io/etcd/server/v3/lease"
67 "go.etcd.io/etcd/server/v3/lease/leasehttp"
68 "go.etcd.io/etcd/server/v3/mvcc"
69 "go.etcd.io/etcd/server/v3/mvcc/backend"
70 "go.etcd.io/etcd/server/v3/wal"
71 )
72
73 const (
74 DefaultSnapshotCount = 100000
75
76
77
78
79
80
81 DefaultSnapshotCatchUpEntries uint64 = 5000
82
83 StoreClusterPrefix = "/0"
84 StoreKeysPrefix = "/1"
85
86
87
88 HealthInterval = 5 * time.Second
89
90 purgeFileInterval = 30 * time.Second
91
92
93
94 maxInFlightMsgSnap = 16
95
96 releaseDelayAfterSnapshot = 30 * time.Second
97
98
99 maxPendingRevokes = 16
100
101 recommendedMaxRequestBytes = 10 * 1024 * 1024
102
103 readyPercent = 0.9
104
105 DowngradeEnabledPath = "/downgrade/enabled"
106 )
107
108 var (
109
110
111
112 monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
113
114 recommendedMaxRequestBytesString = humanize.Bytes(uint64(recommendedMaxRequestBytes))
115 storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
116 )
117
118 func init() {
119 rand.Seed(time.Now().UnixNano())
120
121 expvar.Publish(
122 "file_descriptor_limit",
123 expvar.Func(
124 func() interface{} {
125 n, _ := runtime.FDLimit()
126 return n
127 },
128 ),
129 )
130 }
131
132 type Response struct {
133 Term uint64
134 Index uint64
135 Event *v2store.Event
136 Watcher v2store.Watcher
137 Err error
138 }
139
140 type ServerV2 interface {
141 Server
142 Leader() types.ID
143
144
145 Do(ctx context.Context, r pb.Request) (Response, error)
146 stats.Stats
147 ClientCertAuthEnabled() bool
148 }
149
150 type ServerV3 interface {
151 Server
152 RaftStatusGetter
153 }
154
155 func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
156
157 type Server interface {
158
159
160
161 AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
162
163
164
165 RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
166
167
168 UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
169
170
171
172
173 PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 ClusterVersion() *semver.Version
190 Cluster() api.Cluster
191 Alarms() []*pb.AlarmMember
192
193
194
195
196
197
198
199
200 LeaderChangedNotify() <-chan struct{}
201 }
202
203
204 type EtcdServer struct {
205
206 inflightSnapshots int64
207 appliedIndex uint64
208 committedIndex uint64
209 term uint64
210 lead uint64
211
212 consistIndex cindex.ConsistentIndexer
213 r raftNode
214
215 readych chan struct{}
216 Cfg config.ServerConfig
217
218 lgMu *sync.RWMutex
219 lg *zap.Logger
220
221 w wait.Wait
222
223 readMu sync.RWMutex
224
225
226 readwaitc chan struct{}
227
228
229 readNotifier *notifier
230
231
232 stop chan struct{}
233
234 stopping chan struct{}
235
236 done chan struct{}
237
238 leaderChanged chan struct{}
239 leaderChangedMu sync.RWMutex
240
241 errorc chan error
242 id types.ID
243 attributes membership.Attributes
244
245 cluster *membership.RaftCluster
246
247 v2store v2store.Store
248 snapshotter *snap.Snapshotter
249
250 applyV2 ApplierV2
251
252
253 applyV3 applierV3
254
255 applyV3Base applierV3
256
257 applyV3Internal applierV3Internal
258 applyWait wait.WaitTime
259
260 kv mvcc.WatchableKV
261 lessor lease.Lessor
262 bemu sync.Mutex
263 be backend.Backend
264 beHooks *backendHooks
265 authStore auth.AuthStore
266 alarmStore *v3alarm.AlarmStore
267
268 stats *stats.ServerStats
269 lstats *stats.LeaderStats
270
271 SyncTicker *time.Ticker
272
273 compactor v3compactor.Compactor
274
275
276 peerRt http.RoundTripper
277 reqIDGen *idutil.Generator
278
279
280 wgMu sync.RWMutex
281
282
283 wg sync.WaitGroup
284
285
286
287 ctx context.Context
288 cancel context.CancelFunc
289
290 leadTimeMu sync.RWMutex
291 leadElectedTime time.Time
292
293 firstCommitInTermMu sync.RWMutex
294 firstCommitInTermC chan struct{}
295
296 *AccessController
297 corruptionChecker CorruptionChecker
298 }
299
300 type backendHooks struct {
301 indexer cindex.ConsistentIndexer
302 lg *zap.Logger
303
304
305 confState raftpb.ConfState
306
307
308 confStateDirty bool
309 confStateLock sync.Mutex
310 }
311
312 func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) {
313 bh.indexer.UnsafeSave(tx)
314 bh.confStateLock.Lock()
315 defer bh.confStateLock.Unlock()
316 if bh.confStateDirty {
317 membership.MustUnsafeSaveConfStateToBackend(bh.lg, tx, &bh.confState)
318
319 bh.confStateDirty = false
320 }
321 }
322
323 func (bh *backendHooks) SetConfState(confState *raftpb.ConfState) {
324 bh.confStateLock.Lock()
325 defer bh.confStateLock.Unlock()
326 bh.confState = *confState
327 bh.confStateDirty = true
328 }
329
330
331
332 func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
333 st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
334
335 var (
336 w *wal.WAL
337 n raft.Node
338 s *raft.MemoryStorage
339 id types.ID
340 cl *membership.RaftCluster
341 )
342
343 if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
344 cfg.Logger.Warn(
345 "exceeded recommended request limit",
346 zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
347 zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
348 zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
349 zap.String("recommended-request-size", recommendedMaxRequestBytesString),
350 )
351 }
352
353 if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
354 return nil, fmt.Errorf("cannot access data directory: %v", terr)
355 }
356
357 haveWAL := wal.Exist(cfg.WALDir())
358
359 if err = fileutil.TouchDirAll(cfg.Logger, cfg.SnapDir()); err != nil {
360 cfg.Logger.Fatal(
361 "failed to create snapshot directory",
362 zap.String("path", cfg.SnapDir()),
363 zap.Error(err),
364 )
365 }
366
367 if err = fileutil.RemoveMatchFile(cfg.Logger, cfg.SnapDir(), func(fileName string) bool {
368 return strings.HasPrefix(fileName, "tmp")
369 }); err != nil {
370 cfg.Logger.Error(
371 "failed to remove temp file(s) in snapshot directory",
372 zap.String("path", cfg.SnapDir()),
373 zap.Error(err),
374 )
375 }
376
377 ss := snap.New(cfg.Logger, cfg.SnapDir())
378
379 bepath := cfg.BackendPath()
380 beExist := fileutil.Exist(bepath)
381
382 ci := cindex.NewConsistentIndex(nil)
383 beHooks := &backendHooks{lg: cfg.Logger, indexer: ci}
384 be := openBackend(cfg, beHooks)
385 ci.SetBackend(be)
386 cindex.CreateMetaBucket(be.BatchTx())
387
388 if cfg.ExperimentalBootstrapDefragThresholdMegabytes != 0 {
389 err := maybeDefragBackend(cfg, be)
390 if err != nil {
391 return nil, err
392 }
393 }
394
395 defer func() {
396 if be != nil && err != nil {
397 be.Close()
398 }
399 }()
400
401 prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
402 if err != nil {
403 return nil, err
404 }
405 var (
406 remotes []*membership.Member
407 snapshot *raftpb.Snapshot
408 )
409
410 switch {
411 case !haveWAL && !cfg.NewCluster:
412 if err = cfg.VerifyJoinExisting(); err != nil {
413 return nil, err
414 }
415 cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
416 if err != nil {
417 return nil, err
418 }
419 existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt)
420 if gerr != nil {
421 return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
422 }
423 if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil {
424 return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
425 }
426 if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) {
427 return nil, fmt.Errorf("incompatible with current running cluster")
428 }
429
430 remotes = existingCluster.Members()
431 cl.SetID(types.ID(0), existingCluster.ID())
432 cl.SetStore(st)
433 cl.SetBackend(be)
434 id, n, s, w = startNode(cfg, cl, nil)
435 cl.SetID(id, existingCluster.ID())
436
437 case !haveWAL && cfg.NewCluster:
438 if err = cfg.VerifyBootstrap(); err != nil {
439 return nil, err
440 }
441 cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
442 if err != nil {
443 return nil, err
444 }
445 m := cl.MemberByName(cfg.Name)
446 if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.BootstrapTimeoutEffective()) {
447 return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
448 }
449 if cfg.ShouldDiscover() {
450 var str string
451 str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
452 if err != nil {
453 return nil, &DiscoveryError{Op: "join", Err: err}
454 }
455 var urlsmap types.URLsMap
456 urlsmap, err = types.NewURLsMap(str)
457 if err != nil {
458 return nil, err
459 }
460 if config.CheckDuplicateURL(urlsmap) {
461 return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
462 }
463 if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil {
464 return nil, err
465 }
466 }
467 cl.SetStore(st)
468 cl.SetBackend(be)
469 id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
470 cl.SetID(id, cl.ID())
471
472 case haveWAL:
473 if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
474 return nil, fmt.Errorf("cannot write to member directory: %v", err)
475 }
476
477 if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
478 return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
479 }
480
481 if cfg.ShouldDiscover() {
482 cfg.Logger.Warn(
483 "discovery token is ignored since cluster already initialized; valid logs are found",
484 zap.String("wal-dir", cfg.WALDir()),
485 )
486 }
487
488
489 var walSnaps []walpb.Snapshot
490 walSnaps, err = wal.ValidSnapshotEntries(cfg.Logger, cfg.WALDir())
491 if err != nil {
492 return nil, err
493 }
494
495
496 snapshot, err = ss.LoadNewestAvailable(walSnaps)
497 if err != nil && err != snap.ErrNoSnapshot {
498 return nil, err
499 }
500
501 if snapshot != nil {
502 if err = st.Recovery(snapshot.Data); err != nil {
503 cfg.Logger.Panic("failed to recover from snapshot", zap.Error(err))
504 }
505
506 if err = assertNoV2StoreContent(cfg.Logger, st, cfg.V2Deprecation); err != nil {
507 cfg.Logger.Error("illegal v2store content", zap.Error(err))
508 return nil, err
509 }
510
511 cfg.Logger.Info(
512 "recovered v2 store from snapshot",
513 zap.Uint64("snapshot-index", snapshot.Metadata.Index),
514 zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
515 )
516
517 if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil {
518 cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
519 }
520
521
522 ci.SetBackend(be)
523 s1, s2 := be.Size(), be.SizeInUse()
524 cfg.Logger.Info(
525 "recovered v3 backend from snapshot",
526 zap.Int64("backend-size-bytes", s1),
527 zap.String("backend-size", humanize.Bytes(uint64(s1))),
528 zap.Int64("backend-size-in-use-bytes", s2),
529 zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
530 )
531 } else {
532 cfg.Logger.Info("No snapshot found. Recovering WAL from scratch!")
533 }
534
535 if !cfg.ForceNewCluster {
536 id, cl, n, s, w = restartNode(cfg, snapshot)
537 } else {
538 id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
539 }
540
541 cl.SetStore(st)
542 cl.SetBackend(be)
543 cl.Recover(api.UpdateCapability)
544 if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
545 os.RemoveAll(bepath)
546 return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
547 }
548
549 default:
550 return nil, fmt.Errorf("unsupported bootstrap config")
551 }
552
553 if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
554 return nil, fmt.Errorf("cannot access member directory: %v", terr)
555 }
556
557 sstats := stats.NewServerStats(cfg.Name, id.String())
558 lstats := stats.NewLeaderStats(cfg.Logger, id.String())
559
560 heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
561 srv = &EtcdServer{
562 readych: make(chan struct{}),
563 Cfg: cfg,
564 lgMu: new(sync.RWMutex),
565 lg: cfg.Logger,
566 errorc: make(chan error, 1),
567 v2store: st,
568 snapshotter: ss,
569 r: *newRaftNode(
570 raftNodeConfig{
571 lg: cfg.Logger,
572 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
573 Node: n,
574 heartbeat: heartbeat,
575 raftStorage: s,
576 storage: NewStorage(w, ss),
577 },
578 ),
579 id: id,
580 attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
581 cluster: cl,
582 stats: sstats,
583 lstats: lstats,
584 SyncTicker: time.NewTicker(500 * time.Millisecond),
585 peerRt: prt,
586 reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
587 AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
588 consistIndex: ci,
589 firstCommitInTermC: make(chan struct{}),
590 }
591 serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
592
593 srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
594
595 srv.be = be
596 srv.beHooks = beHooks
597 minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
598
599
600
601 srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
602 MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
603 CheckpointInterval: cfg.LeaseCheckpointInterval,
604 CheckpointPersist: cfg.LeaseCheckpointPersist,
605 ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
606 })
607
608 tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
609 func(index uint64) <-chan struct{} {
610 return srv.applyWait.Wait(index)
611 },
612 time.Duration(cfg.TokenTTL)*time.Second,
613 )
614 if err != nil {
615 cfg.Logger.Warn("failed to create token provider", zap.Error(err))
616 return nil, err
617 }
618 srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
619
620 kvindex := ci.ConsistentIndex()
621 srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex))
622 if beExist {
623
624
625 if snapshot != nil && kvindex < snapshot.Metadata.Index {
626 if kvindex != 0 {
627 return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index)
628 }
629 cfg.Logger.Warn(
630 "consistent index was never saved",
631 zap.Uint64("snapshot-index", snapshot.Metadata.Index),
632 )
633 }
634 }
635 srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
636
637 srv.authStore = auth.NewAuthStore(srv.Logger(), srv.be, tp, int(cfg.BcryptCost))
638
639 newSrv := srv
640 defer func() {
641
642
643 if err != nil {
644 newSrv.kv.Close()
645 }
646 }()
647 if num := cfg.AutoCompactionRetention; num != 0 {
648 srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
649 if err != nil {
650 return nil, err
651 }
652 srv.compactor.Run()
653 }
654
655 srv.applyV3Base = srv.newApplierV3Backend()
656 srv.applyV3Internal = srv.newApplierV3Internal()
657 if err = srv.restoreAlarms(); err != nil {
658 return nil, err
659 }
660
661 if srv.Cfg.EnableLeaseCheckpoint {
662
663 srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) error {
664 if !srv.ensureLeadership() {
665 srv.lg.Warn("Ignore the checkpoint request because current member isn't a leader",
666 zap.Uint64("local-member-id", uint64(srv.ID())))
667 return lease.ErrNotPrimary
668 }
669
670 srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
671 return nil
672 })
673 }
674
675
676
677 srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
678
679
680 tr := &rafthttp.Transport{
681 Logger: cfg.Logger,
682 TLSInfo: cfg.PeerTLSInfo,
683 DialTimeout: cfg.PeerDialTimeout(),
684 ID: id,
685 URLs: cfg.PeerURLs,
686 ClusterID: cl.ID(),
687 Raft: srv,
688 Snapshotter: ss,
689 ServerStats: sstats,
690 LeaderStats: lstats,
691 ErrorC: srv.errorc,
692 }
693 if err = tr.Start(); err != nil {
694 return nil, err
695 }
696
697 for _, m := range remotes {
698 if m.ID != id {
699 tr.AddRemote(m.ID, m.PeerURLs)
700 }
701 }
702 for _, m := range cl.Members() {
703 if m.ID != id {
704 tr.AddPeer(m.ID, m.PeerURLs)
705 }
706 }
707 srv.r.transport = tr
708
709 return srv, nil
710 }
711
712
713
714 func assertNoV2StoreContent(lg *zap.Logger, st v2store.Store, deprecationStage config.V2DeprecationEnum) error {
715 metaOnly, err := membership.IsMetaStoreOnly(st)
716 if err != nil {
717 return err
718 }
719 if metaOnly {
720 return nil
721 }
722 if deprecationStage.IsAtLeast(config.V2_DEPR_1_WRITE_ONLY) {
723 return fmt.Errorf("detected disallowed custom content in v2store for stage --v2-deprecation=%s", deprecationStage)
724 }
725 lg.Warn("detected custom v2store content. Etcd v3.5 is the last version allowing to access it using API v2. Please remove the content.")
726 return nil
727 }
728
729 func (s *EtcdServer) Logger() *zap.Logger {
730 s.lgMu.RLock()
731 l := s.lg
732 s.lgMu.RUnlock()
733 return l
734 }
735
736 func (s *EtcdServer) Config() config.ServerConfig {
737 return s.Cfg
738 }
739
740 func tickToDur(ticks int, tickMs uint) string {
741 return fmt.Sprintf("%v", time.Duration(ticks)*time.Duration(tickMs)*time.Millisecond)
742 }
743
744 func (s *EtcdServer) adjustTicks() {
745 lg := s.Logger()
746 clusterN := len(s.cluster.Members())
747
748
749 if clusterN == 1 {
750 ticks := s.Cfg.ElectionTicks - 1
751 lg.Info(
752 "started as single-node; fast-forwarding election ticks",
753 zap.String("local-member-id", s.ID().String()),
754 zap.Int("forward-ticks", ticks),
755 zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
756 zap.Int("election-ticks", s.Cfg.ElectionTicks),
757 zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
758 )
759 s.r.advanceTicks(ticks)
760 return
761 }
762
763 if !s.Cfg.InitialElectionTickAdvance {
764 lg.Info("skipping initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
765 return
766 }
767 lg.Info("starting initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
768
769
770
771
772
773
774
775 waitTime := rafthttp.ConnReadTimeout
776 itv := 50 * time.Millisecond
777 for i := int64(0); i < int64(waitTime/itv); i++ {
778 select {
779 case <-time.After(itv):
780 case <-s.stopping:
781 return
782 }
783
784 peerN := s.r.transport.ActivePeers()
785 if peerN > 1 {
786
787
788 ticks := s.Cfg.ElectionTicks - 2
789
790 lg.Info(
791 "initialized peer connections; fast-forwarding election ticks",
792 zap.String("local-member-id", s.ID().String()),
793 zap.Int("forward-ticks", ticks),
794 zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
795 zap.Int("election-ticks", s.Cfg.ElectionTicks),
796 zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
797 zap.Int("active-remote-members", peerN),
798 )
799
800 s.r.advanceTicks(ticks)
801 return
802 }
803 }
804 }
805
806
807
808
809
810 func (s *EtcdServer) Start() {
811 s.start()
812 s.GoAttach(func() { s.adjustTicks() })
813
814
815 s.GoAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
816 s.GoAttach(s.purgeFile)
817 s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
818 s.GoAttach(s.monitorVersions)
819 s.GoAttach(s.linearizableReadLoop)
820 s.GoAttach(s.monitorKVHash)
821 s.GoAttach(s.monitorCompactHash)
822 s.GoAttach(s.monitorDowngrade)
823 }
824
825
826
827
828 func (s *EtcdServer) start() {
829 lg := s.Logger()
830
831 if s.Cfg.SnapshotCount == 0 {
832 lg.Info(
833 "updating snapshot-count to default",
834 zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
835 zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
836 )
837 s.Cfg.SnapshotCount = DefaultSnapshotCount
838 }
839 if s.Cfg.SnapshotCatchUpEntries == 0 {
840 lg.Info(
841 "updating snapshot catch-up entries to default",
842 zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
843 zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
844 )
845 s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
846 }
847
848 s.w = wait.New()
849 s.applyWait = wait.NewTimeList()
850 s.done = make(chan struct{})
851 s.stop = make(chan struct{})
852 s.stopping = make(chan struct{}, 1)
853 s.ctx, s.cancel = context.WithCancel(context.Background())
854 s.readwaitc = make(chan struct{}, 1)
855 s.readNotifier = newNotifier()
856 s.leaderChanged = make(chan struct{})
857 if s.ClusterVersion() != nil {
858 lg.Info(
859 "starting etcd server",
860 zap.String("local-member-id", s.ID().String()),
861 zap.String("local-server-version", version.Version),
862 zap.String("cluster-id", s.Cluster().ID().String()),
863 zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())),
864 )
865 membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1)
866 } else {
867 lg.Info(
868 "starting etcd server",
869 zap.String("local-member-id", s.ID().String()),
870 zap.String("local-server-version", version.Version),
871 zap.String("cluster-version", "to_be_decided"),
872 )
873 }
874
875
876
877 go s.run()
878 }
879
880 func (s *EtcdServer) purgeFile() {
881 lg := s.Logger()
882 var dberrc, serrc, werrc <-chan error
883 var dbdonec, sdonec, wdonec <-chan struct{}
884 if s.Cfg.MaxSnapFiles > 0 {
885 dbdonec, dberrc = fileutil.PurgeFileWithoutFlock(lg, s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
886 sdonec, serrc = fileutil.PurgeFileWithoutFlock(lg, s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
887 }
888 if s.Cfg.MaxWALFiles > 0 {
889 wdonec, werrc = fileutil.PurgeFileWithDoneNotify(lg, s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping)
890 }
891
892 select {
893 case e := <-dberrc:
894 lg.Fatal("failed to purge snap db file", zap.Error(e))
895 case e := <-serrc:
896 lg.Fatal("failed to purge snap file", zap.Error(e))
897 case e := <-werrc:
898 lg.Fatal("failed to purge wal file", zap.Error(e))
899 case <-s.stopping:
900 if dbdonec != nil {
901 <-dbdonec
902 }
903 if sdonec != nil {
904 <-sdonec
905 }
906 if wdonec != nil {
907 <-wdonec
908 }
909 return
910 }
911 }
912
913 func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
914
915 func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
916
917 type ServerPeer interface {
918 ServerV2
919 RaftHandler() http.Handler
920 LeaseHandler() http.Handler
921 }
922
923 func (s *EtcdServer) LeaseHandler() http.Handler {
924 if s.lessor == nil {
925 return nil
926 }
927 return leasehttp.NewHandler(s.lessor, s.ApplyWait)
928 }
929
930 func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
931
932 type ServerPeerV2 interface {
933 ServerPeer
934 HashKVHandler() http.Handler
935 DowngradeEnabledHandler() http.Handler
936 }
937
938 func (s *EtcdServer) DowngradeInfo() *membership.DowngradeInfo { return s.cluster.DowngradeInfo() }
939
940 type downgradeEnabledHandler struct {
941 lg *zap.Logger
942 cluster api.Cluster
943 server *EtcdServer
944 }
945
946 func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
947 return &downgradeEnabledHandler{
948 lg: s.Logger(),
949 cluster: s.cluster,
950 server: s,
951 }
952 }
953
954 func (h *downgradeEnabledHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
955 if r.Method != http.MethodGet {
956 w.Header().Set("Allow", http.MethodGet)
957 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
958 return
959 }
960
961 w.Header().Set("X-Etcd-Cluster-ID", h.cluster.ID().String())
962
963 if r.URL.Path != DowngradeEnabledPath {
964 http.Error(w, "bad path", http.StatusBadRequest)
965 return
966 }
967
968 ctx, cancel := context.WithTimeout(context.Background(), h.server.Cfg.ReqTimeout())
969 defer cancel()
970
971
972 if err := h.server.linearizableReadNotify(ctx); err != nil {
973 http.Error(w, fmt.Sprintf("failed linearized read: %v", err),
974 http.StatusInternalServerError)
975 return
976 }
977 enabled := h.server.DowngradeInfo().Enabled
978 w.Header().Set("Content-Type", "text/plain")
979 w.Write([]byte(strconv.FormatBool(enabled)))
980 }
981
982
983
984 func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
985 lg := s.Logger()
986 if s.cluster.IsIDRemoved(types.ID(m.From)) {
987 lg.Warn(
988 "rejected Raft message from removed member",
989 zap.String("local-member-id", s.ID().String()),
990 zap.String("removed-member-id", types.ID(m.From).String()),
991 )
992 return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
993 }
994 if m.Type == raftpb.MsgApp {
995 s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
996 }
997 return s.r.Step(ctx, m)
998 }
999
1000 func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
1001
1002 func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
1003
1004
1005
1006 func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
1007 s.r.ReportSnapshot(id, status)
1008 }
1009
1010 type etcdProgress struct {
1011 confState raftpb.ConfState
1012 snapi uint64
1013 appliedt uint64
1014 appliedi uint64
1015 }
1016
1017
1018
1019
1020 type raftReadyHandler struct {
1021 getLead func() (lead uint64)
1022 updateLead func(lead uint64)
1023 updateLeadership func(newLeader bool)
1024 updateCommittedIndex func(uint64)
1025 }
1026
1027 func (s *EtcdServer) run() {
1028 lg := s.Logger()
1029
1030 sn, err := s.r.raftStorage.Snapshot()
1031 if err != nil {
1032 lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
1033 }
1034
1035
1036 sched := schedule.NewFIFOScheduler()
1037
1038 var (
1039 smu sync.RWMutex
1040 syncC <-chan time.Time
1041 )
1042 setSyncC := func(ch <-chan time.Time) {
1043 smu.Lock()
1044 syncC = ch
1045 smu.Unlock()
1046 }
1047 getSyncC := func() (ch <-chan time.Time) {
1048 smu.RLock()
1049 ch = syncC
1050 smu.RUnlock()
1051 return
1052 }
1053 rh := &raftReadyHandler{
1054 getLead: func() (lead uint64) { return s.getLead() },
1055 updateLead: func(lead uint64) { s.setLead(lead) },
1056 updateLeadership: func(newLeader bool) {
1057 if !s.isLeader() {
1058 if s.lessor != nil {
1059 s.lessor.Demote()
1060 }
1061 if s.compactor != nil {
1062 s.compactor.Pause()
1063 }
1064 setSyncC(nil)
1065 } else {
1066 if newLeader {
1067 t := time.Now()
1068 s.leadTimeMu.Lock()
1069 s.leadElectedTime = t
1070 s.leadTimeMu.Unlock()
1071 }
1072 setSyncC(s.SyncTicker.C)
1073 if s.compactor != nil {
1074 s.compactor.Resume()
1075 }
1076 }
1077 if newLeader {
1078 s.leaderChangedMu.Lock()
1079 lc := s.leaderChanged
1080 s.leaderChanged = make(chan struct{})
1081 close(lc)
1082 s.leaderChangedMu.Unlock()
1083 }
1084
1085
1086 if s.stats != nil {
1087 s.stats.BecomeLeader()
1088 }
1089 },
1090 updateCommittedIndex: func(ci uint64) {
1091 cci := s.getCommittedIndex()
1092 if ci > cci {
1093 s.setCommittedIndex(ci)
1094 }
1095 },
1096 }
1097 s.r.start(rh)
1098
1099 ep := etcdProgress{
1100 confState: sn.Metadata.ConfState,
1101 snapi: sn.Metadata.Index,
1102 appliedt: sn.Metadata.Term,
1103 appliedi: sn.Metadata.Index,
1104 }
1105
1106 defer func() {
1107 s.wgMu.Lock()
1108 close(s.stopping)
1109 s.wgMu.Unlock()
1110 s.cancel()
1111 sched.Stop()
1112
1113
1114 s.wg.Wait()
1115
1116 s.SyncTicker.Stop()
1117
1118
1119
1120 s.r.stop()
1121
1122 s.Cleanup()
1123
1124 close(s.done)
1125 }()
1126
1127 var expiredLeaseC <-chan []*lease.Lease
1128 if s.lessor != nil {
1129 expiredLeaseC = s.lessor.ExpiredLeasesC()
1130 }
1131
1132 for {
1133 select {
1134 case ap := <-s.r.apply():
1135 f := func(context.Context) { s.applyAll(&ep, &ap) }
1136 sched.Schedule(f)
1137 case leases := <-expiredLeaseC:
1138 s.revokeExpiredLeases(leases)
1139 case err := <-s.errorc:
1140 lg.Warn("server error", zap.Error(err))
1141 lg.Warn("data-dir used by this member must be removed")
1142 return
1143 case <-getSyncC():
1144 if s.v2store.HasTTLKeys() {
1145 s.sync(s.Cfg.ReqTimeout())
1146 }
1147 case <-s.stop:
1148 return
1149 }
1150 }
1151 }
1152
1153 func (s *EtcdServer) revokeExpiredLeases(leases []*lease.Lease) {
1154 s.GoAttach(func() {
1155
1156
1157
1158
1159
1160
1161 lg := s.Logger()
1162 if !s.ensureLeadership() {
1163 lg.Warn("Ignore the lease revoking request because current member isn't a leader",
1164 zap.Uint64("local-member-id", uint64(s.ID())))
1165 return
1166 }
1167
1168
1169 c := make(chan struct{}, maxPendingRevokes)
1170 for _, curLease := range leases {
1171 select {
1172 case c <- struct{}{}:
1173 case <-s.stopping:
1174 return
1175 }
1176
1177 f := func(lid int64) {
1178 s.GoAttach(func() {
1179 ctx := s.authStore.WithRoot(s.ctx)
1180 _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lid})
1181 if lerr == nil {
1182 leaseExpired.Inc()
1183 } else {
1184 lg.Warn(
1185 "failed to revoke lease",
1186 zap.String("lease-id", fmt.Sprintf("%016x", lid)),
1187 zap.Error(lerr),
1188 )
1189 }
1190
1191 <-c
1192 })
1193 }
1194
1195 f(int64(curLease.ID))
1196 }
1197 })
1198 }
1199
1200
1201 func (s *EtcdServer) ensureLeadership() bool {
1202 lg := s.Logger()
1203
1204 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
1205 defer cancel()
1206 if err := s.linearizableReadNotify(ctx); err != nil {
1207 lg.Warn("Failed to check current member's leadership",
1208 zap.Error(err))
1209 return false
1210 }
1211
1212 newLeaderId := s.raftStatus().Lead
1213 if newLeaderId != uint64(s.ID()) {
1214 lg.Warn("Current member isn't a leader",
1215 zap.Uint64("local-member-id", uint64(s.ID())),
1216 zap.Uint64("new-lead", newLeaderId))
1217 return false
1218 }
1219
1220 return true
1221 }
1222
1223
1224
1225 func (s *EtcdServer) Cleanup() {
1226
1227
1228 if s.lessor != nil {
1229 s.lessor.Stop()
1230 }
1231 if s.kv != nil {
1232 s.kv.Close()
1233 }
1234 if s.authStore != nil {
1235 s.authStore.Close()
1236 }
1237 if s.be != nil {
1238 s.be.Close()
1239 }
1240 if s.compactor != nil {
1241 s.compactor.Stop()
1242 }
1243 }
1244
1245 func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
1246 s.applySnapshot(ep, apply)
1247 s.applyEntries(ep, apply)
1248
1249 proposalsApplied.Set(float64(ep.appliedi))
1250 s.applyWait.Trigger(ep.appliedi)
1251
1252
1253
1254
1255 <-apply.notifyc
1256
1257 s.triggerSnapshot(ep)
1258 select {
1259
1260 case m := <-s.r.msgSnapC:
1261 merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
1262 s.sendMergedSnap(merged)
1263 default:
1264 }
1265 }
1266
1267 func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
1268 if raft.IsEmptySnap(apply.snapshot) {
1269 return
1270 }
1271 applySnapshotInProgress.Inc()
1272
1273 lg := s.Logger()
1274 lg.Info(
1275 "applying snapshot",
1276 zap.Uint64("current-snapshot-index", ep.snapi),
1277 zap.Uint64("current-applied-index", ep.appliedi),
1278 zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
1279 zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
1280 )
1281 defer func() {
1282 lg.Info(
1283 "applied snapshot",
1284 zap.Uint64("current-snapshot-index", ep.snapi),
1285 zap.Uint64("current-applied-index", ep.appliedi),
1286 zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
1287 zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
1288 )
1289 applySnapshotInProgress.Dec()
1290 }()
1291
1292 if apply.snapshot.Metadata.Index <= ep.appliedi {
1293 lg.Panic(
1294 "unexpected leader snapshot from outdated index",
1295 zap.Uint64("current-snapshot-index", ep.snapi),
1296 zap.Uint64("current-applied-index", ep.appliedi),
1297 zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
1298 zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
1299 )
1300 }
1301
1302
1303 <-apply.notifyc
1304
1305 newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks)
1306 if err != nil {
1307 lg.Panic("failed to open snapshot backend", zap.Error(err))
1308 }
1309
1310
1311
1312
1313
1314
1315 s.consistIndex.SetBackend(newbe)
1316
1317
1318
1319 if s.lessor != nil {
1320 lg.Info("restoring lease store")
1321
1322 s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write(traceutil.TODO()) })
1323
1324 lg.Info("restored lease store")
1325 }
1326
1327 lg.Info("restoring mvcc store")
1328
1329 if err := s.kv.Restore(newbe); err != nil {
1330 lg.Panic("failed to restore mvcc store", zap.Error(err))
1331 }
1332
1333 newbe.SetTxPostLockInsideApplyHook(s.getTxPostLockInsideApplyHook())
1334 lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))
1335
1336
1337
1338
1339 s.bemu.Lock()
1340 oldbe := s.be
1341 go func() {
1342 lg.Info("closing old backend file")
1343 defer func() {
1344 lg.Info("closed old backend file")
1345 }()
1346 if err := oldbe.Close(); err != nil {
1347 lg.Panic("failed to close old backend", zap.Error(err))
1348 }
1349 }()
1350
1351 s.be = newbe
1352 s.bemu.Unlock()
1353
1354 lg.Info("restoring alarm store")
1355
1356 if err := s.restoreAlarms(); err != nil {
1357 lg.Panic("failed to restore alarm store", zap.Error(err))
1358 }
1359
1360 lg.Info("restored alarm store")
1361
1362 if s.authStore != nil {
1363 lg.Info("restoring auth store")
1364
1365 s.authStore.Recover(newbe)
1366
1367 lg.Info("restored auth store")
1368 }
1369
1370 lg.Info("restoring v2 store")
1371 if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {
1372 lg.Panic("failed to restore v2 store", zap.Error(err))
1373 }
1374
1375 if err := assertNoV2StoreContent(lg, s.v2store, s.Cfg.V2Deprecation); err != nil {
1376 lg.Panic("illegal v2store content", zap.Error(err))
1377 }
1378
1379 lg.Info("restored v2 store")
1380
1381 s.cluster.SetBackend(newbe)
1382
1383 lg.Info("restoring cluster configuration")
1384
1385 s.cluster.Recover(api.UpdateCapability)
1386
1387 lg.Info("restored cluster configuration")
1388 lg.Info("removing old peers from network")
1389
1390
1391 s.r.transport.RemoveAllPeers()
1392
1393 lg.Info("removed old peers from network")
1394 lg.Info("adding peers from new cluster configuration")
1395
1396 for _, m := range s.cluster.Members() {
1397 if m.ID == s.ID() {
1398 continue
1399 }
1400 s.r.transport.AddPeer(m.ID, m.PeerURLs)
1401 }
1402
1403 lg.Info("added peers from new cluster configuration")
1404
1405 ep.appliedt = apply.snapshot.Metadata.Term
1406 ep.appliedi = apply.snapshot.Metadata.Index
1407 ep.snapi = ep.appliedi
1408 ep.confState = apply.snapshot.Metadata.ConfState
1409 }
1410
1411 func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
1412 if len(apply.entries) == 0 {
1413 return
1414 }
1415 firsti := apply.entries[0].Index
1416 if firsti > ep.appliedi+1 {
1417 lg := s.Logger()
1418 lg.Panic(
1419 "unexpected committed entry index",
1420 zap.Uint64("current-applied-index", ep.appliedi),
1421 zap.Uint64("first-committed-entry-index", firsti),
1422 )
1423 }
1424 var ents []raftpb.Entry
1425 if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
1426 ents = apply.entries[ep.appliedi+1-firsti:]
1427 }
1428 if len(ents) == 0 {
1429 return
1430 }
1431 var shouldstop bool
1432 if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
1433 go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
1434 }
1435 }
1436
1437 func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
1438 if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount {
1439 return
1440 }
1441
1442 lg := s.Logger()
1443 lg.Info(
1444 "triggering snapshot",
1445 zap.String("local-member-id", s.ID().String()),
1446 zap.Uint64("local-member-applied-index", ep.appliedi),
1447 zap.Uint64("local-member-snapshot-index", ep.snapi),
1448 zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
1449 )
1450
1451 s.snapshot(ep.appliedi, ep.confState)
1452 ep.snapi = ep.appliedi
1453 }
1454
1455 func (s *EtcdServer) hasMultipleVotingMembers() bool {
1456 return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
1457 }
1458
1459 func (s *EtcdServer) isLeader() bool {
1460 return uint64(s.ID()) == s.Lead()
1461 }
1462
1463
1464 func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
1465 if !s.cluster.IsMemberExist(types.ID(transferee)) || s.cluster.Member(types.ID(transferee)).IsLearner {
1466 return ErrBadLeaderTransferee
1467 }
1468
1469 now := time.Now()
1470 interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
1471
1472 lg := s.Logger()
1473 lg.Info(
1474 "leadership transfer starting",
1475 zap.String("local-member-id", s.ID().String()),
1476 zap.String("current-leader-member-id", types.ID(lead).String()),
1477 zap.String("transferee-member-id", types.ID(transferee).String()),
1478 )
1479
1480 s.r.TransferLeadership(ctx, lead, transferee)
1481 for s.Lead() != transferee {
1482 select {
1483 case <-ctx.Done():
1484 return ErrTimeoutLeaderTransfer
1485 case <-time.After(interval):
1486 }
1487 }
1488
1489
1490 lg.Info(
1491 "leadership transfer finished",
1492 zap.String("local-member-id", s.ID().String()),
1493 zap.String("old-leader-member-id", types.ID(lead).String()),
1494 zap.String("new-leader-member-id", types.ID(transferee).String()),
1495 zap.Duration("took", time.Since(now)),
1496 )
1497 return nil
1498 }
1499
1500
1501 func (s *EtcdServer) TransferLeadership() error {
1502 lg := s.Logger()
1503 if !s.isLeader() {
1504 lg.Info(
1505 "skipped leadership transfer; local server is not leader",
1506 zap.String("local-member-id", s.ID().String()),
1507 zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
1508 )
1509 return nil
1510 }
1511
1512 if !s.hasMultipleVotingMembers() {
1513 lg.Info(
1514 "skipped leadership transfer for single voting member cluster",
1515 zap.String("local-member-id", s.ID().String()),
1516 zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
1517 )
1518 return nil
1519 }
1520
1521 transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs())
1522 if !ok {
1523 return ErrUnhealthy
1524 }
1525
1526 tm := s.Cfg.ReqTimeout()
1527 ctx, cancel := context.WithTimeout(s.ctx, tm)
1528 err := s.MoveLeader(ctx, s.Lead(), uint64(transferee))
1529 cancel()
1530 return err
1531 }
1532
1533
1534 func (s *EtcdServer) HardStop() {
1535 select {
1536 case s.stop <- struct{}{}:
1537 case <-s.done:
1538 return
1539 }
1540 <-s.done
1541 }
1542
1543
1544
1545
1546
1547
1548
1549 func (s *EtcdServer) Stop() {
1550 lg := s.Logger()
1551 if err := s.TransferLeadership(); err != nil {
1552 lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err))
1553 }
1554 s.HardStop()
1555 }
1556
1557
1558
1559 func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
1560
1561 func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
1562 select {
1563 case <-time.After(d):
1564 case <-s.done:
1565 }
1566 select {
1567 case s.errorc <- err:
1568 default:
1569 }
1570 }
1571
1572
1573
1574 func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
1575
1576
1577
1578 func (s *EtcdServer) StoppingNotify() <-chan struct{} { return s.stopping }
1579
1580 func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
1581
1582 func (s *EtcdServer) LeaderStats() []byte {
1583 lead := s.getLead()
1584 if lead != uint64(s.id) {
1585 return nil
1586 }
1587 return s.lstats.JSON()
1588 }
1589
1590 func (s *EtcdServer) StoreStats() []byte { return s.v2store.JsonStats() }
1591
1592 func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error {
1593 if s.authStore == nil {
1594
1595
1596 return nil
1597 }
1598
1599
1600
1601
1602
1603
1604
1605 authInfo, err := s.AuthInfoFromCtx(ctx)
1606 if err != nil {
1607 return err
1608 }
1609
1610 return s.AuthStore().IsAdminPermitted(authInfo)
1611 }
1612
1613 func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1614 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1615 return nil, err
1616 }
1617
1618
1619 b, err := json.Marshal(memb)
1620 if err != nil {
1621 return nil, err
1622 }
1623
1624
1625 if err := s.mayAddMember(memb); err != nil {
1626 return nil, err
1627 }
1628
1629 cc := raftpb.ConfChange{
1630 Type: raftpb.ConfChangeAddNode,
1631 NodeID: uint64(memb.ID),
1632 Context: b,
1633 }
1634
1635 if memb.IsLearner {
1636 cc.Type = raftpb.ConfChangeAddLearnerNode
1637 }
1638
1639 return s.configure(ctx, cc)
1640 }
1641
1642 func (s *EtcdServer) mayAddMember(memb membership.Member) error {
1643 lg := s.Logger()
1644 if !s.Cfg.StrictReconfigCheck {
1645 return nil
1646 }
1647
1648
1649 if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() {
1650 lg.Warn(
1651 "rejecting member add request; not enough healthy members",
1652 zap.String("local-member-id", s.ID().String()),
1653 zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
1654 zap.Error(ErrNotEnoughStartedMembers),
1655 )
1656 return ErrNotEnoughStartedMembers
1657 }
1658
1659 if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) {
1660 lg.Warn(
1661 "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
1662 zap.String("local-member-id", s.ID().String()),
1663 zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
1664 zap.Error(ErrUnhealthy),
1665 )
1666 return ErrUnhealthy
1667 }
1668
1669 return nil
1670 }
1671
1672 func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1673 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1674 return nil, err
1675 }
1676
1677
1678 if err := s.mayRemoveMember(types.ID(id)); err != nil {
1679 return nil, err
1680 }
1681
1682 cc := raftpb.ConfChange{
1683 Type: raftpb.ConfChangeRemoveNode,
1684 NodeID: id,
1685 }
1686 return s.configure(ctx, cc)
1687 }
1688
1689
1690 func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1691
1692
1693
1694 resp, err := s.promoteMember(ctx, id)
1695 if err == nil {
1696 learnerPromoteSucceed.Inc()
1697 return resp, nil
1698 }
1699 if err != ErrNotLeader {
1700 learnerPromoteFailed.WithLabelValues(err.Error()).Inc()
1701 return resp, err
1702 }
1703
1704 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
1705 defer cancel()
1706
1707 for cctx.Err() == nil {
1708 leader, err := s.waitLeader(cctx)
1709 if err != nil {
1710 return nil, err
1711 }
1712 for _, url := range leader.PeerURLs {
1713 resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
1714 if err == nil {
1715 return resp, nil
1716 }
1717
1718 if err == ErrLearnerNotReady || err == membership.ErrIDNotFound || err == membership.ErrMemberNotLearner {
1719 return nil, err
1720 }
1721 }
1722 }
1723
1724 if cctx.Err() == context.DeadlineExceeded {
1725 return nil, ErrTimeout
1726 }
1727 return nil, ErrCanceled
1728 }
1729
1730
1731
1732
1733
1734
1735
1736 func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1737 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1738 return nil, err
1739 }
1740
1741
1742 if err := s.mayPromoteMember(types.ID(id)); err != nil {
1743 return nil, err
1744 }
1745
1746
1747 promoteChangeContext := membership.ConfigChangeContext{
1748 Member: membership.Member{
1749 ID: types.ID(id),
1750 },
1751 IsPromote: true,
1752 }
1753
1754 b, err := json.Marshal(promoteChangeContext)
1755 if err != nil {
1756 return nil, err
1757 }
1758
1759 cc := raftpb.ConfChange{
1760 Type: raftpb.ConfChangeAddNode,
1761 NodeID: id,
1762 Context: b,
1763 }
1764
1765 return s.configure(ctx, cc)
1766 }
1767
1768 func (s *EtcdServer) mayPromoteMember(id types.ID) error {
1769 lg := s.Logger()
1770 err := s.isLearnerReady(uint64(id))
1771 if err != nil {
1772 return err
1773 }
1774
1775 if !s.Cfg.StrictReconfigCheck {
1776 return nil
1777 }
1778 if !s.cluster.IsReadyToPromoteMember(uint64(id)) {
1779 lg.Warn(
1780 "rejecting member promote request; not enough healthy members",
1781 zap.String("local-member-id", s.ID().String()),
1782 zap.String("requested-member-remove-id", id.String()),
1783 zap.Error(ErrNotEnoughStartedMembers),
1784 )
1785 return ErrNotEnoughStartedMembers
1786 }
1787
1788 return nil
1789 }
1790
1791
1792
1793
1794 func (s *EtcdServer) isLearnerReady(id uint64) error {
1795 if err := s.waitAppliedIndex(); err != nil {
1796 return err
1797 }
1798
1799 rs := s.raftStatus()
1800
1801
1802 if rs.Progress == nil {
1803 return ErrNotLeader
1804 }
1805
1806 var learnerMatch uint64
1807 isFound := false
1808 leaderID := rs.ID
1809 for memberID, progress := range rs.Progress {
1810 if id == memberID {
1811
1812 learnerMatch = progress.Match
1813 isFound = true
1814 break
1815 }
1816 }
1817
1818
1819
1820 if !isFound {
1821 return membership.ErrIDNotFound
1822 }
1823
1824 leaderMatch := rs.Progress[leaderID].Match
1825
1826 if float64(learnerMatch) < float64(leaderMatch)*readyPercent {
1827 return ErrLearnerNotReady
1828 }
1829
1830 return nil
1831 }
1832
1833 func (s *EtcdServer) mayRemoveMember(id types.ID) error {
1834 if !s.Cfg.StrictReconfigCheck {
1835 return nil
1836 }
1837
1838 lg := s.Logger()
1839 isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner
1840
1841 if isLearner {
1842 return nil
1843 }
1844
1845 if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) {
1846 lg.Warn(
1847 "rejecting member remove request; not enough healthy members",
1848 zap.String("local-member-id", s.ID().String()),
1849 zap.String("requested-member-remove-id", id.String()),
1850 zap.Error(ErrNotEnoughStartedMembers),
1851 )
1852 return ErrNotEnoughStartedMembers
1853 }
1854
1855
1856 if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() {
1857 return nil
1858 }
1859
1860
1861 m := s.cluster.VotingMembers()
1862 active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m)
1863 if (active - 1) < 1+((len(m)-1)/2) {
1864 lg.Warn(
1865 "rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum",
1866 zap.String("local-member-id", s.ID().String()),
1867 zap.String("requested-member-remove", id.String()),
1868 zap.Int("active-peers", active),
1869 zap.Error(ErrUnhealthy),
1870 )
1871 return ErrUnhealthy
1872 }
1873
1874 return nil
1875 }
1876
1877 func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1878 b, merr := json.Marshal(memb)
1879 if merr != nil {
1880 return nil, merr
1881 }
1882
1883 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1884 return nil, err
1885 }
1886 cc := raftpb.ConfChange{
1887 Type: raftpb.ConfChangeUpdateNode,
1888 NodeID: uint64(memb.ID),
1889 Context: b,
1890 }
1891 return s.configure(ctx, cc)
1892 }
1893
1894 func (s *EtcdServer) setCommittedIndex(v uint64) {
1895 atomic.StoreUint64(&s.committedIndex, v)
1896 }
1897
1898 func (s *EtcdServer) getCommittedIndex() uint64 {
1899 return atomic.LoadUint64(&s.committedIndex)
1900 }
1901
1902 func (s *EtcdServer) setAppliedIndex(v uint64) {
1903 atomic.StoreUint64(&s.appliedIndex, v)
1904 }
1905
1906 func (s *EtcdServer) getAppliedIndex() uint64 {
1907 return atomic.LoadUint64(&s.appliedIndex)
1908 }
1909
1910 func (s *EtcdServer) setTerm(v uint64) {
1911 atomic.StoreUint64(&s.term, v)
1912 }
1913
1914 func (s *EtcdServer) getTerm() uint64 {
1915 return atomic.LoadUint64(&s.term)
1916 }
1917
1918 func (s *EtcdServer) setLead(v uint64) {
1919 atomic.StoreUint64(&s.lead, v)
1920 }
1921
1922 func (s *EtcdServer) getLead() uint64 {
1923 return atomic.LoadUint64(&s.lead)
1924 }
1925
1926 func (s *EtcdServer) LeaderChangedNotify() <-chan struct{} {
1927 s.leaderChangedMu.RLock()
1928 defer s.leaderChangedMu.RUnlock()
1929 return s.leaderChanged
1930 }
1931
1932
1933
1934
1935
1936 func (s *EtcdServer) FirstCommitInTermNotify() <-chan struct{} {
1937 s.firstCommitInTermMu.RLock()
1938 defer s.firstCommitInTermMu.RUnlock()
1939 return s.firstCommitInTermC
1940 }
1941
1942
1943 type RaftStatusGetter interface {
1944 ID() types.ID
1945 Leader() types.ID
1946 CommittedIndex() uint64
1947 AppliedIndex() uint64
1948 Term() uint64
1949 }
1950
1951 func (s *EtcdServer) ID() types.ID { return s.id }
1952
1953 func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) }
1954
1955 func (s *EtcdServer) Lead() uint64 { return s.getLead() }
1956
1957 func (s *EtcdServer) CommittedIndex() uint64 { return s.getCommittedIndex() }
1958
1959 func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
1960
1961 func (s *EtcdServer) Term() uint64 { return s.getTerm() }
1962
1963 type confChangeResponse struct {
1964 membs []*membership.Member
1965 err error
1966 }
1967
1968
1969
1970
1971 func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
1972 lg := s.Logger()
1973 cc.ID = s.reqIDGen.Next()
1974 ch := s.w.Register(cc.ID)
1975
1976 start := time.Now()
1977 if err := s.r.ProposeConfChange(ctx, cc); err != nil {
1978 s.w.Trigger(cc.ID, nil)
1979 return nil, err
1980 }
1981
1982 select {
1983 case x := <-ch:
1984 if x == nil {
1985 lg.Panic("failed to configure")
1986 }
1987 resp := x.(*confChangeResponse)
1988 lg.Info(
1989 "applied a configuration change through raft",
1990 zap.String("local-member-id", s.ID().String()),
1991 zap.String("raft-conf-change", cc.Type.String()),
1992 zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()),
1993 )
1994 return resp.membs, resp.err
1995
1996 case <-ctx.Done():
1997 s.w.Trigger(cc.ID, nil)
1998 return nil, s.parseProposeCtxErr(ctx.Err(), start)
1999
2000 case <-s.stopping:
2001 return nil, ErrStopped
2002 }
2003 }
2004
2005
2006
2007
2008 func (s *EtcdServer) sync(timeout time.Duration) {
2009 req := pb.Request{
2010 Method: "SYNC",
2011 ID: s.reqIDGen.Next(),
2012 Time: time.Now().UnixNano(),
2013 }
2014 data := pbutil.MustMarshal(&req)
2015
2016
2017 ctx, cancel := context.WithTimeout(s.ctx, timeout)
2018 s.GoAttach(func() {
2019 s.r.Propose(ctx, data)
2020 cancel()
2021 })
2022 }
2023
2024
2025
2026
2027
2028
2029 func (s *EtcdServer) publishV3(timeout time.Duration) {
2030 req := &membershippb.ClusterMemberAttrSetRequest{
2031 Member_ID: uint64(s.id),
2032 MemberAttributes: &membershippb.Attributes{
2033 Name: s.attributes.Name,
2034 ClientUrls: s.attributes.ClientURLs,
2035 },
2036 }
2037 lg := s.Logger()
2038 for {
2039 select {
2040 case <-s.stopping:
2041 lg.Warn(
2042 "stopped publish because server is stopping",
2043 zap.String("local-member-id", s.ID().String()),
2044 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
2045 zap.Duration("publish-timeout", timeout),
2046 )
2047 return
2048
2049 default:
2050 }
2051
2052 ctx, cancel := context.WithTimeout(s.ctx, timeout)
2053 _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterMemberAttrSet: req})
2054 cancel()
2055 switch err {
2056 case nil:
2057 close(s.readych)
2058 lg.Info(
2059 "published local member to cluster through raft",
2060 zap.String("local-member-id", s.ID().String()),
2061 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
2062 zap.String("cluster-id", s.cluster.ID().String()),
2063 zap.Duration("publish-timeout", timeout),
2064 )
2065 return
2066
2067 default:
2068 lg.Warn(
2069 "failed to publish local member to cluster through raft",
2070 zap.String("local-member-id", s.ID().String()),
2071 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
2072 zap.Duration("publish-timeout", timeout),
2073 zap.Error(err),
2074 )
2075 }
2076 }
2077 }
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090 func (s *EtcdServer) publish(timeout time.Duration) {
2091 lg := s.Logger()
2092 b, err := json.Marshal(s.attributes)
2093 if err != nil {
2094 lg.Panic("failed to marshal JSON", zap.Error(err))
2095 return
2096 }
2097 req := pb.Request{
2098 Method: "PUT",
2099 Path: membership.MemberAttributesStorePath(s.id),
2100 Val: string(b),
2101 }
2102
2103 for {
2104 ctx, cancel := context.WithTimeout(s.ctx, timeout)
2105 _, err := s.Do(ctx, req)
2106 cancel()
2107 switch err {
2108 case nil:
2109 close(s.readych)
2110 lg.Info(
2111 "published local member to cluster through raft",
2112 zap.String("local-member-id", s.ID().String()),
2113 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
2114 zap.String("request-path", req.Path),
2115 zap.String("cluster-id", s.cluster.ID().String()),
2116 zap.Duration("publish-timeout", timeout),
2117 )
2118 return
2119
2120 case ErrStopped:
2121 lg.Warn(
2122 "stopped publish because server is stopped",
2123 zap.String("local-member-id", s.ID().String()),
2124 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
2125 zap.Duration("publish-timeout", timeout),
2126 zap.Error(err),
2127 )
2128 return
2129
2130 default:
2131 lg.Warn(
2132 "failed to publish local member to cluster through raft",
2133 zap.String("local-member-id", s.ID().String()),
2134 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
2135 zap.String("request-path", req.Path),
2136 zap.Duration("publish-timeout", timeout),
2137 zap.Error(err),
2138 )
2139 }
2140 }
2141 }
2142
2143 func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
2144 atomic.AddInt64(&s.inflightSnapshots, 1)
2145
2146 lg := s.Logger()
2147 fields := []zap.Field{
2148 zap.String("from", s.ID().String()),
2149 zap.String("to", types.ID(merged.To).String()),
2150 zap.Int64("bytes", merged.TotalSize),
2151 zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
2152 }
2153
2154 now := time.Now()
2155 s.r.transport.SendSnapshot(merged)
2156 lg.Info("sending merged snapshot", fields...)
2157
2158 s.GoAttach(func() {
2159 select {
2160 case ok := <-merged.CloseNotify():
2161
2162
2163
2164
2165 if ok {
2166 select {
2167 case <-time.After(releaseDelayAfterSnapshot):
2168 case <-s.stopping:
2169 }
2170 }
2171
2172 atomic.AddInt64(&s.inflightSnapshots, -1)
2173
2174 lg.Info("sent merged snapshot", append(fields, zap.Duration("took", time.Since(now)))...)
2175
2176 case <-s.stopping:
2177 lg.Warn("canceled sending merged snapshot; server stopping", fields...)
2178 return
2179 }
2180 })
2181 }
2182
2183
2184
2185
2186 func (s *EtcdServer) apply(
2187 es []raftpb.Entry,
2188 confState *raftpb.ConfState,
2189 ) (appliedt uint64, appliedi uint64, shouldStop bool) {
2190 s.lg.Debug("Applying entries", zap.Int("num-entries", len(es)))
2191 for i := range es {
2192 e := es[i]
2193 s.lg.Debug("Applying entry",
2194 zap.Uint64("index", e.Index),
2195 zap.Uint64("term", e.Term),
2196 zap.Stringer("type", e.Type))
2197 switch e.Type {
2198 case raftpb.EntryNormal:
2199
2200 s.applyEntryNormal(&e)
2201 s.setAppliedIndex(e.Index)
2202 s.setTerm(e.Term)
2203
2204 case raftpb.EntryConfChange:
2205
2206
2207 shouldApplyV3 := membership.ApplyV2storeOnly
2208
2209
2210 if e.Index > s.consistIndex.ConsistentIndex() {
2211 s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
2212 shouldApplyV3 = membership.ApplyBoth
2213 }
2214
2215 var cc raftpb.ConfChange
2216 pbutil.MustUnmarshal(&cc, e.Data)
2217 removedSelf, err := s.applyConfChange(cc, confState, shouldApplyV3)
2218 s.setAppliedIndex(e.Index)
2219 s.setTerm(e.Term)
2220 shouldStop = shouldStop || removedSelf
2221 s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
2222
2223 default:
2224 lg := s.Logger()
2225 lg.Panic(
2226 "unknown entry type; must be either EntryNormal or EntryConfChange",
2227 zap.String("type", e.Type.String()),
2228 )
2229 }
2230 appliedi, appliedt = e.Index, e.Term
2231 }
2232 return appliedt, appliedi, shouldStop
2233 }
2234
2235
2236 func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
2237 shouldApplyV3 := membership.ApplyV2storeOnly
2238 var ar *applyResult
2239 index := s.consistIndex.ConsistentIndex()
2240 if e.Index > index {
2241
2242 s.consistIndex.SetConsistentApplyingIndex(e.Index, e.Term)
2243 shouldApplyV3 = membership.ApplyBoth
2244 defer func() {
2245
2246
2247 newIndex := s.consistIndex.ConsistentIndex()
2248 if newIndex < e.Index {
2249 s.consistIndex.SetConsistentIndex(e.Index, e.Term)
2250 }
2251 }()
2252 }
2253 s.lg.Debug("apply entry normal",
2254 zap.Uint64("consistent-index", index),
2255 zap.Uint64("entry-index", e.Index),
2256 zap.Bool("should-applyV3", bool(shouldApplyV3)))
2257
2258
2259
2260 if len(e.Data) == 0 {
2261 s.notifyAboutFirstCommitInTerm()
2262
2263
2264
2265 if s.isLeader() {
2266 s.lessor.Promote(s.Cfg.ElectionTimeout())
2267 }
2268 return
2269 }
2270
2271 var raftReq pb.InternalRaftRequest
2272 if !pbutil.MaybeUnmarshal(&raftReq, e.Data) {
2273 var r pb.Request
2274 rp := &r
2275 pbutil.MustUnmarshal(rp, e.Data)
2276 s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp))
2277 s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp), shouldApplyV3))
2278 return
2279 }
2280 s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq))
2281
2282 if raftReq.V2 != nil {
2283 req := (*RequestV2)(raftReq.V2)
2284 s.w.Trigger(req.ID, s.applyV2Request(req, shouldApplyV3))
2285 return
2286 }
2287
2288 id := raftReq.ID
2289 if id == 0 {
2290 id = raftReq.Header.ID
2291 }
2292
2293 needResult := s.w.IsRegistered(id)
2294 if needResult || !noSideEffect(&raftReq) {
2295 if !needResult && raftReq.Txn != nil {
2296 removeNeedlessRangeReqs(raftReq.Txn)
2297 }
2298 ar = s.applyV3.Apply(&raftReq, shouldApplyV3)
2299 }
2300
2301
2302 if !shouldApplyV3 {
2303 return
2304 }
2305
2306 if ar == nil {
2307 return
2308 }
2309
2310 if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
2311 s.w.Trigger(id, ar)
2312 return
2313 }
2314
2315 lg := s.Logger()
2316 lg.Warn(
2317 "message exceeded backend quota; raising alarm",
2318 zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
2319 zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
2320 zap.Error(ar.err),
2321 )
2322
2323 s.GoAttach(func() {
2324 a := &pb.AlarmRequest{
2325 MemberID: uint64(s.ID()),
2326 Action: pb.AlarmRequest_ACTIVATE,
2327 Alarm: pb.AlarmType_NOSPACE,
2328 }
2329 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
2330 s.w.Trigger(id, ar)
2331 })
2332 }
2333
2334 func (s *EtcdServer) notifyAboutFirstCommitInTerm() {
2335 newNotifier := make(chan struct{})
2336 s.firstCommitInTermMu.Lock()
2337 notifierToClose := s.firstCommitInTermC
2338 s.firstCommitInTermC = newNotifier
2339 s.firstCommitInTermMu.Unlock()
2340 close(notifierToClose)
2341 }
2342
2343
2344
2345 func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
2346 if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
2347 cc.NodeID = raft.None
2348 s.r.ApplyConfChange(cc)
2349
2350
2351
2352 if s.consistIndex != nil && membership.ApplyBoth == shouldApplyV3 {
2353 applyingIndex, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
2354 s.consistIndex.SetConsistentIndex(applyingIndex, applyingTerm)
2355 }
2356 return false, err
2357 }
2358
2359 lg := s.Logger()
2360 *confState = *s.r.ApplyConfChange(cc)
2361 s.beHooks.SetConfState(confState)
2362 switch cc.Type {
2363 case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
2364 confChangeContext := new(membership.ConfigChangeContext)
2365 if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
2366 lg.Panic("failed to unmarshal member", zap.Error(err))
2367 }
2368 if cc.NodeID != uint64(confChangeContext.Member.ID) {
2369 lg.Panic(
2370 "got different member ID",
2371 zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
2372 zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
2373 )
2374 }
2375 if confChangeContext.IsPromote {
2376 s.cluster.PromoteMember(confChangeContext.Member.ID, shouldApplyV3)
2377 } else {
2378 s.cluster.AddMember(&confChangeContext.Member, shouldApplyV3)
2379
2380 if confChangeContext.Member.ID != s.id {
2381 s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
2382 }
2383 }
2384
2385
2386 if confChangeContext.Member.ID == s.id {
2387 if cc.Type == raftpb.ConfChangeAddLearnerNode {
2388 isLearner.Set(1)
2389 } else {
2390 isLearner.Set(0)
2391 }
2392 }
2393
2394 case raftpb.ConfChangeRemoveNode:
2395 id := types.ID(cc.NodeID)
2396 s.cluster.RemoveMember(id, shouldApplyV3)
2397 if id == s.id {
2398 return true, nil
2399 }
2400 s.r.transport.RemovePeer(id)
2401
2402 case raftpb.ConfChangeUpdateNode:
2403 m := new(membership.Member)
2404 if err := json.Unmarshal(cc.Context, m); err != nil {
2405 lg.Panic("failed to unmarshal member", zap.Error(err))
2406 }
2407 if cc.NodeID != uint64(m.ID) {
2408 lg.Panic(
2409 "got different member ID",
2410 zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
2411 zap.String("member-id-from-message", m.ID.String()),
2412 )
2413 }
2414 s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes, shouldApplyV3)
2415 if m.ID != s.id {
2416 s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
2417 }
2418 }
2419 return false, nil
2420 }
2421
2422
2423 func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
2424 clone := s.v2store.Clone()
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434 s.KV().Commit()
2435
2436 s.GoAttach(func() {
2437 lg := s.Logger()
2438
2439 d, err := clone.SaveNoCopy()
2440
2441
2442 if err != nil {
2443 lg.Panic("failed to save v2 store", zap.Error(err))
2444 }
2445 snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
2446 if err != nil {
2447
2448
2449 if err == raft.ErrSnapOutOfDate {
2450 return
2451 }
2452 lg.Panic("failed to create snapshot", zap.Error(err))
2453 }
2454
2455 if err = s.r.storage.SaveSnap(snap); err != nil {
2456 lg.Panic("failed to save snapshot", zap.Error(err))
2457 }
2458 if err = s.r.storage.Release(snap); err != nil {
2459 lg.Panic("failed to release wal", zap.Error(err))
2460 }
2461
2462 lg.Info(
2463 "saved snapshot",
2464 zap.Uint64("snapshot-index", snap.Metadata.Index),
2465 )
2466
2467
2468
2469
2470
2471
2472 if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
2473 lg.Info("skip compaction since there is an inflight snapshot")
2474 return
2475 }
2476
2477
2478 compacti := uint64(1)
2479 if snapi > s.Cfg.SnapshotCatchUpEntries {
2480 compacti = snapi - s.Cfg.SnapshotCatchUpEntries
2481 }
2482
2483 err = s.r.raftStorage.Compact(compacti)
2484 if err != nil {
2485
2486
2487 if err == raft.ErrCompacted {
2488 return
2489 }
2490 lg.Panic("failed to compact", zap.Error(err))
2491 }
2492 lg.Info(
2493 "compacted Raft logs",
2494 zap.Uint64("compact-index", compacti),
2495 )
2496 })
2497 }
2498
2499
2500 func (s *EtcdServer) CutPeer(id types.ID) {
2501 tr, ok := s.r.transport.(*rafthttp.Transport)
2502 if ok {
2503 tr.CutPeer(id)
2504 }
2505 }
2506
2507
2508 func (s *EtcdServer) MendPeer(id types.ID) {
2509 tr, ok := s.r.transport.(*rafthttp.Transport)
2510 if ok {
2511 tr.MendPeer(id)
2512 }
2513 }
2514
2515 func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
2516
2517 func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
2518
2519 func (s *EtcdServer) ClusterVersion() *semver.Version {
2520 if s.cluster == nil {
2521 return nil
2522 }
2523 return s.cluster.Version()
2524 }
2525
2526
2527
2528
2529
2530
2531 func (s *EtcdServer) monitorVersions() {
2532 for {
2533 select {
2534 case <-s.FirstCommitInTermNotify():
2535 case <-time.After(monitorVersionInterval):
2536 case <-s.stopping:
2537 return
2538 }
2539
2540 if s.Leader() != s.ID() {
2541 continue
2542 }
2543
2544 v := decideClusterVersion(s.Logger(), getVersions(s.Logger(), s.cluster, s.id, s.peerRt))
2545 if v != nil {
2546
2547 v = &semver.Version{
2548 Major: v.Major,
2549 Minor: v.Minor,
2550 }
2551 }
2552
2553
2554
2555
2556 if s.cluster.Version() == nil {
2557 verStr := version.MinClusterVersion
2558 if v != nil {
2559 verStr = v.String()
2560 }
2561 s.GoAttach(func() { s.updateClusterVersionV2(verStr) })
2562 continue
2563 }
2564
2565 if v != nil && membership.IsValidVersionChange(s.cluster.Version(), v) {
2566 s.GoAttach(func() { s.updateClusterVersionV2(v.String()) })
2567 }
2568 }
2569 }
2570
2571 func (s *EtcdServer) monitorKVHash() {
2572 t := s.Cfg.CorruptCheckTime
2573 if t == 0 {
2574 return
2575 }
2576
2577 lg := s.Logger()
2578 lg.Info(
2579 "enabled corruption checking",
2580 zap.String("local-member-id", s.ID().String()),
2581 zap.Duration("interval", t),
2582 )
2583 for {
2584 select {
2585 case <-s.stopping:
2586 return
2587 case <-time.After(t):
2588 }
2589 if !s.isLeader() {
2590 continue
2591 }
2592 if err := s.corruptionChecker.PeriodicCheck(); err != nil {
2593 lg.Warn("failed to check hash KV", zap.Error(err))
2594 }
2595 }
2596 }
2597
2598 func (s *EtcdServer) monitorCompactHash() {
2599 if !s.Cfg.CompactHashCheckEnabled {
2600 return
2601 }
2602 t := s.Cfg.CompactHashCheckTime
2603 for {
2604 select {
2605 case <-time.After(t):
2606 case <-s.stopping:
2607 return
2608 }
2609 if !s.isLeader() {
2610 continue
2611 }
2612 s.corruptionChecker.CompactHashCheck()
2613 }
2614 }
2615
2616 func (s *EtcdServer) updateClusterVersionV2(ver string) {
2617 lg := s.Logger()
2618
2619 if s.cluster.Version() == nil {
2620 lg.Info(
2621 "setting up initial cluster version using v2 API",
2622 zap.String("cluster-version", version.Cluster(ver)),
2623 )
2624 } else {
2625 lg.Info(
2626 "updating cluster version using v2 API",
2627 zap.String("from", version.Cluster(s.cluster.Version().String())),
2628 zap.String("to", version.Cluster(ver)),
2629 )
2630 }
2631
2632 req := pb.Request{
2633 Method: "PUT",
2634 Path: membership.StoreClusterVersionKey(),
2635 Val: ver,
2636 }
2637
2638 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
2639 _, err := s.Do(ctx, req)
2640 cancel()
2641
2642 switch err {
2643 case nil:
2644 lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
2645 return
2646
2647 case ErrStopped:
2648 lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
2649 return
2650
2651 default:
2652 lg.Warn("failed to update cluster version", zap.Error(err))
2653 }
2654 }
2655
2656 func (s *EtcdServer) updateClusterVersionV3(ver string) {
2657 lg := s.Logger()
2658
2659 if s.cluster.Version() == nil {
2660 lg.Info(
2661 "setting up initial cluster version using v3 API",
2662 zap.String("cluster-version", version.Cluster(ver)),
2663 )
2664 } else {
2665 lg.Info(
2666 "updating cluster version using v3 API",
2667 zap.String("from", version.Cluster(s.cluster.Version().String())),
2668 zap.String("to", version.Cluster(ver)),
2669 )
2670 }
2671
2672 req := membershippb.ClusterVersionSetRequest{Ver: ver}
2673
2674 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
2675 _, err := s.raftRequest(ctx, pb.InternalRaftRequest{ClusterVersionSet: &req})
2676 cancel()
2677
2678 switch err {
2679 case nil:
2680 lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
2681 return
2682
2683 case ErrStopped:
2684 lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
2685 return
2686
2687 default:
2688 lg.Warn("failed to update cluster version", zap.Error(err))
2689 }
2690 }
2691
2692 func (s *EtcdServer) monitorDowngrade() {
2693 t := s.Cfg.DowngradeCheckTime
2694 if t == 0 {
2695 return
2696 }
2697 lg := s.Logger()
2698 for {
2699 select {
2700 case <-time.After(t):
2701 case <-s.stopping:
2702 return
2703 }
2704
2705 if !s.isLeader() {
2706 continue
2707 }
2708
2709 d := s.cluster.DowngradeInfo()
2710 if !d.Enabled {
2711 continue
2712 }
2713
2714 targetVersion := d.TargetVersion
2715 v := semver.Must(semver.NewVersion(targetVersion))
2716 if isMatchedVersions(s.Logger(), v, getVersions(s.Logger(), s.cluster, s.id, s.peerRt)) {
2717 lg.Info("the cluster has been downgraded", zap.String("cluster-version", targetVersion))
2718 ctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
2719 if _, err := s.downgradeCancel(ctx); err != nil {
2720 lg.Warn("failed to cancel downgrade", zap.Error(err))
2721 }
2722 cancel()
2723 }
2724 }
2725 }
2726
2727 func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
2728 switch err {
2729 case context.Canceled:
2730 return ErrCanceled
2731
2732 case context.DeadlineExceeded:
2733 s.leadTimeMu.RLock()
2734 curLeadElected := s.leadElectedTime
2735 s.leadTimeMu.RUnlock()
2736 prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
2737 if start.After(prevLeadLost) && start.Before(curLeadElected) {
2738 return ErrTimeoutDueToLeaderFail
2739 }
2740 lead := types.ID(s.getLead())
2741 switch lead {
2742 case types.ID(raft.None):
2743
2744 case s.ID():
2745 if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) {
2746 return ErrTimeoutDueToConnectionLost
2747 }
2748 default:
2749 if !isConnectedSince(s.r.transport, start, lead) {
2750 return ErrTimeoutDueToConnectionLost
2751 }
2752 }
2753 return ErrTimeout
2754
2755 default:
2756 return err
2757 }
2758 }
2759
2760 func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv }
2761 func (s *EtcdServer) Backend() backend.Backend {
2762 s.bemu.Lock()
2763 defer s.bemu.Unlock()
2764 return s.be
2765 }
2766
2767 func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
2768
2769 func (s *EtcdServer) restoreAlarms() error {
2770 s.applyV3 = s.newApplierV3()
2771 as, err := v3alarm.NewAlarmStore(s.lg, s)
2772 if err != nil {
2773 return err
2774 }
2775 s.alarmStore = as
2776 if len(as.Get(pb.AlarmType_NOSPACE)) > 0 {
2777 s.applyV3 = newApplierV3Capped(s.applyV3)
2778 }
2779 if len(as.Get(pb.AlarmType_CORRUPT)) > 0 {
2780 s.applyV3 = newApplierV3Corrupt(s.applyV3)
2781 }
2782 return nil
2783 }
2784
2785
2786
2787
2788 func (s *EtcdServer) GoAttach(f func()) {
2789 s.wgMu.RLock()
2790 defer s.wgMu.RUnlock()
2791 select {
2792 case <-s.stopping:
2793 lg := s.Logger()
2794 lg.Warn("server has stopped; skipping GoAttach")
2795 return
2796 default:
2797 }
2798
2799
2800 s.wg.Add(1)
2801 go func() {
2802 defer s.wg.Done()
2803 f()
2804 }()
2805 }
2806
2807 func (s *EtcdServer) Alarms() []*pb.AlarmMember {
2808 return s.alarmStore.Get(pb.AlarmType_NONE)
2809 }
2810
2811
2812 func (s *EtcdServer) IsLearner() bool {
2813 return s.cluster.IsLocalMemberLearner()
2814 }
2815
2816
2817 func (s *EtcdServer) IsMemberExist(id types.ID) bool {
2818 return s.cluster.IsMemberExist(id)
2819 }
2820
2821
2822 func (s *EtcdServer) raftStatus() raft.Status {
2823 return s.r.Node.Status()
2824 }
2825
2826 func (s *EtcdServer) getTxPostLockInsideApplyHook() func() {
2827 return func() {
2828 applyingIdx, applyingTerm := s.consistIndex.ConsistentApplyingIndex()
2829 if applyingIdx > s.consistIndex.UnsafeConsistentIndex() {
2830 s.consistIndex.SetConsistentIndex(applyingIdx, applyingTerm)
2831 }
2832 }
2833 }
2834
2835 func maybeDefragBackend(cfg config.ServerConfig, be backend.Backend) error {
2836 size := be.Size()
2837 sizeInUse := be.SizeInUse()
2838 freeableMemory := uint(size - sizeInUse)
2839 thresholdBytes := cfg.ExperimentalBootstrapDefragThresholdMegabytes * 1024 * 1024
2840 if freeableMemory < thresholdBytes {
2841 cfg.Logger.Info("Skipping defragmentation",
2842 zap.Int64("current-db-size-bytes", size),
2843 zap.String("current-db-size", humanize.Bytes(uint64(size))),
2844 zap.Int64("current-db-size-in-use-bytes", sizeInUse),
2845 zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse))),
2846 zap.Uint("experimental-bootstrap-defrag-threshold-bytes", thresholdBytes),
2847 zap.String("experimental-bootstrap-defrag-threshold", humanize.Bytes(uint64(thresholdBytes))),
2848 )
2849 return nil
2850 }
2851 return be.Defrag()
2852 }
2853
2854 func (s *EtcdServer) CorruptionChecker() CorruptionChecker {
2855 return s.corruptionChecker
2856 }
2857
View as plain text