...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/membership/cluster.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/membership

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package membership
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"crypto/sha1"
    21  	"encoding/binary"
    22  	"encoding/json"
    23  	"errors"
    24  	"fmt"
    25  	"path"
    26  	"sort"
    27  	"strings"
    28  	"sync"
    29  	"time"
    30  
    31  	"go.etcd.io/etcd/api/v3/version"
    32  	"go.etcd.io/etcd/client/pkg/v3/types"
    33  	"go.etcd.io/etcd/pkg/v3/netutil"
    34  	"go.etcd.io/etcd/raft/v3"
    35  	"go.etcd.io/etcd/raft/v3/raftpb"
    36  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
    37  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
    38  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    39  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    40  
    41  	"github.com/coreos/go-semver/semver"
    42  	"github.com/prometheus/client_golang/prometheus"
    43  	"go.uber.org/zap"
    44  )
    45  
    46  const maxLearners = 1
    47  
    48  // RaftCluster is a list of Members that belong to the same raft cluster
    49  type RaftCluster struct {
    50  	lg *zap.Logger
    51  
    52  	localID types.ID
    53  	cid     types.ID
    54  
    55  	v2store v2store.Store
    56  	be      backend.Backend
    57  
    58  	sync.Mutex // guards the fields below
    59  	version    *semver.Version
    60  	members    map[types.ID]*Member
    61  	// removed contains the ids of removed members in the cluster.
    62  	// removed id cannot be reused.
    63  	removed map[types.ID]bool
    64  
    65  	downgradeInfo *DowngradeInfo
    66  }
    67  
    68  // ConfigChangeContext represents a context for confChange.
    69  type ConfigChangeContext struct {
    70  	Member
    71  	// IsPromote indicates if the config change is for promoting a learner member.
    72  	// This flag is needed because both adding a new member and promoting a learner member
    73  	// uses the same config change type 'ConfChangeAddNode'.
    74  	IsPromote bool `json:"isPromote"`
    75  }
    76  
    77  type ShouldApplyV3 bool
    78  
    79  const (
    80  	ApplyBoth        = ShouldApplyV3(true)
    81  	ApplyV2storeOnly = ShouldApplyV3(false)
    82  )
    83  
    84  // NewClusterFromURLsMap creates a new raft cluster using provided urls map. Currently, it does not support creating
    85  // cluster with raft learner member.
    86  func NewClusterFromURLsMap(lg *zap.Logger, token string, urlsmap types.URLsMap) (*RaftCluster, error) {
    87  	c := NewCluster(lg)
    88  	for name, urls := range urlsmap {
    89  		m := NewMember(name, urls, token, nil)
    90  		if _, ok := c.members[m.ID]; ok {
    91  			return nil, fmt.Errorf("member exists with identical ID %v", m)
    92  		}
    93  		if uint64(m.ID) == raft.None {
    94  			return nil, fmt.Errorf("cannot use %x as member id", raft.None)
    95  		}
    96  		c.members[m.ID] = m
    97  	}
    98  	c.genID()
    99  	return c, nil
   100  }
   101  
   102  func NewClusterFromMembers(lg *zap.Logger, id types.ID, membs []*Member) *RaftCluster {
   103  	c := NewCluster(lg)
   104  	c.cid = id
   105  	for _, m := range membs {
   106  		c.members[m.ID] = m
   107  	}
   108  	return c
   109  }
   110  
   111  func NewCluster(lg *zap.Logger) *RaftCluster {
   112  	if lg == nil {
   113  		lg = zap.NewNop()
   114  	}
   115  	return &RaftCluster{
   116  		lg:            lg,
   117  		members:       make(map[types.ID]*Member),
   118  		removed:       make(map[types.ID]bool),
   119  		downgradeInfo: &DowngradeInfo{Enabled: false},
   120  	}
   121  }
   122  
   123  func (c *RaftCluster) ID() types.ID { return c.cid }
   124  
   125  func (c *RaftCluster) Members() []*Member {
   126  	c.Lock()
   127  	defer c.Unlock()
   128  	var ms MembersByID
   129  	for _, m := range c.members {
   130  		ms = append(ms, m.Clone())
   131  	}
   132  	sort.Sort(ms)
   133  	return []*Member(ms)
   134  }
   135  
   136  func (c *RaftCluster) Member(id types.ID) *Member {
   137  	c.Lock()
   138  	defer c.Unlock()
   139  	return c.members[id].Clone()
   140  }
   141  
   142  func (c *RaftCluster) VotingMembers() []*Member {
   143  	c.Lock()
   144  	defer c.Unlock()
   145  	var ms MembersByID
   146  	for _, m := range c.members {
   147  		if !m.IsLearner {
   148  			ms = append(ms, m.Clone())
   149  		}
   150  	}
   151  	sort.Sort(ms)
   152  	return []*Member(ms)
   153  }
   154  
   155  // MemberByName returns a Member with the given name if exists.
   156  // If more than one member has the given name, it will panic.
   157  func (c *RaftCluster) MemberByName(name string) *Member {
   158  	c.Lock()
   159  	defer c.Unlock()
   160  	var memb *Member
   161  	for _, m := range c.members {
   162  		if m.Name == name {
   163  			if memb != nil {
   164  				c.lg.Panic("two member with same name found", zap.String("name", name))
   165  			}
   166  			memb = m
   167  		}
   168  	}
   169  	return memb.Clone()
   170  }
   171  
   172  func (c *RaftCluster) MemberIDs() []types.ID {
   173  	c.Lock()
   174  	defer c.Unlock()
   175  	var ids []types.ID
   176  	for _, m := range c.members {
   177  		ids = append(ids, m.ID)
   178  	}
   179  	sort.Sort(types.IDSlice(ids))
   180  	return ids
   181  }
   182  
   183  func (c *RaftCluster) IsIDRemoved(id types.ID) bool {
   184  	c.Lock()
   185  	defer c.Unlock()
   186  	return c.removed[id]
   187  }
   188  
   189  // PeerURLs returns a list of all peer addresses.
   190  // The returned list is sorted in ascending lexicographical order.
   191  func (c *RaftCluster) PeerURLs() []string {
   192  	c.Lock()
   193  	defer c.Unlock()
   194  	urls := make([]string, 0)
   195  	for _, p := range c.members {
   196  		urls = append(urls, p.PeerURLs...)
   197  	}
   198  	sort.Strings(urls)
   199  	return urls
   200  }
   201  
   202  // ClientURLs returns a list of all client addresses.
   203  // The returned list is sorted in ascending lexicographical order.
   204  func (c *RaftCluster) ClientURLs() []string {
   205  	c.Lock()
   206  	defer c.Unlock()
   207  	urls := make([]string, 0)
   208  	for _, p := range c.members {
   209  		urls = append(urls, p.ClientURLs...)
   210  	}
   211  	sort.Strings(urls)
   212  	return urls
   213  }
   214  
   215  func (c *RaftCluster) String() string {
   216  	c.Lock()
   217  	defer c.Unlock()
   218  	b := &bytes.Buffer{}
   219  	fmt.Fprintf(b, "{ClusterID:%s ", c.cid)
   220  	var ms []string
   221  	for _, m := range c.members {
   222  		ms = append(ms, fmt.Sprintf("%+v", m))
   223  	}
   224  	fmt.Fprintf(b, "Members:[%s] ", strings.Join(ms, " "))
   225  	var ids []string
   226  	for id := range c.removed {
   227  		ids = append(ids, id.String())
   228  	}
   229  	fmt.Fprintf(b, "RemovedMemberIDs:[%s]}", strings.Join(ids, " "))
   230  	return b.String()
   231  }
   232  
   233  func (c *RaftCluster) genID() {
   234  	mIDs := c.MemberIDs()
   235  	b := make([]byte, 8*len(mIDs))
   236  	for i, id := range mIDs {
   237  		binary.BigEndian.PutUint64(b[8*i:], uint64(id))
   238  	}
   239  	hash := sha1.Sum(b)
   240  	c.cid = types.ID(binary.BigEndian.Uint64(hash[:8]))
   241  }
   242  
   243  func (c *RaftCluster) SetID(localID, cid types.ID) {
   244  	c.localID = localID
   245  	c.cid = cid
   246  }
   247  
   248  func (c *RaftCluster) SetStore(st v2store.Store) { c.v2store = st }
   249  
   250  func (c *RaftCluster) SetBackend(be backend.Backend) {
   251  	c.be = be
   252  	mustCreateBackendBuckets(c.be)
   253  }
   254  
   255  func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
   256  	c.Lock()
   257  	defer c.Unlock()
   258  
   259  	if c.v2store != nil {
   260  		c.version = clusterVersionFromStore(c.lg, c.v2store)
   261  		c.members, c.removed = membersFromStore(c.lg, c.v2store)
   262  	} else {
   263  		c.version = clusterVersionFromBackend(c.lg, c.be)
   264  		c.members, c.removed = membersFromBackend(c.lg, c.be)
   265  	}
   266  
   267  	if c.be != nil {
   268  		c.downgradeInfo = downgradeInfoFromBackend(c.lg, c.be)
   269  	}
   270  	d := &DowngradeInfo{Enabled: false}
   271  	if c.downgradeInfo != nil {
   272  		d = &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
   273  	}
   274  	mustDetectDowngrade(c.lg, c.version, d)
   275  	onSet(c.lg, c.version)
   276  
   277  	for _, m := range c.members {
   278  		c.lg.Info(
   279  			"recovered/added member from store",
   280  			zap.String("cluster-id", c.cid.String()),
   281  			zap.String("local-member-id", c.localID.String()),
   282  			zap.String("recovered-remote-peer-id", m.ID.String()),
   283  			zap.Strings("recovered-remote-peer-urls", m.PeerURLs),
   284  		)
   285  	}
   286  	if c.version != nil {
   287  		c.lg.Info(
   288  			"set cluster version from store",
   289  			zap.String("cluster-version", version.Cluster(c.version.String())),
   290  		)
   291  	}
   292  }
   293  
   294  // ValidateConfigurationChange takes a proposed ConfChange and
   295  // ensures that it is still valid.
   296  func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
   297  	// TODO: this must be switched to backend as well.
   298  	members, removed := membersFromStore(c.lg, c.v2store)
   299  	id := types.ID(cc.NodeID)
   300  	if removed[id] {
   301  		return ErrIDRemoved
   302  	}
   303  	switch cc.Type {
   304  	case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
   305  		confChangeContext := new(ConfigChangeContext)
   306  		if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
   307  			c.lg.Panic("failed to unmarshal confChangeContext", zap.Error(err))
   308  		}
   309  
   310  		if confChangeContext.IsPromote { // promoting a learner member to voting member
   311  			if members[id] == nil {
   312  				return ErrIDNotFound
   313  			}
   314  			if !members[id].IsLearner {
   315  				return ErrMemberNotLearner
   316  			}
   317  		} else { // adding a new member
   318  			if members[id] != nil {
   319  				return ErrIDExists
   320  			}
   321  
   322  			urls := make(map[string]bool)
   323  			for _, m := range members {
   324  				for _, u := range m.PeerURLs {
   325  					urls[u] = true
   326  				}
   327  			}
   328  			for _, u := range confChangeContext.Member.PeerURLs {
   329  				if urls[u] {
   330  					return ErrPeerURLexists
   331  				}
   332  			}
   333  
   334  			if confChangeContext.Member.IsLearner { // the new member is a learner
   335  				numLearners := 0
   336  				for _, m := range members {
   337  					if m.IsLearner {
   338  						numLearners++
   339  					}
   340  				}
   341  				if numLearners+1 > maxLearners {
   342  					return ErrTooManyLearners
   343  				}
   344  			}
   345  		}
   346  	case raftpb.ConfChangeRemoveNode:
   347  		if members[id] == nil {
   348  			return ErrIDNotFound
   349  		}
   350  
   351  	case raftpb.ConfChangeUpdateNode:
   352  		if members[id] == nil {
   353  			return ErrIDNotFound
   354  		}
   355  		urls := make(map[string]bool)
   356  		for _, m := range members {
   357  			if m.ID == id {
   358  				continue
   359  			}
   360  			for _, u := range m.PeerURLs {
   361  				urls[u] = true
   362  			}
   363  		}
   364  		m := new(Member)
   365  		if err := json.Unmarshal(cc.Context, m); err != nil {
   366  			c.lg.Panic("failed to unmarshal member", zap.Error(err))
   367  		}
   368  		for _, u := range m.PeerURLs {
   369  			if urls[u] {
   370  				return ErrPeerURLexists
   371  			}
   372  		}
   373  
   374  	default:
   375  		c.lg.Panic("unknown ConfChange type", zap.String("type", cc.Type.String()))
   376  	}
   377  	return nil
   378  }
   379  
   380  // AddMember adds a new Member into the cluster, and saves the given member's
   381  // raftAttributes into the store. The given member should have empty attributes.
   382  // A Member with a matching id must not exist.
   383  func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
   384  	c.Lock()
   385  	defer c.Unlock()
   386  
   387  	var v2Err, beErr error
   388  	if c.v2store != nil {
   389  		v2Err = unsafeSaveMemberToStore(c.lg, c.v2store, m)
   390  		if v2Err != nil {
   391  			if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeNodeExist {
   392  				c.lg.Panic(
   393  					"failed to save member to store",
   394  					zap.String("member-id", m.ID.String()),
   395  					zap.Error(v2Err),
   396  				)
   397  			}
   398  		}
   399  	}
   400  	if c.be != nil && shouldApplyV3 {
   401  		beErr = unsafeSaveMemberToBackend(c.lg, c.be, m)
   402  		if beErr != nil && !errors.Is(beErr, errMemberAlreadyExist) {
   403  			c.lg.Panic(
   404  				"failed to save member to backend",
   405  				zap.String("member-id", m.ID.String()),
   406  				zap.Error(beErr),
   407  			)
   408  		}
   409  	}
   410  	// Panic if both storeV2 and backend report member already exist.
   411  	if v2Err != nil && (beErr != nil || c.be == nil) {
   412  		c.lg.Panic(
   413  			"failed to save member to store",
   414  			zap.String("member-id", m.ID.String()),
   415  			zap.Error(v2Err),
   416  		)
   417  	}
   418  
   419  	c.members[m.ID] = m
   420  
   421  	c.lg.Info(
   422  		"added member",
   423  		zap.String("cluster-id", c.cid.String()),
   424  		zap.String("local-member-id", c.localID.String()),
   425  		zap.String("added-peer-id", m.ID.String()),
   426  		zap.Strings("added-peer-peer-urls", m.PeerURLs),
   427  	)
   428  }
   429  
   430  // RemoveMember removes a member from the store.
   431  // The given id MUST exist, or the function panics.
   432  func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
   433  	c.Lock()
   434  	defer c.Unlock()
   435  	var v2Err, beErr error
   436  	if c.v2store != nil {
   437  		v2Err = unsafeDeleteMemberFromStore(c.v2store, id)
   438  		if v2Err != nil {
   439  			if e, ok := v2Err.(*v2error.Error); !ok || e.ErrorCode != v2error.EcodeKeyNotFound {
   440  				c.lg.Panic(
   441  					"failed to delete member from store",
   442  					zap.String("member-id", id.String()),
   443  					zap.Error(v2Err),
   444  				)
   445  			}
   446  		}
   447  	}
   448  	if c.be != nil && shouldApplyV3 {
   449  		beErr = unsafeDeleteMemberFromBackend(c.be, id)
   450  		if beErr != nil && !errors.Is(beErr, errMemberNotFound) {
   451  			c.lg.Panic(
   452  				"failed to delete member from backend",
   453  				zap.String("member-id", id.String()),
   454  				zap.Error(beErr),
   455  			)
   456  		}
   457  	}
   458  	// Panic if both storeV2 and backend report member not found.
   459  	if v2Err != nil && (beErr != nil || c.be == nil) {
   460  		c.lg.Panic(
   461  			"failed to delete member from store",
   462  			zap.String("member-id", id.String()),
   463  			zap.Error(v2Err),
   464  		)
   465  	}
   466  
   467  	m, ok := c.members[id]
   468  	delete(c.members, id)
   469  	c.removed[id] = true
   470  
   471  	if ok {
   472  		c.lg.Info(
   473  			"removed member",
   474  			zap.String("cluster-id", c.cid.String()),
   475  			zap.String("local-member-id", c.localID.String()),
   476  			zap.String("removed-remote-peer-id", id.String()),
   477  			zap.Strings("removed-remote-peer-urls", m.PeerURLs),
   478  		)
   479  	} else {
   480  		c.lg.Warn(
   481  			"skipped removing already removed member",
   482  			zap.String("cluster-id", c.cid.String()),
   483  			zap.String("local-member-id", c.localID.String()),
   484  			zap.String("removed-remote-peer-id", id.String()),
   485  		)
   486  	}
   487  }
   488  
   489  func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApplyV3 ShouldApplyV3) {
   490  	c.Lock()
   491  	defer c.Unlock()
   492  
   493  	if m, ok := c.members[id]; ok {
   494  		m.Attributes = attr
   495  		if c.v2store != nil {
   496  			mustUpdateMemberAttrInStore(c.lg, c.v2store, m)
   497  		}
   498  		if c.be != nil && shouldApplyV3 {
   499  			unsafeSaveMemberToBackend(c.lg, c.be, m)
   500  		}
   501  		return
   502  	}
   503  
   504  	_, ok := c.removed[id]
   505  	if !ok {
   506  		c.lg.Panic(
   507  			"failed to update; member unknown",
   508  			zap.String("cluster-id", c.cid.String()),
   509  			zap.String("local-member-id", c.localID.String()),
   510  			zap.String("unknown-remote-peer-id", id.String()),
   511  		)
   512  	}
   513  
   514  	c.lg.Warn(
   515  		"skipped attributes update of removed member",
   516  		zap.String("cluster-id", c.cid.String()),
   517  		zap.String("local-member-id", c.localID.String()),
   518  		zap.String("updated-peer-id", id.String()),
   519  	)
   520  }
   521  
   522  // PromoteMember marks the member's IsLearner RaftAttributes to false.
   523  func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
   524  	c.Lock()
   525  	defer c.Unlock()
   526  
   527  	c.members[id].RaftAttributes.IsLearner = false
   528  	if c.v2store != nil {
   529  		mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
   530  	}
   531  	if c.be != nil && shouldApplyV3 {
   532  		unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
   533  	}
   534  
   535  	c.lg.Info(
   536  		"promote member",
   537  		zap.String("cluster-id", c.cid.String()),
   538  		zap.String("local-member-id", c.localID.String()),
   539  	)
   540  }
   541  
   542  func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes, shouldApplyV3 ShouldApplyV3) {
   543  	c.Lock()
   544  	defer c.Unlock()
   545  
   546  	c.members[id].RaftAttributes = raftAttr
   547  	if c.v2store != nil {
   548  		mustUpdateMemberInStore(c.lg, c.v2store, c.members[id])
   549  	}
   550  	if c.be != nil && shouldApplyV3 {
   551  		unsafeSaveMemberToBackend(c.lg, c.be, c.members[id])
   552  	}
   553  
   554  	c.lg.Info(
   555  		"updated member",
   556  		zap.String("cluster-id", c.cid.String()),
   557  		zap.String("local-member-id", c.localID.String()),
   558  		zap.String("updated-remote-peer-id", id.String()),
   559  		zap.Strings("updated-remote-peer-urls", raftAttr.PeerURLs),
   560  	)
   561  }
   562  
   563  func (c *RaftCluster) Version() *semver.Version {
   564  	c.Lock()
   565  	defer c.Unlock()
   566  	if c.version == nil {
   567  		return nil
   568  	}
   569  	return semver.Must(semver.NewVersion(c.version.String()))
   570  }
   571  
   572  func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *semver.Version), shouldApplyV3 ShouldApplyV3) {
   573  	c.Lock()
   574  	defer c.Unlock()
   575  	if c.version != nil {
   576  		c.lg.Info(
   577  			"updated cluster version",
   578  			zap.String("cluster-id", c.cid.String()),
   579  			zap.String("local-member-id", c.localID.String()),
   580  			zap.String("from", version.Cluster(c.version.String())),
   581  			zap.String("to", version.Cluster(ver.String())),
   582  		)
   583  	} else {
   584  		c.lg.Info(
   585  			"set initial cluster version",
   586  			zap.String("cluster-id", c.cid.String()),
   587  			zap.String("local-member-id", c.localID.String()),
   588  			zap.String("cluster-version", version.Cluster(ver.String())),
   589  		)
   590  	}
   591  	oldVer := c.version
   592  	c.version = ver
   593  	mustDetectDowngrade(c.lg, c.version, c.downgradeInfo)
   594  	if c.v2store != nil {
   595  		mustSaveClusterVersionToStore(c.lg, c.v2store, ver)
   596  	}
   597  	if c.be != nil && shouldApplyV3 {
   598  		mustSaveClusterVersionToBackend(c.be, ver)
   599  	}
   600  	if oldVer != nil {
   601  		ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0)
   602  	}
   603  	ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(ver.String())}).Set(1)
   604  	onSet(c.lg, ver)
   605  }
   606  
   607  func (c *RaftCluster) IsReadyToAddVotingMember() bool {
   608  	nmembers := 1
   609  	nstarted := 0
   610  
   611  	for _, member := range c.VotingMembers() {
   612  		if member.IsStarted() {
   613  			nstarted++
   614  		}
   615  		nmembers++
   616  	}
   617  
   618  	if nstarted == 1 && nmembers == 2 {
   619  		// a case of adding a new node to 1-member cluster for restoring cluster data
   620  		// https://github.com/etcd-io/website/blob/main/content/docs/v2/admin_guide.md#restoring-the-cluster
   621  		c.lg.Debug("number of started member is 1; can accept add member request")
   622  		return true
   623  	}
   624  
   625  	nquorum := nmembers/2 + 1
   626  	if nstarted < nquorum {
   627  		c.lg.Warn(
   628  			"rejecting member add; started member will be less than quorum",
   629  			zap.Int("number-of-started-member", nstarted),
   630  			zap.Int("quorum", nquorum),
   631  			zap.String("cluster-id", c.cid.String()),
   632  			zap.String("local-member-id", c.localID.String()),
   633  		)
   634  		return false
   635  	}
   636  
   637  	return true
   638  }
   639  
   640  func (c *RaftCluster) IsReadyToRemoveVotingMember(id uint64) bool {
   641  	nmembers := 0
   642  	nstarted := 0
   643  
   644  	for _, member := range c.VotingMembers() {
   645  		if uint64(member.ID) == id {
   646  			continue
   647  		}
   648  
   649  		if member.IsStarted() {
   650  			nstarted++
   651  		}
   652  		nmembers++
   653  	}
   654  
   655  	nquorum := nmembers/2 + 1
   656  	if nstarted < nquorum {
   657  		c.lg.Warn(
   658  			"rejecting member remove; started member will be less than quorum",
   659  			zap.Int("number-of-started-member", nstarted),
   660  			zap.Int("quorum", nquorum),
   661  			zap.String("cluster-id", c.cid.String()),
   662  			zap.String("local-member-id", c.localID.String()),
   663  		)
   664  		return false
   665  	}
   666  
   667  	return true
   668  }
   669  
   670  func (c *RaftCluster) IsReadyToPromoteMember(id uint64) bool {
   671  	nmembers := 1 // We count the learner to be promoted for the future quorum
   672  	nstarted := 1 // and we also count it as started.
   673  
   674  	for _, member := range c.VotingMembers() {
   675  		if member.IsStarted() {
   676  			nstarted++
   677  		}
   678  		nmembers++
   679  	}
   680  
   681  	nquorum := nmembers/2 + 1
   682  	if nstarted < nquorum {
   683  		c.lg.Warn(
   684  			"rejecting member promote; started member will be less than quorum",
   685  			zap.Int("number-of-started-member", nstarted),
   686  			zap.Int("quorum", nquorum),
   687  			zap.String("cluster-id", c.cid.String()),
   688  			zap.String("local-member-id", c.localID.String()),
   689  		)
   690  		return false
   691  	}
   692  
   693  	return true
   694  }
   695  
   696  func membersFromStore(lg *zap.Logger, st v2store.Store) (map[types.ID]*Member, map[types.ID]bool) {
   697  	members := make(map[types.ID]*Member)
   698  	removed := make(map[types.ID]bool)
   699  	e, err := st.Get(StoreMembersPrefix, true, true)
   700  	if err != nil {
   701  		if isKeyNotFound(err) {
   702  			return members, removed
   703  		}
   704  		lg.Panic("failed to get members from store", zap.String("path", StoreMembersPrefix), zap.Error(err))
   705  	}
   706  	for _, n := range e.Node.Nodes {
   707  		var m *Member
   708  		m, err = nodeToMember(lg, n)
   709  		if err != nil {
   710  			lg.Panic("failed to nodeToMember", zap.Error(err))
   711  		}
   712  		members[m.ID] = m
   713  	}
   714  
   715  	e, err = st.Get(storeRemovedMembersPrefix, true, true)
   716  	if err != nil {
   717  		if isKeyNotFound(err) {
   718  			return members, removed
   719  		}
   720  		lg.Panic(
   721  			"failed to get removed members from store",
   722  			zap.String("path", storeRemovedMembersPrefix),
   723  			zap.Error(err),
   724  		)
   725  	}
   726  	for _, n := range e.Node.Nodes {
   727  		removed[MustParseMemberIDFromKey(lg, n.Key)] = true
   728  	}
   729  	return members, removed
   730  }
   731  
   732  func membersFromBackend(lg *zap.Logger, be backend.Backend) (map[types.ID]*Member, map[types.ID]bool) {
   733  	return mustReadMembersFromBackend(lg, be)
   734  }
   735  
   736  func clusterVersionFromStore(lg *zap.Logger, st v2store.Store) *semver.Version {
   737  	e, err := st.Get(path.Join(storePrefix, "version"), false, false)
   738  	if err != nil {
   739  		if isKeyNotFound(err) {
   740  			return nil
   741  		}
   742  		lg.Panic(
   743  			"failed to get cluster version from store",
   744  			zap.String("path", path.Join(storePrefix, "version")),
   745  			zap.Error(err),
   746  		)
   747  	}
   748  	return semver.Must(semver.NewVersion(*e.Node.Value))
   749  }
   750  
   751  // The field is populated since etcd v3.5.
   752  func clusterVersionFromBackend(lg *zap.Logger, be backend.Backend) *semver.Version {
   753  	ckey := backendClusterVersionKey()
   754  	tx := be.ReadTx()
   755  	tx.RLock()
   756  	defer tx.RUnlock()
   757  	keys, vals := tx.UnsafeRange(buckets.Cluster, ckey, nil, 0)
   758  	if len(keys) == 0 {
   759  		return nil
   760  	}
   761  	if len(keys) != 1 {
   762  		lg.Panic(
   763  			"unexpected number of keys when getting cluster version from backend",
   764  			zap.Int("number-of-key", len(keys)),
   765  		)
   766  	}
   767  	return semver.Must(semver.NewVersion(string(vals[0])))
   768  }
   769  
   770  // The field is populated since etcd v3.5.
   771  func downgradeInfoFromBackend(lg *zap.Logger, be backend.Backend) *DowngradeInfo {
   772  	dkey := backendDowngradeKey()
   773  	tx := be.ReadTx()
   774  	tx.Lock()
   775  	defer tx.Unlock()
   776  	keys, vals := tx.UnsafeRange(buckets.Cluster, dkey, nil, 0)
   777  	if len(keys) == 0 {
   778  		return nil
   779  	}
   780  
   781  	if len(keys) != 1 {
   782  		lg.Panic(
   783  			"unexpected number of keys when getting cluster version from backend",
   784  			zap.Int("number-of-key", len(keys)),
   785  		)
   786  	}
   787  	var d DowngradeInfo
   788  	if err := json.Unmarshal(vals[0], &d); err != nil {
   789  		lg.Panic("failed to unmarshal downgrade information", zap.Error(err))
   790  	}
   791  
   792  	// verify the downgrade info from backend
   793  	if d.Enabled {
   794  		if _, err := semver.NewVersion(d.TargetVersion); err != nil {
   795  			lg.Panic(
   796  				"unexpected version format of the downgrade target version from backend",
   797  				zap.String("target-version", d.TargetVersion),
   798  			)
   799  		}
   800  	}
   801  	return &d
   802  }
   803  
   804  // ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
   805  // with the existing cluster. If the validation succeeds, it assigns the IDs
   806  // from the existing cluster to the local cluster.
   807  // If the validation fails, an error will be returned.
   808  func ValidateClusterAndAssignIDs(lg *zap.Logger, local *RaftCluster, existing *RaftCluster) error {
   809  	ems := existing.Members()
   810  	lms := local.Members()
   811  	if len(ems) != len(lms) {
   812  		return fmt.Errorf("member count is unequal")
   813  	}
   814  
   815  	ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
   816  	defer cancel()
   817  	for i := range ems {
   818  		var err error
   819  		ok := false
   820  		for j := range lms {
   821  			if ok, err = netutil.URLStringsEqual(ctx, lg, ems[i].PeerURLs, lms[j].PeerURLs); ok {
   822  				lms[j].ID = ems[i].ID
   823  				break
   824  			}
   825  		}
   826  		if !ok {
   827  			return fmt.Errorf("PeerURLs: no match found for existing member (%v, %v), last resolver error (%v)", ems[i].ID, ems[i].PeerURLs, err)
   828  		}
   829  	}
   830  	local.members = make(map[types.ID]*Member)
   831  	for _, m := range lms {
   832  		local.members[m.ID] = m
   833  	}
   834  	return nil
   835  }
   836  
   837  // IsValidVersionChange checks the two scenario when version is valid to change:
   838  // 1. Downgrade: cluster version is 1 minor version higher than local version,
   839  // cluster version should change.
   840  // 2. Cluster start: when not all members version are available, cluster version
   841  // is set to MinVersion(3.0), when all members are at higher version, cluster version
   842  // is lower than local version, cluster version should change
   843  func IsValidVersionChange(cv *semver.Version, lv *semver.Version) bool {
   844  	cv = &semver.Version{Major: cv.Major, Minor: cv.Minor}
   845  	lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
   846  
   847  	if isValidDowngrade(cv, lv) || (cv.Major == lv.Major && cv.LessThan(*lv)) {
   848  		return true
   849  	}
   850  	return false
   851  }
   852  
   853  // IsLocalMemberLearner returns if the local member is raft learner
   854  func (c *RaftCluster) IsLocalMemberLearner() bool {
   855  	c.Lock()
   856  	defer c.Unlock()
   857  	localMember, ok := c.members[c.localID]
   858  	if !ok {
   859  		c.lg.Panic(
   860  			"failed to find local ID in cluster members",
   861  			zap.String("cluster-id", c.cid.String()),
   862  			zap.String("local-member-id", c.localID.String()),
   863  		)
   864  	}
   865  	return localMember.IsLearner
   866  }
   867  
   868  // DowngradeInfo returns the downgrade status of the cluster
   869  func (c *RaftCluster) DowngradeInfo() *DowngradeInfo {
   870  	c.Lock()
   871  	defer c.Unlock()
   872  	if c.downgradeInfo == nil {
   873  		return &DowngradeInfo{Enabled: false}
   874  	}
   875  	d := &DowngradeInfo{Enabled: c.downgradeInfo.Enabled, TargetVersion: c.downgradeInfo.TargetVersion}
   876  	return d
   877  }
   878  
   879  func (c *RaftCluster) SetDowngradeInfo(d *DowngradeInfo, shouldApplyV3 ShouldApplyV3) {
   880  	c.Lock()
   881  	defer c.Unlock()
   882  
   883  	if c.be != nil && shouldApplyV3 {
   884  		mustSaveDowngradeToBackend(c.lg, c.be, d)
   885  	}
   886  
   887  	c.downgradeInfo = d
   888  
   889  	if d.Enabled {
   890  		c.lg.Info(
   891  			"The server is ready to downgrade",
   892  			zap.String("target-version", d.TargetVersion),
   893  			zap.String("server-version", version.Version),
   894  		)
   895  	}
   896  }
   897  
   898  // IsMemberExist returns if the member with the given id exists in cluster.
   899  func (c *RaftCluster) IsMemberExist(id types.ID) bool {
   900  	c.Lock()
   901  	defer c.Unlock()
   902  	_, ok := c.members[id]
   903  	return ok
   904  }
   905  
   906  // VotingMemberIDs returns the ID of voting members in cluster.
   907  func (c *RaftCluster) VotingMemberIDs() []types.ID {
   908  	c.Lock()
   909  	defer c.Unlock()
   910  	var ids []types.ID
   911  	for _, m := range c.members {
   912  		if !m.IsLearner {
   913  			ids = append(ids, m.ID)
   914  		}
   915  	}
   916  	sort.Sort(types.IDSlice(ids))
   917  	return ids
   918  }
   919  
   920  // PushMembershipToStorage is overriding storage information about cluster's
   921  // members, such that they fully reflect internal RaftCluster's storage.
   922  func (c *RaftCluster) PushMembershipToStorage() {
   923  	if c.be != nil {
   924  		TrimMembershipFromBackend(c.lg, c.be)
   925  		for _, m := range c.members {
   926  			unsafeSaveMemberToBackend(c.lg, c.be, m)
   927  		}
   928  	}
   929  	if c.v2store != nil {
   930  		TrimMembershipFromV2Store(c.lg, c.v2store)
   931  		for _, m := range c.members {
   932  			mustSaveMemberToStore(c.lg, c.v2store, m)
   933  		}
   934  	}
   935  }
   936  

View as plain text