1
2
3
4
5
6
7 package topology
8
9 import (
10 "context"
11 "errors"
12 "net"
13 "sync"
14 "testing"
15 "time"
16
17 "go.mongodb.org/mongo-driver/event"
18 "go.mongodb.org/mongo-driver/internal/assert"
19 "go.mongodb.org/mongo-driver/internal/eventtest"
20 "go.mongodb.org/mongo-driver/internal/require"
21 "go.mongodb.org/mongo-driver/mongo/address"
22 "go.mongodb.org/mongo-driver/x/mongo/driver"
23 "go.mongodb.org/mongo-driver/x/mongo/driver/operation"
24 )
25
26 func TestPool(t *testing.T) {
27 t.Run("newPool", func(t *testing.T) {
28 t.Parallel()
29
30 t.Run("minPoolSize should not exceed maxPoolSize", func(t *testing.T) {
31 t.Parallel()
32
33 p := newPool(poolConfig{MinPoolSize: 100, MaxPoolSize: 10})
34 assert.Equalf(t, uint64(10), p.minSize, "expected minSize of a pool not to be greater than maxSize")
35
36 p.close(context.Background())
37 })
38 t.Run("minPoolSize may exceed maxPoolSize of 0", func(t *testing.T) {
39 t.Parallel()
40
41 p := newPool(poolConfig{MinPoolSize: 10, MaxPoolSize: 0})
42 assert.Equalf(t, uint64(10), p.minSize, "expected minSize of a pool to be greater than maxSize of 0")
43
44 p.close(context.Background())
45 })
46 t.Run("should be paused", func(t *testing.T) {
47 t.Parallel()
48
49 p := newPool(poolConfig{})
50 assert.Equalf(t, poolPaused, p.getState(), "expected new pool to be paused")
51
52 p.close(context.Background())
53 })
54 })
55 t.Run("closeConnection", func(t *testing.T) {
56 t.Parallel()
57
58 t.Run("can't close connection from different pool", func(t *testing.T) {
59 t.Parallel()
60
61 cleanup := make(chan struct{})
62 defer close(cleanup)
63 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
64 <-cleanup
65 _ = nc.Close()
66 })
67
68 p1 := newPool(poolConfig{
69 Address: address.Address(addr.String()),
70 })
71 err := p1.ready()
72 noerr(t, err)
73
74 c, err := p1.checkOut(context.Background())
75 noerr(t, err)
76
77 p2 := newPool(poolConfig{})
78 err = p2.ready()
79 noerr(t, err)
80
81 err = p2.closeConnection(c)
82 assert.Equalf(t, ErrWrongPool, err, "expected ErrWrongPool error")
83
84 p1.close(context.Background())
85 p2.close(context.Background())
86 })
87 })
88 t.Run("close", func(t *testing.T) {
89 t.Parallel()
90
91 t.Run("calling close multiple times does not panic", func(t *testing.T) {
92 t.Parallel()
93
94 p := newPool(poolConfig{})
95 err := p.ready()
96 noerr(t, err)
97
98 for i := 0; i < 5; i++ {
99 p.close(context.Background())
100 }
101 })
102 t.Run("closes idle connections", func(t *testing.T) {
103 t.Parallel()
104
105 cleanup := make(chan struct{})
106 defer close(cleanup)
107 addr := bootstrapConnections(t, 3, func(nc net.Conn) {
108 <-cleanup
109 _ = nc.Close()
110 })
111
112 d := newdialer(&net.Dialer{})
113 p := newPool(poolConfig{
114 Address: address.Address(addr.String()),
115 }, WithDialer(func(Dialer) Dialer { return d }))
116 err := p.ready()
117 noerr(t, err)
118
119 conns := make([]*connection, 3)
120 for i := range conns {
121 conns[i], err = p.checkOut(context.Background())
122 noerr(t, err)
123 }
124 for i := range conns {
125 err = p.checkIn(conns[i])
126 noerr(t, err)
127 }
128 assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connections")
129 assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections")
130 assert.Equalf(t, 3, p.availableConnectionCount(), "should have 3 available connections")
131 assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections")
132
133 p.close(context.Background())
134 assertConnectionsClosed(t, d, 3)
135 assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections")
136 assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 total connections")
137 })
138 t.Run("closes all open connections", func(t *testing.T) {
139 t.Parallel()
140
141 cleanup := make(chan struct{})
142 defer close(cleanup)
143 addr := bootstrapConnections(t, 3, func(nc net.Conn) {
144 <-cleanup
145 _ = nc.Close()
146 })
147
148 d := newdialer(&net.Dialer{})
149 p := newPool(poolConfig{
150 Address: address.Address(addr.String()),
151 }, WithDialer(func(Dialer) Dialer { return d }))
152 err := p.ready()
153 noerr(t, err)
154
155 conns := make([]*connection, 3)
156 for i := range conns {
157 conns[i], err = p.checkOut(context.Background())
158 noerr(t, err)
159 }
160 for i := 0; i < 2; i++ {
161 err = p.checkIn(conns[i])
162 noerr(t, err)
163 }
164 assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connections")
165 assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections")
166 assert.Equalf(t, 2, p.availableConnectionCount(), "should have 2 available connections")
167 assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections")
168
169 p.close(context.Background())
170 assertConnectionsClosed(t, d, 3)
171 assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections")
172 assert.Equalf(t, 0, p.totalConnectionCount(), "should have 0 total connections")
173 })
174 t.Run("no race if connections are also connecting", func(t *testing.T) {
175 t.Parallel()
176
177 cleanup := make(chan struct{})
178 defer close(cleanup)
179 addr := bootstrapConnections(t, 3, func(nc net.Conn) {
180 <-cleanup
181 _ = nc.Close()
182 })
183
184 p := newPool(poolConfig{
185 Address: address.Address(addr.String()),
186 })
187 err := p.ready()
188 noerr(t, err)
189
190 _, err = p.checkOut(context.Background())
191 noerr(t, err)
192
193 closed := make(chan struct{})
194 started := make(chan struct{})
195 go func() {
196 close(started)
197
198 for {
199 select {
200 case <-closed:
201 return
202 default:
203 c, _ := p.checkOut(context.Background())
204 _ = p.checkIn(c)
205 time.Sleep(time.Millisecond)
206 }
207 }
208 }()
209
210
211
212 <-started
213 _, err = p.checkOut(context.Background())
214 noerr(t, err)
215
216 p.close(context.Background())
217
218 close(closed)
219 })
220 t.Run("shuts down gracefully if Context has a deadline", func(t *testing.T) {
221 t.Parallel()
222
223 cleanup := make(chan struct{})
224 defer close(cleanup)
225 addr := bootstrapConnections(t, 3, func(nc net.Conn) {
226 <-cleanup
227 _ = nc.Close()
228 })
229
230 p := newPool(poolConfig{
231 Address: address.Address(addr.String()),
232 })
233 err := p.ready()
234 noerr(t, err)
235
236
237 conns := make([]*connection, 2)
238 for i := 0; i < 2; i++ {
239 c, err := p.checkOut(context.Background())
240 noerr(t, err)
241
242 conns[i] = c
243 }
244
245
246
247 c, err := p.checkOut(context.Background())
248 noerr(t, err)
249
250 err = p.checkIn(c)
251 noerr(t, err)
252
253
254
255
256 go func() {
257 for p.getState() == poolReady {
258 time.Sleep(time.Millisecond)
259 }
260 for _, c := range conns {
261 assert.Equalf(t, connConnected, c.state, "expected conn to still be connected")
262
263 err := p.checkIn(c)
264 noerr(t, err)
265 }
266 }()
267
268
269
270
271 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Hour)
272 defer cancel()
273 p.close(ctx)
274 })
275 t.Run("closing a Connection does not cause an error after pool is closed", func(t *testing.T) {
276 t.Parallel()
277
278 cleanup := make(chan struct{})
279 defer close(cleanup)
280 addr := bootstrapConnections(t, 3, func(nc net.Conn) {
281 <-cleanup
282 _ = nc.Close()
283 })
284
285 p := newPool(poolConfig{
286 Address: address.Address(addr.String()),
287 })
288 err := p.ready()
289 noerr(t, err)
290
291 c, err := p.checkOut(context.Background())
292 noerr(t, err)
293
294 p.close(context.Background())
295
296 c1 := &Connection{connection: c}
297 err = c1.Close()
298 noerr(t, err)
299 })
300 })
301 t.Run("ready", func(t *testing.T) {
302 t.Parallel()
303
304 t.Run("can ready a paused pool", func(t *testing.T) {
305 t.Parallel()
306
307 cleanup := make(chan struct{})
308 defer close(cleanup)
309 addr := bootstrapConnections(t, 6, func(nc net.Conn) {
310 <-cleanup
311 _ = nc.Close()
312 })
313
314 p := newPool(poolConfig{
315 Address: address.Address(addr.String()),
316 })
317 err := p.ready()
318 noerr(t, err)
319
320 conns := make([]*connection, 3)
321 for i := range conns {
322 conn, err := p.checkOut(context.Background())
323 noerr(t, err)
324 conns[i] = conn
325 }
326 assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections")
327 assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections")
328
329 p.clear(nil, nil)
330 for _, conn := range conns {
331 err = p.checkIn(conn)
332 noerr(t, err)
333 }
334 assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections")
335 assert.Equalf(t, 0, p.totalConnectionCount(), "should have 0 total connections")
336
337 err = p.ready()
338 noerr(t, err)
339
340 for i := 0; i < 3; i++ {
341 _, err := p.checkOut(context.Background())
342 noerr(t, err)
343 }
344 assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 available connections")
345 assert.Equalf(t, 3, p.totalConnectionCount(), "should have 3 total connections")
346
347 p.close(context.Background())
348 })
349 t.Run("calling ready multiple times does not return an error", func(t *testing.T) {
350 t.Parallel()
351
352 p := newPool(poolConfig{})
353 for i := 0; i < 5; i++ {
354 err := p.ready()
355 noerr(t, err)
356 }
357
358 p.close(context.Background())
359 })
360 t.Run("can clear and ready multiple times", func(t *testing.T) {
361 t.Parallel()
362
363 cleanup := make(chan struct{})
364 defer close(cleanup)
365 addr := bootstrapConnections(t, 2, func(nc net.Conn) {
366 <-cleanup
367 _ = nc.Close()
368 })
369
370 p := newPool(poolConfig{
371 Address: address.Address(addr.String()),
372 })
373 err := p.ready()
374 noerr(t, err)
375
376 c, err := p.checkOut(context.Background())
377 noerr(t, err)
378 err = p.checkIn(c)
379 noerr(t, err)
380
381 for i := 0; i < 100; i++ {
382 err = p.ready()
383 noerr(t, err)
384
385 p.clear(nil, nil)
386 }
387
388 err = p.ready()
389 noerr(t, err)
390
391 c, err = p.checkOut(context.Background())
392 noerr(t, err)
393 err = p.checkIn(c)
394 noerr(t, err)
395
396 p.close(context.Background())
397 })
398 t.Run("can clear and ready multiple times concurrently", func(t *testing.T) {
399 t.Parallel()
400
401 cleanup := make(chan struct{})
402 defer close(cleanup)
403 addr := bootstrapConnections(t, 2, func(nc net.Conn) {
404 <-cleanup
405 _ = nc.Close()
406 })
407
408 p := newPool(poolConfig{
409 Address: address.Address(addr.String()),
410 })
411 err := p.ready()
412 noerr(t, err)
413
414 c, err := p.checkOut(context.Background())
415 noerr(t, err)
416 err = p.checkIn(c)
417 noerr(t, err)
418
419 var wg sync.WaitGroup
420 for i := 0; i < 10; i++ {
421 wg.Add(1)
422 go func() {
423 defer wg.Done()
424 for i := 0; i < 1000; i++ {
425 err := p.ready()
426 noerr(t, err)
427 }
428 }()
429
430 wg.Add(1)
431 go func() {
432 defer wg.Done()
433 for i := 0; i < 1000; i++ {
434 p.clear(errors.New("test error"), nil)
435 }
436 }()
437 }
438
439 wg.Wait()
440 err = p.ready()
441 noerr(t, err)
442
443 c, err = p.checkOut(context.Background())
444 noerr(t, err)
445 err = p.checkIn(c)
446 noerr(t, err)
447
448 p.close(context.Background())
449 })
450 })
451 t.Run("checkOut", func(t *testing.T) {
452 t.Parallel()
453
454 t.Run("return error when attempting to create new connection", func(t *testing.T) {
455 t.Parallel()
456
457 dialErr := errors.New("create new connection error")
458 p := newPool(poolConfig{}, WithDialer(func(Dialer) Dialer {
459 return DialerFunc(func(context.Context, string, string) (net.Conn, error) {
460 return nil, dialErr
461 })
462 }))
463 err := p.ready()
464 noerr(t, err)
465
466 _, err = p.checkOut(context.Background())
467 var want error = ConnectionError{Wrapped: dialErr, init: true}
468 assert.Equalf(t, want, err, "should return error from calling checkOut()")
469
470
471
472 assert.Eventuallyf(t,
473 func() bool {
474 return p.totalConnectionCount() == 0
475 },
476 2*time.Second,
477 100*time.Millisecond,
478 "expected pool to have 0 total connections within 10s")
479
480 p.close(context.Background())
481 })
482 t.Run("closes perished connections", func(t *testing.T) {
483 t.Parallel()
484
485 cleanup := make(chan struct{})
486 defer close(cleanup)
487 addr := bootstrapConnections(t, 2, func(nc net.Conn) {
488 <-cleanup
489 _ = nc.Close()
490 })
491
492 d := newdialer(&net.Dialer{})
493 p := newPool(
494 poolConfig{
495 Address: address.Address(addr.String()),
496 MaxIdleTime: time.Millisecond,
497 },
498 WithDialer(func(Dialer) Dialer { return d }),
499 )
500 err := p.ready()
501 noerr(t, err)
502
503
504
505 c1, err := p.checkOut(context.Background())
506 noerr(t, err)
507 assert.Equalf(t, 1, d.lenopened(), "should have opened 1 connection")
508 assert.Equalf(t, 1, p.totalConnectionCount(), "pool should have 1 total connection")
509 assert.Equalf(t, time.Millisecond, c1.idleTimeout, "connection should have a 1ms idle timeout")
510
511 err = p.checkIn(c1)
512 noerr(t, err)
513
514
515
516
517 time.Sleep(50 * time.Millisecond)
518 c2, err := p.checkOut(context.Background())
519 noerr(t, err)
520
521
522 assert.True(t, c1 != c2, "expected a new connection on 2nd check out after idle timeout expires")
523 assert.Equalf(t, 2, d.lenopened(), "should have opened 2 connections")
524 assert.Equalf(t, 1, p.totalConnectionCount(), "pool should have 1 total connection")
525
526 p.close(context.Background())
527 })
528 t.Run("recycles connections", func(t *testing.T) {
529 t.Parallel()
530
531 cleanup := make(chan struct{})
532 defer close(cleanup)
533 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
534 <-cleanup
535 _ = nc.Close()
536 })
537
538 d := newdialer(&net.Dialer{})
539 p := newPool(poolConfig{
540 Address: address.Address(addr.String()),
541 }, WithDialer(func(Dialer) Dialer { return d }))
542 err := p.ready()
543 noerr(t, err)
544
545 for i := 0; i < 100; i++ {
546 c, err := p.checkOut(context.Background())
547 noerr(t, err)
548
549 err = p.checkIn(c)
550 noerr(t, err)
551 }
552 assert.Equalf(t, 1, d.lenopened(), "should have opened 1 connection")
553
554 p.close(context.Background())
555 })
556 t.Run("cannot checkOut from closed pool", func(t *testing.T) {
557 t.Parallel()
558
559 cleanup := make(chan struct{})
560 defer close(cleanup)
561 addr := bootstrapConnections(t, 3, func(nc net.Conn) {
562 <-cleanup
563 _ = nc.Close()
564 })
565
566 p := newPool(poolConfig{
567 Address: address.Address(addr.String()),
568 })
569 err := p.ready()
570 noerr(t, err)
571
572 p.close(context.Background())
573
574 _, err = p.checkOut(context.Background())
575 assert.Equalf(
576 t,
577 ErrPoolClosed,
578 err,
579 "expected an error from checkOut() from a closed pool")
580 })
581 t.Run("handshaker i/o fails", func(t *testing.T) {
582 t.Parallel()
583
584 p := newPool(
585 poolConfig{},
586 WithHandshaker(func(Handshaker) Handshaker {
587 return operation.NewHello()
588 }),
589 WithDialer(func(Dialer) Dialer {
590 return DialerFunc(func(context.Context, string, string) (net.Conn, error) {
591 return &writeFailConn{&net.TCPConn{}}, nil
592 })
593 }),
594 )
595 err := p.ready()
596 noerr(t, err)
597
598 _, err = p.checkOut(context.Background())
599 assert.IsTypef(t, ConnectionError{}, err, "expected a ConnectionError")
600 if err, ok := err.(ConnectionError); ok {
601 assert.Containsf(
602 t,
603 err.Unwrap().Error(),
604 "unable to write wire message to network: Write error",
605 "expected error to contain string")
606 }
607 assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 available connections")
608
609
610
611 assert.Eventually(t,
612 func() bool {
613 return p.totalConnectionCount() == 0
614 },
615 100*time.Millisecond,
616 1*time.Millisecond,
617 "expected pool to have 0 total connections within 100ms")
618
619 p.close(context.Background())
620 })
621
622
623 t.Run("wait queue timeout error", func(t *testing.T) {
624 t.Parallel()
625
626 cleanup := make(chan struct{})
627 defer close(cleanup)
628 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
629 <-cleanup
630 _ = nc.Close()
631 })
632
633 p := newPool(poolConfig{
634 Address: address.Address(addr.String()),
635 MaxPoolSize: 1,
636 })
637 err := p.ready()
638 noerr(t, err)
639
640
641 _, err = p.checkOut(context.Background())
642 noerr(t, err)
643
644
645 ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
646 defer cancel()
647 _, err = p.checkOut(ctx)
648 assert.NotNilf(t, err, "expected a WaitQueueTimeout error")
649
650
651 assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError")
652 if err, ok := err.(WaitQueueTimeoutError); ok {
653 assert.Equalf(t, context.DeadlineExceeded, err.Unwrap(), "expected wrapped error to be a context.Timeout")
654 assert.Containsf(t, err.Error(), "timed out", `expected error message to contain "timed out"`)
655 }
656
657 p.close(context.Background())
658 })
659
660
661
662
663 t.Run("wait queue doesn't overflow", func(t *testing.T) {
664 t.Parallel()
665
666 cleanup := make(chan struct{})
667 defer close(cleanup)
668 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
669 <-cleanup
670 _ = nc.Close()
671 })
672
673 p := newPool(poolConfig{
674 Address: address.Address(addr.String()),
675 MaxPoolSize: 1,
676 })
677 err := p.ready()
678 noerr(t, err)
679
680
681 c, err := p.checkOut(context.Background())
682 noerr(t, err)
683
684
685
686
687 var wg sync.WaitGroup
688 wg.Add(1)
689 go func() {
690 defer wg.Done()
691
692 _, err := p.checkOut(context.Background())
693 noerr(t, err)
694 }()
695
696
697
698 for i := 0; i < 50000; i++ {
699 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Microsecond)
700 _, err := p.checkOut(ctx)
701 cancel()
702 assert.NotNilf(t, err, "expected a WaitQueueTimeout error")
703 assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError")
704 }
705
706
707
708 err = p.checkIn(c)
709 noerr(t, err)
710 wg.Wait()
711
712 p.close(context.Background())
713 })
714
715
716 t.Run("should return a new connection as soon as the pool isn't full", func(t *testing.T) {
717 t.Parallel()
718
719 cleanup := make(chan struct{})
720 defer close(cleanup)
721 addr := bootstrapConnections(t, 3, func(nc net.Conn) {
722 <-cleanup
723 _ = nc.Close()
724 })
725
726 d := newdialer(&net.Dialer{})
727 p := newPool(
728 poolConfig{
729 Address: address.Address(addr.String()),
730 MaxPoolSize: 2,
731 },
732 WithDialer(func(Dialer) Dialer { return d }),
733 )
734 err := p.ready()
735 noerr(t, err)
736
737
738
739 c, err := p.checkOut(context.Background())
740 noerr(t, err)
741 _, err = p.checkOut(context.Background())
742 noerr(t, err)
743 assert.Equalf(t, 2, d.lenopened(), "should have opened 2 connection")
744 assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection")
745 assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection")
746
747
748
749 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
750 defer cancel()
751 _, err = p.checkOut(ctx)
752 assert.Equalf(
753 t,
754 context.DeadlineExceeded,
755 err.(WaitQueueTimeoutError).Wrapped,
756 "expected wrapped error to be a context.DeadlineExceeded")
757
758
759
760
761
762 var start time.Time
763 go func() {
764 c.close()
765 start = time.Now()
766 err := p.checkIn(c)
767 noerr(t, err)
768 }()
769 _, err = p.checkOut(context.Background())
770 noerr(t, err)
771 assert.WithinDurationf(
772 t,
773 time.Now(),
774 start,
775 100*time.Millisecond,
776 "expected checkOut to complete within 100ms of checking in a closed connection")
777
778 assert.Equalf(t, 1, d.lenclosed(), "should have closed 1 connection")
779 assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connection")
780 assert.Equalf(t, 2, p.totalConnectionCount(), "pool should have 2 total connection")
781 assert.Equalf(t, 0, p.availableConnectionCount(), "pool should have 0 idle connection")
782
783 p.close(context.Background())
784 })
785 t.Run("canceled context in wait queue", func(t *testing.T) {
786 t.Parallel()
787
788 cleanup := make(chan struct{})
789 defer close(cleanup)
790 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
791 <-cleanup
792 _ = nc.Close()
793 })
794
795 p := newPool(poolConfig{
796 Address: address.Address(addr.String()),
797 MaxPoolSize: 1,
798 })
799 err := p.ready()
800 noerr(t, err)
801
802
803 _, err = p.checkOut(context.Background())
804 noerr(t, err)
805
806
807 cancelCtx, cancel := context.WithCancel(context.Background())
808 cancel()
809 _, err = p.checkOut(cancelCtx)
810 assert.NotNilf(t, err, "expected a non-nil error")
811
812
813 assert.IsTypef(t, WaitQueueTimeoutError{}, err, "expected a WaitQueueTimeoutError")
814 if err, ok := err.(WaitQueueTimeoutError); ok {
815 assert.Equalf(t, context.Canceled, err.Unwrap(), "expected wrapped error to be a context.Canceled")
816 assert.Containsf(t, err.Error(), "canceled", `expected error message to contain "canceled"`)
817 }
818
819 p.close(context.Background())
820 })
821 })
822 t.Run("checkIn", func(t *testing.T) {
823 t.Parallel()
824
825 t.Run("cannot return same connection to pool twice", func(t *testing.T) {
826 t.Parallel()
827
828 cleanup := make(chan struct{})
829 defer close(cleanup)
830 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
831 <-cleanup
832 _ = nc.Close()
833 })
834
835 p := newPool(poolConfig{
836 Address: address.Address(addr.String()),
837 })
838 err := p.ready()
839 noerr(t, err)
840
841 c, err := p.checkOut(context.Background())
842 noerr(t, err)
843 assert.Equalf(t, 0, p.availableConnectionCount(), "should be no idle connections in pool")
844 assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool")
845
846 err = p.checkIn(c)
847 noerr(t, err)
848
849 err = p.checkIn(c)
850 assert.NotNilf(t, err, "expected an error trying to return the same conn to the pool twice")
851
852 assert.Equalf(t, 1, p.availableConnectionCount(), "should have returned 1 idle connection to the pool")
853 assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool")
854
855 p.close(context.Background())
856 })
857 t.Run("closes connections if the pool is closed", func(t *testing.T) {
858 t.Parallel()
859
860 cleanup := make(chan struct{})
861 defer close(cleanup)
862 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
863 <-cleanup
864 _ = nc.Close()
865 })
866
867 d := newdialer(&net.Dialer{})
868 p := newPool(poolConfig{
869 Address: address.Address(addr.String()),
870 }, WithDialer(func(Dialer) Dialer { return d }))
871 err := p.ready()
872 noerr(t, err)
873
874 c, err := p.checkOut(context.Background())
875 noerr(t, err)
876 assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections")
877 assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 idle connections in pool")
878 assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool")
879
880 p.close(context.Background())
881
882 err = p.checkIn(c)
883 noerr(t, err)
884 assert.Equalf(t, 1, d.lenclosed(), "should have closed 1 connection")
885 assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 idle connections in pool")
886 assert.Equalf(t, 0, p.totalConnectionCount(), "should have 0 total connection in pool")
887 })
888 t.Run("can't checkIn a connection from different pool", func(t *testing.T) {
889 t.Parallel()
890
891 cleanup := make(chan struct{})
892 defer close(cleanup)
893 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
894 <-cleanup
895 _ = nc.Close()
896 })
897
898 p1 := newPool(poolConfig{
899 Address: address.Address(addr.String()),
900 })
901 err := p1.ready()
902 noerr(t, err)
903
904 c, err := p1.checkOut(context.Background())
905 noerr(t, err)
906
907 p2 := newPool(poolConfig{})
908 err = p2.ready()
909 noerr(t, err)
910
911 err = p2.checkIn(c)
912 assert.Equalf(t, ErrWrongPool, err, "expected ErrWrongPool error")
913
914 p1.close(context.Background())
915 p2.close(context.Background())
916 })
917 t.Run("bumps the connection idle deadline", func(t *testing.T) {
918 t.Parallel()
919
920 cleanup := make(chan struct{})
921 defer close(cleanup)
922 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
923 <-cleanup
924 _ = nc.Close()
925 })
926
927 d := newdialer(&net.Dialer{})
928 p := newPool(poolConfig{
929 Address: address.Address(addr.String()),
930 MaxIdleTime: 100 * time.Millisecond,
931 }, WithDialer(func(Dialer) Dialer { return d }))
932 err := p.ready()
933 noerr(t, err)
934 defer p.close(context.Background())
935
936 c, err := p.checkOut(context.Background())
937 noerr(t, err)
938
939
940
941
942 time.Sleep(110 * time.Millisecond)
943 err = p.checkIn(c)
944 noerr(t, err)
945
946 assert.Equalf(t, 0, d.lenclosed(), "should have closed 0 connections")
947 assert.Equalf(t, 1, p.availableConnectionCount(), "should have 1 idle connections in pool")
948 assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool")
949 })
950 t.Run("sets minPoolSize connection idle deadline", func(t *testing.T) {
951 t.Parallel()
952
953 cleanup := make(chan struct{})
954 defer close(cleanup)
955 addr := bootstrapConnections(t, 4, func(nc net.Conn) {
956 <-cleanup
957 _ = nc.Close()
958 })
959
960 d := newdialer(&net.Dialer{})
961 p := newPool(poolConfig{
962 Address: address.Address(addr.String()),
963 MinPoolSize: 3,
964 MaxIdleTime: 10 * time.Millisecond,
965 }, WithDialer(func(Dialer) Dialer { return d }))
966 err := p.ready()
967 noerr(t, err)
968 defer p.close(context.Background())
969
970
971 assertConnectionsOpened(t, d, 3)
972
973
974
975
976
977 time.Sleep(100 * time.Millisecond)
978 _, err = p.checkOut(context.Background())
979 noerr(t, err)
980
981 assertConnectionsClosed(t, d, 3)
982 assert.Equalf(t, 4, d.lenopened(), "should have opened 4 connections")
983 assert.Equalf(t, 0, p.availableConnectionCount(), "should have 0 idle connections in pool")
984 assert.Equalf(t, 1, p.totalConnectionCount(), "should have 1 total connection in pool")
985 })
986 })
987 t.Run("maintain", func(t *testing.T) {
988 t.Parallel()
989
990 t.Run("creates MinPoolSize connections shortly after calling ready", func(t *testing.T) {
991 t.Parallel()
992
993 cleanup := make(chan struct{})
994 defer close(cleanup)
995 addr := bootstrapConnections(t, 3, func(nc net.Conn) {
996 <-cleanup
997 _ = nc.Close()
998 })
999
1000 d := newdialer(&net.Dialer{})
1001 p := newPool(poolConfig{
1002 Address: address.Address(addr.String()),
1003 MinPoolSize: 3,
1004 }, WithDialer(func(Dialer) Dialer { return d }))
1005 err := p.ready()
1006 noerr(t, err)
1007
1008 assertConnectionsOpened(t, d, 3)
1009 assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool")
1010 assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool")
1011
1012 p.close(context.Background())
1013 })
1014 t.Run("when MinPoolSize > MaxPoolSize should not exceed MaxPoolSize connections", func(t *testing.T) {
1015 t.Parallel()
1016
1017 cleanup := make(chan struct{})
1018 defer close(cleanup)
1019 addr := bootstrapConnections(t, 20, func(nc net.Conn) {
1020 <-cleanup
1021 _ = nc.Close()
1022 })
1023
1024 d := newdialer(&net.Dialer{})
1025 p := newPool(poolConfig{
1026 Address: address.Address(addr.String()),
1027 MinPoolSize: 20,
1028 MaxPoolSize: 2,
1029 }, WithDialer(func(Dialer) Dialer { return d }))
1030 err := p.ready()
1031 noerr(t, err)
1032
1033 assertConnectionsOpened(t, d, 2)
1034 assert.Equalf(t, 2, p.availableConnectionCount(), "should be 2 idle connections in pool")
1035 assert.Equalf(t, 2, p.totalConnectionCount(), "should be 2 total connection in pool")
1036
1037 p.close(context.Background())
1038 })
1039 t.Run("removes perished connections", func(t *testing.T) {
1040 t.Parallel()
1041
1042 cleanup := make(chan struct{})
1043 defer close(cleanup)
1044 addr := bootstrapConnections(t, 5, func(nc net.Conn) {
1045 <-cleanup
1046 _ = nc.Close()
1047 })
1048
1049 d := newdialer(&net.Dialer{})
1050 p := newPool(poolConfig{
1051 Address: address.Address(addr.String()),
1052
1053 MaintainInterval: 10 * time.Millisecond,
1054 }, WithDialer(func(Dialer) Dialer { return d }))
1055 err := p.ready()
1056 noerr(t, err)
1057
1058
1059
1060 conns := make([]*connection, 3)
1061 for i := range conns {
1062 conns[i], err = p.checkOut(context.Background())
1063 noerr(t, err)
1064 }
1065 for _, c := range conns {
1066 err = p.checkIn(c)
1067 noerr(t, err)
1068 }
1069 assert.Equalf(t, 3, d.lenopened(), "should have opened 3 connections")
1070 assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool")
1071 assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool")
1072
1073
1074
1075
1076 p.idleMu.Lock()
1077 for i := 0; i < 2; i++ {
1078 p.idleConns[i].idleTimeout = time.Millisecond
1079 p.idleConns[i].idleDeadline.Store(time.Now().Add(-1 * time.Hour))
1080 }
1081 p.idleMu.Unlock()
1082 assertConnectionsClosed(t, d, 2)
1083 assert.Equalf(t, 1, p.availableConnectionCount(), "should be 1 idle connections in pool")
1084 assert.Equalf(t, 1, p.totalConnectionCount(), "should be 1 total connection in pool")
1085
1086 p.close(context.Background())
1087 })
1088 t.Run("removes perished connections and replaces them to maintain MinPoolSize", func(t *testing.T) {
1089 t.Parallel()
1090
1091 cleanup := make(chan struct{})
1092 defer close(cleanup)
1093 addr := bootstrapConnections(t, 5, func(nc net.Conn) {
1094 <-cleanup
1095 _ = nc.Close()
1096 })
1097
1098 d := newdialer(&net.Dialer{})
1099 p := newPool(poolConfig{
1100 Address: address.Address(addr.String()),
1101 MinPoolSize: 3,
1102
1103 MaintainInterval: 10 * time.Millisecond,
1104 }, WithDialer(func(Dialer) Dialer { return d }))
1105 err := p.ready()
1106 noerr(t, err)
1107 assertConnectionsOpened(t, d, 3)
1108 assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool")
1109 assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool")
1110
1111 p.idleMu.Lock()
1112 for i := 0; i < 2; i++ {
1113 p.idleConns[i].idleTimeout = time.Millisecond
1114 p.idleConns[i].idleDeadline.Store(time.Now().Add(-1 * time.Hour))
1115 }
1116 p.idleMu.Unlock()
1117 assertConnectionsClosed(t, d, 2)
1118 assertConnectionsOpened(t, d, 5)
1119 assert.Equalf(t, 3, p.availableConnectionCount(), "should be 3 idle connections in pool")
1120 assert.Equalf(t, 3, p.totalConnectionCount(), "should be 3 total connection in pool")
1121
1122 p.close(context.Background())
1123 })
1124 })
1125 }
1126
1127 func assertConnectionsClosed(t *testing.T, dialer *dialer, count int) {
1128 t.Helper()
1129
1130 start := time.Now()
1131 for {
1132 if dialer.lenclosed() == count {
1133 return
1134 }
1135 if time.Since(start) > 3*time.Second {
1136 t.Errorf(
1137 "Waited for 3 seconds for %d connections to be closed, but got %d",
1138 count,
1139 dialer.lenclosed())
1140 return
1141 }
1142 time.Sleep(100 * time.Millisecond)
1143 }
1144 }
1145
1146 func assertConnectionsOpened(t *testing.T, dialer *dialer, count int) {
1147 t.Helper()
1148
1149 start := time.Now()
1150 for {
1151 if dialer.lenopened() == count {
1152 return
1153 }
1154 if time.Since(start) > 3*time.Second {
1155 t.Errorf(
1156 "Waited for 3 seconds for %d connections to be opened, but got %d",
1157 count,
1158 dialer.lenopened())
1159 return
1160 }
1161 time.Sleep(100 * time.Millisecond)
1162 }
1163 }
1164
1165 func TestPool_PoolMonitor(t *testing.T) {
1166 t.Parallel()
1167
1168 t.Run("records durations", func(t *testing.T) {
1169 t.Parallel()
1170
1171 cleanup := make(chan struct{})
1172 defer close(cleanup)
1173
1174
1175
1176 addr := bootstrapConnections(t, 1, func(nc net.Conn) {
1177 <-cleanup
1178 _ = nc.Close()
1179 })
1180
1181 tpm := eventtest.NewTestPoolMonitor()
1182 p := newPool(
1183 poolConfig{
1184 Address: address.Address(addr.String()),
1185 PoolMonitor: tpm.PoolMonitor,
1186 },
1187
1188
1189
1190 WithHandshaker(func(Handshaker) Handshaker {
1191 return &testHandshaker{
1192 getHandshakeInformation: func(context.Context, address.Address, driver.Connection) (driver.HandshakeInformation, error) {
1193 time.Sleep(10 * time.Millisecond)
1194 return driver.HandshakeInformation{}, nil
1195 },
1196 }
1197 }))
1198
1199 err := p.ready()
1200 require.NoError(t, err, "ready error")
1201
1202
1203
1204 conn, err := p.checkOut(context.Background())
1205 require.NoError(t, err, "checkOut error")
1206
1207
1208
1209 err = conn.close()
1210 require.NoError(t, err, "close error")
1211
1212 err = p.checkIn(conn)
1213 require.NoError(t, err, "checkIn error")
1214
1215
1216
1217 _, err = p.checkOut(context.Background())
1218 require.Error(t, err, "expected a checkOut error")
1219
1220 p.close(context.Background())
1221
1222 events := tpm.Events(func(evt *event.PoolEvent) bool {
1223 switch evt.Type {
1224 case "ConnectionReady", "ConnectionCheckedOut", "ConnectionCheckOutFailed":
1225 return true
1226 }
1227 return false
1228 })
1229
1230 require.Lenf(t, events, 3, "expected there to be 3 pool events")
1231
1232 assert.Equal(t, events[0].Type, "ConnectionReady")
1233 assert.Positive(t,
1234 events[0].Duration,
1235 "expected ConnectionReady Duration to be set")
1236
1237 assert.Equal(t, events[1].Type, "ConnectionCheckedOut")
1238 assert.Positive(t,
1239 events[1].Duration,
1240 "expected ConnectionCheckedOut Duration to be set")
1241
1242 assert.Equal(t, events[2].Type, "ConnectionCheckOutFailed")
1243 assert.Positive(t,
1244 events[2].Duration,
1245 "expected ConnectionCheckOutFailed Duration to be set")
1246 })
1247 }
1248
View as plain text