...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/cluster_util.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdserver
    16  
    17  import (
    18  	"context"
    19  	"encoding/json"
    20  	"fmt"
    21  	"io/ioutil"
    22  	"net/http"
    23  	"sort"
    24  	"strconv"
    25  	"strings"
    26  	"time"
    27  
    28  	"go.etcd.io/etcd/api/v3/version"
    29  	"go.etcd.io/etcd/client/pkg/v3/types"
    30  	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
    31  
    32  	"github.com/coreos/go-semver/semver"
    33  	"go.uber.org/zap"
    34  )
    35  
    36  // isMemberBootstrapped tries to check if the given member has been bootstrapped
    37  // in the given cluster.
    38  func isMemberBootstrapped(lg *zap.Logger, cl *membership.RaftCluster, member string, rt http.RoundTripper, timeout time.Duration) bool {
    39  	rcl, err := getClusterFromRemotePeers(lg, getRemotePeerURLs(cl, member), timeout, false, rt)
    40  	if err != nil {
    41  		return false
    42  	}
    43  	id := cl.MemberByName(member).ID
    44  	m := rcl.Member(id)
    45  	if m == nil {
    46  		return false
    47  	}
    48  	if len(m.ClientURLs) > 0 {
    49  		return true
    50  	}
    51  	return false
    52  }
    53  
    54  // GetClusterFromRemotePeers takes a set of URLs representing etcd peers, and
    55  // attempts to construct a Cluster by accessing the members endpoint on one of
    56  // these URLs. The first URL to provide a response is used. If no URLs provide
    57  // a response, or a Cluster cannot be successfully created from a received
    58  // response, an error is returned.
    59  // Each request has a 10-second timeout. Because the upper limit of TTL is 5s,
    60  // 10 second is enough for building connection and finishing request.
    61  func GetClusterFromRemotePeers(lg *zap.Logger, urls []string, rt http.RoundTripper) (*membership.RaftCluster, error) {
    62  	return getClusterFromRemotePeers(lg, urls, 10*time.Second, true, rt)
    63  }
    64  
    65  // If logerr is true, it prints out more error messages.
    66  func getClusterFromRemotePeers(lg *zap.Logger, urls []string, timeout time.Duration, logerr bool, rt http.RoundTripper) (*membership.RaftCluster, error) {
    67  	if lg == nil {
    68  		lg = zap.NewNop()
    69  	}
    70  	cc := &http.Client{
    71  		Transport: rt,
    72  		Timeout:   timeout,
    73  		CheckRedirect: func(req *http.Request, via []*http.Request) error {
    74  			return http.ErrUseLastResponse
    75  		},
    76  	}
    77  	for _, u := range urls {
    78  		addr := u + "/members"
    79  		resp, err := cc.Get(addr)
    80  		if err != nil {
    81  			if logerr {
    82  				lg.Warn("failed to get cluster response", zap.String("address", addr), zap.Error(err))
    83  			}
    84  			continue
    85  		}
    86  		b, err := ioutil.ReadAll(resp.Body)
    87  		resp.Body.Close()
    88  		if err != nil {
    89  			if logerr {
    90  				lg.Warn("failed to read body of cluster response", zap.String("address", addr), zap.Error(err))
    91  			}
    92  			continue
    93  		}
    94  		var membs []*membership.Member
    95  		if err = json.Unmarshal(b, &membs); err != nil {
    96  			if logerr {
    97  				lg.Warn("failed to unmarshal cluster response", zap.String("address", addr), zap.Error(err))
    98  			}
    99  			continue
   100  		}
   101  		id, err := types.IDFromString(resp.Header.Get("X-Etcd-Cluster-ID"))
   102  		if err != nil {
   103  			if logerr {
   104  				lg.Warn(
   105  					"failed to parse cluster ID",
   106  					zap.String("address", addr),
   107  					zap.String("header", resp.Header.Get("X-Etcd-Cluster-ID")),
   108  					zap.Error(err),
   109  				)
   110  			}
   111  			continue
   112  		}
   113  
   114  		// check the length of membership members
   115  		// if the membership members are present then prepare and return raft cluster
   116  		// if membership members are not present then the raft cluster formed will be
   117  		// an invalid empty cluster hence return failed to get raft cluster member(s) from the given urls error
   118  		if len(membs) > 0 {
   119  			return membership.NewClusterFromMembers(lg, id, membs), nil
   120  		}
   121  		return nil, fmt.Errorf("failed to get raft cluster member(s) from the given URLs")
   122  	}
   123  	return nil, fmt.Errorf("could not retrieve cluster information from the given URLs")
   124  }
   125  
   126  // getRemotePeerURLs returns peer urls of remote members in the cluster. The
   127  // returned list is sorted in ascending lexicographical order.
   128  func getRemotePeerURLs(cl *membership.RaftCluster, local string) []string {
   129  	us := make([]string, 0)
   130  	for _, m := range cl.Members() {
   131  		if m.Name == local {
   132  			continue
   133  		}
   134  		us = append(us, m.PeerURLs...)
   135  	}
   136  	sort.Strings(us)
   137  	return us
   138  }
   139  
   140  // getVersions returns the versions of the members in the given cluster.
   141  // The key of the returned map is the member's ID. The value of the returned map
   142  // is the semver versions string, including server and cluster.
   143  // If it fails to get the version of a member, the key will be nil.
   144  func getVersions(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) map[string]*version.Versions {
   145  	members := cl.Members()
   146  	vers := make(map[string]*version.Versions)
   147  	for _, m := range members {
   148  		if m.ID == local {
   149  			cv := "not_decided"
   150  			if cl.Version() != nil {
   151  				cv = cl.Version().String()
   152  			}
   153  			vers[m.ID.String()] = &version.Versions{Server: version.Version, Cluster: cv}
   154  			continue
   155  		}
   156  		ver, err := getVersion(lg, m, rt)
   157  		if err != nil {
   158  			lg.Warn("failed to get version", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
   159  			vers[m.ID.String()] = nil
   160  		} else {
   161  			vers[m.ID.String()] = ver
   162  		}
   163  	}
   164  	return vers
   165  }
   166  
   167  // decideClusterVersion decides the cluster version based on the versions map.
   168  // The returned version is the min server version in the map, or nil if the min
   169  // version in unknown.
   170  func decideClusterVersion(lg *zap.Logger, vers map[string]*version.Versions) *semver.Version {
   171  	var cv *semver.Version
   172  	lv := semver.Must(semver.NewVersion(version.Version))
   173  
   174  	for mid, ver := range vers {
   175  		if ver == nil {
   176  			return nil
   177  		}
   178  		v, err := semver.NewVersion(ver.Server)
   179  		if err != nil {
   180  			lg.Warn(
   181  				"failed to parse server version of remote member",
   182  				zap.String("remote-member-id", mid),
   183  				zap.String("remote-member-version", ver.Server),
   184  				zap.Error(err),
   185  			)
   186  			return nil
   187  		}
   188  		if lv.LessThan(*v) {
   189  			lg.Warn(
   190  				"leader found higher-versioned member",
   191  				zap.String("local-member-version", lv.String()),
   192  				zap.String("remote-member-id", mid),
   193  				zap.String("remote-member-version", ver.Server),
   194  			)
   195  		}
   196  		if cv == nil {
   197  			cv = v
   198  		} else if v.LessThan(*cv) {
   199  			cv = v
   200  		}
   201  	}
   202  	return cv
   203  }
   204  
   205  // allowedVersionRange decides the available version range of the cluster that local server can join in;
   206  // if the downgrade enabled status is true, the version window is [oneMinorHigher, oneMinorHigher]
   207  // if the downgrade is not enabled, the version window is [MinClusterVersion, localVersion]
   208  func allowedVersionRange(downgradeEnabled bool) (minV *semver.Version, maxV *semver.Version) {
   209  	minV = semver.Must(semver.NewVersion(version.MinClusterVersion))
   210  	maxV = semver.Must(semver.NewVersion(version.Version))
   211  	maxV = &semver.Version{Major: maxV.Major, Minor: maxV.Minor}
   212  
   213  	if downgradeEnabled {
   214  		// Todo: handle the case that downgrading from higher major version(e.g. downgrade from v4.0 to v3.x)
   215  		maxV.Minor = maxV.Minor + 1
   216  		minV = &semver.Version{Major: maxV.Major, Minor: maxV.Minor}
   217  	}
   218  	return minV, maxV
   219  }
   220  
   221  // isCompatibleWithCluster return true if the local member has a compatible version with
   222  // the current running cluster.
   223  // The version is considered as compatible when at least one of the other members in the cluster has a
   224  // cluster version in the range of [MinV, MaxV] and no known members has a cluster version
   225  // out of the range.
   226  // We set this rule since when the local member joins, another member might be offline.
   227  func isCompatibleWithCluster(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
   228  	vers := getVersions(lg, cl, local, rt)
   229  	minV, maxV := allowedVersionRange(getDowngradeEnabledFromRemotePeers(lg, cl, local, rt))
   230  	return isCompatibleWithVers(lg, vers, local, minV, maxV)
   231  }
   232  
   233  func isCompatibleWithVers(lg *zap.Logger, vers map[string]*version.Versions, local types.ID, minV, maxV *semver.Version) bool {
   234  	var ok bool
   235  	for id, v := range vers {
   236  		// ignore comparison with local version
   237  		if id == local.String() {
   238  			continue
   239  		}
   240  		if v == nil {
   241  			continue
   242  		}
   243  		clusterv, err := semver.NewVersion(v.Cluster)
   244  		if err != nil {
   245  			lg.Warn(
   246  				"failed to parse cluster version of remote member",
   247  				zap.String("remote-member-id", id),
   248  				zap.String("remote-member-cluster-version", v.Cluster),
   249  				zap.Error(err),
   250  			)
   251  			continue
   252  		}
   253  		if clusterv.LessThan(*minV) {
   254  			lg.Warn(
   255  				"cluster version of remote member is not compatible; too low",
   256  				zap.String("remote-member-id", id),
   257  				zap.String("remote-member-cluster-version", clusterv.String()),
   258  				zap.String("minimum-cluster-version-supported", minV.String()),
   259  			)
   260  			return false
   261  		}
   262  		if maxV.LessThan(*clusterv) {
   263  			lg.Warn(
   264  				"cluster version of remote member is not compatible; too high",
   265  				zap.String("remote-member-id", id),
   266  				zap.String("remote-member-cluster-version", clusterv.String()),
   267  				zap.String("minimum-cluster-version-supported", minV.String()),
   268  			)
   269  			return false
   270  		}
   271  		ok = true
   272  	}
   273  	return ok
   274  }
   275  
   276  // getVersion returns the Versions of the given member via its
   277  // peerURLs. Returns the last error if it fails to get the version.
   278  func getVersion(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (*version.Versions, error) {
   279  	cc := &http.Client{
   280  		Transport: rt,
   281  		CheckRedirect: func(req *http.Request, via []*http.Request) error {
   282  			return http.ErrUseLastResponse
   283  		},
   284  	}
   285  	var (
   286  		err  error
   287  		resp *http.Response
   288  	)
   289  
   290  	for _, u := range m.PeerURLs {
   291  		addr := u + "/version"
   292  		resp, err = cc.Get(addr)
   293  		if err != nil {
   294  			lg.Warn(
   295  				"failed to reach the peer URL",
   296  				zap.String("address", addr),
   297  				zap.String("remote-member-id", m.ID.String()),
   298  				zap.Error(err),
   299  			)
   300  			continue
   301  		}
   302  		var b []byte
   303  		b, err = ioutil.ReadAll(resp.Body)
   304  		resp.Body.Close()
   305  		if err != nil {
   306  			lg.Warn(
   307  				"failed to read body of response",
   308  				zap.String("address", addr),
   309  				zap.String("remote-member-id", m.ID.String()),
   310  				zap.Error(err),
   311  			)
   312  			continue
   313  		}
   314  		var vers version.Versions
   315  		if err = json.Unmarshal(b, &vers); err != nil {
   316  			lg.Warn(
   317  				"failed to unmarshal response",
   318  				zap.String("address", addr),
   319  				zap.String("remote-member-id", m.ID.String()),
   320  				zap.Error(err),
   321  			)
   322  			continue
   323  		}
   324  		return &vers, nil
   325  	}
   326  	return nil, err
   327  }
   328  
   329  func promoteMemberHTTP(ctx context.Context, url string, id uint64, peerRt http.RoundTripper) ([]*membership.Member, error) {
   330  	cc := &http.Client{
   331  		Transport: peerRt,
   332  		CheckRedirect: func(req *http.Request, via []*http.Request) error {
   333  			return http.ErrUseLastResponse
   334  		},
   335  	}
   336  	// TODO: refactor member http handler code
   337  	// cannot import etcdhttp, so manually construct url
   338  	requestUrl := url + "/members/promote/" + fmt.Sprintf("%d", id)
   339  	req, err := http.NewRequest("POST", requestUrl, nil)
   340  	if err != nil {
   341  		return nil, err
   342  	}
   343  	req = req.WithContext(ctx)
   344  	resp, err := cc.Do(req)
   345  	if err != nil {
   346  		return nil, err
   347  	}
   348  	defer resp.Body.Close()
   349  	b, err := ioutil.ReadAll(resp.Body)
   350  	if err != nil {
   351  		return nil, err
   352  	}
   353  
   354  	if resp.StatusCode == http.StatusRequestTimeout {
   355  		return nil, ErrTimeout
   356  	}
   357  	if resp.StatusCode == http.StatusPreconditionFailed {
   358  		// both ErrMemberNotLearner and ErrLearnerNotReady have same http status code
   359  		if strings.Contains(string(b), ErrLearnerNotReady.Error()) {
   360  			return nil, ErrLearnerNotReady
   361  		}
   362  		if strings.Contains(string(b), membership.ErrMemberNotLearner.Error()) {
   363  			return nil, membership.ErrMemberNotLearner
   364  		}
   365  		return nil, fmt.Errorf("member promote: unknown error(%s)", string(b))
   366  	}
   367  	if resp.StatusCode == http.StatusNotFound {
   368  		return nil, membership.ErrIDNotFound
   369  	}
   370  
   371  	if resp.StatusCode != http.StatusOK { // all other types of errors
   372  		return nil, fmt.Errorf("member promote: unknown error(%s)", string(b))
   373  	}
   374  
   375  	var membs []*membership.Member
   376  	if err := json.Unmarshal(b, &membs); err != nil {
   377  		return nil, err
   378  	}
   379  	return membs, nil
   380  }
   381  
   382  // getDowngradeEnabledFromRemotePeers will get the downgrade enabled status of the cluster.
   383  func getDowngradeEnabledFromRemotePeers(lg *zap.Logger, cl *membership.RaftCluster, local types.ID, rt http.RoundTripper) bool {
   384  	members := cl.Members()
   385  
   386  	for _, m := range members {
   387  		if m.ID == local {
   388  			continue
   389  		}
   390  		enable, err := getDowngradeEnabled(lg, m, rt)
   391  		if err != nil {
   392  			lg.Warn("failed to get downgrade enabled status", zap.String("remote-member-id", m.ID.String()), zap.Error(err))
   393  		} else {
   394  			// Since the "/downgrade/enabled" serves linearized data,
   395  			// this function can return once it gets a non-error response from the endpoint.
   396  			return enable
   397  		}
   398  	}
   399  	return false
   400  }
   401  
   402  // getDowngradeEnabled returns the downgrade enabled status of the given member
   403  // via its peerURLs. Returns the last error if it fails to get it.
   404  func getDowngradeEnabled(lg *zap.Logger, m *membership.Member, rt http.RoundTripper) (bool, error) {
   405  	cc := &http.Client{
   406  		Transport: rt,
   407  		CheckRedirect: func(req *http.Request, via []*http.Request) error {
   408  			return http.ErrUseLastResponse
   409  		},
   410  	}
   411  	var (
   412  		err  error
   413  		resp *http.Response
   414  	)
   415  
   416  	for _, u := range m.PeerURLs {
   417  		addr := u + DowngradeEnabledPath
   418  		resp, err = cc.Get(addr)
   419  		if err != nil {
   420  			lg.Warn(
   421  				"failed to reach the peer URL",
   422  				zap.String("address", addr),
   423  				zap.String("remote-member-id", m.ID.String()),
   424  				zap.Error(err),
   425  			)
   426  			continue
   427  		}
   428  		var b []byte
   429  		b, err = ioutil.ReadAll(resp.Body)
   430  		resp.Body.Close()
   431  		if err != nil {
   432  			lg.Warn(
   433  				"failed to read body of response",
   434  				zap.String("address", addr),
   435  				zap.String("remote-member-id", m.ID.String()),
   436  				zap.Error(err),
   437  			)
   438  			continue
   439  		}
   440  		var enable bool
   441  		if enable, err = strconv.ParseBool(string(b)); err != nil {
   442  			lg.Warn(
   443  				"failed to convert response",
   444  				zap.String("address", addr),
   445  				zap.String("remote-member-id", m.ID.String()),
   446  				zap.Error(err),
   447  			)
   448  			continue
   449  		}
   450  		return enable, nil
   451  	}
   452  	return false, err
   453  }
   454  
   455  // isMatchedVersions returns true if all server versions are equal to target version, otherwise return false.
   456  // It can be used to decide the whether the cluster finishes downgrading to target version.
   457  func isMatchedVersions(lg *zap.Logger, targetVersion *semver.Version, vers map[string]*version.Versions) bool {
   458  	for mid, ver := range vers {
   459  		if ver == nil {
   460  			return false
   461  		}
   462  		v, err := semver.NewVersion(ver.Cluster)
   463  		if err != nil {
   464  			lg.Warn(
   465  				"failed to parse server version of remote member",
   466  				zap.String("remote-member-id", mid),
   467  				zap.String("remote-member-version", ver.Server),
   468  				zap.Error(err),
   469  			)
   470  			return false
   471  		}
   472  		if !targetVersion.Equal(*v) {
   473  			lg.Warn("remotes server has mismatching etcd version",
   474  				zap.String("remote-member-id", mid),
   475  				zap.String("current-server-version", v.String()),
   476  				zap.String("target-version", targetVersion.String()),
   477  			)
   478  			return false
   479  		}
   480  	}
   481  	return true
   482  }
   483  
   484  func convertToClusterVersion(v string) (*semver.Version, error) {
   485  	ver, err := semver.NewVersion(v)
   486  	if err != nil {
   487  		// allow input version format Major.Minor
   488  		ver, err = semver.NewVersion(v + ".0")
   489  		if err != nil {
   490  			return nil, ErrWrongDowngradeVersionFormat
   491  		}
   492  	}
   493  	// cluster version only keeps major.minor, remove patch version
   494  	ver = &semver.Version{Major: ver.Major, Minor: ver.Minor}
   495  	return ver, nil
   496  }
   497  

View as plain text