...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/http.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"io/ioutil"
    22  	"net/http"
    23  	"path"
    24  	"strings"
    25  	"time"
    26  
    27  	"go.etcd.io/etcd/api/v3/version"
    28  	"go.etcd.io/etcd/client/pkg/v3/types"
    29  	pioutil "go.etcd.io/etcd/pkg/v3/ioutil"
    30  	"go.etcd.io/etcd/raft/v3/raftpb"
    31  	"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
    32  
    33  	humanize "github.com/dustin/go-humanize"
    34  	"go.uber.org/zap"
    35  )
    36  
    37  const (
    38  	// connReadLimitByte limits the number of bytes
    39  	// a single read can read out.
    40  	//
    41  	// 64KB should be large enough for not causing
    42  	// throughput bottleneck as well as small enough
    43  	// for not causing a read timeout.
    44  	connReadLimitByte = 64 * 1024
    45  
    46  	// snapshotLimitByte limits the snapshot size to 1TB
    47  	snapshotLimitByte = 1 * 1024 * 1024 * 1024 * 1024
    48  )
    49  
    50  var (
    51  	RaftPrefix         = "/raft"
    52  	ProbingPrefix      = path.Join(RaftPrefix, "probing")
    53  	RaftStreamPrefix   = path.Join(RaftPrefix, "stream")
    54  	RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
    55  
    56  	errIncompatibleVersion = errors.New("incompatible version")
    57  	ErrClusterIDMismatch   = errors.New("cluster ID mismatch")
    58  )
    59  
    60  type peerGetter interface {
    61  	Get(id types.ID) Peer
    62  }
    63  
    64  type writerToResponse interface {
    65  	WriteTo(w http.ResponseWriter)
    66  }
    67  
    68  type pipelineHandler struct {
    69  	lg      *zap.Logger
    70  	localID types.ID
    71  	tr      Transporter
    72  	r       Raft
    73  	cid     types.ID
    74  }
    75  
    76  // newPipelineHandler returns a handler for handling raft messages
    77  // from pipeline for RaftPrefix.
    78  //
    79  // The handler reads out the raft message from request body,
    80  // and forwards it to the given raft state machine for processing.
    81  func newPipelineHandler(t *Transport, r Raft, cid types.ID) http.Handler {
    82  	h := &pipelineHandler{
    83  		lg:      t.Logger,
    84  		localID: t.ID,
    85  		tr:      t,
    86  		r:       r,
    87  		cid:     cid,
    88  	}
    89  	if h.lg == nil {
    90  		h.lg = zap.NewNop()
    91  	}
    92  	return h
    93  }
    94  
    95  func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    96  	if r.Method != "POST" {
    97  		w.Header().Set("Allow", "POST")
    98  		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
    99  		return
   100  	}
   101  
   102  	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
   103  
   104  	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
   105  		http.Error(w, err.Error(), http.StatusPreconditionFailed)
   106  		return
   107  	}
   108  
   109  	addRemoteFromRequest(h.tr, r)
   110  
   111  	// Limit the data size that could be read from the request body, which ensures that read from
   112  	// connection will not time out accidentally due to possible blocking in underlying implementation.
   113  	limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
   114  	b, err := ioutil.ReadAll(limitedr)
   115  	if err != nil {
   116  		h.lg.Warn(
   117  			"failed to read Raft message",
   118  			zap.String("local-member-id", h.localID.String()),
   119  			zap.Error(err),
   120  		)
   121  		http.Error(w, "error reading raft message", http.StatusBadRequest)
   122  		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
   123  		return
   124  	}
   125  
   126  	var m raftpb.Message
   127  	if err := m.Unmarshal(b); err != nil {
   128  		h.lg.Warn(
   129  			"failed to unmarshal Raft message",
   130  			zap.String("local-member-id", h.localID.String()),
   131  			zap.Error(err),
   132  		)
   133  		http.Error(w, "error unmarshalling raft message", http.StatusBadRequest)
   134  		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
   135  		return
   136  	}
   137  
   138  	receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
   139  
   140  	if err := h.r.Process(context.TODO(), m); err != nil {
   141  		switch v := err.(type) {
   142  		case writerToResponse:
   143  			v.WriteTo(w)
   144  		default:
   145  			h.lg.Warn(
   146  				"failed to process Raft message",
   147  				zap.String("local-member-id", h.localID.String()),
   148  				zap.Error(err),
   149  			)
   150  			http.Error(w, "error processing raft message", http.StatusInternalServerError)
   151  			w.(http.Flusher).Flush()
   152  			// disconnect the http stream
   153  			panic(err)
   154  		}
   155  		return
   156  	}
   157  
   158  	// Write StatusNoContent header after the message has been processed by
   159  	// raft, which facilitates the client to report MsgSnap status.
   160  	w.WriteHeader(http.StatusNoContent)
   161  }
   162  
   163  type snapshotHandler struct {
   164  	lg          *zap.Logger
   165  	tr          Transporter
   166  	r           Raft
   167  	snapshotter *snap.Snapshotter
   168  
   169  	localID types.ID
   170  	cid     types.ID
   171  }
   172  
   173  func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
   174  	h := &snapshotHandler{
   175  		lg:          t.Logger,
   176  		tr:          t,
   177  		r:           r,
   178  		snapshotter: snapshotter,
   179  		localID:     t.ID,
   180  		cid:         cid,
   181  	}
   182  	if h.lg == nil {
   183  		h.lg = zap.NewNop()
   184  	}
   185  	return h
   186  }
   187  
   188  const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
   189  
   190  // ServeHTTP serves HTTP request to receive and process snapshot message.
   191  //
   192  // If request sender dies without closing underlying TCP connection,
   193  // the handler will keep waiting for the request body until TCP keepalive
   194  // finds out that the connection is broken after several minutes.
   195  // This is acceptable because
   196  // 1. snapshot messages sent through other TCP connections could still be
   197  // received and processed.
   198  // 2. this case should happen rarely, so no further optimization is done.
   199  func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   200  	start := time.Now()
   201  
   202  	if r.Method != "POST" {
   203  		w.Header().Set("Allow", "POST")
   204  		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
   205  		snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
   206  		return
   207  	}
   208  
   209  	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
   210  
   211  	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
   212  		http.Error(w, err.Error(), http.StatusPreconditionFailed)
   213  		snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
   214  		return
   215  	}
   216  
   217  	addRemoteFromRequest(h.tr, r)
   218  
   219  	dec := &messageDecoder{r: r.Body}
   220  	// let snapshots be very large since they can exceed 512MB for large installations
   221  	m, err := dec.decodeLimit(snapshotLimitByte)
   222  	from := types.ID(m.From).String()
   223  	if err != nil {
   224  		msg := fmt.Sprintf("failed to decode raft message (%v)", err)
   225  		h.lg.Warn(
   226  			"failed to decode Raft message",
   227  			zap.String("local-member-id", h.localID.String()),
   228  			zap.String("remote-snapshot-sender-id", from),
   229  			zap.Error(err),
   230  		)
   231  		http.Error(w, msg, http.StatusBadRequest)
   232  		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
   233  		snapshotReceiveFailures.WithLabelValues(from).Inc()
   234  		return
   235  	}
   236  
   237  	msgSize := m.Size()
   238  	receivedBytes.WithLabelValues(from).Add(float64(msgSize))
   239  
   240  	if m.Type != raftpb.MsgSnap {
   241  		h.lg.Warn(
   242  			"unexpected Raft message type",
   243  			zap.String("local-member-id", h.localID.String()),
   244  			zap.String("remote-snapshot-sender-id", from),
   245  			zap.String("message-type", m.Type.String()),
   246  		)
   247  		http.Error(w, "wrong raft message type", http.StatusBadRequest)
   248  		snapshotReceiveFailures.WithLabelValues(from).Inc()
   249  		return
   250  	}
   251  
   252  	snapshotReceiveInflights.WithLabelValues(from).Inc()
   253  	defer func() {
   254  		snapshotReceiveInflights.WithLabelValues(from).Dec()
   255  	}()
   256  
   257  	h.lg.Info(
   258  		"receiving database snapshot",
   259  		zap.String("local-member-id", h.localID.String()),
   260  		zap.String("remote-snapshot-sender-id", from),
   261  		zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
   262  		zap.Int("incoming-snapshot-message-size-bytes", msgSize),
   263  		zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
   264  	)
   265  
   266  	// save incoming database snapshot.
   267  
   268  	n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
   269  	if err != nil {
   270  		msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
   271  		h.lg.Warn(
   272  			"failed to save incoming database snapshot",
   273  			zap.String("local-member-id", h.localID.String()),
   274  			zap.String("remote-snapshot-sender-id", from),
   275  			zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
   276  			zap.Error(err),
   277  		)
   278  		http.Error(w, msg, http.StatusInternalServerError)
   279  		snapshotReceiveFailures.WithLabelValues(from).Inc()
   280  		return
   281  	}
   282  
   283  	receivedBytes.WithLabelValues(from).Add(float64(n))
   284  
   285  	downloadTook := time.Since(start)
   286  	h.lg.Info(
   287  		"received and saved database snapshot",
   288  		zap.String("local-member-id", h.localID.String()),
   289  		zap.String("remote-snapshot-sender-id", from),
   290  		zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
   291  		zap.Int64("incoming-snapshot-size-bytes", n),
   292  		zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
   293  		zap.String("download-took", downloadTook.String()),
   294  	)
   295  
   296  	if err := h.r.Process(context.TODO(), m); err != nil {
   297  		switch v := err.(type) {
   298  		// Process may return writerToResponse error when doing some
   299  		// additional checks before calling raft.Node.Step.
   300  		case writerToResponse:
   301  			v.WriteTo(w)
   302  		default:
   303  			msg := fmt.Sprintf("failed to process raft message (%v)", err)
   304  			h.lg.Warn(
   305  				"failed to process Raft message",
   306  				zap.String("local-member-id", h.localID.String()),
   307  				zap.String("remote-snapshot-sender-id", from),
   308  				zap.Error(err),
   309  			)
   310  			http.Error(w, msg, http.StatusInternalServerError)
   311  			snapshotReceiveFailures.WithLabelValues(from).Inc()
   312  		}
   313  		return
   314  	}
   315  
   316  	// Write StatusNoContent header after the message has been processed by
   317  	// raft, which facilitates the client to report MsgSnap status.
   318  	w.WriteHeader(http.StatusNoContent)
   319  
   320  	snapshotReceive.WithLabelValues(from).Inc()
   321  	snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
   322  }
   323  
   324  type streamHandler struct {
   325  	lg         *zap.Logger
   326  	tr         *Transport
   327  	peerGetter peerGetter
   328  	r          Raft
   329  	id         types.ID
   330  	cid        types.ID
   331  }
   332  
   333  func newStreamHandler(t *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
   334  	h := &streamHandler{
   335  		lg:         t.Logger,
   336  		tr:         t,
   337  		peerGetter: pg,
   338  		r:          r,
   339  		id:         id,
   340  		cid:        cid,
   341  	}
   342  	if h.lg == nil {
   343  		h.lg = zap.NewNop()
   344  	}
   345  	return h
   346  }
   347  
   348  func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   349  	if r.Method != "GET" {
   350  		w.Header().Set("Allow", "GET")
   351  		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
   352  		return
   353  	}
   354  
   355  	w.Header().Set("X-Server-Version", version.Version)
   356  	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
   357  
   358  	if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil {
   359  		http.Error(w, err.Error(), http.StatusPreconditionFailed)
   360  		return
   361  	}
   362  
   363  	var t streamType
   364  	switch path.Dir(r.URL.Path) {
   365  	case streamTypeMsgAppV2.endpoint(h.lg):
   366  		t = streamTypeMsgAppV2
   367  	case streamTypeMessage.endpoint(h.lg):
   368  		t = streamTypeMessage
   369  	default:
   370  		h.lg.Debug(
   371  			"ignored unexpected streaming request path",
   372  			zap.String("local-member-id", h.tr.ID.String()),
   373  			zap.String("remote-peer-id-stream-handler", h.id.String()),
   374  			zap.String("path", r.URL.Path),
   375  		)
   376  		http.Error(w, "invalid path", http.StatusNotFound)
   377  		return
   378  	}
   379  
   380  	fromStr := path.Base(r.URL.Path)
   381  	from, err := types.IDFromString(fromStr)
   382  	if err != nil {
   383  		h.lg.Warn(
   384  			"failed to parse path into ID",
   385  			zap.String("local-member-id", h.tr.ID.String()),
   386  			zap.String("remote-peer-id-stream-handler", h.id.String()),
   387  			zap.String("path", fromStr),
   388  			zap.Error(err),
   389  		)
   390  		http.Error(w, "invalid from", http.StatusNotFound)
   391  		return
   392  	}
   393  	if h.r.IsIDRemoved(uint64(from)) {
   394  		h.lg.Warn(
   395  			"rejected stream from remote peer because it was removed",
   396  			zap.String("local-member-id", h.tr.ID.String()),
   397  			zap.String("remote-peer-id-stream-handler", h.id.String()),
   398  			zap.String("remote-peer-id-from", from.String()),
   399  		)
   400  		http.Error(w, "removed member", http.StatusGone)
   401  		return
   402  	}
   403  	p := h.peerGetter.Get(from)
   404  	if p == nil {
   405  		// This may happen in following cases:
   406  		// 1. user starts a remote peer that belongs to a different cluster
   407  		// with the same cluster ID.
   408  		// 2. local etcd falls behind of the cluster, and cannot recognize
   409  		// the members that joined after its current progress.
   410  		if urls := r.Header.Get("X-PeerURLs"); urls != "" {
   411  			h.tr.AddRemote(from, strings.Split(urls, ","))
   412  		}
   413  		h.lg.Warn(
   414  			"failed to find remote peer in cluster",
   415  			zap.String("local-member-id", h.tr.ID.String()),
   416  			zap.String("remote-peer-id-stream-handler", h.id.String()),
   417  			zap.String("remote-peer-id-from", from.String()),
   418  			zap.String("cluster-id", h.cid.String()),
   419  		)
   420  		http.Error(w, "error sender not found", http.StatusNotFound)
   421  		return
   422  	}
   423  
   424  	wto := h.id.String()
   425  	if gto := r.Header.Get("X-Raft-To"); gto != wto {
   426  		h.lg.Warn(
   427  			"ignored streaming request; ID mismatch",
   428  			zap.String("local-member-id", h.tr.ID.String()),
   429  			zap.String("remote-peer-id-stream-handler", h.id.String()),
   430  			zap.String("remote-peer-id-header", gto),
   431  			zap.String("remote-peer-id-from", from.String()),
   432  			zap.String("cluster-id", h.cid.String()),
   433  		)
   434  		http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
   435  		return
   436  	}
   437  
   438  	w.WriteHeader(http.StatusOK)
   439  	w.(http.Flusher).Flush()
   440  
   441  	c := newCloseNotifier()
   442  	conn := &outgoingConn{
   443  		t:       t,
   444  		Writer:  w,
   445  		Flusher: w.(http.Flusher),
   446  		Closer:  c,
   447  		localID: h.tr.ID,
   448  		peerID:  from,
   449  	}
   450  	p.attachOutgoingConn(conn)
   451  	<-c.closeNotify()
   452  }
   453  
   454  // checkClusterCompatibilityFromHeader checks the cluster compatibility of
   455  // the local member from the given header.
   456  // It checks whether the version of local member is compatible with
   457  // the versions in the header, and whether the cluster ID of local member
   458  // matches the one in the header.
   459  func checkClusterCompatibilityFromHeader(lg *zap.Logger, localID types.ID, header http.Header, cid types.ID) error {
   460  	remoteName := header.Get("X-Server-From")
   461  
   462  	remoteServer := serverVersion(header)
   463  	remoteVs := ""
   464  	if remoteServer != nil {
   465  		remoteVs = remoteServer.String()
   466  	}
   467  
   468  	remoteMinClusterVer := minClusterVersion(header)
   469  	remoteMinClusterVs := ""
   470  	if remoteMinClusterVer != nil {
   471  		remoteMinClusterVs = remoteMinClusterVer.String()
   472  	}
   473  
   474  	localServer, localMinCluster, err := checkVersionCompatibility(remoteName, remoteServer, remoteMinClusterVer)
   475  
   476  	localVs := ""
   477  	if localServer != nil {
   478  		localVs = localServer.String()
   479  	}
   480  	localMinClusterVs := ""
   481  	if localMinCluster != nil {
   482  		localMinClusterVs = localMinCluster.String()
   483  	}
   484  
   485  	if err != nil {
   486  		lg.Warn(
   487  			"failed to check version compatibility",
   488  			zap.String("local-member-id", localID.String()),
   489  			zap.String("local-member-cluster-id", cid.String()),
   490  			zap.String("local-member-server-version", localVs),
   491  			zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
   492  			zap.String("remote-peer-server-name", remoteName),
   493  			zap.String("remote-peer-server-version", remoteVs),
   494  			zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
   495  			zap.Error(err),
   496  		)
   497  		return errIncompatibleVersion
   498  	}
   499  	if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
   500  		lg.Warn(
   501  			"request cluster ID mismatch",
   502  			zap.String("local-member-id", localID.String()),
   503  			zap.String("local-member-cluster-id", cid.String()),
   504  			zap.String("local-member-server-version", localVs),
   505  			zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
   506  			zap.String("remote-peer-server-name", remoteName),
   507  			zap.String("remote-peer-server-version", remoteVs),
   508  			zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
   509  			zap.String("remote-peer-cluster-id", gcid),
   510  		)
   511  		return ErrClusterIDMismatch
   512  	}
   513  	return nil
   514  }
   515  
   516  type closeNotifier struct {
   517  	done chan struct{}
   518  }
   519  
   520  func newCloseNotifier() *closeNotifier {
   521  	return &closeNotifier{
   522  		done: make(chan struct{}),
   523  	}
   524  }
   525  
   526  func (n *closeNotifier) Close() error {
   527  	close(n.done)
   528  	return nil
   529  }
   530  
   531  func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done }
   532  

View as plain text