...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/peer.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"context"
    19  	"sync"
    20  	"time"
    21  
    22  	"go.etcd.io/etcd/client/pkg/v3/types"
    23  	"go.etcd.io/etcd/raft/v3"
    24  	"go.etcd.io/etcd/raft/v3/raftpb"
    25  	"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
    26  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    27  
    28  	"go.uber.org/zap"
    29  	"golang.org/x/time/rate"
    30  )
    31  
    32  const (
    33  	// ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates.
    34  	// A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for
    35  	// tcp keepalive failing to detect a bad connection, which is at minutes level.
    36  	// For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage
    37  	// to keep the connection alive.
    38  	// For short term pipeline connections, the connection MUST be killed to avoid it being
    39  	// put back to http pkg connection pool.
    40  	DefaultConnReadTimeout  = 5 * time.Second
    41  	DefaultConnWriteTimeout = 5 * time.Second
    42  
    43  	recvBufSize = 4096
    44  	// maxPendingProposals holds the proposals during one leader election process.
    45  	// Generally one leader election takes at most 1 sec. It should have
    46  	// 0-2 election conflicts, and each one takes 0.5 sec.
    47  	// We assume the number of concurrent proposers is smaller than 4096.
    48  	// One client blocks on its proposal for at least 1 sec, so 4096 is enough
    49  	// to hold all proposals.
    50  	maxPendingProposals = 4096
    51  
    52  	streamAppV2 = "streamMsgAppV2"
    53  	streamMsg   = "streamMsg"
    54  	pipelineMsg = "pipeline"
    55  	sendSnap    = "sendMsgSnap"
    56  )
    57  
    58  var (
    59  	ConnReadTimeout  = DefaultConnReadTimeout
    60  	ConnWriteTimeout = DefaultConnWriteTimeout
    61  )
    62  
    63  type Peer interface {
    64  	// send sends the message to the remote peer. The function is non-blocking
    65  	// and has no promise that the message will be received by the remote.
    66  	// When it fails to send message out, it will report the status to underlying
    67  	// raft.
    68  	send(m raftpb.Message)
    69  
    70  	// sendSnap sends the merged snapshot message to the remote peer. Its behavior
    71  	// is similar to send.
    72  	sendSnap(m snap.Message)
    73  
    74  	// update updates the urls of remote peer.
    75  	update(urls types.URLs)
    76  
    77  	// attachOutgoingConn attaches the outgoing connection to the peer for
    78  	// stream usage. After the call, the ownership of the outgoing
    79  	// connection hands over to the peer. The peer will close the connection
    80  	// when it is no longer used.
    81  	attachOutgoingConn(conn *outgoingConn)
    82  	// activeSince returns the time that the connection with the
    83  	// peer becomes active.
    84  	activeSince() time.Time
    85  	// stop performs any necessary finalization and terminates the peer
    86  	// elegantly.
    87  	stop()
    88  }
    89  
    90  // peer is the representative of a remote raft node. Local raft node sends
    91  // messages to the remote through peer.
    92  // Each peer has two underlying mechanisms to send out a message: stream and
    93  // pipeline.
    94  // A stream is a receiver initialized long-polling connection, which
    95  // is always open to transfer messages. Besides general stream, peer also has
    96  // a optimized stream for sending msgApp since msgApp accounts for large part
    97  // of all messages. Only raft leader uses the optimized stream to send msgApp
    98  // to the remote follower node.
    99  // A pipeline is a series of http clients that send http requests to the remote.
   100  // It is only used when the stream has not been established.
   101  type peer struct {
   102  	lg *zap.Logger
   103  
   104  	localID types.ID
   105  	// id of the remote raft peer node
   106  	id types.ID
   107  
   108  	r Raft
   109  
   110  	status *peerStatus
   111  
   112  	picker *urlPicker
   113  
   114  	msgAppV2Writer *streamWriter
   115  	writer         *streamWriter
   116  	pipeline       *pipeline
   117  	snapSender     *snapshotSender // snapshot sender to send v3 snapshot messages
   118  	msgAppV2Reader *streamReader
   119  	msgAppReader   *streamReader
   120  
   121  	recvc chan raftpb.Message
   122  	propc chan raftpb.Message
   123  
   124  	mu     sync.Mutex
   125  	paused bool
   126  
   127  	cancel context.CancelFunc // cancel pending works in go routine created by peer.
   128  	stopc  chan struct{}
   129  }
   130  
   131  func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
   132  	if t.Logger != nil {
   133  		t.Logger.Info("starting remote peer", zap.String("remote-peer-id", peerID.String()))
   134  	}
   135  	defer func() {
   136  		if t.Logger != nil {
   137  			t.Logger.Info("started remote peer", zap.String("remote-peer-id", peerID.String()))
   138  		}
   139  	}()
   140  
   141  	status := newPeerStatus(t.Logger, t.ID, peerID)
   142  	picker := newURLPicker(urls)
   143  	errorc := t.ErrorC
   144  	r := t.Raft
   145  	pipeline := &pipeline{
   146  		peerID:        peerID,
   147  		tr:            t,
   148  		picker:        picker,
   149  		status:        status,
   150  		followerStats: fs,
   151  		raft:          r,
   152  		errorc:        errorc,
   153  	}
   154  	pipeline.start()
   155  
   156  	p := &peer{
   157  		lg:             t.Logger,
   158  		localID:        t.ID,
   159  		id:             peerID,
   160  		r:              r,
   161  		status:         status,
   162  		picker:         picker,
   163  		msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
   164  		writer:         startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
   165  		pipeline:       pipeline,
   166  		snapSender:     newSnapshotSender(t, picker, peerID, status),
   167  		recvc:          make(chan raftpb.Message, recvBufSize),
   168  		propc:          make(chan raftpb.Message, maxPendingProposals),
   169  		stopc:          make(chan struct{}),
   170  	}
   171  
   172  	ctx, cancel := context.WithCancel(context.Background())
   173  	p.cancel = cancel
   174  	go func() {
   175  		for {
   176  			select {
   177  			case mm := <-p.recvc:
   178  				if err := r.Process(ctx, mm); err != nil {
   179  					if t.Logger != nil {
   180  						t.Logger.Warn("failed to process Raft message", zap.Error(err))
   181  					}
   182  				}
   183  			case <-p.stopc:
   184  				return
   185  			}
   186  		}
   187  	}()
   188  
   189  	// r.Process might block for processing proposal when there is no leader.
   190  	// Thus propc must be put into a separate routine with recvc to avoid blocking
   191  	// processing other raft messages.
   192  	go func() {
   193  		for {
   194  			select {
   195  			case mm := <-p.propc:
   196  				if err := r.Process(ctx, mm); err != nil {
   197  					if t.Logger != nil {
   198  						t.Logger.Warn("failed to process Raft message", zap.Error(err))
   199  					}
   200  				}
   201  			case <-p.stopc:
   202  				return
   203  			}
   204  		}
   205  	}()
   206  
   207  	p.msgAppV2Reader = &streamReader{
   208  		lg:     t.Logger,
   209  		peerID: peerID,
   210  		typ:    streamTypeMsgAppV2,
   211  		tr:     t,
   212  		picker: picker,
   213  		status: status,
   214  		recvc:  p.recvc,
   215  		propc:  p.propc,
   216  		rl:     rate.NewLimiter(t.DialRetryFrequency, 1),
   217  	}
   218  	p.msgAppReader = &streamReader{
   219  		lg:     t.Logger,
   220  		peerID: peerID,
   221  		typ:    streamTypeMessage,
   222  		tr:     t,
   223  		picker: picker,
   224  		status: status,
   225  		recvc:  p.recvc,
   226  		propc:  p.propc,
   227  		rl:     rate.NewLimiter(t.DialRetryFrequency, 1),
   228  	}
   229  
   230  	p.msgAppV2Reader.start()
   231  	p.msgAppReader.start()
   232  
   233  	return p
   234  }
   235  
   236  func (p *peer) send(m raftpb.Message) {
   237  	p.mu.Lock()
   238  	paused := p.paused
   239  	p.mu.Unlock()
   240  
   241  	if paused {
   242  		return
   243  	}
   244  
   245  	writec, name := p.pick(m)
   246  	select {
   247  	case writec <- m:
   248  	default:
   249  		p.r.ReportUnreachable(m.To)
   250  		if isMsgSnap(m) {
   251  			p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
   252  		}
   253  		if p.status.isActive() {
   254  			if p.lg != nil {
   255  				p.lg.Warn(
   256  					"dropped internal Raft message since sending buffer is full (overloaded network)",
   257  					zap.String("message-type", m.Type.String()),
   258  					zap.String("local-member-id", p.localID.String()),
   259  					zap.String("from", types.ID(m.From).String()),
   260  					zap.String("remote-peer-id", p.id.String()),
   261  					zap.String("remote-peer-name", name),
   262  					zap.Bool("remote-peer-active", p.status.isActive()),
   263  				)
   264  			}
   265  		} else {
   266  			if p.lg != nil {
   267  				p.lg.Warn(
   268  					"dropped internal Raft message since sending buffer is full (overloaded network)",
   269  					zap.String("message-type", m.Type.String()),
   270  					zap.String("local-member-id", p.localID.String()),
   271  					zap.String("from", types.ID(m.From).String()),
   272  					zap.String("remote-peer-id", p.id.String()),
   273  					zap.String("remote-peer-name", name),
   274  					zap.Bool("remote-peer-active", p.status.isActive()),
   275  				)
   276  			}
   277  		}
   278  		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
   279  	}
   280  }
   281  
   282  func (p *peer) sendSnap(m snap.Message) {
   283  	go p.snapSender.send(m)
   284  }
   285  
   286  func (p *peer) update(urls types.URLs) {
   287  	p.picker.update(urls)
   288  }
   289  
   290  func (p *peer) attachOutgoingConn(conn *outgoingConn) {
   291  	var ok bool
   292  	switch conn.t {
   293  	case streamTypeMsgAppV2:
   294  		ok = p.msgAppV2Writer.attach(conn)
   295  	case streamTypeMessage:
   296  		ok = p.writer.attach(conn)
   297  	default:
   298  		if p.lg != nil {
   299  			p.lg.Panic("unknown stream type", zap.String("type", conn.t.String()))
   300  		}
   301  	}
   302  	if !ok {
   303  		conn.Close()
   304  	}
   305  }
   306  
   307  func (p *peer) activeSince() time.Time { return p.status.activeSince() }
   308  
   309  // Pause pauses the peer. The peer will simply drops all incoming
   310  // messages without returning an error.
   311  func (p *peer) Pause() {
   312  	p.mu.Lock()
   313  	defer p.mu.Unlock()
   314  	p.paused = true
   315  	p.msgAppReader.pause()
   316  	p.msgAppV2Reader.pause()
   317  }
   318  
   319  // Resume resumes a paused peer.
   320  func (p *peer) Resume() {
   321  	p.mu.Lock()
   322  	defer p.mu.Unlock()
   323  	p.paused = false
   324  	p.msgAppReader.resume()
   325  	p.msgAppV2Reader.resume()
   326  }
   327  
   328  func (p *peer) stop() {
   329  	if p.lg != nil {
   330  		p.lg.Info("stopping remote peer", zap.String("remote-peer-id", p.id.String()))
   331  	}
   332  
   333  	defer func() {
   334  		if p.lg != nil {
   335  			p.lg.Info("stopped remote peer", zap.String("remote-peer-id", p.id.String()))
   336  		}
   337  	}()
   338  
   339  	close(p.stopc)
   340  	p.cancel()
   341  	p.msgAppV2Writer.stop()
   342  	p.writer.stop()
   343  	p.pipeline.stop()
   344  	p.snapSender.stop()
   345  	p.msgAppV2Reader.stop()
   346  	p.msgAppReader.stop()
   347  }
   348  
   349  // pick picks a chan for sending the given message. The picked chan and the picked chan
   350  // string name are returned.
   351  func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
   352  	var ok bool
   353  	// Considering MsgSnap may have a big size, e.g., 1G, and will block
   354  	// stream for a long time, only use one of the N pipelines to send MsgSnap.
   355  	if isMsgSnap(m) {
   356  		return p.pipeline.msgc, pipelineMsg
   357  	} else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
   358  		return writec, streamAppV2
   359  	} else if writec, ok = p.writer.writec(); ok {
   360  		return writec, streamMsg
   361  	}
   362  	return p.pipeline.msgc, pipelineMsg
   363  }
   364  
   365  func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
   366  
   367  func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }
   368  

View as plain text