1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package raft
16
17 import (
18 "bytes"
19 "context"
20 "fmt"
21 "math"
22 "reflect"
23 "strings"
24 "testing"
25 "time"
26
27 "go.etcd.io/etcd/client/pkg/v3/testutil"
28 "go.etcd.io/etcd/raft/v3/raftpb"
29 )
30
31
32
33
34
35 func readyWithTimeout(n Node) Ready {
36 select {
37 case rd := <-n.Ready():
38 return rd
39 case <-time.After(time.Second):
40 panic("timed out waiting for ready")
41 }
42 }
43
44
45
46 func TestNodeStep(t *testing.T) {
47 for i, msgn := range raftpb.MessageType_name {
48 n := &node{
49 propc: make(chan msgWithResult, 1),
50 recvc: make(chan raftpb.Message, 1),
51 }
52 msgt := raftpb.MessageType(i)
53 n.Step(context.TODO(), raftpb.Message{Type: msgt})
54
55 if msgt == raftpb.MsgProp {
56 select {
57 case <-n.propc:
58 default:
59 t.Errorf("%d: cannot receive %s on propc chan", msgt, msgn)
60 }
61 } else {
62 if IsLocalMsg(msgt) {
63 select {
64 case <-n.recvc:
65 t.Errorf("%d: step should ignore %s", msgt, msgn)
66 default:
67 }
68 } else {
69 select {
70 case <-n.recvc:
71 default:
72 t.Errorf("%d: cannot receive %s on recvc chan", msgt, msgn)
73 }
74 }
75 }
76 }
77 }
78
79
80 func TestNodeStepUnblock(t *testing.T) {
81
82 n := &node{
83 propc: make(chan msgWithResult),
84 done: make(chan struct{}),
85 }
86
87 ctx, cancel := context.WithCancel(context.Background())
88 stopFunc := func() { close(n.done) }
89
90 tests := []struct {
91 unblock func()
92 werr error
93 }{
94 {stopFunc, ErrStopped},
95 {cancel, context.Canceled},
96 }
97
98 for i, tt := range tests {
99 errc := make(chan error, 1)
100 go func() {
101 err := n.Step(ctx, raftpb.Message{Type: raftpb.MsgProp})
102 errc <- err
103 }()
104 tt.unblock()
105 select {
106 case err := <-errc:
107 if err != tt.werr {
108 t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
109 }
110
111 if ctx.Err() != nil {
112 ctx = context.TODO()
113 }
114 select {
115 case <-n.done:
116 n.done = make(chan struct{})
117 default:
118 }
119 case <-time.After(1 * time.Second):
120 t.Fatalf("#%d: failed to unblock step", i)
121 }
122 }
123 }
124
125
126 func TestNodePropose(t *testing.T) {
127 msgs := []raftpb.Message{}
128 appendStep := func(r *raft, m raftpb.Message) error {
129 msgs = append(msgs, m)
130 return nil
131 }
132
133 s := newTestMemoryStorage(withPeers(1))
134 rn := newTestRawNode(1, 10, 1, s)
135 n := newNode(rn)
136 r := rn.raft
137 go n.run()
138 if err := n.Campaign(context.TODO()); err != nil {
139 t.Fatal(err)
140 }
141 for {
142 rd := <-n.Ready()
143 s.Append(rd.Entries)
144
145 if rd.SoftState.Lead == r.id {
146 r.step = appendStep
147 n.Advance()
148 break
149 }
150 n.Advance()
151 }
152 n.Propose(context.TODO(), []byte("somedata"))
153 n.Stop()
154
155 if len(msgs) != 1 {
156 t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
157 }
158 if msgs[0].Type != raftpb.MsgProp {
159 t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
160 }
161 if !bytes.Equal(msgs[0].Entries[0].Data, []byte("somedata")) {
162 t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, []byte("somedata"))
163 }
164 }
165
166
167
168 func TestNodeReadIndex(t *testing.T) {
169 msgs := []raftpb.Message{}
170 appendStep := func(r *raft, m raftpb.Message) error {
171 msgs = append(msgs, m)
172 return nil
173 }
174 wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
175
176 s := newTestMemoryStorage(withPeers(1))
177 rn := newTestRawNode(1, 10, 1, s)
178 n := newNode(rn)
179 r := rn.raft
180 r.readStates = wrs
181
182 go n.run()
183 n.Campaign(context.TODO())
184 for {
185 rd := <-n.Ready()
186 if !reflect.DeepEqual(rd.ReadStates, wrs) {
187 t.Errorf("ReadStates = %v, want %v", rd.ReadStates, wrs)
188 }
189
190 s.Append(rd.Entries)
191
192 if rd.SoftState.Lead == r.id {
193 n.Advance()
194 break
195 }
196 n.Advance()
197 }
198
199 r.step = appendStep
200 wrequestCtx := []byte("somedata2")
201 n.ReadIndex(context.TODO(), wrequestCtx)
202 n.Stop()
203
204 if len(msgs) != 1 {
205 t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
206 }
207 if msgs[0].Type != raftpb.MsgReadIndex {
208 t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgReadIndex)
209 }
210 if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
211 t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
212 }
213 }
214
215
216
217 func TestDisableProposalForwarding(t *testing.T) {
218 r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
219 r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
220 cfg3 := newTestConfig(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
221 cfg3.DisableProposalForwarding = true
222 r3 := newRaft(cfg3)
223 nt := newNetwork(r1, r2, r3)
224
225
226 nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
227
228 var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
229
230
231 r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgProp, Entries: testEntries})
232
233
234 if len(r2.msgs) != 1 {
235 t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
236 }
237
238
239 r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgProp, Entries: testEntries})
240
241
242 if len(r3.msgs) != 0 {
243 t.Fatalf("len(r3.msgs) expected 0, got %d", len(r3.msgs))
244 }
245 }
246
247
248
249 func TestNodeReadIndexToOldLeader(t *testing.T) {
250 r1 := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
251 r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
252 r3 := newTestRaft(3, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
253
254 nt := newNetwork(r1, r2, r3)
255
256
257 nt.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
258
259 var testEntries = []raftpb.Entry{{Data: []byte("testdata")}}
260
261
262 r2.Step(raftpb.Message{From: 2, To: 2, Type: raftpb.MsgReadIndex, Entries: testEntries})
263
264
265 if len(r2.msgs) != 1 {
266 t.Fatalf("len(r2.msgs) expected 1, got %d", len(r2.msgs))
267 }
268 readIndxMsg1 := raftpb.Message{From: 2, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
269 if !reflect.DeepEqual(r2.msgs[0], readIndxMsg1) {
270 t.Fatalf("r2.msgs[0] expected %+v, got %+v", readIndxMsg1, r2.msgs[0])
271 }
272
273
274 r3.Step(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries})
275
276
277 if len(r3.msgs) != 1 {
278 t.Fatalf("len(r3.msgs) expected 1, got %d", len(r3.msgs))
279 }
280 readIndxMsg2 := raftpb.Message{From: 3, To: 1, Type: raftpb.MsgReadIndex, Entries: testEntries}
281 if !reflect.DeepEqual(r3.msgs[0], readIndxMsg2) {
282 t.Fatalf("r3.msgs[0] expected %+v, got %+v", readIndxMsg2, r3.msgs[0])
283 }
284
285
286 nt.send(raftpb.Message{From: 3, To: 3, Type: raftpb.MsgHup})
287
288
289 r1.Step(readIndxMsg1)
290 r1.Step(readIndxMsg2)
291
292
293 if len(r1.msgs) != 2 {
294 t.Fatalf("len(r1.msgs) expected 1, got %d", len(r1.msgs))
295 }
296 readIndxMsg3 := raftpb.Message{From: 2, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
297 if !reflect.DeepEqual(r1.msgs[0], readIndxMsg3) {
298 t.Fatalf("r1.msgs[0] expected %+v, got %+v", readIndxMsg3, r1.msgs[0])
299 }
300 readIndxMsg3 = raftpb.Message{From: 3, To: 3, Type: raftpb.MsgReadIndex, Entries: testEntries}
301 if !reflect.DeepEqual(r1.msgs[1], readIndxMsg3) {
302 t.Fatalf("r1.msgs[1] expected %+v, got %+v", readIndxMsg3, r1.msgs[1])
303 }
304 }
305
306
307
308 func TestNodeProposeConfig(t *testing.T) {
309 msgs := []raftpb.Message{}
310 appendStep := func(r *raft, m raftpb.Message) error {
311 msgs = append(msgs, m)
312 return nil
313 }
314
315 s := newTestMemoryStorage(withPeers(1))
316 rn := newTestRawNode(1, 10, 1, s)
317 n := newNode(rn)
318 r := rn.raft
319 go n.run()
320 n.Campaign(context.TODO())
321 for {
322 rd := <-n.Ready()
323 s.Append(rd.Entries)
324
325 if rd.SoftState.Lead == r.id {
326 r.step = appendStep
327 n.Advance()
328 break
329 }
330 n.Advance()
331 }
332 cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
333 ccdata, err := cc.Marshal()
334 if err != nil {
335 t.Fatal(err)
336 }
337 n.ProposeConfChange(context.TODO(), cc)
338 n.Stop()
339
340 if len(msgs) != 1 {
341 t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
342 }
343 if msgs[0].Type != raftpb.MsgProp {
344 t.Errorf("msg type = %d, want %d", msgs[0].Type, raftpb.MsgProp)
345 }
346 if !bytes.Equal(msgs[0].Entries[0].Data, ccdata) {
347 t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, ccdata)
348 }
349 }
350
351
352
353 func TestNodeProposeAddDuplicateNode(t *testing.T) {
354 s := newTestMemoryStorage(withPeers(1))
355 rn := newTestRawNode(1, 10, 1, s)
356 n := newNode(rn)
357 go n.run()
358 n.Campaign(context.TODO())
359 rdyEntries := make([]raftpb.Entry, 0)
360 ticker := time.NewTicker(time.Millisecond * 100)
361 defer ticker.Stop()
362 done := make(chan struct{})
363 stop := make(chan struct{})
364 applyConfChan := make(chan struct{})
365
366 go func() {
367 defer close(done)
368 for {
369 select {
370 case <-stop:
371 return
372 case <-ticker.C:
373 n.Tick()
374 case rd := <-n.Ready():
375 s.Append(rd.Entries)
376 applied := false
377 for _, e := range rd.Entries {
378 rdyEntries = append(rdyEntries, e)
379 switch e.Type {
380 case raftpb.EntryNormal:
381 case raftpb.EntryConfChange:
382 var cc raftpb.ConfChange
383 cc.Unmarshal(e.Data)
384 n.ApplyConfChange(cc)
385 applied = true
386 }
387 }
388 n.Advance()
389 if applied {
390 applyConfChan <- struct{}{}
391 }
392 }
393 }
394 }()
395
396 cc1 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
397 ccdata1, _ := cc1.Marshal()
398 n.ProposeConfChange(context.TODO(), cc1)
399 <-applyConfChan
400
401
402 n.ProposeConfChange(context.TODO(), cc1)
403 <-applyConfChan
404
405
406 cc2 := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 2}
407 ccdata2, _ := cc2.Marshal()
408 n.ProposeConfChange(context.TODO(), cc2)
409 <-applyConfChan
410
411 close(stop)
412 <-done
413
414 if len(rdyEntries) != 4 {
415 t.Errorf("len(entry) = %d, want %d, %v\n", len(rdyEntries), 4, rdyEntries)
416 }
417 if !bytes.Equal(rdyEntries[1].Data, ccdata1) {
418 t.Errorf("data = %v, want %v", rdyEntries[1].Data, ccdata1)
419 }
420 if !bytes.Equal(rdyEntries[3].Data, ccdata2) {
421 t.Errorf("data = %v, want %v", rdyEntries[3].Data, ccdata2)
422 }
423 n.Stop()
424 }
425
426
427
428
429 func TestBlockProposal(t *testing.T) {
430 rn := newTestRawNode(1, 10, 1, newTestMemoryStorage(withPeers(1)))
431 n := newNode(rn)
432 go n.run()
433 defer n.Stop()
434
435 errc := make(chan error, 1)
436 go func() {
437 errc <- n.Propose(context.TODO(), []byte("somedata"))
438 }()
439
440 testutil.WaitSchedule()
441 select {
442 case err := <-errc:
443 t.Errorf("err = %v, want blocking", err)
444 default:
445 }
446
447 n.Campaign(context.TODO())
448 select {
449 case err := <-errc:
450 if err != nil {
451 t.Errorf("err = %v, want %v", err, nil)
452 }
453 case <-time.After(10 * time.Second):
454 t.Errorf("blocking proposal, want unblocking")
455 }
456 }
457
458 func TestNodeProposeWaitDropped(t *testing.T) {
459 msgs := []raftpb.Message{}
460 droppingMsg := []byte("test_dropping")
461 dropStep := func(r *raft, m raftpb.Message) error {
462 if m.Type == raftpb.MsgProp && strings.Contains(m.String(), string(droppingMsg)) {
463 t.Logf("dropping message: %v", m.String())
464 return ErrProposalDropped
465 }
466 msgs = append(msgs, m)
467 return nil
468 }
469
470 s := newTestMemoryStorage(withPeers(1))
471 rn := newTestRawNode(1, 10, 1, s)
472 n := newNode(rn)
473 r := rn.raft
474 go n.run()
475 n.Campaign(context.TODO())
476 for {
477 rd := <-n.Ready()
478 s.Append(rd.Entries)
479
480 if rd.SoftState.Lead == r.id {
481 r.step = dropStep
482 n.Advance()
483 break
484 }
485 n.Advance()
486 }
487 proposalTimeout := time.Millisecond * 100
488 ctx, cancel := context.WithTimeout(context.Background(), proposalTimeout)
489
490 err := n.Propose(ctx, droppingMsg)
491 if err != ErrProposalDropped {
492 t.Errorf("should drop proposal : %v", err)
493 }
494 cancel()
495
496 n.Stop()
497 if len(msgs) != 0 {
498 t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
499 }
500 }
501
502
503
504 func TestNodeTick(t *testing.T) {
505 s := newTestMemoryStorage(withPeers(1))
506 rn := newTestRawNode(1, 10, 1, s)
507 n := newNode(rn)
508 r := rn.raft
509 go n.run()
510 elapsed := r.electionElapsed
511 n.Tick()
512
513 for len(n.tickc) != 0 {
514 time.Sleep(100 * time.Millisecond)
515 }
516
517 n.Stop()
518 if r.electionElapsed != elapsed+1 {
519 t.Errorf("elapsed = %d, want %d", r.electionElapsed, elapsed+1)
520 }
521 }
522
523
524
525 func TestNodeStop(t *testing.T) {
526 rn := newTestRawNode(1, 10, 1, newTestMemoryStorage(withPeers(1)))
527 n := newNode(rn)
528 donec := make(chan struct{})
529
530 go func() {
531 n.run()
532 close(donec)
533 }()
534
535 status := n.Status()
536 n.Stop()
537
538 select {
539 case <-donec:
540 case <-time.After(time.Second):
541 t.Fatalf("timed out waiting for node to stop!")
542 }
543
544 emptyStatus := Status{}
545
546 if reflect.DeepEqual(status, emptyStatus) {
547 t.Errorf("status = %v, want not empty", status)
548 }
549
550 status = n.Status()
551 if !reflect.DeepEqual(status, emptyStatus) {
552 t.Errorf("status = %v, want empty", status)
553 }
554
555 n.Stop()
556 }
557
558 func TestReadyContainUpdates(t *testing.T) {
559 tests := []struct {
560 rd Ready
561 wcontain bool
562 }{
563 {Ready{}, false},
564 {Ready{SoftState: &SoftState{Lead: 1}}, true},
565 {Ready{HardState: raftpb.HardState{Vote: 1}}, true},
566 {Ready{Entries: make([]raftpb.Entry, 1)}, true},
567 {Ready{CommittedEntries: make([]raftpb.Entry, 1)}, true},
568 {Ready{Messages: make([]raftpb.Message, 1)}, true},
569 {Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}, true},
570 }
571
572 for i, tt := range tests {
573 if g := tt.rd.containsUpdates(); g != tt.wcontain {
574 t.Errorf("#%d: containUpdates = %v, want %v", i, g, tt.wcontain)
575 }
576 }
577 }
578
579
580
581
582 func TestNodeStart(t *testing.T) {
583 ctx, cancel := context.WithCancel(context.Background())
584 defer cancel()
585
586 cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1}
587 ccdata, err := cc.Marshal()
588 if err != nil {
589 t.Fatalf("unexpected marshal error: %v", err)
590 }
591 wants := []Ready{
592 {
593 HardState: raftpb.HardState{Term: 1, Commit: 1, Vote: 0},
594 Entries: []raftpb.Entry{
595 {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
596 },
597 CommittedEntries: []raftpb.Entry{
598 {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata},
599 },
600 MustSync: true,
601 },
602 {
603 HardState: raftpb.HardState{Term: 2, Commit: 3, Vote: 1},
604 Entries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
605 CommittedEntries: []raftpb.Entry{{Term: 2, Index: 3, Data: []byte("foo")}},
606 MustSync: true,
607 },
608 }
609 storage := NewMemoryStorage()
610 c := &Config{
611 ID: 1,
612 ElectionTick: 10,
613 HeartbeatTick: 1,
614 Storage: storage,
615 MaxSizePerMsg: noLimit,
616 MaxInflightMsgs: 256,
617 }
618 n := StartNode(c, []Peer{{ID: 1}})
619 defer n.Stop()
620 g := <-n.Ready()
621 if !reflect.DeepEqual(g, wants[0]) {
622 t.Fatalf("#%d: g = %+v,\n w %+v", 1, g, wants[0])
623 } else {
624 storage.Append(g.Entries)
625 n.Advance()
626 }
627
628 if err := n.Campaign(ctx); err != nil {
629 t.Fatal(err)
630 }
631 rd := <-n.Ready()
632 storage.Append(rd.Entries)
633 n.Advance()
634
635 n.Propose(ctx, []byte("foo"))
636 if g2 := <-n.Ready(); !reflect.DeepEqual(g2, wants[1]) {
637 t.Errorf("#%d: g = %+v,\n w %+v", 2, g2, wants[1])
638 } else {
639 storage.Append(g2.Entries)
640 n.Advance()
641 }
642
643 select {
644 case rd := <-n.Ready():
645 t.Errorf("unexpected Ready: %+v", rd)
646 case <-time.After(time.Millisecond):
647 }
648 }
649
650 func TestNodeRestart(t *testing.T) {
651 entries := []raftpb.Entry{
652 {Term: 1, Index: 1},
653 {Term: 1, Index: 2, Data: []byte("foo")},
654 }
655 st := raftpb.HardState{Term: 1, Commit: 1}
656
657 want := Ready{
658
659 HardState: raftpb.HardState{},
660
661 CommittedEntries: entries[:st.Commit],
662
663 MustSync: false,
664 }
665
666 storage := NewMemoryStorage()
667 storage.SetHardState(st)
668 storage.Append(entries)
669 c := &Config{
670 ID: 1,
671 ElectionTick: 10,
672 HeartbeatTick: 1,
673 Storage: storage,
674 MaxSizePerMsg: noLimit,
675 MaxInflightMsgs: 256,
676 }
677 n := RestartNode(c)
678 defer n.Stop()
679 if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
680 t.Errorf("g = %+v,\n w %+v", g, want)
681 }
682 n.Advance()
683
684 select {
685 case rd := <-n.Ready():
686 t.Errorf("unexpected Ready: %+v", rd)
687 case <-time.After(time.Millisecond):
688 }
689 }
690
691 func TestNodeRestartFromSnapshot(t *testing.T) {
692 snap := raftpb.Snapshot{
693 Metadata: raftpb.SnapshotMetadata{
694 ConfState: raftpb.ConfState{Voters: []uint64{1, 2}},
695 Index: 2,
696 Term: 1,
697 },
698 }
699 entries := []raftpb.Entry{
700 {Term: 1, Index: 3, Data: []byte("foo")},
701 }
702 st := raftpb.HardState{Term: 1, Commit: 3}
703
704 want := Ready{
705
706
707 HardState: raftpb.HardState{},
708
709 CommittedEntries: entries,
710
711
712 MustSync: false,
713 }
714
715 s := NewMemoryStorage()
716 s.SetHardState(st)
717 s.ApplySnapshot(snap)
718 s.Append(entries)
719 c := &Config{
720 ID: 1,
721 ElectionTick: 10,
722 HeartbeatTick: 1,
723 Storage: s,
724 MaxSizePerMsg: noLimit,
725 MaxInflightMsgs: 256,
726 }
727 n := RestartNode(c)
728 defer n.Stop()
729 if g := <-n.Ready(); !reflect.DeepEqual(g, want) {
730 t.Errorf("g = %+v,\n w %+v", g, want)
731 } else {
732 n.Advance()
733 }
734
735 select {
736 case rd := <-n.Ready():
737 t.Errorf("unexpected Ready: %+v", rd)
738 case <-time.After(time.Millisecond):
739 }
740 }
741
742 func TestNodeAdvance(t *testing.T) {
743 ctx, cancel := context.WithCancel(context.Background())
744 defer cancel()
745
746 storage := NewMemoryStorage()
747 c := &Config{
748 ID: 1,
749 ElectionTick: 10,
750 HeartbeatTick: 1,
751 Storage: storage,
752 MaxSizePerMsg: noLimit,
753 MaxInflightMsgs: 256,
754 }
755 n := StartNode(c, []Peer{{ID: 1}})
756 defer n.Stop()
757 rd := <-n.Ready()
758 storage.Append(rd.Entries)
759 n.Advance()
760
761 n.Campaign(ctx)
762 <-n.Ready()
763
764 n.Propose(ctx, []byte("foo"))
765 select {
766 case rd = <-n.Ready():
767 t.Fatalf("unexpected Ready before Advance: %+v", rd)
768 case <-time.After(time.Millisecond):
769 }
770 storage.Append(rd.Entries)
771 n.Advance()
772 select {
773 case <-n.Ready():
774 case <-time.After(100 * time.Millisecond):
775 t.Errorf("expect Ready after Advance, but there is no Ready available")
776 }
777 }
778
779 func TestSoftStateEqual(t *testing.T) {
780 tests := []struct {
781 st *SoftState
782 we bool
783 }{
784 {&SoftState{}, true},
785 {&SoftState{Lead: 1}, false},
786 {&SoftState{RaftState: StateLeader}, false},
787 }
788 for i, tt := range tests {
789 if g := tt.st.equal(&SoftState{}); g != tt.we {
790 t.Errorf("#%d, equal = %v, want %v", i, g, tt.we)
791 }
792 }
793 }
794
795 func TestIsHardStateEqual(t *testing.T) {
796 tests := []struct {
797 st raftpb.HardState
798 we bool
799 }{
800 {emptyState, true},
801 {raftpb.HardState{Vote: 1}, false},
802 {raftpb.HardState{Commit: 1}, false},
803 {raftpb.HardState{Term: 1}, false},
804 }
805
806 for i, tt := range tests {
807 if isHardStateEqual(tt.st, emptyState) != tt.we {
808 t.Errorf("#%d, equal = %v, want %v", i, isHardStateEqual(tt.st, emptyState), tt.we)
809 }
810 }
811 }
812
813 func TestNodeProposeAddLearnerNode(t *testing.T) {
814 ticker := time.NewTicker(time.Millisecond * 100)
815 defer ticker.Stop()
816 s := newTestMemoryStorage(withPeers(1))
817 rn := newTestRawNode(1, 10, 1, s)
818 n := newNode(rn)
819 go n.run()
820 n.Campaign(context.TODO())
821 stop := make(chan struct{})
822 done := make(chan struct{})
823 applyConfChan := make(chan struct{})
824 go func() {
825 defer close(done)
826 for {
827 select {
828 case <-stop:
829 return
830 case <-ticker.C:
831 n.Tick()
832 case rd := <-n.Ready():
833 s.Append(rd.Entries)
834 t.Logf("raft: %v", rd.Entries)
835 for _, ent := range rd.Entries {
836 if ent.Type != raftpb.EntryConfChange {
837 continue
838 }
839 var cc raftpb.ConfChange
840 cc.Unmarshal(ent.Data)
841 state := n.ApplyConfChange(cc)
842 if len(state.Learners) == 0 ||
843 state.Learners[0] != cc.NodeID ||
844 cc.NodeID != 2 {
845 t.Errorf("apply conf change should return new added learner: %v", state.String())
846 }
847
848 if len(state.Voters) != 1 {
849 t.Errorf("add learner should not change the nodes: %v", state.String())
850 }
851 t.Logf("apply raft conf %v changed to: %v", cc, state.String())
852 applyConfChan <- struct{}{}
853 }
854 n.Advance()
855 }
856 }
857 }()
858 cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddLearnerNode, NodeID: 2}
859 n.ProposeConfChange(context.TODO(), cc)
860 <-applyConfChan
861 close(stop)
862 <-done
863 }
864
865 func TestAppendPagination(t *testing.T) {
866 const maxSizePerMsg = 2048
867 n := newNetworkWithConfig(func(c *Config) {
868 c.MaxSizePerMsg = maxSizePerMsg
869 }, nil, nil, nil)
870
871 seenFullMessage := false
872
873
874 n.msgHook = func(m raftpb.Message) bool {
875 if m.Type == raftpb.MsgApp {
876 size := 0
877 for _, e := range m.Entries {
878 size += len(e.Data)
879 }
880 if size > maxSizePerMsg {
881 t.Errorf("sent MsgApp that is too large: %d bytes", size)
882 }
883 if size > maxSizePerMsg/2 {
884 seenFullMessage = true
885 }
886 }
887 return true
888 }
889
890 n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgHup})
891
892
893
894 n.isolate(1)
895 blob := []byte(strings.Repeat("a", 1000))
896 for i := 0; i < 5; i++ {
897 n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgProp, Entries: []raftpb.Entry{{Data: blob}}})
898 }
899 n.recover()
900
901
902
903 n.send(raftpb.Message{From: 1, To: 1, Type: raftpb.MsgBeat})
904 if !seenFullMessage {
905 t.Error("didn't see any messages more than half the max size; something is wrong with this test")
906 }
907 }
908
909 func TestCommitPagination(t *testing.T) {
910 s := newTestMemoryStorage(withPeers(1))
911 cfg := newTestConfig(1, 10, 1, s)
912 cfg.MaxCommittedSizePerReady = 2048
913 rn, err := NewRawNode(cfg)
914 if err != nil {
915 t.Fatal(err)
916 }
917 n := newNode(rn)
918 go n.run()
919 n.Campaign(context.TODO())
920
921 rd := readyWithTimeout(&n)
922 if len(rd.CommittedEntries) != 1 {
923 t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
924 }
925 s.Append(rd.Entries)
926 n.Advance()
927
928 blob := []byte(strings.Repeat("a", 1000))
929 for i := 0; i < 3; i++ {
930 if err := n.Propose(context.TODO(), blob); err != nil {
931 t.Fatal(err)
932 }
933 }
934
935
936 rd = readyWithTimeout(&n)
937 if len(rd.CommittedEntries) != 2 {
938 t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
939 }
940 s.Append(rd.Entries)
941 n.Advance()
942 rd = readyWithTimeout(&n)
943 if len(rd.CommittedEntries) != 1 {
944 t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
945 }
946 s.Append(rd.Entries)
947 n.Advance()
948 }
949
950 type ignoreSizeHintMemStorage struct {
951 *MemoryStorage
952 }
953
954 func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) {
955 return s.MemoryStorage.Entries(lo, hi, math.MaxUint64)
956 }
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974 func TestNodeCommitPaginationAfterRestart(t *testing.T) {
975 s := &ignoreSizeHintMemStorage{
976 MemoryStorage: newTestMemoryStorage(withPeers(1)),
977 }
978 persistedHardState := raftpb.HardState{
979 Term: 1,
980 Vote: 1,
981 Commit: 10,
982 }
983
984 s.hardState = persistedHardState
985 s.ents = make([]raftpb.Entry, 10)
986 var size uint64
987 for i := range s.ents {
988 ent := raftpb.Entry{
989 Term: 1,
990 Index: uint64(i + 1),
991 Type: raftpb.EntryNormal,
992 Data: []byte("a"),
993 }
994
995 s.ents[i] = ent
996 size += uint64(ent.Size())
997 }
998
999 cfg := newTestConfig(1, 10, 1, s)
1000
1001
1002
1003 cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
1004
1005 rn, err := NewRawNode(cfg)
1006 if err != nil {
1007 t.Fatal(err)
1008 }
1009 n := newNode(rn)
1010 go n.run()
1011 defer n.Stop()
1012
1013 rd := readyWithTimeout(&n)
1014 if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit {
1015 t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n%+v",
1016 persistedHardState.Commit, rd.HardState.Commit,
1017 DescribeEntries(rd.CommittedEntries, func(data []byte) string { return fmt.Sprintf("%q", data) }),
1018 )
1019 }
1020 }
1021
View as plain text