...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/stream.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"io"
    21  	"io/ioutil"
    22  	"net/http"
    23  	"path"
    24  	"strings"
    25  	"sync"
    26  	"time"
    27  
    28  	"go.etcd.io/etcd/api/v3/version"
    29  	"go.etcd.io/etcd/client/pkg/v3/transport"
    30  	"go.etcd.io/etcd/client/pkg/v3/types"
    31  	"go.etcd.io/etcd/pkg/v3/httputil"
    32  	"go.etcd.io/etcd/raft/v3/raftpb"
    33  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    34  
    35  	"github.com/coreos/go-semver/semver"
    36  	"go.uber.org/zap"
    37  	"golang.org/x/time/rate"
    38  )
    39  
    40  const (
    41  	streamTypeMessage  streamType = "message"
    42  	streamTypeMsgAppV2 streamType = "msgappv2"
    43  
    44  	streamBufSize = 4096
    45  )
    46  
    47  var (
    48  	errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
    49  
    50  	// the key is in string format "major.minor.patch"
    51  	supportedStream = map[string][]streamType{
    52  		"2.0.0": {},
    53  		"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
    54  		"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
    55  		"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
    56  		"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
    57  		"3.1.0": {streamTypeMsgAppV2, streamTypeMessage},
    58  		"3.2.0": {streamTypeMsgAppV2, streamTypeMessage},
    59  		"3.3.0": {streamTypeMsgAppV2, streamTypeMessage},
    60  		"3.4.0": {streamTypeMsgAppV2, streamTypeMessage},
    61  		"3.5.0": {streamTypeMsgAppV2, streamTypeMessage},
    62  	}
    63  )
    64  
    65  type streamType string
    66  
    67  func (t streamType) endpoint(lg *zap.Logger) string {
    68  	switch t {
    69  	case streamTypeMsgAppV2:
    70  		return path.Join(RaftStreamPrefix, "msgapp")
    71  	case streamTypeMessage:
    72  		return path.Join(RaftStreamPrefix, "message")
    73  	default:
    74  		if lg != nil {
    75  			lg.Panic("unhandled stream type", zap.String("stream-type", t.String()))
    76  		}
    77  		return ""
    78  	}
    79  }
    80  
    81  func (t streamType) String() string {
    82  	switch t {
    83  	case streamTypeMsgAppV2:
    84  		return "stream MsgApp v2"
    85  	case streamTypeMessage:
    86  		return "stream Message"
    87  	default:
    88  		return "unknown stream"
    89  	}
    90  }
    91  
    92  var (
    93  	// linkHeartbeatMessage is a special message used as heartbeat message in
    94  	// link layer. It never conflicts with messages from raft because raft
    95  	// doesn't send out messages without From and To fields.
    96  	linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
    97  )
    98  
    99  func isLinkHeartbeatMessage(m *raftpb.Message) bool {
   100  	return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
   101  }
   102  
   103  type outgoingConn struct {
   104  	t streamType
   105  	io.Writer
   106  	http.Flusher
   107  	io.Closer
   108  
   109  	localID types.ID
   110  	peerID  types.ID
   111  }
   112  
   113  // streamWriter writes messages to the attached outgoingConn.
   114  type streamWriter struct {
   115  	lg *zap.Logger
   116  
   117  	localID types.ID
   118  	peerID  types.ID
   119  
   120  	status *peerStatus
   121  	fs     *stats.FollowerStats
   122  	r      Raft
   123  
   124  	mu      sync.Mutex // guard field working and closer
   125  	closer  io.Closer
   126  	working bool
   127  
   128  	msgc  chan raftpb.Message
   129  	connc chan *outgoingConn
   130  	stopc chan struct{}
   131  	done  chan struct{}
   132  }
   133  
   134  // startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
   135  // messages and writes to the attached outgoing connection.
   136  func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
   137  	w := &streamWriter{
   138  		lg: lg,
   139  
   140  		localID: local,
   141  		peerID:  id,
   142  
   143  		status: status,
   144  		fs:     fs,
   145  		r:      r,
   146  		msgc:   make(chan raftpb.Message, streamBufSize),
   147  		connc:  make(chan *outgoingConn),
   148  		stopc:  make(chan struct{}),
   149  		done:   make(chan struct{}),
   150  	}
   151  	go w.run()
   152  	return w
   153  }
   154  
   155  func (cw *streamWriter) run() {
   156  	var (
   157  		msgc       chan raftpb.Message
   158  		heartbeatc <-chan time.Time
   159  		t          streamType
   160  		enc        encoder
   161  		flusher    http.Flusher
   162  		batched    int
   163  	)
   164  	tickc := time.NewTicker(ConnReadTimeout / 3)
   165  	defer tickc.Stop()
   166  	unflushed := 0
   167  
   168  	if cw.lg != nil {
   169  		cw.lg.Info(
   170  			"started stream writer with remote peer",
   171  			zap.String("local-member-id", cw.localID.String()),
   172  			zap.String("remote-peer-id", cw.peerID.String()),
   173  		)
   174  	}
   175  
   176  	for {
   177  		select {
   178  		case <-heartbeatc:
   179  			err := enc.encode(&linkHeartbeatMessage)
   180  			unflushed += linkHeartbeatMessage.Size()
   181  			if err == nil {
   182  				flusher.Flush()
   183  				batched = 0
   184  				sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
   185  				unflushed = 0
   186  				continue
   187  			}
   188  
   189  			cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
   190  
   191  			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
   192  			cw.close()
   193  			if cw.lg != nil {
   194  				cw.lg.Warn(
   195  					"lost TCP streaming connection with remote peer",
   196  					zap.String("stream-writer-type", t.String()),
   197  					zap.String("local-member-id", cw.localID.String()),
   198  					zap.String("remote-peer-id", cw.peerID.String()),
   199  				)
   200  			}
   201  			heartbeatc, msgc = nil, nil
   202  
   203  		case m := <-msgc:
   204  			err := enc.encode(&m)
   205  			if err == nil {
   206  				unflushed += m.Size()
   207  
   208  				if len(msgc) == 0 || batched > streamBufSize/2 {
   209  					flusher.Flush()
   210  					sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
   211  					unflushed = 0
   212  					batched = 0
   213  				} else {
   214  					batched++
   215  				}
   216  
   217  				continue
   218  			}
   219  
   220  			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
   221  			cw.close()
   222  			if cw.lg != nil {
   223  				cw.lg.Warn(
   224  					"lost TCP streaming connection with remote peer",
   225  					zap.String("stream-writer-type", t.String()),
   226  					zap.String("local-member-id", cw.localID.String()),
   227  					zap.String("remote-peer-id", cw.peerID.String()),
   228  				)
   229  			}
   230  			heartbeatc, msgc = nil, nil
   231  			cw.r.ReportUnreachable(m.To)
   232  			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
   233  
   234  		case conn := <-cw.connc:
   235  			cw.mu.Lock()
   236  			closed := cw.closeUnlocked()
   237  			t = conn.t
   238  			switch conn.t {
   239  			case streamTypeMsgAppV2:
   240  				enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
   241  			case streamTypeMessage:
   242  				enc = &messageEncoder{w: conn.Writer}
   243  			default:
   244  				if cw.lg != nil {
   245  					cw.lg.Panic("unhandled stream type", zap.String("stream-type", t.String()))
   246  				}
   247  			}
   248  			if cw.lg != nil {
   249  				cw.lg.Info(
   250  					"set message encoder",
   251  					zap.String("from", conn.localID.String()),
   252  					zap.String("to", conn.peerID.String()),
   253  					zap.String("stream-type", t.String()),
   254  				)
   255  			}
   256  			flusher = conn.Flusher
   257  			unflushed = 0
   258  			cw.status.activate()
   259  			cw.closer = conn.Closer
   260  			cw.working = true
   261  			cw.mu.Unlock()
   262  
   263  			if closed {
   264  				if cw.lg != nil {
   265  					cw.lg.Warn(
   266  						"closed TCP streaming connection with remote peer",
   267  						zap.String("stream-writer-type", t.String()),
   268  						zap.String("local-member-id", cw.localID.String()),
   269  						zap.String("remote-peer-id", cw.peerID.String()),
   270  					)
   271  				}
   272  			}
   273  			if cw.lg != nil {
   274  				cw.lg.Info(
   275  					"established TCP streaming connection with remote peer",
   276  					zap.String("stream-writer-type", t.String()),
   277  					zap.String("local-member-id", cw.localID.String()),
   278  					zap.String("remote-peer-id", cw.peerID.String()),
   279  				)
   280  			}
   281  			heartbeatc, msgc = tickc.C, cw.msgc
   282  
   283  		case <-cw.stopc:
   284  			if cw.close() {
   285  				if cw.lg != nil {
   286  					cw.lg.Warn(
   287  						"closed TCP streaming connection with remote peer",
   288  						zap.String("stream-writer-type", t.String()),
   289  						zap.String("remote-peer-id", cw.peerID.String()),
   290  					)
   291  				}
   292  			}
   293  			if cw.lg != nil {
   294  				cw.lg.Info(
   295  					"stopped TCP streaming connection with remote peer",
   296  					zap.String("stream-writer-type", t.String()),
   297  					zap.String("remote-peer-id", cw.peerID.String()),
   298  				)
   299  			}
   300  			close(cw.done)
   301  			return
   302  		}
   303  	}
   304  }
   305  
   306  func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
   307  	cw.mu.Lock()
   308  	defer cw.mu.Unlock()
   309  	return cw.msgc, cw.working
   310  }
   311  
   312  func (cw *streamWriter) close() bool {
   313  	cw.mu.Lock()
   314  	defer cw.mu.Unlock()
   315  	return cw.closeUnlocked()
   316  }
   317  
   318  func (cw *streamWriter) closeUnlocked() bool {
   319  	if !cw.working {
   320  		return false
   321  	}
   322  	if err := cw.closer.Close(); err != nil {
   323  		if cw.lg != nil {
   324  			cw.lg.Warn(
   325  				"failed to close connection with remote peer",
   326  				zap.String("remote-peer-id", cw.peerID.String()),
   327  				zap.Error(err),
   328  			)
   329  		}
   330  	}
   331  	if len(cw.msgc) > 0 {
   332  		cw.r.ReportUnreachable(uint64(cw.peerID))
   333  	}
   334  	cw.msgc = make(chan raftpb.Message, streamBufSize)
   335  	cw.working = false
   336  	return true
   337  }
   338  
   339  func (cw *streamWriter) attach(conn *outgoingConn) bool {
   340  	select {
   341  	case cw.connc <- conn:
   342  		return true
   343  	case <-cw.done:
   344  		return false
   345  	}
   346  }
   347  
   348  func (cw *streamWriter) stop() {
   349  	close(cw.stopc)
   350  	<-cw.done
   351  }
   352  
   353  // streamReader is a long-running go-routine that dials to the remote stream
   354  // endpoint and reads messages from the response body returned.
   355  type streamReader struct {
   356  	lg *zap.Logger
   357  
   358  	peerID types.ID
   359  	typ    streamType
   360  
   361  	tr     *Transport
   362  	picker *urlPicker
   363  	status *peerStatus
   364  	recvc  chan<- raftpb.Message
   365  	propc  chan<- raftpb.Message
   366  
   367  	rl *rate.Limiter // alters the frequency of dial retrial attempts
   368  
   369  	errorc chan<- error
   370  
   371  	mu     sync.Mutex
   372  	paused bool
   373  	closer io.Closer
   374  
   375  	ctx    context.Context
   376  	cancel context.CancelFunc
   377  	done   chan struct{}
   378  }
   379  
   380  func (cr *streamReader) start() {
   381  	cr.done = make(chan struct{})
   382  	if cr.errorc == nil {
   383  		cr.errorc = cr.tr.ErrorC
   384  	}
   385  	if cr.ctx == nil {
   386  		cr.ctx, cr.cancel = context.WithCancel(context.Background())
   387  	}
   388  	go cr.run()
   389  }
   390  
   391  func (cr *streamReader) run() {
   392  	t := cr.typ
   393  
   394  	if cr.lg != nil {
   395  		cr.lg.Info(
   396  			"started stream reader with remote peer",
   397  			zap.String("stream-reader-type", t.String()),
   398  			zap.String("local-member-id", cr.tr.ID.String()),
   399  			zap.String("remote-peer-id", cr.peerID.String()),
   400  		)
   401  	}
   402  
   403  	for {
   404  		rc, err := cr.dial(t)
   405  		if err != nil {
   406  			if err != errUnsupportedStreamType {
   407  				cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
   408  			}
   409  		} else {
   410  			cr.status.activate()
   411  			if cr.lg != nil {
   412  				cr.lg.Info(
   413  					"established TCP streaming connection with remote peer",
   414  					zap.String("stream-reader-type", cr.typ.String()),
   415  					zap.String("local-member-id", cr.tr.ID.String()),
   416  					zap.String("remote-peer-id", cr.peerID.String()),
   417  				)
   418  			}
   419  			err = cr.decodeLoop(rc, t)
   420  			if cr.lg != nil {
   421  				cr.lg.Warn(
   422  					"lost TCP streaming connection with remote peer",
   423  					zap.String("stream-reader-type", cr.typ.String()),
   424  					zap.String("local-member-id", cr.tr.ID.String()),
   425  					zap.String("remote-peer-id", cr.peerID.String()),
   426  					zap.Error(err),
   427  				)
   428  			}
   429  			switch {
   430  			// all data is read out
   431  			case err == io.EOF:
   432  			// connection is closed by the remote
   433  			case transport.IsClosedConnError(err):
   434  			default:
   435  				cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
   436  			}
   437  		}
   438  		// Wait for a while before new dial attempt
   439  		err = cr.rl.Wait(cr.ctx)
   440  		if cr.ctx.Err() != nil {
   441  			if cr.lg != nil {
   442  				cr.lg.Info(
   443  					"stopped stream reader with remote peer",
   444  					zap.String("stream-reader-type", t.String()),
   445  					zap.String("local-member-id", cr.tr.ID.String()),
   446  					zap.String("remote-peer-id", cr.peerID.String()),
   447  				)
   448  			}
   449  			close(cr.done)
   450  			return
   451  		}
   452  		if err != nil {
   453  			if cr.lg != nil {
   454  				cr.lg.Warn(
   455  					"rate limit on stream reader with remote peer",
   456  					zap.String("stream-reader-type", t.String()),
   457  					zap.String("local-member-id", cr.tr.ID.String()),
   458  					zap.String("remote-peer-id", cr.peerID.String()),
   459  					zap.Error(err),
   460  				)
   461  			}
   462  		}
   463  	}
   464  }
   465  
   466  func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
   467  	var dec decoder
   468  	cr.mu.Lock()
   469  	switch t {
   470  	case streamTypeMsgAppV2:
   471  		dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
   472  	case streamTypeMessage:
   473  		dec = &messageDecoder{r: rc}
   474  	default:
   475  		if cr.lg != nil {
   476  			cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
   477  		}
   478  	}
   479  	select {
   480  	case <-cr.ctx.Done():
   481  		cr.mu.Unlock()
   482  		if err := rc.Close(); err != nil {
   483  			return err
   484  		}
   485  		return io.EOF
   486  	default:
   487  		cr.closer = rc
   488  	}
   489  	cr.mu.Unlock()
   490  
   491  	// gofail: labelRaftDropHeartbeat:
   492  	for {
   493  		m, err := dec.decode()
   494  		if err != nil {
   495  			cr.mu.Lock()
   496  			cr.close()
   497  			cr.mu.Unlock()
   498  			return err
   499  		}
   500  
   501  		// gofail-go: var raftDropHeartbeat struct{}
   502  		// continue labelRaftDropHeartbeat
   503  		receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
   504  
   505  		cr.mu.Lock()
   506  		paused := cr.paused
   507  		cr.mu.Unlock()
   508  
   509  		if paused {
   510  			continue
   511  		}
   512  
   513  		if isLinkHeartbeatMessage(&m) {
   514  			// raft is not interested in link layer
   515  			// heartbeat message, so we should ignore
   516  			// it.
   517  			continue
   518  		}
   519  
   520  		recvc := cr.recvc
   521  		if m.Type == raftpb.MsgProp {
   522  			recvc = cr.propc
   523  		}
   524  
   525  		select {
   526  		case recvc <- m:
   527  		default:
   528  			if cr.status.isActive() {
   529  				if cr.lg != nil {
   530  					cr.lg.Warn(
   531  						"dropped internal Raft message since receiving buffer is full (overloaded network)",
   532  						zap.String("message-type", m.Type.String()),
   533  						zap.String("local-member-id", cr.tr.ID.String()),
   534  						zap.String("from", types.ID(m.From).String()),
   535  						zap.String("remote-peer-id", types.ID(m.To).String()),
   536  						zap.Bool("remote-peer-active", cr.status.isActive()),
   537  					)
   538  				}
   539  			} else {
   540  				if cr.lg != nil {
   541  					cr.lg.Warn(
   542  						"dropped Raft message since receiving buffer is full (overloaded network)",
   543  						zap.String("message-type", m.Type.String()),
   544  						zap.String("local-member-id", cr.tr.ID.String()),
   545  						zap.String("from", types.ID(m.From).String()),
   546  						zap.String("remote-peer-id", types.ID(m.To).String()),
   547  						zap.Bool("remote-peer-active", cr.status.isActive()),
   548  					)
   549  				}
   550  			}
   551  			recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
   552  		}
   553  	}
   554  }
   555  
   556  func (cr *streamReader) stop() {
   557  	cr.mu.Lock()
   558  	cr.cancel()
   559  	cr.close()
   560  	cr.mu.Unlock()
   561  	<-cr.done
   562  }
   563  
   564  func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
   565  	u := cr.picker.pick()
   566  	uu := u
   567  	uu.Path = path.Join(t.endpoint(cr.lg), cr.tr.ID.String())
   568  
   569  	if cr.lg != nil {
   570  		cr.lg.Debug(
   571  			"dial stream reader",
   572  			zap.String("from", cr.tr.ID.String()),
   573  			zap.String("to", cr.peerID.String()),
   574  			zap.String("address", uu.String()),
   575  		)
   576  	}
   577  	req, err := http.NewRequest("GET", uu.String(), nil)
   578  	if err != nil {
   579  		cr.picker.unreachable(u)
   580  		return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
   581  	}
   582  	req.Header.Set("X-Server-From", cr.tr.ID.String())
   583  	req.Header.Set("X-Server-Version", version.Version)
   584  	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
   585  	req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
   586  	req.Header.Set("X-Raft-To", cr.peerID.String())
   587  
   588  	setPeerURLsHeader(req, cr.tr.URLs)
   589  
   590  	req = req.WithContext(cr.ctx)
   591  
   592  	cr.mu.Lock()
   593  	select {
   594  	case <-cr.ctx.Done():
   595  		cr.mu.Unlock()
   596  		return nil, fmt.Errorf("stream reader is stopped")
   597  	default:
   598  	}
   599  	cr.mu.Unlock()
   600  
   601  	resp, err := cr.tr.streamRt.RoundTrip(req)
   602  	if err != nil {
   603  		cr.picker.unreachable(u)
   604  		return nil, err
   605  	}
   606  
   607  	rv := serverVersion(resp.Header)
   608  	lv := semver.Must(semver.NewVersion(version.Version))
   609  	if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
   610  		httputil.GracefulClose(resp)
   611  		cr.picker.unreachable(u)
   612  		return nil, errUnsupportedStreamType
   613  	}
   614  
   615  	switch resp.StatusCode {
   616  	case http.StatusGone:
   617  		httputil.GracefulClose(resp)
   618  		cr.picker.unreachable(u)
   619  		reportCriticalError(errMemberRemoved, cr.errorc)
   620  		return nil, errMemberRemoved
   621  
   622  	case http.StatusOK:
   623  		return resp.Body, nil
   624  
   625  	case http.StatusNotFound:
   626  		httputil.GracefulClose(resp)
   627  		cr.picker.unreachable(u)
   628  		return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
   629  
   630  	case http.StatusPreconditionFailed:
   631  		b, err := ioutil.ReadAll(resp.Body)
   632  		if err != nil {
   633  			cr.picker.unreachable(u)
   634  			return nil, err
   635  		}
   636  		httputil.GracefulClose(resp)
   637  		cr.picker.unreachable(u)
   638  
   639  		switch strings.TrimSuffix(string(b), "\n") {
   640  		case errIncompatibleVersion.Error():
   641  			if cr.lg != nil {
   642  				cr.lg.Warn(
   643  					"request sent was ignored by remote peer due to server version incompatibility",
   644  					zap.String("local-member-id", cr.tr.ID.String()),
   645  					zap.String("remote-peer-id", cr.peerID.String()),
   646  					zap.Error(errIncompatibleVersion),
   647  				)
   648  			}
   649  			return nil, errIncompatibleVersion
   650  
   651  		case ErrClusterIDMismatch.Error():
   652  			if cr.lg != nil {
   653  				cr.lg.Warn(
   654  					"request sent was ignored by remote peer due to cluster ID mismatch",
   655  					zap.String("remote-peer-id", cr.peerID.String()),
   656  					zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
   657  					zap.String("local-member-id", cr.tr.ID.String()),
   658  					zap.String("local-member-cluster-id", cr.tr.ClusterID.String()),
   659  					zap.Error(ErrClusterIDMismatch),
   660  				)
   661  			}
   662  			return nil, ErrClusterIDMismatch
   663  
   664  		default:
   665  			return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
   666  		}
   667  
   668  	default:
   669  		httputil.GracefulClose(resp)
   670  		cr.picker.unreachable(u)
   671  		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
   672  	}
   673  }
   674  
   675  func (cr *streamReader) close() {
   676  	if cr.closer != nil {
   677  		if err := cr.closer.Close(); err != nil {
   678  			if cr.lg != nil {
   679  				cr.lg.Warn(
   680  					"failed to close remote peer connection",
   681  					zap.String("local-member-id", cr.tr.ID.String()),
   682  					zap.String("remote-peer-id", cr.peerID.String()),
   683  					zap.Error(err),
   684  				)
   685  			}
   686  		}
   687  	}
   688  	cr.closer = nil
   689  }
   690  
   691  func (cr *streamReader) pause() {
   692  	cr.mu.Lock()
   693  	defer cr.mu.Unlock()
   694  	cr.paused = true
   695  }
   696  
   697  func (cr *streamReader) resume() {
   698  	cr.mu.Lock()
   699  	defer cr.mu.Unlock()
   700  	cr.paused = false
   701  }
   702  
   703  // checkStreamSupport checks whether the stream type is supported in the
   704  // given version.
   705  func checkStreamSupport(v *semver.Version, t streamType) bool {
   706  	nv := &semver.Version{Major: v.Major, Minor: v.Minor}
   707  	for _, s := range supportedStream[nv.String()] {
   708  		if s == t {
   709  			return true
   710  		}
   711  	}
   712  	return false
   713  }
   714  

View as plain text