1
2
3
4
5
6
7
8
9
10
11
12
13
14 package cluster
15
16 import (
17 "context"
18 "testing"
19 "time"
20
21 "github.com/go-kit/log"
22 "github.com/hashicorp/go-sockaddr"
23 "github.com/stretchr/testify/require"
24
25 "github.com/prometheus/client_golang/prometheus"
26 )
27
28 func TestClusterJoinAndReconnect(t *testing.T) {
29 ip, _ := sockaddr.GetPrivateIP()
30 if ip == "" {
31 t.Skipf("skipping tests because no private IP address can be found")
32 return
33 }
34 t.Run("TestJoinLeave", testJoinLeave)
35 t.Run("TestReconnect", testReconnect)
36 t.Run("TestRemoveFailedPeers", testRemoveFailedPeers)
37 t.Run("TestInitiallyFailingPeers", testInitiallyFailingPeers)
38 t.Run("TestTLSConnection", testTLSConnection)
39 }
40
41 func testJoinLeave(t *testing.T) {
42 logger := log.NewNopLogger()
43 p, err := Create(
44 logger,
45 prometheus.NewRegistry(),
46 "127.0.0.1:0",
47 "",
48 []string{},
49 true,
50 DefaultPushPullInterval,
51 DefaultGossipInterval,
52 DefaultTCPTimeout,
53 DefaultProbeTimeout,
54 DefaultProbeInterval,
55 nil,
56 false,
57 )
58 require.NoError(t, err)
59 require.NotNil(t, p)
60 err = p.Join(
61 DefaultReconnectInterval,
62 DefaultReconnectTimeout,
63 )
64 require.NoError(t, err)
65 require.False(t, p.Ready())
66 {
67 ctx, cancel := context.WithCancel(context.Background())
68 cancel()
69 require.Equal(t, context.Canceled, p.WaitReady(ctx))
70 }
71 require.Equal(t, p.Status(), "settling")
72 go p.Settle(context.Background(), 0*time.Second)
73 require.NoError(t, p.WaitReady(context.Background()))
74 require.Equal(t, p.Status(), "ready")
75
76
77 p2, err := Create(
78 logger,
79 prometheus.NewRegistry(),
80 "127.0.0.1:0",
81 "",
82 []string{p.Self().Address()},
83 true,
84 DefaultPushPullInterval,
85 DefaultGossipInterval,
86 DefaultTCPTimeout,
87 DefaultProbeTimeout,
88 DefaultProbeInterval,
89 nil,
90 false,
91 )
92 require.NoError(t, err)
93 require.NotNil(t, p2)
94 err = p2.Join(
95 DefaultReconnectInterval,
96 DefaultReconnectTimeout,
97 )
98 require.NoError(t, err)
99 go p2.Settle(context.Background(), 0*time.Second)
100 require.NoError(t, p2.WaitReady(context.Background()))
101
102 require.Equal(t, 2, p.ClusterSize())
103 p2.Leave(0 * time.Second)
104 require.Equal(t, 1, p.ClusterSize())
105 require.Equal(t, 1, len(p.failedPeers))
106 require.Equal(t, p2.Self().Address(), p.peers[p2.Self().Address()].Node.Address())
107 require.Equal(t, p2.Name(), p.failedPeers[0].Name)
108 }
109
110 func testReconnect(t *testing.T) {
111 logger := log.NewNopLogger()
112 p, err := Create(
113 logger,
114 prometheus.NewRegistry(),
115 "127.0.0.1:0",
116 "",
117 []string{},
118 true,
119 DefaultPushPullInterval,
120 DefaultGossipInterval,
121 DefaultTCPTimeout,
122 DefaultProbeTimeout,
123 DefaultProbeInterval,
124 nil,
125 false,
126 )
127 require.NoError(t, err)
128 require.NotNil(t, p)
129 err = p.Join(
130 DefaultReconnectInterval,
131 DefaultReconnectTimeout,
132 )
133 require.NoError(t, err)
134 go p.Settle(context.Background(), 0*time.Second)
135 require.NoError(t, p.WaitReady(context.Background()))
136
137 p2, err := Create(
138 logger,
139 prometheus.NewRegistry(),
140 "127.0.0.1:0",
141 "",
142 []string{},
143 true,
144 DefaultPushPullInterval,
145 DefaultGossipInterval,
146 DefaultTCPTimeout,
147 DefaultProbeTimeout,
148 DefaultProbeInterval,
149 nil,
150 false,
151 )
152 require.NoError(t, err)
153 require.NotNil(t, p2)
154 err = p2.Join(
155 DefaultReconnectInterval,
156 DefaultReconnectTimeout,
157 )
158 require.NoError(t, err)
159 go p2.Settle(context.Background(), 0*time.Second)
160 require.NoError(t, p2.WaitReady(context.Background()))
161
162 p.peerJoin(p2.Self())
163 p.peerLeave(p2.Self())
164
165 require.Equal(t, 1, p.ClusterSize())
166 require.Equal(t, 1, len(p.failedPeers))
167
168 p.reconnect()
169
170 require.Equal(t, 2, p.ClusterSize())
171 require.Equal(t, 0, len(p.failedPeers))
172 require.Equal(t, StatusAlive, p.peers[p2.Self().Address()].status)
173 }
174
175 func testRemoveFailedPeers(t *testing.T) {
176 logger := log.NewNopLogger()
177 p, err := Create(
178 logger,
179 prometheus.NewRegistry(),
180 "127.0.0.1:0",
181 "",
182 []string{},
183 true,
184 DefaultPushPullInterval,
185 DefaultGossipInterval,
186 DefaultTCPTimeout,
187 DefaultProbeTimeout,
188 DefaultProbeInterval,
189 nil,
190 false,
191 )
192 require.NoError(t, err)
193 require.NotNil(t, p)
194 err = p.Join(
195 DefaultReconnectInterval,
196 DefaultReconnectTimeout,
197 )
198 require.NoError(t, err)
199 n := p.Self()
200
201 now := time.Now()
202 p1 := peer{
203 status: StatusFailed,
204 leaveTime: now,
205 Node: n,
206 }
207 p2 := peer{
208 status: StatusFailed,
209 leaveTime: now.Add(-time.Hour),
210 Node: n,
211 }
212 p3 := peer{
213 status: StatusFailed,
214 leaveTime: now.Add(30 * -time.Minute),
215 Node: n,
216 }
217 p.failedPeers = []peer{p1, p2, p3}
218
219 p.removeFailedPeers(30 * time.Minute)
220 require.Equal(t, 1, len(p.failedPeers))
221 require.Equal(t, p1, p.failedPeers[0])
222 }
223
224 func testInitiallyFailingPeers(t *testing.T) {
225 logger := log.NewNopLogger()
226 myAddr := "1.2.3.4:5000"
227 peerAddrs := []string{myAddr, "2.3.4.5:5000", "3.4.5.6:5000", "foo.example.com:5000"}
228 p, err := Create(
229 logger,
230 prometheus.NewRegistry(),
231 "127.0.0.1:0",
232 "",
233 []string{},
234 true,
235 DefaultPushPullInterval,
236 DefaultGossipInterval,
237 DefaultTCPTimeout,
238 DefaultProbeTimeout,
239 DefaultProbeInterval,
240 nil,
241 false,
242 )
243 require.NoError(t, err)
244 require.NotNil(t, p)
245 err = p.Join(
246 DefaultReconnectInterval,
247 DefaultReconnectTimeout,
248 )
249 require.NoError(t, err)
250
251 p.setInitialFailed(peerAddrs, myAddr)
252
253
254
255 require.Equal(t, len(peerAddrs)-2, len(p.failedPeers))
256 for _, addr := range peerAddrs {
257 if addr == myAddr || addr == "foo.example.com:5000" {
258 continue
259 }
260
261 pr, ok := p.peers[addr]
262 require.True(t, ok)
263 require.Equal(t, StatusFailed.String(), pr.status.String())
264 require.Equal(t, addr, pr.Address())
265 expectedLen := len(p.failedPeers) - 1
266 p.peerJoin(pr.Node)
267 require.Equal(t, expectedLen, len(p.failedPeers))
268 }
269 }
270
271 func testTLSConnection(t *testing.T) {
272 logger := log.NewNopLogger()
273 tlsTransportConfig1, err := GetTLSTransportConfig("./testdata/tls_config_node1.yml")
274 require.NoError(t, err)
275 p1, err := Create(
276 logger,
277 prometheus.NewRegistry(),
278 "127.0.0.1:0",
279 "",
280 []string{},
281 true,
282 DefaultPushPullInterval,
283 DefaultGossipInterval,
284 DefaultTCPTimeout,
285 DefaultProbeTimeout,
286 DefaultProbeInterval,
287 tlsTransportConfig1,
288 false,
289 )
290 require.NoError(t, err)
291 require.NotNil(t, p1)
292 err = p1.Join(
293 DefaultReconnectInterval,
294 DefaultReconnectTimeout,
295 )
296 require.NoError(t, err)
297 require.False(t, p1.Ready())
298 require.Equal(t, p1.Status(), "settling")
299 go p1.Settle(context.Background(), 0*time.Second)
300 p1.WaitReady(context.Background())
301 require.Equal(t, p1.Status(), "ready")
302
303
304 tlsTransportConfig2, err := GetTLSTransportConfig("./testdata/tls_config_node2.yml")
305 require.NoError(t, err)
306 p2, err := Create(
307 logger,
308 prometheus.NewRegistry(),
309 "127.0.0.1:0",
310 "",
311 []string{p1.Self().Address()},
312 true,
313 DefaultPushPullInterval,
314 DefaultGossipInterval,
315 DefaultTCPTimeout,
316 DefaultProbeTimeout,
317 DefaultProbeInterval,
318 tlsTransportConfig2,
319 false,
320 )
321 require.NoError(t, err)
322 require.NotNil(t, p2)
323 err = p2.Join(
324 DefaultReconnectInterval,
325 DefaultReconnectTimeout,
326 )
327 require.NoError(t, err)
328 go p2.Settle(context.Background(), 0*time.Second)
329
330 require.Equal(t, 2, p1.ClusterSize())
331 p2.Leave(0 * time.Second)
332 require.Equal(t, 1, p1.ClusterSize())
333 require.Equal(t, 1, len(p1.failedPeers))
334 require.Equal(t, p2.Self().Address(), p1.peers[p2.Self().Address()].Node.Address())
335 require.Equal(t, p2.Name(), p1.failedPeers[0].Name)
336 }
337
View as plain text