1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package raft
16
17 import (
18 "testing"
19
20 pb "go.etcd.io/etcd/raft/v3/raftpb"
21 )
22
23
24
25
26
27 func TestMsgAppFlowControlFull(t *testing.T) {
28 r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
29 r.becomeCandidate()
30 r.becomeLeader()
31
32 pr2 := r.prs.Progress[2]
33
34 pr2.BecomeReplicate()
35
36 for i := 0; i < r.prs.MaxInflight; i++ {
37 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
38 ms := r.readMessages()
39 if len(ms) != 1 {
40 t.Fatalf("#%d: len(ms) = %d, want 1", i, len(ms))
41 }
42 }
43
44
45 if !pr2.Inflights.Full() {
46 t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
47 }
48
49
50 for i := 0; i < 10; i++ {
51 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
52 ms := r.readMessages()
53 if len(ms) != 0 {
54 t.Fatalf("#%d: len(ms) = %d, want 0", i, len(ms))
55 }
56 }
57 }
58
59
60
61
62
63 func TestMsgAppFlowControlMoveForward(t *testing.T) {
64 r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
65 r.becomeCandidate()
66 r.becomeLeader()
67
68 pr2 := r.prs.Progress[2]
69
70 pr2.BecomeReplicate()
71
72 for i := 0; i < r.prs.MaxInflight; i++ {
73 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
74 r.readMessages()
75 }
76
77
78
79 for tt := 2; tt < r.prs.MaxInflight; tt++ {
80
81 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)})
82 r.readMessages()
83
84
85 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
86 ms := r.readMessages()
87 if len(ms) != 1 {
88 t.Fatalf("#%d: len(ms) = %d, want 1", tt, len(ms))
89 }
90
91
92 if !pr2.Inflights.Full() {
93 t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
94 }
95
96
97 for i := 0; i < tt; i++ {
98 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)})
99 if !pr2.Inflights.Full() {
100 t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
101 }
102 }
103 }
104 }
105
106
107
108 func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
109 r := newTestRaft(1, 5, 1, newTestMemoryStorage(withPeers(1, 2)))
110 r.becomeCandidate()
111 r.becomeLeader()
112
113 pr2 := r.prs.Progress[2]
114
115 pr2.BecomeReplicate()
116
117 for i := 0; i < r.prs.MaxInflight; i++ {
118 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
119 r.readMessages()
120 }
121
122 for tt := 1; tt < 5; tt++ {
123 if !pr2.Inflights.Full() {
124 t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
125 }
126
127
128 for i := 0; i < tt; i++ {
129 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
130 r.readMessages()
131 if pr2.Inflights.Full() {
132 t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.Inflights.Full(), false)
133 }
134 }
135
136
137 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
138 ms := r.readMessages()
139 if len(ms) != 1 {
140 t.Fatalf("#%d: free slot = 0, want 1", tt)
141 }
142
143
144 for i := 0; i < 10; i++ {
145 r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
146 ms1 := r.readMessages()
147 if len(ms1) != 0 {
148 t.Fatalf("#%d.%d: len(ms) = %d, want 0", tt, i, len(ms1))
149 }
150 }
151
152
153 r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
154 r.readMessages()
155 }
156 }
157
View as plain text