1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "fmt"
19 "io"
20 "io/ioutil"
21 "net/http"
22 "net/http/httptest"
23 "os"
24 "strings"
25 "testing"
26 "time"
27
28 "go.etcd.io/etcd/client/pkg/v3/types"
29 "go.etcd.io/etcd/raft/v3/raftpb"
30 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
31
32 "go.uber.org/zap"
33 )
34
35 type strReaderCloser struct{ *strings.Reader }
36
37 func (s strReaderCloser) Close() error { return nil }
38
39 func TestSnapshotSend(t *testing.T) {
40 tests := []struct {
41 m raftpb.Message
42 rc io.ReadCloser
43 size int64
44
45 wsent bool
46 wfiles int
47 }{
48
49 {
50 m: raftpb.Message{Type: raftpb.MsgSnap, To: 1},
51 rc: strReaderCloser{strings.NewReader("hello")},
52 size: 5,
53
54 wsent: true,
55 wfiles: 1,
56 },
57
58 {
59 m: raftpb.Message{Type: raftpb.MsgSnap, To: 1},
60 rc: &errReadCloser{fmt.Errorf("snapshot error")},
61 size: 1,
62
63 wsent: false,
64 wfiles: 0,
65 },
66
67 {
68 m: raftpb.Message{Type: raftpb.MsgSnap, To: 1},
69 rc: strReaderCloser{strings.NewReader("hello")},
70 size: 10000,
71
72 wsent: false,
73 wfiles: 0,
74 },
75
76 {
77 m: raftpb.Message{Type: raftpb.MsgSnap, To: 1},
78 rc: strReaderCloser{strings.NewReader("hello")},
79 size: 1,
80
81 wsent: false,
82 wfiles: 0,
83 },
84 }
85
86 for i, tt := range tests {
87 sent, files := testSnapshotSend(t, snap.NewMessage(tt.m, tt.rc, tt.size))
88 if tt.wsent != sent {
89 t.Errorf("#%d: snapshot expected %v, got %v", i, tt.wsent, sent)
90 }
91 if tt.wfiles != len(files) {
92 t.Fatalf("#%d: expected %d files, got %d files", i, tt.wfiles, len(files))
93 }
94 }
95 }
96
97 func testSnapshotSend(t *testing.T, sm *snap.Message) (bool, []os.FileInfo) {
98 d, err := ioutil.TempDir(os.TempDir(), "snapdir")
99 if err != nil {
100 t.Fatal(err)
101 }
102 defer os.RemoveAll(d)
103
104 r := &fakeRaft{}
105 tr := &Transport{pipelineRt: &http.Transport{}, ClusterID: types.ID(1), Raft: r}
106 ch := make(chan struct{}, 1)
107 h := &syncHandler{newSnapshotHandler(tr, r, snap.New(zap.NewExample(), d), types.ID(1)), ch}
108 srv := httptest.NewServer(h)
109 defer srv.Close()
110
111 picker := mustNewURLPicker(t, []string{srv.URL})
112 snapsend := newSnapshotSender(tr, picker, types.ID(1), newPeerStatus(zap.NewExample(), types.ID(0), types.ID(1)))
113 defer snapsend.stop()
114
115 snapsend.send(*sm)
116
117 sent := false
118 select {
119 case <-time.After(time.Second):
120 t.Fatalf("timed out sending snapshot")
121 case sent = <-sm.CloseNotify():
122 }
123
124
125 <-ch
126
127 files, rerr := ioutil.ReadDir(d)
128 if rerr != nil {
129 t.Fatal(rerr)
130 }
131 return sent, files
132 }
133
134 type errReadCloser struct{ err error }
135
136 func (s *errReadCloser) Read(p []byte) (int, error) { return 0, s.err }
137 func (s *errReadCloser) Close() error { return s.err }
138
139 type syncHandler struct {
140 h http.Handler
141 ch chan<- struct{}
142 }
143
144 func (sh *syncHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
145 sh.h.ServeHTTP(w, r)
146 sh.ch <- struct{}{}
147 }
148
View as plain text