...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafttest
16
17 import (
18 "math/rand"
19 "sync"
20 "time"
21
22 "go.etcd.io/etcd/raft/v3/raftpb"
23 )
24
25
26 type iface interface {
27 send(m raftpb.Message)
28 recv() chan raftpb.Message
29 disconnect()
30 connect()
31 }
32
33 type raftNetwork struct {
34 rand *rand.Rand
35 mu sync.Mutex
36 disconnected map[uint64]bool
37 dropmap map[conn]float64
38 delaymap map[conn]delay
39 recvQueues map[uint64]chan raftpb.Message
40 }
41
42 type conn struct {
43 from, to uint64
44 }
45
46 type delay struct {
47 d time.Duration
48 rate float64
49 }
50
51 func newRaftNetwork(nodes ...uint64) *raftNetwork {
52 pn := &raftNetwork{
53 rand: rand.New(rand.NewSource(1)),
54 recvQueues: make(map[uint64]chan raftpb.Message),
55 dropmap: make(map[conn]float64),
56 delaymap: make(map[conn]delay),
57 disconnected: make(map[uint64]bool),
58 }
59
60 for _, n := range nodes {
61 pn.recvQueues[n] = make(chan raftpb.Message, 1024)
62 }
63 return pn
64 }
65
66 func (rn *raftNetwork) nodeNetwork(id uint64) iface {
67 return &nodeNetwork{id: id, raftNetwork: rn}
68 }
69
70 func (rn *raftNetwork) send(m raftpb.Message) {
71 rn.mu.Lock()
72 to := rn.recvQueues[m.To]
73 if rn.disconnected[m.To] {
74 to = nil
75 }
76 drop := rn.dropmap[conn{m.From, m.To}]
77 dl := rn.delaymap[conn{m.From, m.To}]
78 rn.mu.Unlock()
79
80 if to == nil {
81 return
82 }
83 if drop != 0 && rn.rand.Float64() < drop {
84 return
85 }
86
87 if dl.d != 0 && rn.rand.Float64() < dl.rate {
88 rd := rn.rand.Int63n(int64(dl.d))
89 time.Sleep(time.Duration(rd))
90 }
91
92
93 b, err := m.Marshal()
94 if err != nil {
95 panic(err)
96 }
97
98 var cm raftpb.Message
99 err = cm.Unmarshal(b)
100 if err != nil {
101 panic(err)
102 }
103
104 select {
105 case to <- cm:
106 default:
107
108 }
109 }
110
111 func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message {
112 rn.mu.Lock()
113 fromc := rn.recvQueues[from]
114 if rn.disconnected[from] {
115 fromc = nil
116 }
117 rn.mu.Unlock()
118
119 return fromc
120 }
121
122 func (rn *raftNetwork) drop(from, to uint64, rate float64) {
123 rn.mu.Lock()
124 defer rn.mu.Unlock()
125 rn.dropmap[conn{from, to}] = rate
126 }
127
128 func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
129 rn.mu.Lock()
130 defer rn.mu.Unlock()
131 rn.delaymap[conn{from, to}] = delay{d, rate}
132 }
133
134 func (rn *raftNetwork) disconnect(id uint64) {
135 rn.mu.Lock()
136 defer rn.mu.Unlock()
137 rn.disconnected[id] = true
138 }
139
140 func (rn *raftNetwork) connect(id uint64) {
141 rn.mu.Lock()
142 defer rn.mu.Unlock()
143 rn.disconnected[id] = false
144 }
145
146 type nodeNetwork struct {
147 id uint64
148 *raftNetwork
149 }
150
151 func (nt *nodeNetwork) connect() {
152 nt.raftNetwork.connect(nt.id)
153 }
154
155 func (nt *nodeNetwork) disconnect() {
156 nt.raftNetwork.disconnect(nt.id)
157 }
158
159 func (nt *nodeNetwork) send(m raftpb.Message) {
160 nt.raftNetwork.send(m)
161 }
162
163 func (nt *nodeNetwork) recv() chan raftpb.Message {
164 return nt.recvFrom(nt.id)
165 }
166
View as plain text