...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/transport.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"context"
    19  	"net/http"
    20  	"sync"
    21  	"time"
    22  
    23  	"go.etcd.io/etcd/client/pkg/v3/transport"
    24  	"go.etcd.io/etcd/client/pkg/v3/types"
    25  	"go.etcd.io/etcd/raft/v3"
    26  	"go.etcd.io/etcd/raft/v3/raftpb"
    27  	"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
    28  	stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
    29  
    30  	"github.com/xiang90/probing"
    31  	"go.uber.org/zap"
    32  	"golang.org/x/time/rate"
    33  )
    34  
    35  type Raft interface {
    36  	Process(ctx context.Context, m raftpb.Message) error
    37  	IsIDRemoved(id uint64) bool
    38  	ReportUnreachable(id uint64)
    39  	ReportSnapshot(id uint64, status raft.SnapshotStatus)
    40  }
    41  
    42  type Transporter interface {
    43  	// Start starts the given Transporter.
    44  	// Start MUST be called before calling other functions in the interface.
    45  	Start() error
    46  	// Handler returns the HTTP handler of the transporter.
    47  	// A transporter HTTP handler handles the HTTP requests
    48  	// from remote peers.
    49  	// The handler MUST be used to handle RaftPrefix(/raft)
    50  	// endpoint.
    51  	Handler() http.Handler
    52  	// Send sends out the given messages to the remote peers.
    53  	// Each message has a To field, which is an id that maps
    54  	// to an existing peer in the transport.
    55  	// If the id cannot be found in the transport, the message
    56  	// will be ignored.
    57  	Send(m []raftpb.Message)
    58  	// SendSnapshot sends out the given snapshot message to a remote peer.
    59  	// The behavior of SendSnapshot is similar to Send.
    60  	SendSnapshot(m snap.Message)
    61  	// AddRemote adds a remote with given peer urls into the transport.
    62  	// A remote helps newly joined member to catch up the progress of cluster,
    63  	// and will not be used after that.
    64  	// It is the caller's responsibility to ensure the urls are all valid,
    65  	// or it panics.
    66  	AddRemote(id types.ID, urls []string)
    67  	// AddPeer adds a peer with given peer urls into the transport.
    68  	// It is the caller's responsibility to ensure the urls are all valid,
    69  	// or it panics.
    70  	// Peer urls are used to connect to the remote peer.
    71  	AddPeer(id types.ID, urls []string)
    72  	// RemovePeer removes the peer with given id.
    73  	RemovePeer(id types.ID)
    74  	// RemoveAllPeers removes all the existing peers in the transport.
    75  	RemoveAllPeers()
    76  	// UpdatePeer updates the peer urls of the peer with the given id.
    77  	// It is the caller's responsibility to ensure the urls are all valid,
    78  	// or it panics.
    79  	UpdatePeer(id types.ID, urls []string)
    80  	// ActiveSince returns the time that the connection with the peer
    81  	// of the given id becomes active.
    82  	// If the connection is active since peer was added, it returns the adding time.
    83  	// If the connection is currently inactive, it returns zero time.
    84  	ActiveSince(id types.ID) time.Time
    85  	// ActivePeers returns the number of active peers.
    86  	ActivePeers() int
    87  	// Stop closes the connections and stops the transporter.
    88  	Stop()
    89  }
    90  
    91  // Transport implements Transporter interface. It provides the functionality
    92  // to send raft messages to peers, and receive raft messages from peers.
    93  // User should call Handler method to get a handler to serve requests
    94  // received from peerURLs.
    95  // User needs to call Start before calling other functions, and call
    96  // Stop when the Transport is no longer used.
    97  type Transport struct {
    98  	Logger *zap.Logger
    99  
   100  	DialTimeout time.Duration // maximum duration before timing out dial of the request
   101  	// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
   102  	// a distinct rate limiter is created per every peer (default value: 10 events/sec)
   103  	DialRetryFrequency rate.Limit
   104  
   105  	TLSInfo transport.TLSInfo // TLS information used when creating connection
   106  
   107  	ID          types.ID   // local member ID
   108  	URLs        types.URLs // local peer URLs
   109  	ClusterID   types.ID   // raft cluster ID for request validation
   110  	Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
   111  	Snapshotter *snap.Snapshotter
   112  	ServerStats *stats.ServerStats // used to record general transportation statistics
   113  	// LeaderStats is used to record transportation statistics with followers when
   114  	// performing as leader in raft protocol
   115  	LeaderStats *stats.LeaderStats
   116  	// ErrorC is used to report detected critical errors, e.g.,
   117  	// the member has been permanently removed from the cluster
   118  	// When an error is received from ErrorC, user should stop raft state
   119  	// machine and thus stop the Transport.
   120  	ErrorC chan error
   121  
   122  	streamRt   http.RoundTripper // roundTripper used by streams
   123  	pipelineRt http.RoundTripper // roundTripper used by pipelines
   124  
   125  	mu      sync.RWMutex         // protect the remote and peer map
   126  	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
   127  	peers   map[types.ID]Peer    // peers map
   128  
   129  	pipelineProber probing.Prober
   130  	streamProber   probing.Prober
   131  }
   132  
   133  func (t *Transport) Start() error {
   134  	var err error
   135  	t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
   136  	if err != nil {
   137  		return err
   138  	}
   139  	t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
   140  	if err != nil {
   141  		return err
   142  	}
   143  	t.remotes = make(map[types.ID]*remote)
   144  	t.peers = make(map[types.ID]Peer)
   145  	t.pipelineProber = probing.NewProber(t.pipelineRt)
   146  	t.streamProber = probing.NewProber(t.streamRt)
   147  
   148  	// If client didn't provide dial retry frequency, use the default
   149  	// (100ms backoff between attempts to create a new stream),
   150  	// so it doesn't bring too much overhead when retry.
   151  	if t.DialRetryFrequency == 0 {
   152  		t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
   153  	}
   154  	return nil
   155  }
   156  
   157  func (t *Transport) Handler() http.Handler {
   158  	pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
   159  	streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
   160  	snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
   161  	mux := http.NewServeMux()
   162  	mux.Handle(RaftPrefix, pipelineHandler)
   163  	mux.Handle(RaftStreamPrefix+"/", streamHandler)
   164  	mux.Handle(RaftSnapshotPrefix, snapHandler)
   165  	mux.Handle(ProbingPrefix, probing.NewHandler())
   166  	return mux
   167  }
   168  
   169  func (t *Transport) Get(id types.ID) Peer {
   170  	t.mu.RLock()
   171  	defer t.mu.RUnlock()
   172  	return t.peers[id]
   173  }
   174  
   175  func (t *Transport) Send(msgs []raftpb.Message) {
   176  	for _, m := range msgs {
   177  		if m.To == 0 {
   178  			// ignore intentionally dropped message
   179  			continue
   180  		}
   181  		to := types.ID(m.To)
   182  
   183  		t.mu.RLock()
   184  		p, pok := t.peers[to]
   185  		g, rok := t.remotes[to]
   186  		t.mu.RUnlock()
   187  
   188  		if pok {
   189  			if m.Type == raftpb.MsgApp {
   190  				t.ServerStats.SendAppendReq(m.Size())
   191  			}
   192  			p.send(m)
   193  			continue
   194  		}
   195  
   196  		if rok {
   197  			g.send(m)
   198  			continue
   199  		}
   200  
   201  		if t.Logger != nil {
   202  			t.Logger.Debug(
   203  				"ignored message send request; unknown remote peer target",
   204  				zap.String("type", m.Type.String()),
   205  				zap.String("unknown-target-peer-id", to.String()),
   206  			)
   207  		}
   208  	}
   209  }
   210  
   211  func (t *Transport) Stop() {
   212  	t.mu.Lock()
   213  	defer t.mu.Unlock()
   214  	for _, r := range t.remotes {
   215  		r.stop()
   216  	}
   217  	for _, p := range t.peers {
   218  		p.stop()
   219  	}
   220  	t.pipelineProber.RemoveAll()
   221  	t.streamProber.RemoveAll()
   222  	if tr, ok := t.streamRt.(*http.Transport); ok {
   223  		tr.CloseIdleConnections()
   224  	}
   225  	if tr, ok := t.pipelineRt.(*http.Transport); ok {
   226  		tr.CloseIdleConnections()
   227  	}
   228  	t.peers = nil
   229  	t.remotes = nil
   230  }
   231  
   232  // CutPeer drops messages to the specified peer.
   233  func (t *Transport) CutPeer(id types.ID) {
   234  	t.mu.RLock()
   235  	p, pok := t.peers[id]
   236  	g, gok := t.remotes[id]
   237  	t.mu.RUnlock()
   238  
   239  	if pok {
   240  		p.(Pausable).Pause()
   241  	}
   242  	if gok {
   243  		g.Pause()
   244  	}
   245  }
   246  
   247  // MendPeer recovers the message dropping behavior of the given peer.
   248  func (t *Transport) MendPeer(id types.ID) {
   249  	t.mu.RLock()
   250  	p, pok := t.peers[id]
   251  	g, gok := t.remotes[id]
   252  	t.mu.RUnlock()
   253  
   254  	if pok {
   255  		p.(Pausable).Resume()
   256  	}
   257  	if gok {
   258  		g.Resume()
   259  	}
   260  }
   261  
   262  func (t *Transport) AddRemote(id types.ID, us []string) {
   263  	t.mu.Lock()
   264  	defer t.mu.Unlock()
   265  	if t.remotes == nil {
   266  		// there's no clean way to shutdown the golang http server
   267  		// (see: https://github.com/golang/go/issues/4674) before
   268  		// stopping the transport; ignore any new connections.
   269  		return
   270  	}
   271  	if _, ok := t.peers[id]; ok {
   272  		return
   273  	}
   274  	if _, ok := t.remotes[id]; ok {
   275  		return
   276  	}
   277  	urls, err := types.NewURLs(us)
   278  	if err != nil {
   279  		if t.Logger != nil {
   280  			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
   281  		}
   282  	}
   283  	t.remotes[id] = startRemote(t, urls, id)
   284  
   285  	if t.Logger != nil {
   286  		t.Logger.Info(
   287  			"added new remote peer",
   288  			zap.String("local-member-id", t.ID.String()),
   289  			zap.String("remote-peer-id", id.String()),
   290  			zap.Strings("remote-peer-urls", us),
   291  		)
   292  	}
   293  }
   294  
   295  func (t *Transport) AddPeer(id types.ID, us []string) {
   296  	t.mu.Lock()
   297  	defer t.mu.Unlock()
   298  
   299  	if t.peers == nil {
   300  		panic("transport stopped")
   301  	}
   302  	if _, ok := t.peers[id]; ok {
   303  		return
   304  	}
   305  	urls, err := types.NewURLs(us)
   306  	if err != nil {
   307  		if t.Logger != nil {
   308  			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
   309  		}
   310  	}
   311  	fs := t.LeaderStats.Follower(id.String())
   312  	t.peers[id] = startPeer(t, urls, id, fs)
   313  	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
   314  	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
   315  
   316  	if t.Logger != nil {
   317  		t.Logger.Info(
   318  			"added remote peer",
   319  			zap.String("local-member-id", t.ID.String()),
   320  			zap.String("remote-peer-id", id.String()),
   321  			zap.Strings("remote-peer-urls", us),
   322  		)
   323  	}
   324  }
   325  
   326  func (t *Transport) RemovePeer(id types.ID) {
   327  	t.mu.Lock()
   328  	defer t.mu.Unlock()
   329  	t.removePeer(id)
   330  }
   331  
   332  func (t *Transport) RemoveAllPeers() {
   333  	t.mu.Lock()
   334  	defer t.mu.Unlock()
   335  	for id := range t.peers {
   336  		t.removePeer(id)
   337  	}
   338  }
   339  
   340  // the caller of this function must have the peers mutex.
   341  func (t *Transport) removePeer(id types.ID) {
   342  	if peer, ok := t.peers[id]; ok {
   343  		peer.stop()
   344  	} else {
   345  		if t.Logger != nil {
   346  			t.Logger.Panic("unexpected removal of unknown remote peer", zap.String("remote-peer-id", id.String()))
   347  		}
   348  	}
   349  	delete(t.peers, id)
   350  	delete(t.LeaderStats.Followers, id.String())
   351  	t.pipelineProber.Remove(id.String())
   352  	t.streamProber.Remove(id.String())
   353  
   354  	if t.Logger != nil {
   355  		t.Logger.Info(
   356  			"removed remote peer",
   357  			zap.String("local-member-id", t.ID.String()),
   358  			zap.String("removed-remote-peer-id", id.String()),
   359  		)
   360  	}
   361  }
   362  
   363  func (t *Transport) UpdatePeer(id types.ID, us []string) {
   364  	t.mu.Lock()
   365  	defer t.mu.Unlock()
   366  	// TODO: return error or just panic?
   367  	if _, ok := t.peers[id]; !ok {
   368  		return
   369  	}
   370  	urls, err := types.NewURLs(us)
   371  	if err != nil {
   372  		if t.Logger != nil {
   373  			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
   374  		}
   375  	}
   376  	t.peers[id].update(urls)
   377  
   378  	t.pipelineProber.Remove(id.String())
   379  	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
   380  	t.streamProber.Remove(id.String())
   381  	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
   382  
   383  	if t.Logger != nil {
   384  		t.Logger.Info(
   385  			"updated remote peer",
   386  			zap.String("local-member-id", t.ID.String()),
   387  			zap.String("updated-remote-peer-id", id.String()),
   388  			zap.Strings("updated-remote-peer-urls", us),
   389  		)
   390  	}
   391  }
   392  
   393  func (t *Transport) ActiveSince(id types.ID) time.Time {
   394  	t.mu.RLock()
   395  	defer t.mu.RUnlock()
   396  	if p, ok := t.peers[id]; ok {
   397  		return p.activeSince()
   398  	}
   399  	return time.Time{}
   400  }
   401  
   402  func (t *Transport) SendSnapshot(m snap.Message) {
   403  	t.mu.Lock()
   404  	defer t.mu.Unlock()
   405  	p := t.peers[types.ID(m.To)]
   406  	if p == nil {
   407  		m.CloseWithError(errMemberNotFound)
   408  		return
   409  	}
   410  	p.sendSnap(m)
   411  }
   412  
   413  // Pausable is a testing interface for pausing transport traffic.
   414  type Pausable interface {
   415  	Pause()
   416  	Resume()
   417  }
   418  
   419  func (t *Transport) Pause() {
   420  	t.mu.RLock()
   421  	defer t.mu.RUnlock()
   422  	for _, p := range t.peers {
   423  		p.(Pausable).Pause()
   424  	}
   425  }
   426  
   427  func (t *Transport) Resume() {
   428  	t.mu.RLock()
   429  	defer t.mu.RUnlock()
   430  	for _, p := range t.peers {
   431  		p.(Pausable).Resume()
   432  	}
   433  }
   434  
   435  // ActivePeers returns a channel that closes when an initial
   436  // peer connection has been established. Use this to wait until the
   437  // first peer connection becomes active.
   438  func (t *Transport) ActivePeers() (cnt int) {
   439  	t.mu.RLock()
   440  	defer t.mu.RUnlock()
   441  	for _, p := range t.peers {
   442  		if !p.activeSince().IsZero() {
   443  			cnt++
   444  		}
   445  	}
   446  	return cnt
   447  }
   448  

View as plain text