1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "errors"
19 "fmt"
20 "io"
21 "io/ioutil"
22 "net/http"
23 "sync"
24 "testing"
25 "time"
26
27 "go.etcd.io/etcd/api/v3/version"
28 "go.etcd.io/etcd/client/pkg/v3/testutil"
29 "go.etcd.io/etcd/client/pkg/v3/types"
30 "go.etcd.io/etcd/raft/v3/raftpb"
31 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
32
33 "go.uber.org/zap"
34 )
35
36
37
38 func TestPipelineSend(t *testing.T) {
39 tr := &roundTripperRecorder{rec: testutil.NewRecorderStream()}
40 picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
41 tp := &Transport{pipelineRt: tr}
42 p := startTestPipeline(tp, picker)
43
44 p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
45 tr.rec.Wait(1)
46 p.stop()
47 if p.followerStats.Counts.Success != 1 {
48 t.Errorf("success = %d, want 1", p.followerStats.Counts.Success)
49 }
50 }
51
52
53
54 func TestPipelineKeepSendingWhenPostError(t *testing.T) {
55 tr := &respRoundTripper{rec: testutil.NewRecorderStream(), err: fmt.Errorf("roundtrip error")}
56 picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
57 tp := &Transport{pipelineRt: tr}
58 p := startTestPipeline(tp, picker)
59 defer p.stop()
60
61 for i := 0; i < 50; i++ {
62 p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
63 }
64
65 _, err := tr.rec.Wait(50)
66 if err != nil {
67 t.Errorf("unexpected wait error %v", err)
68 }
69 }
70
71 func TestPipelineExceedMaximumServing(t *testing.T) {
72 rt := newRoundTripperBlocker()
73 picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
74 tp := &Transport{pipelineRt: rt}
75 p := startTestPipeline(tp, picker)
76 defer p.stop()
77
78
79
80 for i := 0; i < connPerPipeline+pipelineBufSize; i++ {
81 select {
82 case p.msgc <- raftpb.Message{}:
83 case <-time.After(time.Second):
84 t.Errorf("failed to send out message")
85 }
86 }
87
88
89 select {
90 case p.msgc <- raftpb.Message{}:
91 t.Errorf("unexpected message sendout")
92 default:
93 }
94
95
96 rt.unblock()
97
98
99 select {
100 case p.msgc <- raftpb.Message{}:
101 case <-time.After(time.Second):
102 t.Errorf("failed to send out message")
103 }
104 }
105
106
107
108 func TestPipelineSendFailed(t *testing.T) {
109 picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
110 rt := newRespRoundTripper(0, errors.New("blah"))
111 rt.rec = testutil.NewRecorderStream()
112 tp := &Transport{pipelineRt: rt}
113 p := startTestPipeline(tp, picker)
114
115 p.msgc <- raftpb.Message{Type: raftpb.MsgApp}
116 if _, err := rt.rec.Wait(1); err != nil {
117 t.Fatal(err)
118 }
119
120 p.stop()
121
122 if p.followerStats.Counts.Fail != 1 {
123 t.Errorf("fail = %d, want 1", p.followerStats.Counts.Fail)
124 }
125 }
126
127 func TestPipelinePost(t *testing.T) {
128 tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}}
129 picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
130 tp := &Transport{ClusterID: types.ID(1), pipelineRt: tr}
131 p := startTestPipeline(tp, picker)
132 if err := p.post([]byte("some data")); err != nil {
133 t.Fatalf("unexpected post error: %v", err)
134 }
135 act, err := tr.rec.Wait(1)
136 if err != nil {
137 t.Fatal(err)
138 }
139 p.stop()
140
141 req := act[0].Params[0].(*http.Request)
142
143 if g := req.Method; g != "POST" {
144 t.Errorf("method = %s, want %s", g, "POST")
145 }
146 if g := req.URL.String(); g != "http://localhost:2380/raft" {
147 t.Errorf("url = %s, want %s", g, "http://localhost:2380/raft")
148 }
149 if g := req.Header.Get("Content-Type"); g != "application/protobuf" {
150 t.Errorf("content type = %s, want %s", g, "application/protobuf")
151 }
152 if g := req.Header.Get("X-Server-Version"); g != version.Version {
153 t.Errorf("version = %s, want %s", g, version.Version)
154 }
155 if g := req.Header.Get("X-Min-Cluster-Version"); g != version.MinClusterVersion {
156 t.Errorf("min version = %s, want %s", g, version.MinClusterVersion)
157 }
158 if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" {
159 t.Errorf("cluster id = %s, want %s", g, "1")
160 }
161 b, err := ioutil.ReadAll(req.Body)
162 if err != nil {
163 t.Fatalf("unexpected ReadAll error: %v", err)
164 }
165 if string(b) != "some data" {
166 t.Errorf("body = %s, want %s", b, "some data")
167 }
168 }
169
170 func TestPipelinePostBad(t *testing.T) {
171 tests := []struct {
172 u string
173 code int
174 err error
175 }{
176
177 {"http://localhost:2380", 0, errors.New("blah")},
178
179 {"http://localhost:2380", http.StatusOK, nil},
180 {"http://localhost:2380", http.StatusCreated, nil},
181 }
182 for i, tt := range tests {
183 picker := mustNewURLPicker(t, []string{tt.u})
184 tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
185 p := startTestPipeline(tp, picker)
186 err := p.post([]byte("some data"))
187 p.stop()
188
189 if err == nil {
190 t.Errorf("#%d: err = nil, want not nil", i)
191 }
192 }
193 }
194
195 func TestPipelinePostErrorc(t *testing.T) {
196 tests := []struct {
197 u string
198 code int
199 err error
200 }{
201 {"http://localhost:2380", http.StatusForbidden, nil},
202 }
203 for i, tt := range tests {
204 picker := mustNewURLPicker(t, []string{tt.u})
205 tp := &Transport{pipelineRt: newRespRoundTripper(tt.code, tt.err)}
206 p := startTestPipeline(tp, picker)
207 p.post([]byte("some data"))
208 p.stop()
209 select {
210 case <-p.errorc:
211 default:
212 t.Fatalf("#%d: cannot receive from errorc", i)
213 }
214 }
215 }
216
217 func TestStopBlockedPipeline(t *testing.T) {
218 picker := mustNewURLPicker(t, []string{"http://localhost:2380"})
219 tp := &Transport{pipelineRt: newRoundTripperBlocker()}
220 p := startTestPipeline(tp, picker)
221
222 for i := 0; i < connPerPipeline*10; i++ {
223 p.msgc <- raftpb.Message{}
224 }
225
226 done := make(chan struct{})
227 go func() {
228 p.stop()
229 done <- struct{}{}
230 }()
231 select {
232 case <-done:
233 case <-time.After(time.Second):
234 t.Fatalf("failed to stop pipeline in 1s")
235 }
236 }
237
238 type roundTripperBlocker struct {
239 unblockc chan struct{}
240 mu sync.Mutex
241 cancel map[*http.Request]chan struct{}
242 }
243
244 func newRoundTripperBlocker() *roundTripperBlocker {
245 return &roundTripperBlocker{
246 unblockc: make(chan struct{}),
247 cancel: make(map[*http.Request]chan struct{}),
248 }
249 }
250
251 func (t *roundTripperBlocker) unblock() {
252 close(t.unblockc)
253 }
254
255 func (t *roundTripperBlocker) CancelRequest(req *http.Request) {
256 t.mu.Lock()
257 defer t.mu.Unlock()
258 if c, ok := t.cancel[req]; ok {
259 c <- struct{}{}
260 delete(t.cancel, req)
261 }
262 }
263
264 type respRoundTripper struct {
265 mu sync.Mutex
266 rec testutil.Recorder
267
268 code int
269 header http.Header
270 err error
271 }
272
273 func newRespRoundTripper(code int, err error) *respRoundTripper {
274 return &respRoundTripper{code: code, err: err}
275 }
276 func (t *respRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
277 t.mu.Lock()
278 defer t.mu.Unlock()
279 if t.rec != nil {
280 t.rec.Record(testutil.Action{Name: "req", Params: []interface{}{req}})
281 }
282 return &http.Response{StatusCode: t.code, Header: t.header, Body: &nopReadCloser{}}, t.err
283 }
284
285 type roundTripperRecorder struct {
286 rec testutil.Recorder
287 }
288
289 func (t *roundTripperRecorder) RoundTrip(req *http.Request) (*http.Response, error) {
290 if t.rec != nil {
291 t.rec.Record(testutil.Action{Name: "req", Params: []interface{}{req}})
292 }
293 return &http.Response{StatusCode: http.StatusNoContent, Body: &nopReadCloser{}}, nil
294 }
295
296 type nopReadCloser struct{}
297
298 func (n *nopReadCloser) Read(p []byte) (int, error) { return 0, io.EOF }
299 func (n *nopReadCloser) Close() error { return nil }
300
301 func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline {
302 p := &pipeline{
303 peerID: types.ID(1),
304 tr: tr,
305 picker: picker,
306 status: newPeerStatus(zap.NewExample(), tr.ID, types.ID(1)),
307 raft: &fakeRaft{},
308 followerStats: &stats.FollowerStats{},
309 errorc: make(chan error, 1),
310 }
311 p.start()
312 return p
313 }
314
View as plain text