1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "context"
19 "net/http/httptest"
20 "reflect"
21 "testing"
22 "time"
23
24 "go.etcd.io/etcd/client/pkg/v3/types"
25 "go.etcd.io/etcd/raft/v3"
26 "go.etcd.io/etcd/raft/v3/raftpb"
27 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
28
29 "go.uber.org/zap"
30 )
31
32 func TestSendMessage(t *testing.T) {
33
34 tr := &Transport{
35 ID: types.ID(1),
36 ClusterID: types.ID(1),
37 Raft: &fakeRaft{},
38 ServerStats: newServerStats(),
39 LeaderStats: stats.NewLeaderStats(zap.NewExample(), "1"),
40 }
41 tr.Start()
42 srv := httptest.NewServer(tr.Handler())
43 defer srv.Close()
44
45
46 recvc := make(chan raftpb.Message, 1)
47 p := &fakeRaft{recvc: recvc}
48 tr2 := &Transport{
49 ID: types.ID(2),
50 ClusterID: types.ID(1),
51 Raft: p,
52 ServerStats: newServerStats(),
53 LeaderStats: stats.NewLeaderStats(zap.NewExample(), "2"),
54 }
55 tr2.Start()
56 srv2 := httptest.NewServer(tr2.Handler())
57 defer srv2.Close()
58
59 tr.AddPeer(types.ID(2), []string{srv2.URL})
60 defer tr.Stop()
61 tr2.AddPeer(types.ID(1), []string{srv.URL})
62 defer tr2.Stop()
63 if !waitStreamWorking(tr.Get(types.ID(2)).(*peer)) {
64 t.Fatalf("stream from 1 to 2 is not in work as expected")
65 }
66
67 data := []byte("some data")
68 tests := []raftpb.Message{
69
70 {Type: raftpb.MsgProp, From: 1, To: 2, Entries: []raftpb.Entry{{Data: data}}},
71 {Type: raftpb.MsgApp, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0, Entries: []raftpb.Entry{{Index: 4, Term: 1, Data: data}}, Commit: 3},
72 {Type: raftpb.MsgAppResp, From: 1, To: 2, Term: 1, Index: 3},
73 {Type: raftpb.MsgVote, From: 1, To: 2, Term: 1, Index: 3, LogTerm: 0},
74 {Type: raftpb.MsgVoteResp, From: 1, To: 2, Term: 1},
75 {Type: raftpb.MsgSnap, From: 1, To: 2, Term: 1, Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1000, Term: 1}, Data: data}},
76 {Type: raftpb.MsgHeartbeat, From: 1, To: 2, Term: 1, Commit: 3},
77 {Type: raftpb.MsgHeartbeatResp, From: 1, To: 2, Term: 1},
78 }
79 for i, tt := range tests {
80 tr.Send([]raftpb.Message{tt})
81 msg := <-recvc
82 if !reflect.DeepEqual(msg, tt) {
83 t.Errorf("#%d: msg = %+v, want %+v", i, msg, tt)
84 }
85 }
86 }
87
88
89
90 func TestSendMessageWhenStreamIsBroken(t *testing.T) {
91
92 tr := &Transport{
93 ID: types.ID(1),
94 ClusterID: types.ID(1),
95 Raft: &fakeRaft{},
96 ServerStats: newServerStats(),
97 LeaderStats: stats.NewLeaderStats(zap.NewExample(), "1"),
98 }
99 tr.Start()
100 srv := httptest.NewServer(tr.Handler())
101 defer srv.Close()
102
103
104 recvc := make(chan raftpb.Message, 1)
105 p := &fakeRaft{recvc: recvc}
106 tr2 := &Transport{
107 ID: types.ID(2),
108 ClusterID: types.ID(1),
109 Raft: p,
110 ServerStats: newServerStats(),
111 LeaderStats: stats.NewLeaderStats(zap.NewExample(), "2"),
112 }
113 tr2.Start()
114 srv2 := httptest.NewServer(tr2.Handler())
115 defer srv2.Close()
116
117 tr.AddPeer(types.ID(2), []string{srv2.URL})
118 defer tr.Stop()
119 tr2.AddPeer(types.ID(1), []string{srv.URL})
120 defer tr2.Stop()
121 if !waitStreamWorking(tr.Get(types.ID(2)).(*peer)) {
122 t.Fatalf("stream from 1 to 2 is not in work as expected")
123 }
124
125
126 srv.CloseClientConnections()
127 srv2.CloseClientConnections()
128 var n int
129 for {
130 select {
131
132 case <-time.After(time.Millisecond):
133 n++
134 tr.Send([]raftpb.Message{{Type: raftpb.MsgHeartbeat, From: 1, To: 2, Term: 1, Commit: 3}})
135 case <-recvc:
136 if n > 50 {
137 t.Errorf("disconnection time = %dms, want < 50ms", n)
138 }
139 return
140 }
141 }
142 }
143
144 func newServerStats() *stats.ServerStats {
145 return stats.NewServerStats("", "")
146 }
147
148 func waitStreamWorking(p *peer) bool {
149 for i := 0; i < 1000; i++ {
150 time.Sleep(time.Millisecond)
151 if _, ok := p.msgAppV2Writer.writec(); !ok {
152 continue
153 }
154 if _, ok := p.writer.writec(); !ok {
155 continue
156 }
157 return true
158 }
159 return false
160 }
161
162 type fakeRaft struct {
163 recvc chan<- raftpb.Message
164 err error
165 removedID uint64
166 }
167
168 func (p *fakeRaft) Process(ctx context.Context, m raftpb.Message) error {
169 select {
170 case p.recvc <- m:
171 default:
172 }
173 return p.err
174 }
175
176 func (p *fakeRaft) IsIDRemoved(id uint64) bool { return id == p.removedID }
177
178 func (p *fakeRaft) ReportUnreachable(id uint64) {}
179
180 func (p *fakeRaft) ReportSnapshot(id uint64, status raft.SnapshotStatus) {}
181
View as plain text