1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package membership
16
17 import (
18 "bytes"
19 "context"
20 "crypto/sha1"
21 "encoding/binary"
22 "encoding/json"
23 "errors"
24 "fmt"
25 "path"
26 "sort"
27 "strings"
28 "sync"
29 "time"
30
31 "go.etcd.io/etcd/api/v3/version"
32 "go.etcd.io/etcd/client/pkg/v3/types"
33 "go.etcd.io/etcd/pkg/v3/netutil"
34 "go.etcd.io/etcd/raft/v3"
35 "go.etcd.io/etcd/raft/v3/raftpb"
36 "go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
37 "go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
38 "go.etcd.io/etcd/server/v3/mvcc/backend"
39 "go.etcd.io/etcd/server/v3/mvcc/buckets"
40
41 "github.com/coreos/go-semver/semver"
42 "github.com/prometheus/client_golang/prometheus"
43 "go.uber.org/zap"
44 )
45
46 const maxLearners = 1
47
48
49 type RaftCluster struct {
50 lg *zap.Logger
51
52 localID types.ID
53 cid types.ID
54
55 v2store v2store.Store
56 be backend.Backend
57
58 sync.Mutex
59 version *semver.Version
60 members map[types.ID]*Member
61
62
63 removed map[types.ID]bool
64
65 downgradeInfo *DowngradeInfo
66 }
67
68
69 type ConfigChangeContext struct {
70 Member
71
72
73
74 IsPromote bool `json:"isPromote"`
75 }
76
77 type ShouldApplyV3 bool
78
79 const (
80 ApplyBoth = ShouldApplyV3(true)
81 ApplyV2storeOnly = ShouldApplyV3(false)
82 )
83
84
85
86 func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
87 c := NewCluster(lg)
88 for name, urls := range urlsmap {
89 m := NewMember(name, urls, token, nil)
90 if _, ok := c.members[m.ID]; ok {
91 return nil, fmt.Errorf("member exists with identical ID %v", m)
92 }
93 if uint64(m.ID) == raft.None {
94 return nil, fmt.Errorf("cannot use %x as member id", raft.None)
95 }
96 c.members[m.ID] = m
97 }
98 c.genID()
99 return c, nil
100 }
101
102 func NewClusterFromMembers(lg *zap.Logger, id types.ID, membs []*Member) *RaftCluster {
103 c := NewCluster(lg)
104 c.cid = id
105 for _, m := range membs {
106 c.members[m.ID] = m
107 }
108 return c
109 }
110
111 func NewCluster(lg *zap.Logger) *RaftCluster {
112 if lg == nil {
113 lg = zap.NewNop()
114 }
115 return &RaftCluster{
116 lg: lg,
117 members: make(map[types.ID]*Member),
118 removed: make(map[types.ID]bool),
119 downgradeInfo: &DowngradeInfo{Enabled: false},
120 }
121 }
122
123 func (c *RaftCluster) ID() types.ID { return c.cid }
124
125 func (c *RaftCluster) Members() []*Member {
126 c.Lock()
127 defer c.Unlock()
128 var ms MembersByID
129 for _, m := range c.members {
130 ms = append(ms, m.Clone())
131 }
132 sort.Sort(ms)
133 return []*Member(ms)
134 }
135
136 func (c *RaftCluster) Member(id types.ID) *Member {
137 c.Lock()
138 defer c.Unlock()
139 return c.members[id].Clone()
140 }
141
142 func (c *RaftCluster) VotingMembers() []*Member {
143 c.Lock()
144 defer c.Unlock()
145 var ms MembersByID
146 for _, m := range c.members {
147 if !m.IsLearner {
148 ms = append(ms, m.Clone())
149 }
150 }
151 sort.Sort(ms)
152 return []*Member(ms)
153 }
154
155
156
157 func (c *RaftCluster) MemberByName(name string) *Member {
158 c.Lock()
159 defer c.Unlock()
160 var memb *Member
161 for _, m := range c.members {
162 if m.Name == name {
163 if memb != nil {
164 c.lg.Panic("two member with same name found", zap.String("name", name))
165 }
166 memb = m
167 }
168 }
169 return memb.Clone()
170 }
171
172 func (c *RaftCluster) MemberIDs() []types.ID {
173 c.Lock()
174 defer c.Unlock()
175 var ids []types.ID
176 for _, m := range c.members {
177 ids = append(ids, m.ID)
178 }
179 sort.Sort(types.IDSlice(ids))
180 return ids
181 }
182
183 func (c *RaftCluster) IsIDRemoved(id types.ID) bool {
184 c.Lock()
185 defer c.Unlock()
186 return c.removed[id]
187 }
188
189
190
191 func (c *RaftCluster) PeerURLs() []string {
192 c.Lock()
193 defer c.Unlock()
194 urls := make([]string, 0)
195 for _, p := range c.members {
196 urls = append(urls, p.PeerURLs...)
197 }
198 sort.Strings(urls)
199 return urls
200 }
201
202
203
204 func (c *RaftCluster) ClientURLs() []string {
205 c.Lock()
206 defer c.Unlock()
207 urls := make([]string, 0)
208 for _, p := range c.members {
209 urls = append(urls, p.ClientURLs...)
210 }
211 sort.Strings(urls)
212 return urls
213 }
214
215 func (c *RaftCluster) String() string {
216 c.Lock()
217 defer c.Unlock()
218 b := &bytes.Buffer{}
219 fmt.Fprintf(b, "{ClusterID:%s ", c.cid)
220 var ms []string
221 for _, m := range c.members {
222 ms = append(ms, fmt.Sprintf("%+v", m))
223 }
224 fmt.Fprintf(b, "Members:[%s] ", strings.Join(ms, " "))
225 var ids []string
226 for id := range c.removed {
227 ids = append(ids, id.String())
228 }
229 fmt.Fprintf(b, "RemovedMemberIDs:[%s]}", strings.Join(ids, " "))
230 return b.String()
231 }
232
233 func (c *RaftCluster) genID() {
234 mIDs := c.MemberIDs()
235 b := make([]byte, 8*len(mIDs))
236 for i, id := range mIDs {
237 binary.BigEndian.PutUint64(b[8*i:], uint64(id))
238 }
239 hash := sha1.Sum(b)
240 c.cid = types.ID(binary.BigEndian.Uint64(hash[:8]))
241 }
242
243 func (c *RaftCluster) SetID(localID, cid types.ID) {
244 c.localID = localID
245 c.cid = cid
246 }
247
248 func (c *RaftCluster) SetStore(st v2store.Store) { c.v2store = st }
249
250 func (c *RaftCluster) SetBackend(be backend.Backend) {
251 c.be = be
252 mustCreateBackendBuckets(c.be)
253 }
254
255 func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
256 c.Lock()
257 defer c.Unlock()
258
259 if c.v2store != nil {
260 c.version = clusterVersionFromStore(c.lg, c.v2store)
261 c.members, c.removed = membersFromStore(c.lg, c.v2store)
262 } else {
263 c.version = clusterVersionFromBackend(c.lg, c.be)
264 c.members, c.removed = membersFromBackend(c.lg, c.be)
265 }
266
267 if c.be != nil {
268 c.downgradeInfo = downgradeInfoFromBackend(c.lg, c.be)
269 }
270 d := &DowngradeInfo{Enabled: false}
271 if c.downgradeInfo != nil {
272 d = &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
273 }
274 mustDetectDowngrade(c.lg, c.version, d)
275 onSet(c.lg, c.version)
276
277 for _, m := range c.members {
278 c.lg.Info(
279 "recovered/added member from store",
280 zap.String("cluster-id", c.cid.String()),
281 zap.String("local-member-id", c.localID.String()),
282 zap.String("recovered-remote-peer-id", m.ID.String()),
283 zap.Strings("recovered-remote-peer-urls", m.PeerURLs),
284 )
285 }
286 if c.version != nil {
287 c.lg.Info(
288 "set cluster version from store",
289 zap.String("cluster-version", version.Cluster(c.version.String())),
290 )
291 }
292 }
293
294
295
296 func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
297
298 members, removed := membersFromStore(c.lg, c.v2store)
299 id := types.ID(cc.NodeID)
300 if removed[id] {
301 return ErrIDRemoved
302 }
303 switch cc.Type {
304 case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
305 confChangeContext := new(ConfigChangeContext)
306 if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
307 c.lg.Panic("failed to unmarshal confChangeContext", zap.Error(err))
308 }
309
310 if confChangeContext.IsPromote {
311 if members[id] == nil {
312 return ErrIDNotFound
313 }
314 if !members[id].IsLearner {
315 return ErrMemberNotLearner
316 }
317 } else {
318 if members[id] != nil {
319 return ErrIDExists
320 }
321
322 urls := make(map[string]bool)
323 for _, m := range members {
324 for _, u := range m.PeerURLs {
325 urls[u] = true
326 }
327 }
328 for _, u := range confChangeContext.Member.PeerURLs {
329 if urls[u] {
330 return ErrPeerURLexists
331 }
332 }
333
334 if confChangeContext.Member.IsLearner {
335 numLearners := 0
336 for _, m := range members {
337 if m.IsLearner {
338 numLearners++
339 }
340 }
341 if numLearners+1 > maxLearners {
342 return ErrTooManyLearners
343 }
344 }
345 }
346 case raftpb.ConfChangeRemoveNode:
347 if members[id] == nil {
348 return ErrIDNotFound
349 }
350
351 case raftpb.ConfChangeUpdateNode:
352 if members[id] == nil {
353 return ErrIDNotFound
354 }
355 urls := make(map[string]bool)
356 for _, m := range members {
357 if m.ID == id {
358 continue
359 }
360 for _, u := range m.PeerURLs {
361 urls[u] = true
362 }
363 }
364 m := new(Member)
365 if err := json.Unmarshal(cc.Context, m); err != nil {
366 c.lg.Panic("failed to unmarshal member", zap.Error(err))
367 }
368 for _, u := range m.PeerURLs {
369 if urls[u] {
370 return ErrPeerURLexists
371 }
372 }
373
374 default:
375 c.lg.Panic("unknown ConfChange type", zap.String("type", cc.Type.String()))
376 }
377 return nil
378 }
379
380
381
382
383 func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
384 c.Lock()
385 defer c.Unlock()
386
387 var v2Err, beErr error
388 if c.v2store != nil {
389 v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m)
390 if v2Err != nil {
391 if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist {
392 c.lg.Panic(
393 "failed to save member to store",
394 zap.String("member-id", m.ID.String()),
395 zap.Error(v2Err),
396 )
397 }
398 }
399 }
400 if c.be != nil && shouldApplyV3 {
401 beErr = unsafeSaveMemberToBackend(c.lg, c.be, m)
402 if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) {
403 c.lg.Panic(
404 "failed to save member to backend",
405 zap.String("member-id", m.ID.String()),
406 zap.Error(beErr),
407 )
408 }
409 }
410
411 if v2Err != nil && (beErr != nil || c.be == nil) {
412 c.lg.Panic(
413 "failed to save member to store",
414 zap.String("member-id", m.ID.String()),
415 zap.Error(v2Err),
416 )
417 }
418
419 c.members[m.ID] = m
420
421 c.lg.Info(
422 "added member",
423 zap.String("cluster-id", c.cid.String()),
424 zap.String("local-member-id", c.localID.String()),
425 zap.String("added-peer-id", m.ID.String()),
426 zap.Strings("added-peer-peer-urls", m.PeerURLs),
427 )
428 }
429
430
431
432 func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
433 c.Lock()
434 defer c.Unlock()
435 var v2Err, beErr error
436 if c.v2store != nil {
437 v2Err = unsafeDeleteMemberFromStore(c.v2store, id)
438 if v2Err != nil {
439 if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound {
440 c.lg.Panic(
441 "failed to delete member from store",
442 zap.String("member-id", id.String()),
443 zap.Error(v2Err),
444 )
445 }
446 }
447 }
448 if c.be != nil && shouldApplyV3 {
449 beErr = unsafeDeleteMemberFromBackend(c.be, id)
450 if beErr != nil && !errors.Is(beErr, errMemberNotFound) {
451 c.lg.Panic(
452 "failed to delete member from backend",
453 zap.String("member-id", id.String()),
454 zap.Error(beErr),
455 )
456 }
457 }
458
459 if v2Err != nil && (beErr != nil || c.be == nil) {
460 c.lg.Panic(
461 "failed to delete member from store",
462 zap.String("member-id", id.String()),
463 zap.Error(v2Err),
464 )
465 }
466
467 m, ok := c.members[id]
468 delete(c.members, id)
469 c.removed[id] = true
470
471 if ok {
472 c.lg.Info(
473 "removed member",
474 zap.String("cluster-id", c.cid.String()),
475 zap.String("local-member-id", c.localID.String()),
476 zap.String("removed-remote-peer-id", id.String()),
477 zap.Strings("removed-remote-peer-urls", m.PeerURLs),
478 )
479 } else {
480 c.lg.Warn(
481 "skipped removing already removed member",
482 zap.String("cluster-id", c.cid.String()),
483 zap.String("local-member-id", c.localID.String()),
484 zap.String("removed-remote-peer-id", id.String()),
485 )
486 }
487 }
488
489 func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 ShouldApplyV3) {
490 c.Lock()
491 defer c.Unlock()
492
493 if m, ok := c.members[id]; ok {
494 m.Attributes = attr
495 if c.v2store != nil {
496 mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
497 }
498 if c.be != nil && shouldApplyV3 {
499 unsafeSaveMemberToBackend(c.lg, c.be, m)
500 }
501 return
502 }
503
504 _, ok := c.removed[id]
505 if !ok {
506 c.lg.Panic(
507 "failed to update; member unknown",
508 zap.String("cluster-id", c.cid.String()),
509 zap.String("local-member-id", c.localID.String()),
510 zap.String("unknown-remote-peer-id", id.String()),
511 )
512 }
513
514 c.lg.Warn(
515 "skipped attributes update of removed member",
516 zap.String("cluster-id", c.cid.String()),
517 zap.String("local-member-id", c.localID.String()),
518 zap.String("updated-peer-id", id.String()),
519 )
520 }
521
522
523 func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
524 c.Lock()
525 defer c.Unlock()
526
527 c.members[id].RaftAttributes.IsLearner = false
528 if c.v2store != nil {
529 mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
530 }
531 if c.be != nil && shouldApplyV3 {
532 unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
533 }
534
535 c.lg.Info(
536 "promote member",
537 zap.String("cluster-id", c.cid.String()),
538 zap.String("local-member-id", c.localID.String()),
539 )
540 }
541
542 func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 ShouldApplyV3) {
543 c.Lock()
544 defer c.Unlock()
545
546 c.members[id].RaftAttributes = raftAttr
547 if c.v2store != nil {
548 mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
549 }
550 if c.be != nil && shouldApplyV3 {
551 unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
552 }
553
554 c.lg.Info(
555 "updated member",
556 zap.String("cluster-id", c.cid.String()),
557 zap.String("local-member-id", c.localID.String()),
558 zap.String("updated-remote-peer-id", id.String()),
559 zap.Strings("updated-remote-peer-urls", raftAttr.PeerURLs),
560 )
561 }
562
563 func (c *RaftCluster) Version() *semver.Version {
564 c.Lock()
565 defer c.Unlock()
566 if c.version == nil {
567 return nil
568 }
569 return semver.Must(semver.NewVersion(c.version.String()))
570 }
571
572 func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 ShouldApplyV3) {
573 c.Lock()
574 defer c.Unlock()
575 if c.version != nil {
576 c.lg.Info(
577 "updated cluster version",
578 zap.String("cluster-id", c.cid.String()),
579 zap.String("local-member-id", c.localID.String()),
580 zap.String("from", version.Cluster(c.version.String())),
581 zap.String("to", version.Cluster(ver.String())),
582 )
583 } else {
584 c.lg.Info(
585 "set initial cluster version",
586 zap.String("cluster-id", c.cid.String()),
587 zap.String("local-member-id", c.localID.String()),
588 zap.String("cluster-version", version.Cluster(ver.String())),
589 )
590 }
591 oldVer := c.version
592 c.version = ver
593 mustDetectDowngrade(c.lg, c.version, c.downgradeInfo)
594 if c.v2store != nil {
595 mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
596 }
597 if c.be != nil && shouldApplyV3 {
598 mustSaveClusterVersionToBackend(c.be, ver)
599 }
600 if oldVer != nil {
601 ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0)
602 }
603 ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(ver.String())}).Set(1)
604 onSet(c.lg, ver)
605 }
606
607 func (c *RaftCluster) IsReadyToAddVotingMember() bool {
608 nmembers := 1
609 nstarted := 0
610
611 for _, member := range c.VotingMembers() {
612 if member.IsStarted() {
613 nstarted++
614 }
615 nmembers++
616 }
617
618 if nstarted == 1 && nmembers == 2 {
619
620
621 c.lg.Debug("number of started member is 1; can accept add member request")
622 return true
623 }
624
625 nquorum := nmembers/2 + 1
626 if nstarted < nquorum {
627 c.lg.Warn(
628 "rejecting member add; started member will be less than quorum",
629 zap.Int("number-of-started-member", nstarted),
630 zap.Int("quorum", nquorum),
631 zap.String("cluster-id", c.cid.String()),
632 zap.String("local-member-id", c.localID.String()),
633 )
634 return false
635 }
636
637 return true
638 }
639
640 func (c *RaftCluster) IsReadyToRemoveVotingMember(id uint64) bool {
641 nmembers := 0
642 nstarted := 0
643
644 for _, member := range c.VotingMembers() {
645 if uint64(member.ID) == id {
646 continue
647 }
648
649 if member.IsStarted() {
650 nstarted++
651 }
652 nmembers++
653 }
654
655 nquorum := nmembers/2 + 1
656 if nstarted < nquorum {
657 c.lg.Warn(
658 "rejecting member remove; started member will be less than quorum",
659 zap.Int("number-of-started-member", nstarted),
660 zap.Int("quorum", nquorum),
661 zap.String("cluster-id", c.cid.String()),
662 zap.String("local-member-id", c.localID.String()),
663 )
664 return false
665 }
666
667 return true
668 }
669
670 func (c *RaftCluster) IsReadyToPromoteMember(id uint64) bool {
671 nmembers := 1
672 nstarted := 1
673
674 for _, member := range c.VotingMembers() {
675 if member.IsStarted() {
676 nstarted++
677 }
678 nmembers++
679 }
680
681 nquorum := nmembers/2 + 1
682 if nstarted < nquorum {
683 c.lg.Warn(
684 "rejecting member promote; started member will be less than quorum",
685 zap.Int("number-of-started-member", nstarted),
686 zap.Int("quorum", nquorum),
687 zap.String("cluster-id", c.cid.String()),
688 zap.String("local-member-id", c.localID.String()),
689 )
690 return false
691 }
692
693 return true
694 }
695
696 func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, map[types.ID]bool) {
697 members := make(map[types.ID]*Member)
698 removed := make(map[types.ID]bool)
699 e, err := st.Get(StoreMembersPrefix, true, true)
700 if err != nil {
701 if isKeyNotFound(err) {
702 return members, removed
703 }
704 lg.Panic("failed to get members from store", zap.String("path", StoreMembersPrefix), zap.Error(err))
705 }
706 for _, n := range e.Node.Nodes {
707 var m *Member
708 m, err = nodeToMember(lg, n)
709 if err != nil {
710 lg.Panic("failed to nodeToMember", zap.Error(err))
711 }
712 members[m.ID] = m
713 }
714
715 e, err = st.Get(storeRemovedMembersPrefix, true, true)
716 if err != nil {
717 if isKeyNotFound(err) {
718 return members, removed
719 }
720 lg.Panic(
721 "failed to get removed members from store",
722 zap.String("path", storeRemovedMembersPrefix),
723 zap.Error(err),
724 )
725 }
726 for _, n := range e.Node.Nodes {
727 removed[MustParseMemberIDFromKey(lg, n.Key)] = true
728 }
729 return members, removed
730 }
731
732 func membersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
733 return mustReadMembersFromBackend(lg, be)
734 }
735
736 func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
737 e, err := st.Get(path.Join(storePrefix, "version"), false, false)
738 if err != nil {
739 if isKeyNotFound(err) {
740 return nil
741 }
742 lg.Panic(
743 "failed to get cluster version from store",
744 zap.String("path", path.Join(storePrefix, "version")),
745 zap.Error(err),
746 )
747 }
748 return semver.Must(semver.NewVersion(*e.Node.Value))
749 }
750
751
752 func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version {
753 ckey := backendClusterVersionKey()
754 tx := be.ReadTx()
755 tx.RLock()
756 defer tx.RUnlock()
757 keys, vals := tx.UnsafeRange(buckets.Cluster, ckey, nil, 0)
758 if len(keys) == 0 {
759 return nil
760 }
761 if len(keys) != 1 {
762 lg.Panic(
763 "unexpected number of keys when getting cluster version from backend",
764 zap.Int("number-of-key", len(keys)),
765 )
766 }
767 return semver.Must(semver.NewVersion(string(vals[0])))
768 }
769
770
771 func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
772 dkey := backendDowngradeKey()
773 tx := be.ReadTx()
774 tx.Lock()
775 defer tx.Unlock()
776 keys, vals := tx.UnsafeRange(buckets.Cluster, dkey, nil, 0)
777 if len(keys) == 0 {
778 return nil
779 }
780
781 if len(keys) != 1 {
782 lg.Panic(
783 "unexpected number of keys when getting cluster version from backend",
784 zap.Int("number-of-key", len(keys)),
785 )
786 }
787 var d DowngradeInfo
788 if err := json.Unmarshal(vals[0], &d); err != nil {
789 lg.Panic("failed to unmarshal downgrade information", zap.Error(err))
790 }
791
792
793 if d.Enabled {
794 if _, err := semver.NewVersion(d.TargetVersion); err != nil {
795 lg.Panic(
796 "unexpected version format of the downgrade target version from backend",
797 zap.String("target-version", d.TargetVersion),
798 )
799 }
800 }
801 return &d
802 }
803
804
805
806
807
808 func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *RaftCluster) error {
809 ems := existing.Members()
810 lms := local.Members()
811 if len(ems) != len(lms) {
812 return fmt.Errorf("member count is unequal")
813 }
814
815 ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
816 defer cancel()
817 for i := range ems {
818 var err error
819 ok := false
820 for j := range lms {
821 if ok, err = netutil.URLStringsEqual(ctx, lg, ems[i].PeerURLs, lms[j].PeerURLs); ok {
822 lms[j].ID = ems[i].ID
823 break
824 }
825 }
826 if !ok {
827 return fmt.Errorf("PeerURLs: no match found for existing member (%v, %v), last resolver error (%v)", ems[i].ID, ems[i].PeerURLs, err)
828 }
829 }
830 local.members = make(map[types.ID]*Member)
831 for _, m := range lms {
832 local.members[m.ID] = m
833 }
834 return nil
835 }
836
837
838
839
840
841
842
843 func IsValidVersionChange(cv *semver.Version, lv *semver.Version) bool {
844 cv = &semver.Version{Major: cv.Major, Minor: cv.Minor}
845 lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
846
847 if isValidDowngrade(cv, lv) || (cv.Major == lv.Major && cv.LessThan(*lv)) {
848 return true
849 }
850 return false
851 }
852
853
854 func (c *RaftCluster) IsLocalMemberLearner() bool {
855 c.Lock()
856 defer c.Unlock()
857 localMember, ok := c.members[c.localID]
858 if !ok {
859 c.lg.Panic(
860 "failed to find local ID in cluster members",
861 zap.String("cluster-id", c.cid.String()),
862 zap.String("local-member-id", c.localID.String()),
863 )
864 }
865 return localMember.IsLearner
866 }
867
868
869 func (c *RaftCluster) DowngradeInfo() *DowngradeInfo {
870 c.Lock()
871 defer c.Unlock()
872 if c.downgradeInfo == nil {
873 return &DowngradeInfo{Enabled: false}
874 }
875 d := &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
876 return d
877 }
878
879 func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApplyV3) {
880 c.Lock()
881 defer c.Unlock()
882
883 if c.be != nil && shouldApplyV3 {
884 mustSaveDowngradeToBackend(c.lg, c.be, d)
885 }
886
887 c.downgradeInfo = d
888
889 if d.Enabled {
890 c.lg.Info(
891 "The server is ready to downgrade",
892 zap.String("target-version", d.TargetVersion),
893 zap.String("server-version", version.Version),
894 )
895 }
896 }
897
898
899 func (c *RaftCluster) IsMemberExist(id types.ID) bool {
900 c.Lock()
901 defer c.Unlock()
902 _, ok := c.members[id]
903 return ok
904 }
905
906
907 func (c *RaftCluster) VotingMemberIDs() []types.ID {
908 c.Lock()
909 defer c.Unlock()
910 var ids []types.ID
911 for _, m := range c.members {
912 if !m.IsLearner {
913 ids = append(ids, m.ID)
914 }
915 }
916 sort.Sort(types.IDSlice(ids))
917 return ids
918 }
919
920
921
922 func (c *RaftCluster) PushMembershipToStorage() {
923 if c.be != nil {
924 TrimMembershipFromBackend(c.lg, c.be)
925 for _, m := range c.members {
926 unsafeSaveMemberToBackend(c.lg, c.be, m)
927 }
928 }
929 if c.v2store != nil {
930 TrimMembershipFromV2Store(c.lg, c.v2store)
931 for _, m := range c.members {
932 mustSaveMemberToStore(c.lg, c.v2store, m)
933 }
934 }
935 }
936
View as plain text