1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package raft
16
17 import (
18 "bytes"
19 "context"
20 "fmt"
21 "math"
22 "reflect"
23 "testing"
24
25 "go.etcd.io/etcd/raft/v3/quorum"
26 pb "go.etcd.io/etcd/raft/v3/raftpb"
27 "go.etcd.io/etcd/raft/v3/tracker"
28 )
29
30
31
32
33 type rawNodeAdapter struct {
34 *RawNode
35 }
36
37 var _ Node = (*rawNodeAdapter)(nil)
38
39
40 func (a *rawNodeAdapter) TransferLeadership(ctx context.Context, lead, transferee uint64) {
41 a.RawNode.TransferLeader(transferee)
42 }
43
44
45 func (a *rawNodeAdapter) Stop() {}
46
47
48 func (a *rawNodeAdapter) Status() Status { return a.RawNode.Status() }
49
50
51
52 func (a *rawNodeAdapter) Advance() { a.RawNode.Advance(Ready{}) }
53
54
55 func (a *rawNodeAdapter) Ready() <-chan Ready { return nil }
56
57
58
59 func (a *rawNodeAdapter) Campaign(context.Context) error { return a.RawNode.Campaign() }
60 func (a *rawNodeAdapter) ReadIndex(_ context.Context, rctx []byte) error {
61 a.RawNode.ReadIndex(rctx)
62
63 return nil
64 }
65 func (a *rawNodeAdapter) Step(_ context.Context, m pb.Message) error { return a.RawNode.Step(m) }
66 func (a *rawNodeAdapter) Propose(_ context.Context, data []byte) error {
67 return a.RawNode.Propose(data)
68 }
69 func (a *rawNodeAdapter) ProposeConfChange(_ context.Context, cc pb.ConfChangeI) error {
70 return a.RawNode.ProposeConfChange(cc)
71 }
72
73
74 func TestRawNodeStep(t *testing.T) {
75 for i, msgn := range pb.MessageType_name {
76 t.Run(msgn, func(t *testing.T) {
77 s := NewMemoryStorage()
78 s.SetHardState(pb.HardState{Term: 1, Commit: 1})
79 s.Append([]pb.Entry{{Term: 1, Index: 1}})
80 if err := s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{
81 ConfState: pb.ConfState{
82 Voters: []uint64{1},
83 },
84 Index: 1,
85 Term: 1,
86 }}); err != nil {
87 t.Fatal(err)
88 }
89
90
91 rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
92 if err != nil {
93 t.Fatal(err)
94 }
95 msgt := pb.MessageType(i)
96 err = rawNode.Step(pb.Message{Type: msgt})
97
98 if IsLocalMsg(msgt) {
99 if err != ErrStepLocalMsg {
100 t.Errorf("%d: step should ignore %s", msgt, msgn)
101 }
102 }
103 })
104 }
105 }
106
107
108
109
110
111
112
113
114 func TestRawNodeProposeAndConfChange(t *testing.T) {
115 testCases := []struct {
116 cc pb.ConfChangeI
117 exp pb.ConfState
118 exp2 *pb.ConfState
119 }{
120
121 {
122 pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2},
123 pb.ConfState{Voters: []uint64{1, 2}},
124 nil,
125 },
126
127
128 {
129 pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
130 {Type: pb.ConfChangeAddNode, NodeID: 2},
131 },
132 },
133 pb.ConfState{Voters: []uint64{1, 2}},
134 nil,
135 },
136
137 {
138 pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
139 {Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
140 },
141 },
142 pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
143 nil,
144 },
145
146 {
147 pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
148 {Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
149 },
150 Transition: pb.ConfChangeTransitionJointExplicit,
151 },
152 pb.ConfState{Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2}},
153 &pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
154 },
155
156 {
157 pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
158 {Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
159 },
160 Transition: pb.ConfChangeTransitionJointImplicit,
161 },
162 pb.ConfState{
163 Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2},
164 AutoLeave: true,
165 },
166 &pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}},
167 },
168
169
170 {
171 pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
172 {NodeID: 2, Type: pb.ConfChangeAddNode},
173 {NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
174 {NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
175 },
176 },
177 pb.ConfState{
178 Voters: []uint64{2},
179 VotersOutgoing: []uint64{1},
180 Learners: []uint64{3},
181 LearnersNext: []uint64{1},
182 AutoLeave: true,
183 },
184 &pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}},
185 },
186
187 {
188 pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
189 {NodeID: 2, Type: pb.ConfChangeAddNode},
190 {NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
191 {NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
192 },
193 Transition: pb.ConfChangeTransitionJointExplicit,
194 },
195 pb.ConfState{
196 Voters: []uint64{2},
197 VotersOutgoing: []uint64{1},
198 Learners: []uint64{3},
199 LearnersNext: []uint64{1},
200 },
201 &pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}},
202 },
203
204 {
205 pb.ConfChangeV2{
206 Changes: []pb.ConfChangeSingle{
207 {NodeID: 2, Type: pb.ConfChangeAddNode},
208 {NodeID: 1, Type: pb.ConfChangeAddLearnerNode},
209 {NodeID: 3, Type: pb.ConfChangeAddLearnerNode},
210 },
211 Transition: pb.ConfChangeTransitionJointImplicit,
212 },
213 pb.ConfState{
214 Voters: []uint64{2},
215 VotersOutgoing: []uint64{1},
216 Learners: []uint64{3},
217 LearnersNext: []uint64{1},
218 AutoLeave: true,
219 },
220 &pb.ConfState{Voters: []uint64{2}, Learners: []uint64{1, 3}},
221 },
222 }
223
224 for _, tc := range testCases {
225 t.Run("", func(t *testing.T) {
226 s := newTestMemoryStorage(withPeers(1))
227 rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
228 if err != nil {
229 t.Fatal(err)
230 }
231
232 rawNode.Campaign()
233 proposed := false
234 var (
235 lastIndex uint64
236 ccdata []byte
237 )
238
239
240 var cs *pb.ConfState
241 for cs == nil {
242 rd := rawNode.Ready()
243 s.Append(rd.Entries)
244 for _, ent := range rd.CommittedEntries {
245 var cc pb.ConfChangeI
246 if ent.Type == pb.EntryConfChange {
247 var ccc pb.ConfChange
248 if err = ccc.Unmarshal(ent.Data); err != nil {
249 t.Fatal(err)
250 }
251 cc = ccc
252 } else if ent.Type == pb.EntryConfChangeV2 {
253 var ccc pb.ConfChangeV2
254 if err = ccc.Unmarshal(ent.Data); err != nil {
255 t.Fatal(err)
256 }
257 cc = ccc
258 }
259 if cc != nil {
260 cs = rawNode.ApplyConfChange(cc)
261 }
262 }
263 rawNode.Advance(rd)
264
265 if !proposed && rd.SoftState.Lead == rawNode.raft.id {
266 if err = rawNode.Propose([]byte("somedata")); err != nil {
267 t.Fatal(err)
268 }
269 if ccv1, ok := tc.cc.AsV1(); ok {
270 ccdata, err = ccv1.Marshal()
271 if err != nil {
272 t.Fatal(err)
273 }
274 rawNode.ProposeConfChange(ccv1)
275 } else {
276 ccv2 := tc.cc.AsV2()
277 ccdata, err = ccv2.Marshal()
278 if err != nil {
279 t.Fatal(err)
280 }
281 rawNode.ProposeConfChange(ccv2)
282 }
283 proposed = true
284 }
285 }
286
287
288
289
290
291 lastIndex, err = s.LastIndex()
292 if err != nil {
293 t.Fatal(err)
294 }
295
296 entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
297 if err != nil {
298 t.Fatal(err)
299 }
300 if len(entries) != 2 {
301 t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
302 }
303 if !bytes.Equal(entries[0].Data, []byte("somedata")) {
304 t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
305 }
306 typ := pb.EntryConfChange
307 if _, ok := tc.cc.AsV1(); !ok {
308 typ = pb.EntryConfChangeV2
309 }
310 if entries[1].Type != typ {
311 t.Fatalf("type = %v, want %v", entries[1].Type, typ)
312 }
313 if !bytes.Equal(entries[1].Data, ccdata) {
314 t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
315 }
316
317 if exp := &tc.exp; !reflect.DeepEqual(exp, cs) {
318 t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
319 }
320
321 var maybePlusOne uint64
322 if autoLeave, ok := tc.cc.AsV2().EnterJoint(); ok && autoLeave {
323
324
325
326
327
328
329 maybePlusOne = 1
330 }
331 if exp, act := lastIndex+maybePlusOne, rawNode.raft.pendingConfIndex; exp != act {
332 t.Fatalf("pendingConfIndex: expected %d, got %d", exp, act)
333 }
334
335
336
337
338
339 rd := rawNode.Ready()
340 var context []byte
341 if !tc.exp.AutoLeave {
342 if len(rd.Entries) > 0 {
343 t.Fatal("expected no more entries")
344 }
345 if tc.exp2 == nil {
346 return
347 }
348 context = []byte("manual")
349 t.Log("leaving joint state manually")
350 if err := rawNode.ProposeConfChange(pb.ConfChangeV2{Context: context}); err != nil {
351 t.Fatal(err)
352 }
353 rd = rawNode.Ready()
354 }
355
356
357 if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 {
358 t.Fatalf("expected exactly one more entry, got %+v", rd)
359 }
360 var cc pb.ConfChangeV2
361 if err := cc.Unmarshal(rd.Entries[0].Data); err != nil {
362 t.Fatal(err)
363 }
364 if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: context}) {
365 t.Fatalf("expected zero ConfChangeV2, got %+v", cc)
366 }
367
368
369 cs = rawNode.ApplyConfChange(cc)
370 if exp := tc.exp2; !reflect.DeepEqual(exp, cs) {
371 t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
372 }
373 })
374 }
375 }
376
377
378
379 func TestRawNodeJointAutoLeave(t *testing.T) {
380 testCc := pb.ConfChangeV2{Changes: []pb.ConfChangeSingle{
381 {Type: pb.ConfChangeAddLearnerNode, NodeID: 2},
382 },
383 Transition: pb.ConfChangeTransitionJointImplicit,
384 }
385 expCs := pb.ConfState{
386 Voters: []uint64{1}, VotersOutgoing: []uint64{1}, Learners: []uint64{2},
387 AutoLeave: true,
388 }
389 exp2Cs := pb.ConfState{Voters: []uint64{1}, Learners: []uint64{2}}
390
391 t.Run("", func(t *testing.T) {
392 s := newTestMemoryStorage(withPeers(1))
393 rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
394 if err != nil {
395 t.Fatal(err)
396 }
397
398 rawNode.Campaign()
399 proposed := false
400 var (
401 lastIndex uint64
402 ccdata []byte
403 )
404
405
406 var cs *pb.ConfState
407 for cs == nil {
408 rd := rawNode.Ready()
409 s.Append(rd.Entries)
410 for _, ent := range rd.CommittedEntries {
411 var cc pb.ConfChangeI
412 if ent.Type == pb.EntryConfChangeV2 {
413 var ccc pb.ConfChangeV2
414 if err = ccc.Unmarshal(ent.Data); err != nil {
415 t.Fatal(err)
416 }
417 cc = &ccc
418 }
419 if cc != nil {
420
421 rawNode.Step(pb.Message{Type: pb.MsgHeartbeatResp, From: 1, Term: rawNode.raft.Term + 1})
422 cs = rawNode.ApplyConfChange(cc)
423 }
424 }
425 rawNode.Advance(rd)
426
427 if !proposed && rd.SoftState.Lead == rawNode.raft.id {
428 if err = rawNode.Propose([]byte("somedata")); err != nil {
429 t.Fatal(err)
430 }
431 ccdata, err = testCc.Marshal()
432 if err != nil {
433 t.Fatal(err)
434 }
435 rawNode.ProposeConfChange(testCc)
436 proposed = true
437 }
438 }
439
440
441
442
443
444 lastIndex, err = s.LastIndex()
445 if err != nil {
446 t.Fatal(err)
447 }
448
449 entries, err := s.Entries(lastIndex-1, lastIndex+1, noLimit)
450 if err != nil {
451 t.Fatal(err)
452 }
453 if len(entries) != 2 {
454 t.Fatalf("len(entries) = %d, want %d", len(entries), 2)
455 }
456 if !bytes.Equal(entries[0].Data, []byte("somedata")) {
457 t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, []byte("somedata"))
458 }
459 if entries[1].Type != pb.EntryConfChangeV2 {
460 t.Fatalf("type = %v, want %v", entries[1].Type, pb.EntryConfChangeV2)
461 }
462 if !bytes.Equal(entries[1].Data, ccdata) {
463 t.Errorf("data = %v, want %v", entries[1].Data, ccdata)
464 }
465
466 if !reflect.DeepEqual(&expCs, cs) {
467 t.Fatalf("exp:\n%+v\nact:\n%+v", expCs, cs)
468 }
469
470 if rawNode.raft.pendingConfIndex != 0 {
471 t.Fatalf("pendingConfIndex: expected %d, got %d", 0, rawNode.raft.pendingConfIndex)
472 }
473
474
475 rd := rawNode.readyWithoutAccept()
476
477 if len(rd.Entries) != 0 {
478 t.Fatalf("expected zero entry, got %+v", rd)
479 }
480
481
482 rawNode.Campaign()
483 rd = rawNode.Ready()
484 s.Append(rd.Entries)
485 rawNode.Advance(rd)
486 rd = rawNode.Ready()
487 s.Append(rd.Entries)
488
489
490 if len(rd.Entries) != 1 || rd.Entries[0].Type != pb.EntryConfChangeV2 {
491 t.Fatalf("expected exactly one more entry, got %+v", rd)
492 }
493 var cc pb.ConfChangeV2
494 if err := cc.Unmarshal(rd.Entries[0].Data); err != nil {
495 t.Fatal(err)
496 }
497 if !reflect.DeepEqual(cc, pb.ConfChangeV2{Context: nil}) {
498 t.Fatalf("expected zero ConfChangeV2, got %+v", cc)
499 }
500
501
502 cs = rawNode.ApplyConfChange(cc)
503 if exp := exp2Cs; !reflect.DeepEqual(&exp, cs) {
504 t.Fatalf("exp:\n%+v\nact:\n%+v", exp, cs)
505 }
506 })
507 }
508
509
510
511 func TestRawNodeProposeAddDuplicateNode(t *testing.T) {
512 s := newTestMemoryStorage(withPeers(1))
513 rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
514 if err != nil {
515 t.Fatal(err)
516 }
517 rd := rawNode.Ready()
518 s.Append(rd.Entries)
519 rawNode.Advance(rd)
520
521 rawNode.Campaign()
522 for {
523 rd = rawNode.Ready()
524 s.Append(rd.Entries)
525 if rd.SoftState.Lead == rawNode.raft.id {
526 rawNode.Advance(rd)
527 break
528 }
529 rawNode.Advance(rd)
530 }
531
532 proposeConfChangeAndApply := func(cc pb.ConfChange) {
533 rawNode.ProposeConfChange(cc)
534 rd = rawNode.Ready()
535 s.Append(rd.Entries)
536 for _, entry := range rd.CommittedEntries {
537 if entry.Type == pb.EntryConfChange {
538 var cc pb.ConfChange
539 cc.Unmarshal(entry.Data)
540 rawNode.ApplyConfChange(cc)
541 }
542 }
543 rawNode.Advance(rd)
544 }
545
546 cc1 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 1}
547 ccdata1, err := cc1.Marshal()
548 if err != nil {
549 t.Fatal(err)
550 }
551 proposeConfChangeAndApply(cc1)
552
553
554 proposeConfChangeAndApply(cc1)
555
556
557 cc2 := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: 2}
558 ccdata2, err := cc2.Marshal()
559 if err != nil {
560 t.Fatal(err)
561 }
562 proposeConfChangeAndApply(cc2)
563
564 lastIndex, err := s.LastIndex()
565 if err != nil {
566 t.Fatal(err)
567 }
568
569
570 entries, err := s.Entries(lastIndex-2, lastIndex+1, noLimit)
571 if err != nil {
572 t.Fatal(err)
573 }
574 if len(entries) != 3 {
575 t.Fatalf("len(entries) = %d, want %d", len(entries), 3)
576 }
577 if !bytes.Equal(entries[0].Data, ccdata1) {
578 t.Errorf("entries[0].Data = %v, want %v", entries[0].Data, ccdata1)
579 }
580 if !bytes.Equal(entries[2].Data, ccdata2) {
581 t.Errorf("entries[2].Data = %v, want %v", entries[2].Data, ccdata2)
582 }
583 }
584
585
586
587 func TestRawNodeReadIndex(t *testing.T) {
588 msgs := []pb.Message{}
589 appendStep := func(r *raft, m pb.Message) error {
590 msgs = append(msgs, m)
591 return nil
592 }
593 wrs := []ReadState{{Index: uint64(1), RequestCtx: []byte("somedata")}}
594
595 s := newTestMemoryStorage(withPeers(1))
596 c := newTestConfig(1, 10, 1, s)
597 rawNode, err := NewRawNode(c)
598 if err != nil {
599 t.Fatal(err)
600 }
601 rawNode.raft.readStates = wrs
602
603 hasReady := rawNode.HasReady()
604 if !hasReady {
605 t.Errorf("HasReady() returns %t, want %t", hasReady, true)
606 }
607 rd := rawNode.Ready()
608 if !reflect.DeepEqual(rd.ReadStates, wrs) {
609 t.Errorf("ReadStates = %d, want %d", rd.ReadStates, wrs)
610 }
611 s.Append(rd.Entries)
612 rawNode.Advance(rd)
613
614 if rawNode.raft.readStates != nil {
615 t.Errorf("readStates = %v, want %v", rawNode.raft.readStates, nil)
616 }
617
618 wrequestCtx := []byte("somedata2")
619 rawNode.Campaign()
620 for {
621 rd = rawNode.Ready()
622 s.Append(rd.Entries)
623
624 if rd.SoftState.Lead == rawNode.raft.id {
625 rawNode.Advance(rd)
626
627
628 rawNode.raft.step = appendStep
629 rawNode.ReadIndex(wrequestCtx)
630 break
631 }
632 rawNode.Advance(rd)
633 }
634
635 if len(msgs) != 1 {
636 t.Fatalf("len(msgs) = %d, want %d", len(msgs), 1)
637 }
638 if msgs[0].Type != pb.MsgReadIndex {
639 t.Errorf("msg type = %d, want %d", msgs[0].Type, pb.MsgReadIndex)
640 }
641 if !bytes.Equal(msgs[0].Entries[0].Data, wrequestCtx) {
642 t.Errorf("data = %v, want %v", msgs[0].Entries[0].Data, wrequestCtx)
643 }
644 }
645
646
647
648
649
650
651
652
653
654
655
656
657
658 func TestRawNodeStart(t *testing.T) {
659 want := Ready{
660 SoftState: &SoftState{Lead: 1, RaftState: StateLeader},
661 HardState: pb.HardState{Term: 1, Commit: 3, Vote: 1},
662 Entries: []pb.Entry{
663 {Term: 1, Index: 2, Data: nil},
664 {Term: 1, Index: 3, Data: []byte("foo")},
665 },
666 CommittedEntries: []pb.Entry{
667 {Term: 1, Index: 2, Data: nil},
668 {Term: 1, Index: 3, Data: []byte("foo")},
669 },
670 MustSync: true,
671 }
672
673 storage := NewMemoryStorage()
674 storage.ents[0].Index = 1
675
676
677
678
679
680
681
682
683
684
685
686
687 type appenderStorage interface {
688 Storage
689 ApplySnapshot(pb.Snapshot) error
690 }
691 bootstrap := func(storage appenderStorage, cs pb.ConfState) error {
692 if len(cs.Voters) == 0 {
693 return fmt.Errorf("no voters specified")
694 }
695 fi, err := storage.FirstIndex()
696 if err != nil {
697 return err
698 }
699 if fi < 2 {
700 return fmt.Errorf("FirstIndex >= 2 is prerequisite for bootstrap")
701 }
702 if _, err = storage.Entries(fi, fi, math.MaxUint64); err == nil {
703
704 return fmt.Errorf("should not have been able to load first index")
705 }
706 li, err := storage.LastIndex()
707 if err != nil {
708 return err
709 }
710 if _, err = storage.Entries(li, li, math.MaxUint64); err == nil {
711 return fmt.Errorf("should not have been able to load last index")
712 }
713 hs, ics, err := storage.InitialState()
714 if err != nil {
715 return err
716 }
717 if !IsEmptyHardState(hs) {
718 return fmt.Errorf("HardState not empty")
719 }
720 if len(ics.Voters) != 0 {
721 return fmt.Errorf("ConfState not empty")
722 }
723
724 meta := pb.SnapshotMetadata{
725 Index: 1,
726 Term: 0,
727 ConfState: cs,
728 }
729 snap := pb.Snapshot{Metadata: meta}
730 return storage.ApplySnapshot(snap)
731 }
732
733 if err := bootstrap(storage, pb.ConfState{Voters: []uint64{1}}); err != nil {
734 t.Fatal(err)
735 }
736
737 rawNode, err := NewRawNode(newTestConfig(1, 10, 1, storage))
738 if err != nil {
739 t.Fatal(err)
740 }
741 if rawNode.HasReady() {
742 t.Fatalf("unexpected ready: %+v", rawNode.Ready())
743 }
744 rawNode.Campaign()
745 rawNode.Propose([]byte("foo"))
746 if !rawNode.HasReady() {
747 t.Fatal("expected a Ready")
748 }
749 rd := rawNode.Ready()
750 storage.Append(rd.Entries)
751 rawNode.Advance(rd)
752
753 rd.SoftState, want.SoftState = nil, nil
754
755 if !reflect.DeepEqual(rd, want) {
756 t.Fatalf("unexpected Ready:\n%+v\nvs\n%+v", rd, want)
757 }
758
759 if rawNode.HasReady() {
760 t.Errorf("unexpected Ready: %+v", rawNode.Ready())
761 }
762 }
763
764 func TestRawNodeRestart(t *testing.T) {
765 entries := []pb.Entry{
766 {Term: 1, Index: 1},
767 {Term: 1, Index: 2, Data: []byte("foo")},
768 }
769 st := pb.HardState{Term: 1, Commit: 1}
770
771 want := Ready{
772 HardState: emptyState,
773
774 CommittedEntries: entries[:st.Commit],
775 MustSync: false,
776 }
777
778 storage := newTestMemoryStorage(withPeers(1))
779 storage.SetHardState(st)
780 storage.Append(entries)
781 rawNode, err := NewRawNode(newTestConfig(1, 10, 1, storage))
782 if err != nil {
783 t.Fatal(err)
784 }
785 rd := rawNode.Ready()
786 if !reflect.DeepEqual(rd, want) {
787 t.Errorf("g = %+v,\n w %+v", rd, want)
788 }
789 rawNode.Advance(rd)
790 if rawNode.HasReady() {
791 t.Errorf("unexpected Ready: %+v", rawNode.Ready())
792 }
793 }
794
795 func TestRawNodeRestartFromSnapshot(t *testing.T) {
796 snap := pb.Snapshot{
797 Metadata: pb.SnapshotMetadata{
798 ConfState: pb.ConfState{Voters: []uint64{1, 2}},
799 Index: 2,
800 Term: 1,
801 },
802 }
803 entries := []pb.Entry{
804 {Term: 1, Index: 3, Data: []byte("foo")},
805 }
806 st := pb.HardState{Term: 1, Commit: 3}
807
808 want := Ready{
809 HardState: emptyState,
810
811 CommittedEntries: entries,
812 MustSync: false,
813 }
814
815 s := NewMemoryStorage()
816 s.SetHardState(st)
817 s.ApplySnapshot(snap)
818 s.Append(entries)
819 rawNode, err := NewRawNode(newTestConfig(1, 10, 1, s))
820 if err != nil {
821 t.Fatal(err)
822 }
823 if rd := rawNode.Ready(); !reflect.DeepEqual(rd, want) {
824 t.Errorf("g = %+v,\n w %+v", rd, want)
825 } else {
826 rawNode.Advance(rd)
827 }
828 if rawNode.HasReady() {
829 t.Errorf("unexpected Ready: %+v", rawNode.HasReady())
830 }
831 }
832
833
834
835
836 func TestRawNodeStatus(t *testing.T) {
837 s := newTestMemoryStorage(withPeers(1))
838 rn, err := NewRawNode(newTestConfig(1, 10, 1, s))
839 if err != nil {
840 t.Fatal(err)
841 }
842 if status := rn.Status(); status.Progress != nil {
843 t.Fatalf("expected no Progress because not leader: %+v", status.Progress)
844 }
845 if err := rn.Campaign(); err != nil {
846 t.Fatal(err)
847 }
848 status := rn.Status()
849 if status.Lead != 1 {
850 t.Fatal("not lead")
851 }
852 if status.RaftState != StateLeader {
853 t.Fatal("not leader")
854 }
855 if exp, act := *rn.raft.prs.Progress[1], status.Progress[1]; !reflect.DeepEqual(exp, act) {
856 t.Fatalf("want: %+v\ngot: %+v", exp, act)
857 }
858 expCfg := tracker.Config{Voters: quorum.JointConfig{
859 quorum.MajorityConfig{1: {}},
860 nil,
861 }}
862 if !reflect.DeepEqual(expCfg, status.Config) {
863 t.Fatalf("want: %+v\ngot: %+v", expCfg, status.Config)
864 }
865 }
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882 func TestRawNodeCommitPaginationAfterRestart(t *testing.T) {
883 s := &ignoreSizeHintMemStorage{
884 MemoryStorage: newTestMemoryStorage(withPeers(1)),
885 }
886 persistedHardState := pb.HardState{
887 Term: 1,
888 Vote: 1,
889 Commit: 10,
890 }
891
892 s.hardState = persistedHardState
893 s.ents = make([]pb.Entry, 10)
894 var size uint64
895 for i := range s.ents {
896 ent := pb.Entry{
897 Term: 1,
898 Index: uint64(i + 1),
899 Type: pb.EntryNormal,
900 Data: []byte("a"),
901 }
902
903 s.ents[i] = ent
904 size += uint64(ent.Size())
905 }
906
907 cfg := newTestConfig(1, 10, 1, s)
908
909
910
911 cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1
912
913 s.ents = append(s.ents, pb.Entry{
914 Term: 1,
915 Index: uint64(11),
916 Type: pb.EntryNormal,
917 Data: []byte("boom"),
918 })
919
920 rawNode, err := NewRawNode(cfg)
921 if err != nil {
922 t.Fatal(err)
923 }
924
925 for highestApplied := uint64(0); highestApplied != 11; {
926 rd := rawNode.Ready()
927 n := len(rd.CommittedEntries)
928 if n == 0 {
929 t.Fatalf("stopped applying entries at index %d", highestApplied)
930 }
931 if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next {
932 t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied)
933 }
934 highestApplied = rd.CommittedEntries[n-1].Index
935 rawNode.Advance(rd)
936 rawNode.Step(pb.Message{
937 Type: pb.MsgHeartbeat,
938 To: 1,
939 From: 1,
940 Term: 1,
941 Commit: 11,
942 })
943 }
944 }
945
946
947
948
949
950 func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
951 const maxEntries = 16
952 data := []byte("testdata")
953 testEntry := pb.Entry{Data: data}
954 maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))
955
956 s := newTestMemoryStorage(withPeers(1))
957 cfg := newTestConfig(1, 10, 1, s)
958 cfg.MaxUncommittedEntriesSize = maxEntrySize
959 rawNode, err := NewRawNode(cfg)
960 if err != nil {
961 t.Fatal(err)
962 }
963 rd := rawNode.Ready()
964 s.Append(rd.Entries)
965 rawNode.Advance(rd)
966
967
968 rawNode.Campaign()
969 for {
970 rd = rawNode.Ready()
971 s.Append(rd.Entries)
972 if rd.SoftState.Lead == rawNode.raft.id {
973 rawNode.Advance(rd)
974 break
975 }
976 rawNode.Advance(rd)
977 }
978
979
980
981
982 for i := 0; i < 1024; i++ {
983 rawNode.Propose(data)
984 }
985
986
987
988 checkUncommitted := func(exp uint64) {
989 t.Helper()
990 if a := rawNode.raft.uncommittedSize; exp != a {
991 t.Fatalf("expected %d uncommitted entry bytes, found %d", exp, a)
992 }
993 }
994 checkUncommitted(maxEntrySize)
995
996
997
998 rd = rawNode.Ready()
999 if len(rd.CommittedEntries) != maxEntries {
1000 t.Fatalf("expected %d entries, got %d", maxEntries, len(rd.CommittedEntries))
1001 }
1002 s.Append(rd.Entries)
1003 rawNode.Advance(rd)
1004 checkUncommitted(0)
1005 }
1006
1007 func BenchmarkStatus(b *testing.B) {
1008 setup := func(members int) *RawNode {
1009 peers := make([]uint64, members)
1010 for i := range peers {
1011 peers[i] = uint64(i + 1)
1012 }
1013 cfg := newTestConfig(1, 3, 1, newTestMemoryStorage(withPeers(peers...)))
1014 cfg.Logger = discardLogger
1015 r := newRaft(cfg)
1016 r.becomeFollower(1, 1)
1017 r.becomeCandidate()
1018 r.becomeLeader()
1019 return &RawNode{raft: r}
1020 }
1021
1022 for _, members := range []int{1, 3, 5, 100} {
1023 b.Run(fmt.Sprintf("members=%d", members), func(b *testing.B) {
1024 rn := setup(members)
1025
1026 b.Run("Status", func(b *testing.B) {
1027 b.ReportAllocs()
1028 for i := 0; i < b.N; i++ {
1029 _ = rn.Status()
1030 }
1031 })
1032
1033 b.Run("Status-example", func(b *testing.B) {
1034 b.ReportAllocs()
1035 for i := 0; i < b.N; i++ {
1036 s := rn.Status()
1037 var n uint64
1038 for _, pr := range s.Progress {
1039 n += pr.Match
1040 }
1041 _ = n
1042 }
1043 })
1044
1045 b.Run("BasicStatus", func(b *testing.B) {
1046 b.ReportAllocs()
1047 for i := 0; i < b.N; i++ {
1048 _ = rn.BasicStatus()
1049 }
1050 })
1051
1052 b.Run("WithProgress", func(b *testing.B) {
1053 b.ReportAllocs()
1054 visit := func(uint64, ProgressType, tracker.Progress) {}
1055
1056 for i := 0; i < b.N; i++ {
1057 rn.WithProgress(visit)
1058 }
1059 })
1060 b.Run("WithProgress-example", func(b *testing.B) {
1061 b.ReportAllocs()
1062 for i := 0; i < b.N; i++ {
1063 var n uint64
1064 visit := func(_ uint64, _ ProgressType, pr tracker.Progress) {
1065 n += pr.Match
1066 }
1067 rn.WithProgress(visit)
1068 _ = n
1069 }
1070 })
1071 })
1072 }
1073 }
1074
1075 func TestRawNodeConsumeReady(t *testing.T) {
1076
1077
1078 s := newTestMemoryStorage(withPeers(1))
1079 rn := newTestRawNode(1, 3, 1, s)
1080 m1 := pb.Message{Context: []byte("foo")}
1081 m2 := pb.Message{Context: []byte("bar")}
1082
1083
1084 rn.raft.msgs = append(rn.raft.msgs, m1)
1085 rd := rn.readyWithoutAccept()
1086 if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
1087 t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
1088 }
1089 if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m1) {
1090 t.Fatalf("expected only m1 in raft.msgs, got %+v", rn.raft.msgs)
1091 }
1092
1093
1094 rd = rn.Ready()
1095 if len(rn.raft.msgs) > 0 {
1096 t.Fatalf("messages not reset: %+v", rn.raft.msgs)
1097 }
1098 if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
1099 t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
1100 }
1101
1102 rn.raft.msgs = append(rn.raft.msgs, m2)
1103 rn.Advance(rd)
1104 if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) {
1105 t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs)
1106 }
1107 }
1108
View as plain text