1
2
3
4
5
6
7 package topology
8
9 import (
10 "bytes"
11 "context"
12 "io"
13 "math"
14 "net"
15 "sync"
16 "sync/atomic"
17 "testing"
18 "time"
19
20 "go.mongodb.org/mongo-driver/internal/assert"
21 "go.mongodb.org/mongo-driver/internal/require"
22 "go.mongodb.org/mongo-driver/mongo/address"
23 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
24 "go.mongodb.org/mongo-driver/x/mongo/driver"
25 "go.mongodb.org/mongo-driver/x/mongo/driver/drivertest"
26 "go.mongodb.org/mongo-driver/x/mongo/driver/operation"
27 )
28
29 func makeHelloReply() []byte {
30 doc := bsoncore.NewDocumentBuilder().AppendInt32("ok", 1).Build()
31 return drivertest.MakeReply(doc)
32 }
33
34 var _ net.Conn = &mockSlowConn{}
35
36 type mockSlowConn struct {
37 reader *bytes.Reader
38 delay time.Duration
39 closed atomic.Value
40 }
41
42
43
44
45 func newMockSlowConn(response []byte, delay time.Duration) *mockSlowConn {
46 var closed atomic.Value
47 closed.Store(false)
48
49 return &mockSlowConn{
50 reader: bytes.NewReader(response),
51 delay: delay,
52 closed: closed,
53 }
54 }
55
56 func (msc *mockSlowConn) Read(b []byte) (int, error) {
57 time.Sleep(msc.delay)
58 if msc.closed.Load().(bool) {
59 return 0, io.ErrUnexpectedEOF
60 }
61 return msc.reader.Read(b)
62 }
63
64 func (msc *mockSlowConn) Write(b []byte) (int, error) {
65 if msc.closed.Load().(bool) {
66 return 0, io.ErrUnexpectedEOF
67 }
68 _, err := msc.reader.Seek(0, io.SeekStart)
69 return len(b), err
70 }
71
72
73
74 func (msc *mockSlowConn) Close() error {
75 msc.closed.Store(true)
76 return nil
77 }
78
79 func (*mockSlowConn) LocalAddr() net.Addr { return nil }
80 func (*mockSlowConn) RemoteAddr() net.Addr { return nil }
81 func (*mockSlowConn) SetDeadline(_ time.Time) error { return nil }
82 func (*mockSlowConn) SetReadDeadline(_ time.Time) error { return nil }
83 func (*mockSlowConn) SetWriteDeadline(_ time.Time) error { return nil }
84
85 func TestRTTMonitor(t *testing.T) {
86 t.Run("measures the average, minimum and 90th percentile RTT", func(t *testing.T) {
87 t.Parallel()
88
89 dialer := DialerFunc(func(_ context.Context, _, _ string) (net.Conn, error) {
90 return newMockSlowConn(makeHelloReply(), 10*time.Millisecond), nil
91 })
92 rtt := newRTTMonitor(&rttConfig{
93 interval: 10 * time.Millisecond,
94 createConnectionFn: func() *connection {
95 return newConnection("", WithDialer(func(Dialer) Dialer { return dialer }))
96 },
97 createOperationFn: func(conn driver.Connection) *operation.Hello {
98 return operation.NewHello().Deployment(driver.SingleConnectionDeployment{C: conn})
99 },
100 })
101 rtt.connect()
102 defer rtt.disconnect()
103
104 assert.Eventuallyf(
105 t,
106 func() bool { return rtt.EWMA() > 0 && rtt.Min() > 0 && rtt.P90() > 0 },
107 1*time.Second,
108 10*time.Millisecond,
109 "expected EWMA(), Min() and P90() to return positive durations within 1 second")
110 assert.True(
111 t,
112 rtt.EWMA() > 0,
113 "expected EWMA() to return a positive duration, got %v",
114 rtt.EWMA())
115 assert.True(
116 t,
117 rtt.Min() > 0,
118 "expected Min() to return a positive duration, got %v",
119 rtt.Min())
120 assert.True(
121 t,
122 rtt.P90() > 0,
123 "expected P90() to return a positive duration, got %v",
124 rtt.P90())
125 })
126
127 t.Run("creates the correct size samples slice", func(t *testing.T) {
128 t.Parallel()
129
130 cases := []struct {
131 desc string
132 interval time.Duration
133 wantSamplesLen int
134 }{
135 {
136 desc: "default",
137 interval: 10 * time.Second,
138 wantSamplesLen: 30,
139 },
140 {
141 desc: "min",
142 interval: 10 * time.Minute,
143 wantSamplesLen: 10,
144 },
145 {
146 desc: "max",
147 interval: 1 * time.Millisecond,
148 wantSamplesLen: 500,
149 },
150 }
151 for _, tc := range cases {
152 t.Run(tc.desc, func(t *testing.T) {
153 rtt := newRTTMonitor(&rttConfig{
154 interval: tc.interval,
155 minRTTWindow: 5 * time.Minute,
156 })
157 assert.Equal(t, tc.wantSamplesLen, len(rtt.samples), "expected samples length to match")
158 })
159 }
160 })
161
162 t.Run("can connect and disconnect repeatedly", func(t *testing.T) {
163 t.Parallel()
164
165 dialer := DialerFunc(func(_ context.Context, _, _ string) (net.Conn, error) {
166 return newMockSlowConn(makeHelloReply(), 10*time.Millisecond), nil
167 })
168 rtt := newRTTMonitor(&rttConfig{
169 interval: 10 * time.Second,
170 createConnectionFn: func() *connection {
171 return newConnection("", WithDialer(func(Dialer) Dialer {
172 return dialer
173 }))
174 },
175 createOperationFn: func(conn driver.Connection) *operation.Hello {
176 return operation.NewHello().Deployment(driver.SingleConnectionDeployment{C: conn})
177 },
178 })
179 for i := 0; i < 100; i++ {
180 rtt.connect()
181 rtt.disconnect()
182 }
183 })
184
185 t.Run("works after reset", func(t *testing.T) {
186 t.Parallel()
187
188 dialer := DialerFunc(func(_ context.Context, _, _ string) (net.Conn, error) {
189 return newMockSlowConn(makeHelloReply(), 10*time.Millisecond), nil
190 })
191 rtt := newRTTMonitor(&rttConfig{
192 interval: 10 * time.Millisecond,
193 createConnectionFn: func() *connection {
194 return newConnection("", WithDialer(func(Dialer) Dialer { return dialer }))
195 },
196 createOperationFn: func(conn driver.Connection) *operation.Hello {
197 return operation.NewHello().Deployment(driver.SingleConnectionDeployment{C: conn})
198 },
199 })
200 rtt.connect()
201 defer rtt.disconnect()
202
203 for i := 0; i < 3; i++ {
204 assert.Eventuallyf(
205 t,
206 func() bool { return rtt.EWMA() > 0 },
207 1*time.Second,
208 10*time.Millisecond,
209 "expected EWMA() to return a positive duration within 1 second")
210 assert.Eventuallyf(
211 t,
212 func() bool { return rtt.Min() > 0 },
213 1*time.Second,
214 10*time.Millisecond,
215 "expected Min() to return a positive duration within 1 second")
216 assert.Eventuallyf(
217 t,
218 func() bool { return rtt.P90() > 0 },
219 1*time.Second,
220 10*time.Millisecond,
221 "expected P90() to return a positive duration within 1 second")
222 rtt.reset()
223 }
224 })
225
226
227
228
229
230
231 t.Run("stuck operations time out", func(t *testing.T) {
232 t.Parallel()
233
234
235
236
237 l, err := net.Listen("tcp", "localhost:0")
238 require.NoError(t, err)
239 var wg sync.WaitGroup
240 wg.Add(1)
241 go func() {
242 defer wg.Done()
243
244 for i := 0; ; i++ {
245 conn, err := l.Accept()
246 if err != nil {
247
248
249
250 t.Logf("Accept error: %v", err)
251 return
252 }
253
254
255
256 defer conn.Close()
257
258 wg.Add(1)
259 go func(i int) {
260 defer wg.Done()
261
262 buf := make([]byte, 256)
263 for {
264 if _, err := conn.Read(buf); err != nil {
265
266
267
268 t.Logf("Read error: %v", err)
269 return
270 }
271
272
273
274 if i < 2 {
275 return
276 }
277
278
279
280 time.Sleep(10 * time.Millisecond)
281
282 if _, err := conn.Write(makeHelloReply()); err != nil {
283
284
285
286 t.Logf("Write error: %v", err)
287 return
288 }
289 }
290 }(i)
291 }
292 }()
293
294 rtt := newRTTMonitor(&rttConfig{
295 interval: 10 * time.Millisecond,
296 timeout: 100 * time.Millisecond,
297 createConnectionFn: func() *connection {
298 return newConnection(address.Address(l.Addr().String()))
299 },
300 createOperationFn: func(conn driver.Connection) *operation.Hello {
301 return operation.NewHello().Deployment(driver.SingleConnectionDeployment{C: conn})
302 },
303 })
304 rtt.connect()
305
306 assert.Eventuallyf(
307 t,
308 func() bool { return rtt.EWMA() > 0 },
309 1*time.Second,
310 10*time.Millisecond,
311 "expected EWMA() to return a positive duration within 1 second")
312 assert.Eventuallyf(
313 t,
314 func() bool { return rtt.Min() > 0 },
315 1*time.Second,
316 10*time.Millisecond,
317 "expected Min() to return a positive duration within 1 second")
318 assert.Eventuallyf(
319 t,
320 func() bool { return rtt.P90() > 0 },
321 1*time.Second,
322 10*time.Millisecond,
323 "expected P90() to return a positive duration within 1 second")
324
325 rtt.disconnect()
326 l.Close()
327 wg.Wait()
328 })
329 }
330
331 func TestMin(t *testing.T) {
332 cases := []struct {
333 desc string
334 samples []time.Duration
335 minSamples int
336 want time.Duration
337 }{
338 {
339 desc: "Should return the min for minSamples = 0",
340 samples: []time.Duration{1, 0, 0, 0},
341 minSamples: 0,
342 want: 1,
343 },
344 {
345 desc: "Should return 0 for fewer than minSamples samples",
346 samples: []time.Duration{1, 0, 0, 0},
347 minSamples: 2,
348 want: 0,
349 },
350 {
351 desc: "Should return 0 for empty samples slice",
352 samples: []time.Duration{},
353 minSamples: 0,
354 want: 0,
355 },
356 {
357 desc: "Should return 0 for no valid samples",
358 samples: []time.Duration{0, 0, 0},
359 minSamples: 0,
360 want: 0,
361 },
362 {
363 desc: "Should return max int64 if all samples are max int64",
364 samples: []time.Duration{math.MaxInt64, math.MaxInt64, math.MaxInt64},
365 minSamples: 0,
366 want: math.MaxInt64,
367 },
368 {
369 desc: "Should return the minimum if there are enough samples",
370 samples: []time.Duration{1 * time.Second, 100 * time.Millisecond, 150 * time.Millisecond, 0, 0, 0},
371 minSamples: 3,
372 want: 100 * time.Millisecond,
373 },
374 {
375 desc: "Should return 0 if there are are not enough samples",
376 samples: []time.Duration{1 * time.Second, 100 * time.Millisecond, 0, 0, 0, 0},
377 minSamples: 3,
378 want: 0,
379 },
380 }
381
382 for _, tc := range cases {
383 tc := tc
384 t.Run(tc.desc, func(t *testing.T) {
385 t.Parallel()
386
387 got := min(tc.samples, tc.minSamples)
388 assert.Equal(t, tc.want, got, "unexpected result from min()")
389 })
390 }
391 }
392
393 func TestPercentile(t *testing.T) {
394 cases := []struct {
395 desc string
396 samples []time.Duration
397 minSamples int
398 percentile float64
399 want time.Duration
400 }{
401 {
402 desc: "Should return 0 for fewer than minSamples samples",
403 samples: []time.Duration{1, 0, 0, 0},
404 minSamples: 2,
405 percentile: 90.0,
406 want: 0,
407 },
408 {
409 desc: "Should return 0 for empty samples slice",
410 samples: []time.Duration{},
411 minSamples: 0,
412 percentile: 90.0,
413 want: 0,
414 },
415 {
416 desc: "Should return 0 for no valid samples",
417 samples: []time.Duration{0, 0, 0},
418 minSamples: 0,
419 percentile: 90.0,
420 want: 0,
421 },
422 {
423 desc: "First tertile when minSamples = 0",
424 samples: []time.Duration{1, 2, 3, 0, 0, 0},
425 minSamples: 0,
426 percentile: 33.34,
427 want: 1,
428 },
429 {
430 desc: "90th percentile when there are enough samples",
431 samples: []time.Duration{
432 100 * time.Millisecond,
433 200 * time.Millisecond,
434 300 * time.Millisecond,
435 400 * time.Millisecond,
436 500 * time.Millisecond,
437 600 * time.Millisecond,
438 700 * time.Millisecond,
439 800 * time.Millisecond,
440 900 * time.Millisecond,
441 1 * time.Second,
442 0, 0, 0},
443 minSamples: 10,
444 percentile: 90.0,
445 want: 900 * time.Millisecond,
446 },
447 {
448 desc: "10th percentile when there are enough samples",
449 samples: []time.Duration{
450 100 * time.Millisecond,
451 200 * time.Millisecond,
452 300 * time.Millisecond,
453 400 * time.Millisecond,
454 500 * time.Millisecond,
455 600 * time.Millisecond,
456 700 * time.Millisecond,
457 800 * time.Millisecond,
458 900 * time.Millisecond,
459 1 * time.Second,
460 0, 0, 0},
461 minSamples: 10,
462 percentile: 10.0,
463 want: 100 * time.Millisecond,
464 },
465 }
466
467 for _, tc := range cases {
468 tc := tc
469 t.Run(tc.desc, func(t *testing.T) {
470 t.Parallel()
471
472 got := percentile(tc.percentile, tc.samples, tc.minSamples)
473 assert.Equal(t, tc.want, got, "unexpected result from percentile()")
474 })
475 }
476 }
477
View as plain text