1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "net/http"
19 "reflect"
20 "testing"
21 "time"
22
23 "go.etcd.io/etcd/client/pkg/v3/testutil"
24 "go.etcd.io/etcd/client/pkg/v3/types"
25 "go.etcd.io/etcd/raft/v3/raftpb"
26 stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"
27
28 "github.com/xiang90/probing"
29 "go.uber.org/zap"
30 )
31
32
33
34 func TestTransportSend(t *testing.T) {
35 peer1 := newFakePeer()
36 peer2 := newFakePeer()
37 tr := &Transport{
38 ServerStats: stats.NewServerStats("", ""),
39 peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
40 }
41 wmsgsIgnored := []raftpb.Message{
42
43 {Type: raftpb.MsgBeat},
44
45 {Type: raftpb.MsgProp, To: 3},
46 }
47 wmsgsTo1 := []raftpb.Message{
48
49 {Type: raftpb.MsgProp, To: 1},
50 {Type: raftpb.MsgApp, To: 1},
51 }
52 wmsgsTo2 := []raftpb.Message{
53
54 {Type: raftpb.MsgProp, To: 2},
55 {Type: raftpb.MsgApp, To: 2},
56 }
57 tr.Send(wmsgsIgnored)
58 tr.Send(wmsgsTo1)
59 tr.Send(wmsgsTo2)
60
61 if !reflect.DeepEqual(peer1.msgs, wmsgsTo1) {
62 t.Errorf("msgs to peer 1 = %+v, want %+v", peer1.msgs, wmsgsTo1)
63 }
64 if !reflect.DeepEqual(peer2.msgs, wmsgsTo2) {
65 t.Errorf("msgs to peer 2 = %+v, want %+v", peer2.msgs, wmsgsTo2)
66 }
67 }
68
69 func TestTransportCutMend(t *testing.T) {
70 peer1 := newFakePeer()
71 peer2 := newFakePeer()
72 tr := &Transport{
73 ServerStats: stats.NewServerStats("", ""),
74 peers: map[types.ID]Peer{types.ID(1): peer1, types.ID(2): peer2},
75 }
76
77 tr.CutPeer(types.ID(1))
78
79 wmsgsTo := []raftpb.Message{
80
81 {Type: raftpb.MsgProp, To: 1},
82 {Type: raftpb.MsgApp, To: 1},
83 }
84
85 tr.Send(wmsgsTo)
86 if len(peer1.msgs) > 0 {
87 t.Fatalf("msgs expected to be ignored, got %+v", peer1.msgs)
88 }
89
90 tr.MendPeer(types.ID(1))
91
92 tr.Send(wmsgsTo)
93 if !reflect.DeepEqual(peer1.msgs, wmsgsTo) {
94 t.Errorf("msgs to peer 1 = %+v, want %+v", peer1.msgs, wmsgsTo)
95 }
96 }
97
98 func TestTransportAdd(t *testing.T) {
99 ls := stats.NewLeaderStats(zap.NewExample(), "")
100 tr := &Transport{
101 LeaderStats: ls,
102 streamRt: &roundTripperRecorder{},
103 peers: make(map[types.ID]Peer),
104 pipelineProber: probing.NewProber(nil),
105 streamProber: probing.NewProber(nil),
106 }
107 tr.AddPeer(1, []string{"http://localhost:2380"})
108
109 if _, ok := ls.Followers["1"]; !ok {
110 t.Errorf("FollowerStats[1] is nil, want exists")
111 }
112 s, ok := tr.peers[types.ID(1)]
113 if !ok {
114 tr.Stop()
115 t.Fatalf("senders[1] is nil, want exists")
116 }
117
118
119 tr.AddPeer(1, []string{"http://localhost:2380"})
120 ns := tr.peers[types.ID(1)]
121 if s != ns {
122 t.Errorf("sender = %v, want %v", ns, s)
123 }
124
125 tr.Stop()
126 }
127
128 func TestTransportRemove(t *testing.T) {
129 tr := &Transport{
130 LeaderStats: stats.NewLeaderStats(zap.NewExample(), ""),
131 streamRt: &roundTripperRecorder{},
132 peers: make(map[types.ID]Peer),
133 pipelineProber: probing.NewProber(nil),
134 streamProber: probing.NewProber(nil),
135 }
136 tr.AddPeer(1, []string{"http://localhost:2380"})
137 tr.RemovePeer(types.ID(1))
138 defer tr.Stop()
139
140 if _, ok := tr.peers[types.ID(1)]; ok {
141 t.Fatalf("senders[1] exists, want removed")
142 }
143 }
144
145 func TestTransportUpdate(t *testing.T) {
146 peer := newFakePeer()
147 tr := &Transport{
148 peers: map[types.ID]Peer{types.ID(1): peer},
149 pipelineProber: probing.NewProber(nil),
150 streamProber: probing.NewProber(nil),
151 }
152 u := "http://localhost:2380"
153 tr.UpdatePeer(types.ID(1), []string{u})
154 wurls := types.URLs(testutil.MustNewURLs(t, []string{"http://localhost:2380"}))
155 if !reflect.DeepEqual(peer.peerURLs, wurls) {
156 t.Errorf("urls = %+v, want %+v", peer.peerURLs, wurls)
157 }
158 }
159
160 func TestTransportErrorc(t *testing.T) {
161 errorc := make(chan error, 1)
162 tr := &Transport{
163 Raft: &fakeRaft{},
164 LeaderStats: stats.NewLeaderStats(zap.NewExample(), ""),
165 ErrorC: errorc,
166 streamRt: newRespRoundTripper(http.StatusForbidden, nil),
167 pipelineRt: newRespRoundTripper(http.StatusForbidden, nil),
168 peers: make(map[types.ID]Peer),
169 pipelineProber: probing.NewProber(nil),
170 streamProber: probing.NewProber(nil),
171 }
172 tr.AddPeer(1, []string{"http://localhost:2380"})
173 defer tr.Stop()
174
175 select {
176 case <-errorc:
177 t.Fatalf("received unexpected from errorc")
178 case <-time.After(10 * time.Millisecond):
179 }
180 tr.peers[1].send(raftpb.Message{})
181
182 select {
183 case <-errorc:
184 case <-time.After(1 * time.Second):
185 t.Fatalf("cannot receive error from errorc")
186 }
187 }
188
View as plain text