...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/util.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"fmt"
    19  	"io"
    20  	"net"
    21  	"net/http"
    22  	"net/url"
    23  	"strings"
    24  	"time"
    25  
    26  	"go.etcd.io/etcd/api/v3/version"
    27  	"go.etcd.io/etcd/client/pkg/v3/transport"
    28  	"go.etcd.io/etcd/client/pkg/v3/types"
    29  
    30  	"github.com/coreos/go-semver/semver"
    31  	"go.uber.org/zap"
    32  )
    33  
    34  var (
    35  	errMemberRemoved  = fmt.Errorf("the member has been permanently removed from the cluster")
    36  	errMemberNotFound = fmt.Errorf("member not found")
    37  )
    38  
    39  // NewListener returns a listener for raft message transfer between peers.
    40  // It uses timeout listener to identify broken streams promptly.
    41  func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error) {
    42  	return transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(tlsinfo), transport.WithTimeout(ConnReadTimeout, ConnWriteTimeout))
    43  }
    44  
    45  // NewRoundTripper returns a roundTripper used to send requests
    46  // to rafthttp listener of remote peers.
    47  func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
    48  	// It uses timeout transport to pair with remote timeout listeners.
    49  	// It sets no read/write timeout, because message in requests may
    50  	// take long time to write out before reading out the response.
    51  	return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
    52  }
    53  
    54  // newStreamRoundTripper returns a roundTripper used to send stream requests
    55  // to rafthttp listener of remote peers.
    56  // Read/write timeout is set for stream roundTripper to promptly
    57  // find out broken status, which minimizes the number of messages
    58  // sent on broken connection.
    59  func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
    60  	return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
    61  }
    62  
    63  // createPostRequest creates a HTTP POST request that sends raft message.
    64  func createPostRequest(lg *zap.Logger, u url.URL, path string, body io.Reader, ct string, urls types.URLs, from, cid types.ID) *http.Request {
    65  	uu := u
    66  	uu.Path = path
    67  	req, err := http.NewRequest("POST", uu.String(), body)
    68  	if err != nil {
    69  		if lg != nil {
    70  			lg.Panic("unexpected new request error", zap.Error(err))
    71  		}
    72  	}
    73  	req.Header.Set("Content-Type", ct)
    74  	req.Header.Set("X-Server-From", from.String())
    75  	req.Header.Set("X-Server-Version", version.Version)
    76  	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
    77  	req.Header.Set("X-Etcd-Cluster-ID", cid.String())
    78  	setPeerURLsHeader(req, urls)
    79  
    80  	return req
    81  }
    82  
    83  // checkPostResponse checks the response of the HTTP POST request that sends
    84  // raft message.
    85  func checkPostResponse(lg *zap.Logger, resp *http.Response, body []byte, req *http.Request, to types.ID) error {
    86  	switch resp.StatusCode {
    87  	case http.StatusPreconditionFailed:
    88  		switch strings.TrimSuffix(string(body), "\n") {
    89  		case errIncompatibleVersion.Error():
    90  			if lg != nil {
    91  				lg.Error(
    92  					"request sent was ignored by peer",
    93  					zap.String("remote-peer-id", to.String()),
    94  				)
    95  			}
    96  			return errIncompatibleVersion
    97  		case ErrClusterIDMismatch.Error():
    98  			if lg != nil {
    99  				lg.Error(
   100  					"request sent was ignored due to cluster ID mismatch",
   101  					zap.String("remote-peer-id", to.String()),
   102  					zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
   103  					zap.String("local-member-cluster-id", req.Header.Get("X-Etcd-Cluster-ID")),
   104  				)
   105  			}
   106  			return ErrClusterIDMismatch
   107  		default:
   108  			return fmt.Errorf("unhandled error %q when precondition failed", string(body))
   109  		}
   110  	case http.StatusForbidden:
   111  		return errMemberRemoved
   112  	case http.StatusNoContent:
   113  		return nil
   114  	default:
   115  		return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
   116  	}
   117  }
   118  
   119  // reportCriticalError reports the given error through sending it into
   120  // the given error channel.
   121  // If the error channel is filled up when sending error, it drops the error
   122  // because the fact that error has happened is reported, which is
   123  // good enough.
   124  func reportCriticalError(err error, errc chan<- error) {
   125  	select {
   126  	case errc <- err:
   127  	default:
   128  	}
   129  }
   130  
   131  // compareMajorMinorVersion returns an integer comparing two versions based on
   132  // their major and minor version. The result will be 0 if a==b, -1 if a < b,
   133  // and 1 if a > b.
   134  func compareMajorMinorVersion(a, b *semver.Version) int {
   135  	na := &semver.Version{Major: a.Major, Minor: a.Minor}
   136  	nb := &semver.Version{Major: b.Major, Minor: b.Minor}
   137  	switch {
   138  	case na.LessThan(*nb):
   139  		return -1
   140  	case nb.LessThan(*na):
   141  		return 1
   142  	default:
   143  		return 0
   144  	}
   145  }
   146  
   147  // serverVersion returns the server version from the given header.
   148  func serverVersion(h http.Header) *semver.Version {
   149  	verStr := h.Get("X-Server-Version")
   150  	// backward compatibility with etcd 2.0
   151  	if verStr == "" {
   152  		verStr = "2.0.0"
   153  	}
   154  	return semver.Must(semver.NewVersion(verStr))
   155  }
   156  
   157  // serverVersion returns the min cluster version from the given header.
   158  func minClusterVersion(h http.Header) *semver.Version {
   159  	verStr := h.Get("X-Min-Cluster-Version")
   160  	// backward compatibility with etcd 2.0
   161  	if verStr == "" {
   162  		verStr = "2.0.0"
   163  	}
   164  	return semver.Must(semver.NewVersion(verStr))
   165  }
   166  
   167  // checkVersionCompatibility checks whether the given version is compatible
   168  // with the local version.
   169  func checkVersionCompatibility(name string, server, minCluster *semver.Version) (
   170  	localServer *semver.Version,
   171  	localMinCluster *semver.Version,
   172  	err error) {
   173  	localServer = semver.Must(semver.NewVersion(version.Version))
   174  	localMinCluster = semver.Must(semver.NewVersion(version.MinClusterVersion))
   175  	if compareMajorMinorVersion(server, localMinCluster) == -1 {
   176  		return localServer, localMinCluster, fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
   177  	}
   178  	if compareMajorMinorVersion(minCluster, localServer) == 1 {
   179  		return localServer, localMinCluster, fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
   180  	}
   181  	return localServer, localMinCluster, nil
   182  }
   183  
   184  // setPeerURLsHeader reports local urls for peer discovery
   185  func setPeerURLsHeader(req *http.Request, urls types.URLs) {
   186  	if urls == nil {
   187  		// often not set in unit tests
   188  		return
   189  	}
   190  	peerURLs := make([]string, urls.Len())
   191  	for i := range urls {
   192  		peerURLs[i] = urls[i].String()
   193  	}
   194  	req.Header.Set("X-PeerURLs", strings.Join(peerURLs, ","))
   195  }
   196  
   197  // addRemoteFromRequest adds a remote peer according to an http request header
   198  func addRemoteFromRequest(tr Transporter, r *http.Request) {
   199  	if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err == nil {
   200  		if urls := r.Header.Get("X-PeerURLs"); urls != "" {
   201  			tr.AddRemote(from, strings.Split(urls, ","))
   202  		}
   203  	}
   204  }
   205  

View as plain text