...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafttest
16
17 import (
18 "context"
19 "log"
20 "math/rand"
21 "sync"
22 "time"
23
24 "go.etcd.io/etcd/raft/v3"
25 "go.etcd.io/etcd/raft/v3/raftpb"
26 )
27
28 type node struct {
29 raft.Node
30 id uint64
31 iface iface
32 stopc chan struct{}
33 pausec chan bool
34
35
36 storage *raft.MemoryStorage
37
38 mu sync.Mutex
39 state raftpb.HardState
40 }
41
42 func startNode(id uint64, peers []raft.Peer, iface iface) *node {
43 st := raft.NewMemoryStorage()
44 c := &raft.Config{
45 ID: id,
46 ElectionTick: 10,
47 HeartbeatTick: 1,
48 Storage: st,
49 MaxSizePerMsg: 1024 * 1024,
50 MaxInflightMsgs: 256,
51 MaxUncommittedEntriesSize: 1 << 30,
52 }
53 rn := raft.StartNode(c, peers)
54 n := &node{
55 Node: rn,
56 id: id,
57 storage: st,
58 iface: iface,
59 pausec: make(chan bool),
60 }
61 n.start()
62 return n
63 }
64
65 func (n *node) start() {
66 n.stopc = make(chan struct{})
67 ticker := time.NewTicker(5 * time.Millisecond).C
68
69 go func() {
70 for {
71 select {
72 case <-ticker:
73 n.Tick()
74 case rd := <-n.Ready():
75 if !raft.IsEmptyHardState(rd.HardState) {
76 n.mu.Lock()
77 n.state = rd.HardState
78 n.mu.Unlock()
79 n.storage.SetHardState(n.state)
80 }
81 n.storage.Append(rd.Entries)
82 time.Sleep(time.Millisecond)
83
84
85 for _, m := range rd.Messages {
86 mlocal := m
87 go func() {
88 time.Sleep(time.Duration(rand.Int63n(10)) * time.Millisecond)
89 n.iface.send(mlocal)
90 }()
91 }
92 n.Advance()
93 case m := <-n.iface.recv():
94 go n.Step(context.TODO(), m)
95 case <-n.stopc:
96 n.Stop()
97 log.Printf("raft.%d: stop", n.id)
98 n.Node = nil
99 close(n.stopc)
100 return
101 case p := <-n.pausec:
102 recvms := make([]raftpb.Message, 0)
103 for p {
104 select {
105 case m := <-n.iface.recv():
106 recvms = append(recvms, m)
107 case p = <-n.pausec:
108 }
109 }
110
111 for _, m := range recvms {
112 n.Step(context.TODO(), m)
113 }
114 }
115 }
116 }()
117 }
118
119
120
121
122 func (n *node) stop() {
123 n.iface.disconnect()
124 n.stopc <- struct{}{}
125
126 <-n.stopc
127 }
128
129
130
131 func (n *node) restart() {
132
133 <-n.stopc
134 c := &raft.Config{
135 ID: n.id,
136 ElectionTick: 10,
137 HeartbeatTick: 1,
138 Storage: n.storage,
139 MaxSizePerMsg: 1024 * 1024,
140 MaxInflightMsgs: 256,
141 MaxUncommittedEntriesSize: 1 << 30,
142 }
143 n.Node = raft.RestartNode(c)
144 n.start()
145 n.iface.connect()
146 }
147
148
149
150
151 func (n *node) pause() {
152 n.pausec <- true
153 }
154
155
156 func (n *node) resume() {
157 n.pausec <- false
158 }
159
View as plain text