1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package etcdserver
16
17 import (
18 "encoding/json"
19 "expvar"
20 "reflect"
21 "sync"
22 "testing"
23 "time"
24
25 "github.com/stretchr/testify/assert"
26 "go.etcd.io/etcd/client/pkg/v3/types"
27 "go.etcd.io/etcd/pkg/v3/pbutil"
28 "go.etcd.io/etcd/raft/v3"
29 "go.etcd.io/etcd/raft/v3/raftpb"
30 "go.etcd.io/etcd/server/v3/etcdserver/api/membership"
31 "go.etcd.io/etcd/server/v3/mock/mockstorage"
32 "go.uber.org/zap"
33 )
34
35 func TestGetIDs(t *testing.T) {
36 addcc := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
37 addEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(addcc)}
38 removecc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
39 removeEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc)}
40 normalEntry := raftpb.Entry{Type: raftpb.EntryNormal}
41 updatecc := &raftpb.ConfChange{Type: raftpb.ConfChangeUpdateNode, NodeID: 2}
42 updateEntry := raftpb.Entry{Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(updatecc)}
43
44 tests := []struct {
45 confState *raftpb.ConfState
46 ents []raftpb.Entry
47
48 widSet []uint64
49 }{
50 {nil, []raftpb.Entry{}, []uint64{}},
51 {&raftpb.ConfState{Voters: []uint64{1}},
52 []raftpb.Entry{}, []uint64{1}},
53 {&raftpb.ConfState{Voters: []uint64{1}},
54 []raftpb.Entry{addEntry}, []uint64{1, 2}},
55 {&raftpb.ConfState{Voters: []uint64{1}},
56 []raftpb.Entry{addEntry, removeEntry}, []uint64{1}},
57 {&raftpb.ConfState{Voters: []uint64{1}},
58 []raftpb.Entry{addEntry, normalEntry}, []uint64{1, 2}},
59 {&raftpb.ConfState{Voters: []uint64{1}},
60 []raftpb.Entry{addEntry, normalEntry, updateEntry}, []uint64{1, 2}},
61 {&raftpb.ConfState{Voters: []uint64{1}},
62 []raftpb.Entry{addEntry, removeEntry, normalEntry}, []uint64{1}},
63 }
64
65 for i, tt := range tests {
66 var snap raftpb.Snapshot
67 if tt.confState != nil {
68 snap.Metadata.ConfState = *tt.confState
69 }
70 idSet := getIDs(testLogger, &snap, tt.ents)
71 if !reflect.DeepEqual(idSet, tt.widSet) {
72 t.Errorf("#%d: idset = %#v, want %#v", i, idSet, tt.widSet)
73 }
74 }
75 }
76
77 func TestCreateConfigChangeEnts(t *testing.T) {
78 m := membership.Member{
79 ID: types.ID(1),
80 RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
81 }
82 ctx, err := json.Marshal(m)
83 if err != nil {
84 t.Fatal(err)
85 }
86 addcc1 := &raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1, Context: ctx}
87 removecc2 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2}
88 removecc3 := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 3}
89 tests := []struct {
90 ids []uint64
91 self uint64
92 term, index uint64
93
94 wents []raftpb.Entry
95 }{
96 {
97 []uint64{1},
98 1,
99 1, 1,
100
101 nil,
102 },
103 {
104 []uint64{1, 2},
105 1,
106 1, 1,
107
108 []raftpb.Entry{{Term: 1, Index: 2, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}},
109 },
110 {
111 []uint64{1, 2},
112 1,
113 2, 2,
114
115 []raftpb.Entry{{Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)}},
116 },
117 {
118 []uint64{1, 2, 3},
119 1,
120 2, 2,
121
122 []raftpb.Entry{
123 {Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)},
124 {Term: 2, Index: 4, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
125 },
126 },
127 {
128 []uint64{2, 3},
129 2,
130 2, 2,
131
132 []raftpb.Entry{
133 {Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
134 },
135 },
136 {
137 []uint64{2, 3},
138 1,
139 2, 2,
140
141 []raftpb.Entry{
142 {Term: 2, Index: 3, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(addcc1)},
143 {Term: 2, Index: 4, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc2)},
144 {Term: 2, Index: 5, Type: raftpb.EntryConfChange, Data: pbutil.MustMarshal(removecc3)},
145 },
146 },
147 }
148
149 for i, tt := range tests {
150 gents := createConfigChangeEnts(testLogger, tt.ids, tt.self, tt.term, tt.index)
151 if !reflect.DeepEqual(gents, tt.wents) {
152 t.Errorf("#%d: ents = %v, want %v", i, gents, tt.wents)
153 }
154 }
155 }
156
157 func TestStopRaftWhenWaitingForApplyDone(t *testing.T) {
158 n := newNopReadyNode()
159 r := newRaftNode(raftNodeConfig{
160 lg: zap.NewExample(),
161 Node: n,
162 storage: mockstorage.NewStorageRecorder(""),
163 raftStorage: raft.NewMemoryStorage(),
164 transport: newNopTransporter(),
165 })
166 srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *r}
167 srv.r.start(nil)
168 n.readyc <- raft.Ready{}
169 select {
170 case <-srv.r.applyc:
171 case <-time.After(time.Second):
172 t.Fatalf("failed to receive apply struct")
173 }
174
175 srv.r.stopped <- struct{}{}
176 select {
177 case <-srv.r.done:
178 case <-time.After(time.Second):
179 t.Fatalf("failed to stop raft loop")
180 }
181 }
182
183
184 func TestConfigChangeBlocksApply(t *testing.T) {
185 n := newNopReadyNode()
186
187 r := newRaftNode(raftNodeConfig{
188 lg: zap.NewExample(),
189 Node: n,
190 storage: mockstorage.NewStorageRecorder(""),
191 raftStorage: raft.NewMemoryStorage(),
192 transport: newNopTransporter(),
193 })
194 srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *r}
195
196 srv.r.start(&raftReadyHandler{
197 getLead: func() uint64 { return 0 },
198 updateLead: func(uint64) {},
199 updateLeadership: func(bool) {},
200 })
201 defer srv.r.stop()
202
203 n.readyc <- raft.Ready{
204 SoftState: &raft.SoftState{RaftState: raft.StateFollower},
205 CommittedEntries: []raftpb.Entry{{Type: raftpb.EntryConfChange}},
206 }
207 ap := <-srv.r.applyc
208
209 continueC := make(chan struct{})
210 go func() {
211 n.readyc <- raft.Ready{}
212 <-srv.r.applyc
213 close(continueC)
214 }()
215
216 select {
217 case <-continueC:
218 t.Fatalf("unexpected execution: raft routine should block waiting for apply")
219 case <-time.After(time.Second):
220 }
221
222
223 <-ap.notifyc
224
225 select {
226 case <-continueC:
227 case <-time.After(time.Second):
228 t.Fatalf("unexpected blocking on execution")
229 }
230 }
231
232 func TestProcessDuplicatedAppRespMessage(t *testing.T) {
233 n := newNopReadyNode()
234 cl := membership.NewCluster(zap.NewExample())
235
236 rs := raft.NewMemoryStorage()
237 p := mockstorage.NewStorageRecorder("")
238 tr, sendc := newSendMsgAppRespTransporter()
239 r := newRaftNode(raftNodeConfig{
240 lg: zap.NewExample(),
241 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
242 Node: n,
243 transport: tr,
244 storage: p,
245 raftStorage: rs,
246 })
247
248 s := &EtcdServer{
249 lgMu: new(sync.RWMutex),
250 lg: zap.NewExample(),
251 r: *r,
252 cluster: cl,
253 SyncTicker: &time.Ticker{},
254 }
255
256 s.start()
257 defer s.Stop()
258
259 lead := uint64(1)
260
261 n.readyc <- raft.Ready{Messages: []raftpb.Message{
262 {Type: raftpb.MsgAppResp, From: 2, To: lead, Term: 1, Index: 1},
263 {Type: raftpb.MsgAppResp, From: 2, To: lead, Term: 1, Index: 2},
264 {Type: raftpb.MsgAppResp, From: 2, To: lead, Term: 1, Index: 3},
265 }}
266
267 got, want := <-sendc, 1
268 if got != want {
269 t.Errorf("count = %d, want %d", got, want)
270 }
271 }
272
273
274
275 func TestExpvarWithNoRaftStatus(t *testing.T) {
276 defer func() {
277 if err := recover(); err != nil {
278 t.Fatal(err)
279 }
280 }()
281 expvar.Do(func(kv expvar.KeyValue) {
282 _ = kv.Value.String()
283 })
284 }
285
286 func TestShouldWaitWALSync(t *testing.T) {
287 testcases := []struct {
288 name string
289 unstableEntries []raftpb.Entry
290 commitedEntries []raftpb.Entry
291 expectedResult bool
292 }{
293 {
294 name: "both entries are nil",
295 unstableEntries: nil,
296 commitedEntries: nil,
297 expectedResult: false,
298 },
299 {
300 name: "both entries are empty slices",
301 unstableEntries: []raftpb.Entry{},
302 commitedEntries: []raftpb.Entry{},
303 expectedResult: false,
304 },
305 {
306 name: "one nil and the other empty",
307 unstableEntries: nil,
308 commitedEntries: []raftpb.Entry{},
309 expectedResult: false,
310 },
311 {
312 name: "one nil and the other has data",
313 unstableEntries: nil,
314 commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
315 expectedResult: false,
316 },
317 {
318 name: "one empty and the other has data",
319 unstableEntries: []raftpb.Entry{},
320 commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
321 expectedResult: false,
322 },
323 {
324 name: "has different term and index",
325 unstableEntries: []raftpb.Entry{{Term: 5, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
326 commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
327 expectedResult: false,
328 },
329 {
330 name: "has identical data",
331 unstableEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
332 commitedEntries: []raftpb.Entry{{Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}}},
333 expectedResult: true,
334 },
335 {
336 name: "has overlapped entry",
337 unstableEntries: []raftpb.Entry{
338 {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
339 {Term: 4, Index: 11, Type: raftpb.EntryNormal, Data: []byte{0x44, 0x55, 0x66}},
340 {Term: 4, Index: 12, Type: raftpb.EntryNormal, Data: []byte{0x77, 0x88, 0x99}},
341 },
342 commitedEntries: []raftpb.Entry{
343 {Term: 4, Index: 8, Type: raftpb.EntryNormal, Data: []byte{0x07, 0x08, 0x09}},
344 {Term: 4, Index: 9, Type: raftpb.EntryNormal, Data: []byte{0x10, 0x11, 0x12}},
345 {Term: 4, Index: 10, Type: raftpb.EntryNormal, Data: []byte{0x11, 0x22, 0x33}},
346 },
347 expectedResult: true,
348 },
349 }
350
351 for _, tc := range testcases {
352 t.Run(tc.name, func(t *testing.T) {
353 shouldWALSync := shouldWaitWALSync(raft.Ready{
354 Entries: tc.unstableEntries,
355 CommittedEntries: tc.commitedEntries,
356 })
357 assert.Equal(t, tc.expectedResult, shouldWALSync)
358 })
359 }
360 }
361
View as plain text