...

Source file src/go.etcd.io/etcd/raft/v3/rafttest/network.go

Documentation: go.etcd.io/etcd/raft/v3/rafttest

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package rafttest
    16  
    17  import (
    18  	"math/rand"
    19  	"sync"
    20  	"time"
    21  
    22  	"go.etcd.io/etcd/raft/v3/raftpb"
    23  )
    24  
    25  // a network interface
    26  type iface interface {
    27  	send(m raftpb.Message)
    28  	recv() chan raftpb.Message
    29  	disconnect()
    30  	connect()
    31  }
    32  
    33  type raftNetwork struct {
    34  	rand         *rand.Rand
    35  	mu           sync.Mutex
    36  	disconnected map[uint64]bool
    37  	dropmap      map[conn]float64
    38  	delaymap     map[conn]delay
    39  	recvQueues   map[uint64]chan raftpb.Message
    40  }
    41  
    42  type conn struct {
    43  	from, to uint64
    44  }
    45  
    46  type delay struct {
    47  	d    time.Duration
    48  	rate float64
    49  }
    50  
    51  func newRaftNetwork(nodes ...uint64) *raftNetwork {
    52  	pn := &raftNetwork{
    53  		rand:         rand.New(rand.NewSource(1)),
    54  		recvQueues:   make(map[uint64]chan raftpb.Message),
    55  		dropmap:      make(map[conn]float64),
    56  		delaymap:     make(map[conn]delay),
    57  		disconnected: make(map[uint64]bool),
    58  	}
    59  
    60  	for _, n := range nodes {
    61  		pn.recvQueues[n] = make(chan raftpb.Message, 1024)
    62  	}
    63  	return pn
    64  }
    65  
    66  func (rn *raftNetwork) nodeNetwork(id uint64) iface {
    67  	return &nodeNetwork{id: id, raftNetwork: rn}
    68  }
    69  
    70  func (rn *raftNetwork) send(m raftpb.Message) {
    71  	rn.mu.Lock()
    72  	to := rn.recvQueues[m.To]
    73  	if rn.disconnected[m.To] {
    74  		to = nil
    75  	}
    76  	drop := rn.dropmap[conn{m.From, m.To}]
    77  	dl := rn.delaymap[conn{m.From, m.To}]
    78  	rn.mu.Unlock()
    79  
    80  	if to == nil {
    81  		return
    82  	}
    83  	if drop != 0 && rn.rand.Float64() < drop {
    84  		return
    85  	}
    86  	// TODO: shall we dl without blocking the send call?
    87  	if dl.d != 0 && rn.rand.Float64() < dl.rate {
    88  		rd := rn.rand.Int63n(int64(dl.d))
    89  		time.Sleep(time.Duration(rd))
    90  	}
    91  
    92  	// use marshal/unmarshal to copy message to avoid data race.
    93  	b, err := m.Marshal()
    94  	if err != nil {
    95  		panic(err)
    96  	}
    97  
    98  	var cm raftpb.Message
    99  	err = cm.Unmarshal(b)
   100  	if err != nil {
   101  		panic(err)
   102  	}
   103  
   104  	select {
   105  	case to <- cm:
   106  	default:
   107  		// drop messages when the receiver queue is full.
   108  	}
   109  }
   110  
   111  func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message {
   112  	rn.mu.Lock()
   113  	fromc := rn.recvQueues[from]
   114  	if rn.disconnected[from] {
   115  		fromc = nil
   116  	}
   117  	rn.mu.Unlock()
   118  
   119  	return fromc
   120  }
   121  
   122  func (rn *raftNetwork) drop(from, to uint64, rate float64) {
   123  	rn.mu.Lock()
   124  	defer rn.mu.Unlock()
   125  	rn.dropmap[conn{from, to}] = rate
   126  }
   127  
   128  func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
   129  	rn.mu.Lock()
   130  	defer rn.mu.Unlock()
   131  	rn.delaymap[conn{from, to}] = delay{d, rate}
   132  }
   133  
   134  func (rn *raftNetwork) disconnect(id uint64) {
   135  	rn.mu.Lock()
   136  	defer rn.mu.Unlock()
   137  	rn.disconnected[id] = true
   138  }
   139  
   140  func (rn *raftNetwork) connect(id uint64) {
   141  	rn.mu.Lock()
   142  	defer rn.mu.Unlock()
   143  	rn.disconnected[id] = false
   144  }
   145  
   146  type nodeNetwork struct {
   147  	id uint64
   148  	*raftNetwork
   149  }
   150  
   151  func (nt *nodeNetwork) connect() {
   152  	nt.raftNetwork.connect(nt.id)
   153  }
   154  
   155  func (nt *nodeNetwork) disconnect() {
   156  	nt.raftNetwork.disconnect(nt.id)
   157  }
   158  
   159  func (nt *nodeNetwork) send(m raftpb.Message) {
   160  	nt.raftNetwork.send(m)
   161  }
   162  
   163  func (nt *nodeNetwork) recv() chan raftpb.Message {
   164  	return nt.recvFrom(nt.id)
   165  }
   166  

View as plain text