1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 "net/http"
23 "net/http/httptest"
24 "reflect"
25 "sync"
26 "testing"
27 "time"
28
29 "go.etcd.io/etcd/api/v3/version"
30 "go.etcd.io/etcd/client/pkg/v3/testutil"
31 "go.etcd.io/etcd/client/pkg/v3/types"
32 "go.etcd.io/etcd/raft/v3/raftpb"
33 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
34
35 "github.com/coreos/go-semver/semver"
36 "go.uber.org/zap"
37 "golang.org/x/time/rate"
38 )
39
40
41
42
43 func TestStreamWriterAttachOutgoingConn(t *testing.T) {
44 sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
45
46 if _, ok := sw.writec(); ok {
47 t.Errorf("initial working status = %v, want false", ok)
48 }
49
50
51 var wfc *fakeWriteFlushCloser
52 for i := 0; i < 3; i++ {
53 prevwfc := wfc
54 wfc = newFakeWriteFlushCloser(nil)
55 sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
56
57
58 if prevwfc != nil {
59 select {
60 case <-prevwfc.closed:
61 case <-time.After(time.Second):
62 t.Errorf("#%d: close of previous connection timed out", i)
63 }
64 }
65
66
67
68
69 msgc, _ := sw.writec()
70 msgc <- raftpb.Message{}
71
72 select {
73 case <-wfc.writec:
74 case <-time.After(time.Second):
75 t.Errorf("#%d: failed to write to the underlying connection", i)
76 }
77
78 if _, ok := sw.writec(); !ok {
79 t.Errorf("#%d: working status = %v, want true", i, ok)
80 }
81 }
82
83 sw.stop()
84
85 if _, ok := sw.writec(); ok {
86 t.Errorf("working status after stop = %v, want false", ok)
87 }
88 if !wfc.Closed() {
89 t.Errorf("failed to close the underlying connection")
90 }
91 }
92
93
94
95 func TestStreamWriterAttachBadOutgoingConn(t *testing.T) {
96 sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
97 defer sw.stop()
98 wfc := newFakeWriteFlushCloser(errors.New("blah"))
99 sw.attach(&outgoingConn{t: streamTypeMessage, Writer: wfc, Flusher: wfc, Closer: wfc})
100
101 sw.msgc <- raftpb.Message{}
102 select {
103 case <-wfc.closed:
104 case <-time.After(time.Second):
105 t.Errorf("failed to close the underlying connection in time")
106 }
107
108 if _, ok := sw.writec(); ok {
109 t.Errorf("working = %v, want false", ok)
110 }
111 }
112
113 func TestStreamReaderDialRequest(t *testing.T) {
114 for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
115 tr := &roundTripperRecorder{rec: &testutil.RecorderBuffered{}}
116 sr := &streamReader{
117 peerID: types.ID(2),
118 tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
119 picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
120 ctx: context.Background(),
121 }
122 sr.dial(tt)
123
124 act, err := tr.rec.Wait(1)
125 if err != nil {
126 t.Fatal(err)
127 }
128 req := act[0].Params[0].(*http.Request)
129
130 wurl := fmt.Sprintf("http://localhost:2380" + tt.endpoint(zap.NewExample()) + "/1")
131 if req.URL.String() != wurl {
132 t.Errorf("#%d: url = %s, want %s", i, req.URL.String(), wurl)
133 }
134 if w := "GET"; req.Method != w {
135 t.Errorf("#%d: method = %s, want %s", i, req.Method, w)
136 }
137 if g := req.Header.Get("X-Etcd-Cluster-ID"); g != "1" {
138 t.Errorf("#%d: header X-Etcd-Cluster-ID = %s, want 1", i, g)
139 }
140 if g := req.Header.Get("X-Raft-To"); g != "2" {
141 t.Errorf("#%d: header X-Raft-To = %s, want 2", i, g)
142 }
143 }
144 }
145
146
147
148 func TestStreamReaderDialResult(t *testing.T) {
149 tests := []struct {
150 code int
151 err error
152 wok bool
153 whalt bool
154 }{
155 {0, errors.New("blah"), false, false},
156 {http.StatusOK, nil, true, false},
157 {http.StatusMethodNotAllowed, nil, false, false},
158 {http.StatusNotFound, nil, false, false},
159 {http.StatusPreconditionFailed, nil, false, false},
160 {http.StatusGone, nil, false, true},
161 }
162 for i, tt := range tests {
163 h := http.Header{}
164 h.Add("X-Server-Version", version.Version)
165 tr := &respRoundTripper{
166 code: tt.code,
167 header: h,
168 err: tt.err,
169 }
170 sr := &streamReader{
171 peerID: types.ID(2),
172 tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
173 picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
174 errorc: make(chan error, 1),
175 ctx: context.Background(),
176 }
177
178 _, err := sr.dial(streamTypeMessage)
179 if ok := err == nil; ok != tt.wok {
180 t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
181 }
182 if halt := len(sr.errorc) > 0; halt != tt.whalt {
183 t.Errorf("#%d: halt = %v, want %v", i, halt, tt.whalt)
184 }
185 }
186 }
187
188
189 func TestStreamReaderStopOnDial(t *testing.T) {
190 testutil.RegisterLeakDetection(t)
191 h := http.Header{}
192 h.Add("X-Server-Version", version.Version)
193 tr := &respWaitRoundTripper{rrt: &respRoundTripper{code: http.StatusOK, header: h}}
194 sr := &streamReader{
195 peerID: types.ID(2),
196 tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
197 picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
198 errorc: make(chan error, 1),
199 typ: streamTypeMessage,
200 status: newPeerStatus(zap.NewExample(), types.ID(1), types.ID(2)),
201 rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
202 }
203 tr.onResp = func() {
204
205
206 go sr.stop()
207
208 time.Sleep(10 * time.Millisecond)
209
210 }
211 sr.start()
212 select {
213 case <-sr.done:
214 case <-time.After(time.Second):
215 t.Fatal("streamReader did not stop in time")
216 }
217 }
218
219 type respWaitRoundTripper struct {
220 rrt *respRoundTripper
221 onResp func()
222 }
223
224 func (t *respWaitRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
225 resp, err := t.rrt.RoundTrip(req)
226 resp.Body = newWaitReadCloser()
227 t.onResp()
228 return resp, err
229 }
230
231 type waitReadCloser struct{ closec chan struct{} }
232
233 func newWaitReadCloser() *waitReadCloser { return &waitReadCloser{make(chan struct{})} }
234 func (wrc *waitReadCloser) Read(p []byte) (int, error) {
235 <-wrc.closec
236 return 0, io.EOF
237 }
238 func (wrc *waitReadCloser) Close() error {
239 close(wrc.closec)
240 return nil
241 }
242
243
244
245 func TestStreamReaderDialDetectUnsupport(t *testing.T) {
246 for i, typ := range []streamType{streamTypeMsgAppV2, streamTypeMessage} {
247
248 tr := &respRoundTripper{
249 code: http.StatusNotFound,
250 header: http.Header{},
251 }
252 sr := &streamReader{
253 peerID: types.ID(2),
254 tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
255 picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
256 ctx: context.Background(),
257 }
258
259 _, err := sr.dial(typ)
260 if err != errUnsupportedStreamType {
261 t.Errorf("#%d: error = %v, want %v", i, err, errUnsupportedStreamType)
262 }
263 }
264 }
265
266
267
268 func TestStream(t *testing.T) {
269 recvc := make(chan raftpb.Message, streamBufSize)
270 propc := make(chan raftpb.Message, streamBufSize)
271 msgapp := raftpb.Message{
272 Type: raftpb.MsgApp,
273 From: 2,
274 To: 1,
275 Term: 1,
276 LogTerm: 1,
277 Index: 3,
278 Entries: []raftpb.Entry{{Term: 1, Index: 4}},
279 }
280
281 tests := []struct {
282 t streamType
283 m raftpb.Message
284 wc chan raftpb.Message
285 }{
286 {
287 streamTypeMessage,
288 raftpb.Message{Type: raftpb.MsgProp, To: 2},
289 propc,
290 },
291 {
292 streamTypeMessage,
293 msgapp,
294 recvc,
295 },
296 {
297 streamTypeMsgAppV2,
298 msgapp,
299 recvc,
300 },
301 }
302 for i, tt := range tests {
303 h := &fakeStreamHandler{t: tt.t}
304 srv := httptest.NewServer(h)
305 defer srv.Close()
306
307 sw := startStreamWriter(zap.NewExample(), types.ID(0), types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)), &stats.FollowerStats{}, &fakeRaft{})
308 defer sw.stop()
309 h.sw = sw
310
311 picker := mustNewURLPicker(t, []string{srv.URL})
312 tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)}
313
314 sr := &streamReader{
315 peerID: types.ID(2),
316 typ: tt.t,
317 tr: tr,
318 picker: picker,
319 status: newPeerStatus(zap.NewExample(), types.ID(0), types.ID(2)),
320 recvc: recvc,
321 propc: propc,
322 rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
323 }
324 sr.start()
325
326
327 var writec chan<- raftpb.Message
328 for {
329 var ok bool
330 if writec, ok = sw.writec(); ok {
331 break
332 }
333 time.Sleep(time.Millisecond)
334 }
335
336 writec <- tt.m
337 var m raftpb.Message
338 select {
339 case m = <-tt.wc:
340 case <-time.After(time.Second):
341 t.Fatalf("#%d: failed to receive message from the channel", i)
342 }
343 if !reflect.DeepEqual(m, tt.m) {
344 t.Fatalf("#%d: message = %+v, want %+v", i, m, tt.m)
345 }
346
347 sr.stop()
348 }
349 }
350
351 func TestCheckStreamSupport(t *testing.T) {
352 tests := []struct {
353 v *semver.Version
354 t streamType
355 w bool
356 }{
357
358 {
359 semver.Must(semver.NewVersion("2.1.0")),
360 streamTypeMsgAppV2,
361 true,
362 },
363
364 {
365 semver.Must(semver.NewVersion("2.1.9")),
366 streamTypeMsgAppV2,
367 true,
368 },
369
370 {
371 semver.Must(semver.NewVersion("2.1.0-alpha")),
372 streamTypeMsgAppV2,
373 true,
374 },
375 }
376 for i, tt := range tests {
377 if g := checkStreamSupport(tt.v, tt.t); g != tt.w {
378 t.Errorf("#%d: check = %v, want %v", i, g, tt.w)
379 }
380 }
381 }
382
383 func TestStreamSupportCurrentVersion(t *testing.T) {
384 cv := version.Cluster(version.Version)
385 cv = cv + ".0"
386 if _, ok := supportedStream[cv]; !ok {
387 t.Errorf("Current version does not have stream support.")
388 }
389 }
390
391 type fakeWriteFlushCloser struct {
392 mu sync.Mutex
393 err error
394 written int
395 closed chan struct{}
396 writec chan struct{}
397 }
398
399 func newFakeWriteFlushCloser(err error) *fakeWriteFlushCloser {
400 return &fakeWriteFlushCloser{
401 err: err,
402 closed: make(chan struct{}),
403 writec: make(chan struct{}, 1),
404 }
405 }
406
407 func (wfc *fakeWriteFlushCloser) Write(p []byte) (n int, err error) {
408 wfc.mu.Lock()
409 defer wfc.mu.Unlock()
410 select {
411 case wfc.writec <- struct{}{}:
412 default:
413 }
414 wfc.written += len(p)
415 return len(p), wfc.err
416 }
417
418 func (wfc *fakeWriteFlushCloser) Flush() {}
419
420 func (wfc *fakeWriteFlushCloser) Close() error {
421 close(wfc.closed)
422 return wfc.err
423 }
424
425 func (wfc *fakeWriteFlushCloser) Written() int {
426 wfc.mu.Lock()
427 defer wfc.mu.Unlock()
428 return wfc.written
429 }
430
431 func (wfc *fakeWriteFlushCloser) Closed() bool {
432 select {
433 case <-wfc.closed:
434 return true
435 default:
436 return false
437 }
438 }
439
440 type fakeStreamHandler struct {
441 t streamType
442 sw *streamWriter
443 }
444
445 func (h *fakeStreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
446 w.Header().Add("X-Server-Version", version.Version)
447 w.(http.Flusher).Flush()
448 c := newCloseNotifier()
449 h.sw.attach(&outgoingConn{
450 t: h.t,
451 Writer: w,
452 Flusher: w.(http.Flusher),
453 Closer: c,
454 })
455 <-c.closeNotify()
456 }
457
View as plain text