...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp/remote.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafthttp
    16  
    17  import (
    18  	"go.etcd.io/etcd/client/pkg/v3/types"
    19  	"go.etcd.io/etcd/raft/v3/raftpb"
    20  
    21  	"go.uber.org/zap"
    22  )
    23  
    24  type remote struct {
    25  	lg       *zap.Logger
    26  	localID  types.ID
    27  	id       types.ID
    28  	status   *peerStatus
    29  	pipeline *pipeline
    30  }
    31  
    32  func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
    33  	picker := newURLPicker(urls)
    34  	status := newPeerStatus(tr.Logger, tr.ID, id)
    35  	pipeline := &pipeline{
    36  		peerID: id,
    37  		tr:     tr,
    38  		picker: picker,
    39  		status: status,
    40  		raft:   tr.Raft,
    41  		errorc: tr.ErrorC,
    42  	}
    43  	pipeline.start()
    44  
    45  	return &remote{
    46  		lg:       tr.Logger,
    47  		localID:  tr.ID,
    48  		id:       id,
    49  		status:   status,
    50  		pipeline: pipeline,
    51  	}
    52  }
    53  
    54  func (g *remote) send(m raftpb.Message) {
    55  	select {
    56  	case g.pipeline.msgc <- m:
    57  	default:
    58  		if g.status.isActive() {
    59  			if g.lg != nil {
    60  				g.lg.Warn(
    61  					"dropped internal Raft message since sending buffer is full (overloaded network)",
    62  					zap.String("message-type", m.Type.String()),
    63  					zap.String("local-member-id", g.localID.String()),
    64  					zap.String("from", types.ID(m.From).String()),
    65  					zap.String("remote-peer-id", g.id.String()),
    66  					zap.Bool("remote-peer-active", g.status.isActive()),
    67  				)
    68  			}
    69  		} else {
    70  			if g.lg != nil {
    71  				g.lg.Warn(
    72  					"dropped Raft message since sending buffer is full (overloaded network)",
    73  					zap.String("message-type", m.Type.String()),
    74  					zap.String("local-member-id", g.localID.String()),
    75  					zap.String("from", types.ID(m.From).String()),
    76  					zap.String("remote-peer-id", g.id.String()),
    77  					zap.Bool("remote-peer-active", g.status.isActive()),
    78  				)
    79  			}
    80  		}
    81  		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
    82  	}
    83  }
    84  
    85  func (g *remote) stop() {
    86  	g.pipeline.stop()
    87  }
    88  
    89  func (g *remote) Pause() {
    90  	g.stop()
    91  }
    92  
    93  func (g *remote) Resume() {
    94  	g.pipeline.start()
    95  }
    96  

View as plain text