1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "bytes"
19 "errors"
20 "fmt"
21 "io"
22 "net/http"
23 "net/http/httptest"
24 "net/url"
25 "strings"
26 "testing"
27 "time"
28
29 "go.etcd.io/etcd/api/v3/version"
30 "go.etcd.io/etcd/client/pkg/v3/types"
31 "go.etcd.io/etcd/pkg/v3/pbutil"
32 "go.etcd.io/etcd/raft/v3/raftpb"
33 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
34
35 "go.uber.org/zap"
36 )
37
38 func TestServeRaftPrefix(t *testing.T) {
39 testCases := []struct {
40 method string
41 body io.Reader
42 p Raft
43 clusterID string
44
45 wcode int
46 }{
47 {
48
49 "GET",
50 bytes.NewReader(
51 pbutil.MustMarshal(&raftpb.Message{}),
52 ),
53 &fakeRaft{},
54 "0",
55 http.StatusMethodNotAllowed,
56 },
57 {
58
59 "PUT",
60 bytes.NewReader(
61 pbutil.MustMarshal(&raftpb.Message{}),
62 ),
63 &fakeRaft{},
64 "0",
65 http.StatusMethodNotAllowed,
66 },
67 {
68
69 "DELETE",
70 bytes.NewReader(
71 pbutil.MustMarshal(&raftpb.Message{}),
72 ),
73 &fakeRaft{},
74 "0",
75 http.StatusMethodNotAllowed,
76 },
77 {
78
79 "POST",
80 &errReader{},
81 &fakeRaft{},
82 "0",
83 http.StatusBadRequest,
84 },
85 {
86
87 "POST",
88 strings.NewReader("malformed garbage"),
89 &fakeRaft{},
90 "0",
91 http.StatusBadRequest,
92 },
93 {
94
95 "POST",
96 bytes.NewReader(
97 pbutil.MustMarshal(&raftpb.Message{}),
98 ),
99 &fakeRaft{},
100 "1",
101 http.StatusPreconditionFailed,
102 },
103 {
104
105 "POST",
106 bytes.NewReader(
107 pbutil.MustMarshal(&raftpb.Message{}),
108 ),
109 &fakeRaft{
110 err: &resWriterToError{code: http.StatusForbidden},
111 },
112 "0",
113 http.StatusForbidden,
114 },
115 {
116
117 "POST",
118 bytes.NewReader(
119 pbutil.MustMarshal(&raftpb.Message{}),
120 ),
121 &fakeRaft{
122 err: &resWriterToError{code: http.StatusInternalServerError},
123 },
124 "0",
125 http.StatusInternalServerError,
126 },
127 {
128
129 "POST",
130 bytes.NewReader(
131 pbutil.MustMarshal(&raftpb.Message{}),
132 ),
133 &fakeRaft{err: errors.New("blah")},
134 "0",
135 http.StatusInternalServerError,
136 },
137 {
138
139 "POST",
140 bytes.NewReader(
141 pbutil.MustMarshal(&raftpb.Message{}),
142 ),
143 &fakeRaft{},
144 "0",
145 http.StatusNoContent,
146 },
147 }
148 for i, tt := range testCases {
149 req, err := http.NewRequest(tt.method, "foo", tt.body)
150 if err != nil {
151 t.Fatalf("#%d: could not create request: %#v", i, err)
152 }
153 req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
154 req.Header.Set("X-Server-Version", version.Version)
155 rw := httptest.NewRecorder()
156 h := newPipelineHandler(&Transport{Logger: zap.NewExample()}, tt.p, types.ID(0))
157
158
159 donec := make(chan struct{})
160 go func() {
161 defer func() {
162 recover()
163 close(donec)
164 }()
165 h.ServeHTTP(rw, req)
166 }()
167 <-donec
168
169 if rw.Code != tt.wcode {
170 t.Errorf("#%d: got code=%d, want %d", i, rw.Code, tt.wcode)
171 }
172 }
173 }
174
175 func TestServeRaftStreamPrefix(t *testing.T) {
176 tests := []struct {
177 path string
178 wtype streamType
179 }{
180 {
181 RaftStreamPrefix + "/message/1",
182 streamTypeMessage,
183 },
184 {
185 RaftStreamPrefix + "/msgapp/1",
186 streamTypeMsgAppV2,
187 },
188 }
189 for i, tt := range tests {
190 req, err := http.NewRequest("GET", "http://localhost:2380"+tt.path, nil)
191 if err != nil {
192 t.Fatalf("#%d: could not create request: %#v", i, err)
193 }
194 req.Header.Set("X-Etcd-Cluster-ID", "1")
195 req.Header.Set("X-Server-Version", version.Version)
196 req.Header.Set("X-Raft-To", "2")
197
198 peer := newFakePeer()
199 peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}}
200 tr := &Transport{}
201 h := newStreamHandler(tr, peerGetter, &fakeRaft{}, types.ID(2), types.ID(1))
202
203 rw := httptest.NewRecorder()
204 go h.ServeHTTP(rw, req)
205
206 var conn *outgoingConn
207 select {
208 case conn = <-peer.connc:
209 case <-time.After(time.Second):
210 t.Fatalf("#%d: failed to attach outgoingConn", i)
211 }
212 if g := rw.Header().Get("X-Server-Version"); g != version.Version {
213 t.Errorf("#%d: X-Server-Version = %s, want %s", i, g, version.Version)
214 }
215 if conn.t != tt.wtype {
216 t.Errorf("#%d: type = %s, want %s", i, conn.t, tt.wtype)
217 }
218 conn.Close()
219 }
220 }
221
222 func TestServeRaftStreamPrefixBad(t *testing.T) {
223 removedID := uint64(5)
224 tests := []struct {
225 method string
226 path string
227 clusterID string
228 remote string
229
230 wcode int
231 }{
232
233 {
234 "PUT",
235 RaftStreamPrefix + "/message/1",
236 "1",
237 "1",
238 http.StatusMethodNotAllowed,
239 },
240
241 {
242 "POST",
243 RaftStreamPrefix + "/message/1",
244 "1",
245 "1",
246 http.StatusMethodNotAllowed,
247 },
248
249 {
250 "DELETE",
251 RaftStreamPrefix + "/message/1",
252 "1",
253 "1",
254 http.StatusMethodNotAllowed,
255 },
256
257 {
258 "GET",
259 RaftStreamPrefix + "/strange/1",
260 "1",
261 "1",
262 http.StatusNotFound,
263 },
264
265 {
266 "GET",
267 RaftStreamPrefix + "/strange",
268 "1",
269 "1",
270 http.StatusNotFound,
271 },
272
273 {
274 "GET",
275 RaftStreamPrefix + "/message/2",
276 "1",
277 "1",
278 http.StatusNotFound,
279 },
280
281 {
282 "GET",
283 RaftStreamPrefix + "/message/" + fmt.Sprint(removedID),
284 "1",
285 "1",
286 http.StatusGone,
287 },
288
289 {
290 "GET",
291 RaftStreamPrefix + "/message/1",
292 "2",
293 "1",
294 http.StatusPreconditionFailed,
295 },
296
297 {
298 "GET",
299 RaftStreamPrefix + "/message/1",
300 "1",
301 "2",
302 http.StatusPreconditionFailed,
303 },
304 }
305 for i, tt := range tests {
306 req, err := http.NewRequest(tt.method, "http://localhost:2380"+tt.path, nil)
307 if err != nil {
308 t.Fatalf("#%d: could not create request: %#v", i, err)
309 }
310 req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID)
311 req.Header.Set("X-Server-Version", version.Version)
312 req.Header.Set("X-Raft-To", tt.remote)
313 rw := httptest.NewRecorder()
314 tr := &Transport{}
315 peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}}
316 r := &fakeRaft{removedID: removedID}
317 h := newStreamHandler(tr, peerGetter, r, types.ID(1), types.ID(1))
318 h.ServeHTTP(rw, req)
319
320 if rw.Code != tt.wcode {
321 t.Errorf("#%d: code = %d, want %d", i, rw.Code, tt.wcode)
322 }
323 }
324 }
325
326 func TestCloseNotifier(t *testing.T) {
327 c := newCloseNotifier()
328 select {
329 case <-c.closeNotify():
330 t.Fatalf("received unexpected close notification")
331 default:
332 }
333 c.Close()
334 select {
335 case <-c.closeNotify():
336 default:
337 t.Fatalf("failed to get close notification")
338 }
339 }
340
341
342 type errReader struct{}
343
344 func (er *errReader) Read(_ []byte) (int, error) { return 0, errors.New("some error") }
345
346 type resWriterToError struct {
347 code int
348 }
349
350 func (e *resWriterToError) Error() string { return "" }
351 func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) }
352
353 type fakePeerGetter struct {
354 peers map[types.ID]Peer
355 }
356
357 func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] }
358
359 type fakePeer struct {
360 msgs []raftpb.Message
361 snapMsgs []snap.Message
362 peerURLs types.URLs
363 connc chan *outgoingConn
364 paused bool
365 }
366
367 func newFakePeer() *fakePeer {
368 fakeURL, _ := url.Parse("http://localhost")
369 return &fakePeer{
370 connc: make(chan *outgoingConn, 1),
371 peerURLs: types.URLs{*fakeURL},
372 }
373 }
374
375 func (pr *fakePeer) send(m raftpb.Message) {
376 if pr.paused {
377 return
378 }
379 pr.msgs = append(pr.msgs, m)
380 }
381
382 func (pr *fakePeer) sendSnap(m snap.Message) {
383 if pr.paused {
384 return
385 }
386 pr.snapMsgs = append(pr.snapMsgs, m)
387 }
388
389 func (pr *fakePeer) update(urls types.URLs) { pr.peerURLs = urls }
390 func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
391 func (pr *fakePeer) activeSince() time.Time { return time.Time{} }
392 func (pr *fakePeer) stop() {}
393 func (pr *fakePeer) Pause() { pr.paused = true }
394 func (pr *fakePeer) Resume() { pr.paused = false }
395
View as plain text