1
2
3
4
5
6
7 package topology
8
9 import (
10 "context"
11 "fmt"
12 "net"
13 "sync"
14 "sync/atomic"
15 "time"
16
17 "go.mongodb.org/mongo-driver/bson/primitive"
18 "go.mongodb.org/mongo-driver/event"
19 "go.mongodb.org/mongo-driver/internal/logger"
20 "go.mongodb.org/mongo-driver/mongo/address"
21 "go.mongodb.org/mongo-driver/x/mongo/driver"
22 )
23
24
25 const (
26 poolPaused int = iota
27 poolReady
28 poolClosed
29 )
30
31
32
33 var ErrPoolNotPaused = PoolError("only a paused pool can be marked ready")
34
35
36 var ErrPoolClosed = PoolError("attempted to check out a connection from closed connection pool")
37
38
39 var ErrConnectionClosed = ConnectionError{ConnectionID: "<closed>", message: "connection is closed"}
40
41
42 var ErrWrongPool = PoolError("connection does not belong to this pool")
43
44
45 type PoolError string
46
47 func (pe PoolError) Error() string { return string(pe) }
48
49
50
51 type poolClearedError struct {
52 err error
53 address address.Address
54 }
55
56 func (pce poolClearedError) Error() string {
57 return fmt.Sprintf(
58 "connection pool for %v was cleared because another operation failed with: %v",
59 pce.address,
60 pce.err)
61 }
62
63
64 func (poolClearedError) Retryable() bool { return true }
65
66
67 var _ driver.RetryablePoolError = poolClearedError{}
68
69
70 type poolConfig struct {
71 Address address.Address
72 MinPoolSize uint64
73 MaxPoolSize uint64
74 MaxConnecting uint64
75 MaxIdleTime time.Duration
76 MaintainInterval time.Duration
77 LoadBalanced bool
78 PoolMonitor *event.PoolMonitor
79 Logger *logger.Logger
80 handshakeErrFn func(error, uint64, *primitive.ObjectID)
81 }
82
83 type pool struct {
84
85
86
87
88
89 nextID uint64
90 pinnedCursorConnections uint64
91 pinnedTransactionConnections uint64
92
93 address address.Address
94 minSize uint64
95 maxSize uint64
96 maxConnecting uint64
97 loadBalanced bool
98 monitor *event.PoolMonitor
99 logger *logger.Logger
100
101
102
103 handshakeErrFn func(error, uint64, *primitive.ObjectID)
104
105 connOpts []ConnectionOption
106 generation *poolGenerationMap
107
108 maintainInterval time.Duration
109 maintainReady chan struct{}
110 backgroundDone *sync.WaitGroup
111
112 stateMu sync.RWMutex
113 state int
114 lastClearErr error
115
116
117
118
119
120 createConnectionsCond *sync.Cond
121 cancelBackgroundCtx context.CancelFunc
122 conns map[uint64]*connection
123 newConnWait wantConnQueue
124
125 idleMu sync.Mutex
126 idleConns []*connection
127 idleConnWait wantConnQueue
128 }
129
130
131 func (p *pool) getState() int {
132 p.stateMu.RLock()
133 defer p.stateMu.RUnlock()
134
135 return p.state
136 }
137
138 func mustLogPoolMessage(pool *pool) bool {
139 return pool.logger != nil && pool.logger.LevelComponentEnabled(
140 logger.LevelDebug, logger.ComponentConnection)
141 }
142
143 func logPoolMessage(pool *pool, msg string, keysAndValues ...interface{}) {
144 host, port, err := net.SplitHostPort(pool.address.String())
145 if err != nil {
146 host = pool.address.String()
147 port = ""
148 }
149
150 pool.logger.Print(logger.LevelDebug,
151 logger.ComponentConnection,
152 msg,
153 logger.SerializeConnection(logger.Connection{
154 Message: msg,
155 ServerHost: host,
156 ServerPort: port,
157 }, keysAndValues...)...)
158
159 }
160
161 type reason struct {
162 loggerConn string
163 event string
164 }
165
166
167 func connectionPerished(conn *connection) (reason, bool) {
168 switch {
169 case conn.closed():
170
171 return reason{
172 loggerConn: logger.ReasonConnClosedError,
173 event: event.ReasonError,
174 }, true
175 case conn.idleTimeoutExpired():
176 return reason{
177 loggerConn: logger.ReasonConnClosedIdle,
178 event: event.ReasonIdle,
179 }, true
180 case conn.pool.stale(conn):
181 return reason{
182 loggerConn: logger.ReasonConnClosedStale,
183 event: event.ReasonStale,
184 }, true
185 }
186
187 return reason{}, false
188 }
189
190
191 func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
192 if config.MaxIdleTime != time.Duration(0) {
193 connOpts = append(connOpts, WithIdleTimeout(func(_ time.Duration) time.Duration { return config.MaxIdleTime }))
194 }
195
196 var maxConnecting uint64 = 2
197 if config.MaxConnecting > 0 {
198 maxConnecting = config.MaxConnecting
199 }
200
201 maintainInterval := 10 * time.Second
202 if config.MaintainInterval != 0 {
203 maintainInterval = config.MaintainInterval
204 }
205
206 pool := &pool{
207 address: config.Address,
208 minSize: config.MinPoolSize,
209 maxSize: config.MaxPoolSize,
210 maxConnecting: maxConnecting,
211 loadBalanced: config.LoadBalanced,
212 monitor: config.PoolMonitor,
213 logger: config.Logger,
214 handshakeErrFn: config.handshakeErrFn,
215 connOpts: connOpts,
216 generation: newPoolGenerationMap(),
217 state: poolPaused,
218 maintainInterval: maintainInterval,
219 maintainReady: make(chan struct{}, 1),
220 backgroundDone: &sync.WaitGroup{},
221 createConnectionsCond: sync.NewCond(&sync.Mutex{}),
222 conns: make(map[uint64]*connection, config.MaxPoolSize),
223 idleConns: make([]*connection, 0, config.MaxPoolSize),
224 }
225
226 if pool.maxSize != 0 && pool.minSize > pool.maxSize {
227 pool.minSize = pool.maxSize
228 }
229 pool.connOpts = append(pool.connOpts, withGenerationNumberFn(func(_ generationNumberFn) generationNumberFn { return pool.getGenerationForNewConnection }))
230
231 pool.generation.connect()
232
233
234
235
236 var ctx context.Context
237 ctx, pool.cancelBackgroundCtx = context.WithCancel(context.Background())
238
239 for i := 0; i < int(pool.maxConnecting); i++ {
240 pool.backgroundDone.Add(1)
241 go pool.createConnections(ctx, pool.backgroundDone)
242 }
243
244
245
246 if maintainInterval > 0 {
247 pool.backgroundDone.Add(1)
248 go pool.maintain(ctx, pool.backgroundDone)
249 }
250
251 if mustLogPoolMessage(pool) {
252 keysAndValues := logger.KeyValues{
253 logger.KeyMaxIdleTimeMS, config.MaxIdleTime.Milliseconds(),
254 logger.KeyMinPoolSize, config.MinPoolSize,
255 logger.KeyMaxPoolSize, config.MaxPoolSize,
256 logger.KeyMaxConnecting, config.MaxConnecting,
257 }
258
259 logPoolMessage(pool, logger.ConnectionPoolCreated, keysAndValues...)
260 }
261
262 if pool.monitor != nil {
263 pool.monitor.Event(&event.PoolEvent{
264 Type: event.PoolCreated,
265 PoolOptions: &event.MonitorPoolOptions{
266 MaxPoolSize: config.MaxPoolSize,
267 MinPoolSize: config.MinPoolSize,
268 },
269 Address: pool.address.String(),
270 })
271 }
272
273 return pool
274 }
275
276
277 func (p *pool) stale(conn *connection) bool {
278 return conn == nil || p.generation.stale(conn.desc.ServiceID, conn.generation)
279 }
280
281
282
283
284 func (p *pool) ready() error {
285
286 p.stateMu.Lock()
287 if p.state == poolReady {
288 p.stateMu.Unlock()
289 return nil
290 }
291 if p.state != poolPaused {
292 p.stateMu.Unlock()
293 return ErrPoolNotPaused
294 }
295 p.lastClearErr = nil
296 p.state = poolReady
297 p.stateMu.Unlock()
298
299 if mustLogPoolMessage(p) {
300 logPoolMessage(p, logger.ConnectionPoolReady)
301 }
302
303
304
305 if p.monitor != nil {
306 p.monitor.Event(&event.PoolEvent{
307 Type: event.PoolReady,
308 Address: p.address.String(),
309 })
310 }
311
312
313 select {
314 case p.maintainReady <- struct{}{}:
315 default:
316 }
317
318 return nil
319 }
320
321
322
323
324 func (p *pool) close(ctx context.Context) {
325 p.stateMu.Lock()
326 if p.state == poolClosed {
327 p.stateMu.Unlock()
328 return
329 }
330 p.state = poolClosed
331 p.stateMu.Unlock()
332
333
334
335
336
337
338
339 p.createConnectionsCond.L.Lock()
340 p.cancelBackgroundCtx()
341 p.createConnectionsCond.Broadcast()
342 p.createConnectionsCond.L.Unlock()
343
344
345 p.backgroundDone.Wait()
346
347 p.generation.disconnect()
348
349 if ctx == nil {
350 ctx = context.Background()
351 }
352
353
354
355
356 if _, ok := ctx.Deadline(); ok {
357 ticker := time.NewTicker(100 * time.Millisecond)
358 defer ticker.Stop()
359
360 graceful:
361 for {
362 if p.totalConnectionCount() == p.availableConnectionCount() {
363 break graceful
364 }
365
366 select {
367 case <-ticker.C:
368 case <-ctx.Done():
369 break graceful
370 default:
371 }
372 }
373 }
374
375
376
377 p.idleMu.Lock()
378 for _, conn := range p.idleConns {
379 _ = p.removeConnection(conn, reason{
380 loggerConn: logger.ReasonConnClosedPoolClosed,
381 event: event.ReasonPoolClosed,
382 }, nil)
383 _ = p.closeConnection(conn)
384 }
385 p.idleConns = p.idleConns[:0]
386 for {
387 w := p.idleConnWait.popFront()
388 if w == nil {
389 break
390 }
391 w.tryDeliver(nil, ErrPoolClosed)
392 }
393 p.idleMu.Unlock()
394
395
396
397
398 p.createConnectionsCond.L.Lock()
399 conns := make([]*connection, 0, len(p.conns))
400 for _, conn := range p.conns {
401 conns = append(conns, conn)
402 }
403 for {
404 w := p.newConnWait.popFront()
405 if w == nil {
406 break
407 }
408 w.tryDeliver(nil, ErrPoolClosed)
409 }
410 p.createConnectionsCond.L.Unlock()
411
412 if mustLogPoolMessage(p) {
413 logPoolMessage(p, logger.ConnectionPoolClosed)
414 }
415
416 if p.monitor != nil {
417 p.monitor.Event(&event.PoolEvent{
418 Type: event.PoolClosedEvent,
419 Address: p.address.String(),
420 })
421 }
422
423
424
425 for _, conn := range conns {
426 _ = p.removeConnection(conn, reason{
427 loggerConn: logger.ReasonConnClosedPoolClosed,
428 event: event.ReasonPoolClosed,
429 }, nil)
430 _ = p.closeConnection(conn)
431 }
432 }
433
434 func (p *pool) pinConnectionToCursor() {
435 atomic.AddUint64(&p.pinnedCursorConnections, 1)
436 }
437
438 func (p *pool) unpinConnectionFromCursor() {
439
440 atomic.AddUint64(&p.pinnedCursorConnections, ^uint64(0))
441 }
442
443 func (p *pool) pinConnectionToTransaction() {
444 atomic.AddUint64(&p.pinnedTransactionConnections, 1)
445 }
446
447 func (p *pool) unpinConnectionFromTransaction() {
448
449 atomic.AddUint64(&p.pinnedTransactionConnections, ^uint64(0))
450 }
451
452
453
454
455
456 func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
457 if mustLogPoolMessage(p) {
458 logPoolMessage(p, logger.ConnectionCheckoutStarted)
459 }
460
461
462
463 if p.monitor != nil {
464 p.monitor.Event(&event.PoolEvent{
465 Type: event.GetStarted,
466 Address: p.address.String(),
467 })
468 }
469
470 start := time.Now()
471
472
473
474
475
476 p.stateMu.RLock()
477 switch p.state {
478 case poolClosed:
479 p.stateMu.RUnlock()
480
481 duration := time.Since(start)
482 if mustLogPoolMessage(p) {
483 keysAndValues := logger.KeyValues{
484 logger.KeyDurationMS, duration.Milliseconds(),
485 logger.KeyReason, logger.ReasonConnCheckoutFailedPoolClosed,
486 }
487
488 logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
489 }
490
491 if p.monitor != nil {
492 p.monitor.Event(&event.PoolEvent{
493 Type: event.GetFailed,
494 Address: p.address.String(),
495 Duration: duration,
496 Reason: event.ReasonPoolClosed,
497 })
498 }
499 return nil, ErrPoolClosed
500 case poolPaused:
501 err := poolClearedError{err: p.lastClearErr, address: p.address}
502 p.stateMu.RUnlock()
503
504 duration := time.Since(start)
505 if mustLogPoolMessage(p) {
506 keysAndValues := logger.KeyValues{
507 logger.KeyDurationMS, duration.Milliseconds(),
508 logger.KeyReason, logger.ReasonConnCheckoutFailedError,
509 }
510
511 logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
512 }
513
514 if p.monitor != nil {
515 p.monitor.Event(&event.PoolEvent{
516 Type: event.GetFailed,
517 Address: p.address.String(),
518 Duration: duration,
519 Reason: event.ReasonConnectionErrored,
520 Error: err,
521 })
522 }
523 return nil, err
524 }
525
526 if ctx == nil {
527 ctx = context.Background()
528 }
529
530
531
532
533
534 w := newWantConn()
535 defer func() {
536 if err != nil {
537 w.cancel(p, err)
538 }
539 }()
540
541
542
543
544 if delivered := p.getOrQueueForIdleConn(w); delivered {
545
546
547 p.stateMu.RUnlock()
548
549 duration := time.Since(start)
550 if w.err != nil {
551 if mustLogPoolMessage(p) {
552 keysAndValues := logger.KeyValues{
553 logger.KeyDurationMS, duration.Milliseconds(),
554 logger.KeyReason, logger.ReasonConnCheckoutFailedError,
555 }
556
557 logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
558 }
559
560 if p.monitor != nil {
561 p.monitor.Event(&event.PoolEvent{
562 Type: event.GetFailed,
563 Address: p.address.String(),
564 Duration: duration,
565 Reason: event.ReasonConnectionErrored,
566 Error: w.err,
567 })
568 }
569 return nil, w.err
570 }
571
572 duration = time.Since(start)
573 if mustLogPoolMessage(p) {
574 keysAndValues := logger.KeyValues{
575 logger.KeyDriverConnectionID, w.conn.driverConnectionID,
576 logger.KeyDurationMS, duration.Milliseconds(),
577 }
578
579 logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...)
580 }
581
582 if p.monitor != nil {
583 p.monitor.Event(&event.PoolEvent{
584 Type: event.GetSucceeded,
585 Address: p.address.String(),
586 ConnectionID: w.conn.driverConnectionID,
587 Duration: duration,
588 })
589 }
590
591 return w.conn, nil
592 }
593
594
595
596 p.queueForNewConn(w)
597 p.stateMu.RUnlock()
598
599
600 waitQueueStart := time.Now()
601 select {
602 case <-w.ready:
603 if w.err != nil {
604 duration := time.Since(start)
605 if mustLogPoolMessage(p) {
606 keysAndValues := logger.KeyValues{
607 logger.KeyDurationMS, duration.Milliseconds(),
608 logger.KeyReason, logger.ReasonConnCheckoutFailedError,
609 logger.KeyError, w.err.Error(),
610 }
611
612 logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
613 }
614
615 if p.monitor != nil {
616 p.monitor.Event(&event.PoolEvent{
617 Type: event.GetFailed,
618 Address: p.address.String(),
619 Duration: duration,
620 Reason: event.ReasonConnectionErrored,
621 Error: w.err,
622 })
623 }
624
625 return nil, w.err
626 }
627
628 duration := time.Since(start)
629 if mustLogPoolMessage(p) {
630 keysAndValues := logger.KeyValues{
631 logger.KeyDriverConnectionID, w.conn.driverConnectionID,
632 logger.KeyDurationMS, duration.Milliseconds(),
633 }
634
635 logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...)
636 }
637
638 if p.monitor != nil {
639 p.monitor.Event(&event.PoolEvent{
640 Type: event.GetSucceeded,
641 Address: p.address.String(),
642 ConnectionID: w.conn.driverConnectionID,
643 Duration: duration,
644 })
645 }
646 return w.conn, nil
647 case <-ctx.Done():
648 waitQueueDuration := time.Since(waitQueueStart)
649
650 duration := time.Since(start)
651 if mustLogPoolMessage(p) {
652 keysAndValues := logger.KeyValues{
653 logger.KeyDurationMS, duration.Milliseconds(),
654 logger.KeyReason, logger.ReasonConnCheckoutFailedTimout,
655 }
656
657 logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
658 }
659
660 if p.monitor != nil {
661 p.monitor.Event(&event.PoolEvent{
662 Type: event.GetFailed,
663 Address: p.address.String(),
664 Duration: duration,
665 Reason: event.ReasonTimedOut,
666 Error: ctx.Err(),
667 })
668 }
669
670 err := WaitQueueTimeoutError{
671 Wrapped: ctx.Err(),
672 maxPoolSize: p.maxSize,
673 totalConnections: p.totalConnectionCount(),
674 availableConnections: p.availableConnectionCount(),
675 waitDuration: waitQueueDuration,
676 }
677 if p.loadBalanced {
678 err.pinnedConnections = &pinnedConnections{
679 cursorConnections: atomic.LoadUint64(&p.pinnedCursorConnections),
680 transactionConnections: atomic.LoadUint64(&p.pinnedTransactionConnections),
681 }
682 }
683 return nil, err
684 }
685 }
686
687
688 func (p *pool) closeConnection(conn *connection) error {
689 if conn.pool != p {
690 return ErrWrongPool
691 }
692
693 if atomic.LoadInt64(&conn.state) == connConnected {
694 conn.closeConnectContext()
695 conn.wait()
696 }
697
698 err := conn.close()
699 if err != nil {
700 return ConnectionError{ConnectionID: conn.id, Wrapped: err, message: "failed to close net.Conn"}
701 }
702
703 return nil
704 }
705
706 func (p *pool) getGenerationForNewConnection(serviceID *primitive.ObjectID) uint64 {
707 return p.generation.addConnection(serviceID)
708 }
709
710
711 func (p *pool) removeConnection(conn *connection, reason reason, err error) error {
712 if conn == nil {
713 return nil
714 }
715
716 if conn.pool != p {
717 return ErrWrongPool
718 }
719
720 p.createConnectionsCond.L.Lock()
721 _, ok := p.conns[conn.driverConnectionID]
722 if !ok {
723
724
725 p.createConnectionsCond.L.Unlock()
726 return nil
727 }
728 delete(p.conns, conn.driverConnectionID)
729
730
731 p.createConnectionsCond.Signal()
732 p.createConnectionsCond.L.Unlock()
733
734
735
736
737 if conn.hasGenerationNumber() {
738 p.generation.removeConnection(conn.desc.ServiceID)
739 }
740
741 if mustLogPoolMessage(p) {
742 keysAndValues := logger.KeyValues{
743 logger.KeyDriverConnectionID, conn.driverConnectionID,
744 logger.KeyReason, reason.loggerConn,
745 }
746
747 if err != nil {
748 keysAndValues.Add(logger.KeyError, err.Error())
749 }
750
751 logPoolMessage(p, logger.ConnectionClosed, keysAndValues...)
752 }
753
754 if p.monitor != nil {
755 p.monitor.Event(&event.PoolEvent{
756 Type: event.ConnectionClosed,
757 Address: p.address.String(),
758 ConnectionID: conn.driverConnectionID,
759 Reason: reason.event,
760 Error: err,
761 })
762 }
763
764 return nil
765 }
766
767 var (
768
769
770
771
772
773
774 BGReadTimeout = 1 * time.Second
775
776
777
778
779
780
781 BGReadCallback func(addr string, start, read time.Time, errs []error, connClosed bool)
782 )
783
784
785
786
787
788
789
790
791 func bgRead(pool *pool, conn *connection) {
792 var start, read time.Time
793 start = time.Now()
794 errs := make([]error, 0)
795 connClosed := false
796
797 defer func() {
798
799
800
801 err := pool.checkInNoEvent(conn)
802 if err != nil {
803 errs = append(errs, fmt.Errorf("error checking in: %w", err))
804 }
805
806 if BGReadCallback != nil {
807 BGReadCallback(conn.addr.String(), start, read, errs, connClosed)
808 }
809 }()
810
811 err := conn.nc.SetReadDeadline(time.Now().Add(BGReadTimeout))
812 if err != nil {
813 errs = append(errs, fmt.Errorf("error setting a read deadline: %w", err))
814
815 connClosed = true
816 err := conn.close()
817 if err != nil {
818 errs = append(errs, fmt.Errorf("error closing conn after setting read deadline: %w", err))
819 }
820
821 return
822 }
823
824
825
826
827 _, _, err = conn.read(context.Background())
828 read = time.Now()
829 if err != nil {
830 errs = append(errs, fmt.Errorf("error reading: %w", err))
831
832 connClosed = true
833 err := conn.close()
834 if err != nil {
835 errs = append(errs, fmt.Errorf("error closing conn after reading: %w", err))
836 }
837
838 return
839 }
840 }
841
842
843
844 func (p *pool) checkIn(conn *connection) error {
845 if conn == nil {
846 return nil
847 }
848 if conn.pool != p {
849 return ErrWrongPool
850 }
851
852 if mustLogPoolMessage(p) {
853 keysAndValues := logger.KeyValues{
854 logger.KeyDriverConnectionID, conn.driverConnectionID,
855 }
856
857 logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...)
858 }
859
860 if p.monitor != nil {
861 p.monitor.Event(&event.PoolEvent{
862 Type: event.ConnectionReturned,
863 ConnectionID: conn.driverConnectionID,
864 Address: conn.addr.String(),
865 })
866 }
867
868 return p.checkInNoEvent(conn)
869 }
870
871
872
873 func (p *pool) checkInNoEvent(conn *connection) error {
874 if conn == nil {
875 return nil
876 }
877 if conn.pool != p {
878 return ErrWrongPool
879 }
880
881
882
883
884
885
886
887
888
889 if conn.awaitingResponse {
890 conn.awaitingResponse = false
891 go bgRead(p, conn)
892 return nil
893 }
894
895
896
897
898
899
900
901 conn.bumpIdleDeadline()
902
903 r, perished := connectionPerished(conn)
904 if !perished && conn.pool.getState() == poolClosed {
905 perished = true
906 r = reason{
907 loggerConn: logger.ReasonConnClosedPoolClosed,
908 event: event.ReasonPoolClosed,
909 }
910 }
911 if perished {
912 _ = p.removeConnection(conn, r, nil)
913 go func() {
914 _ = p.closeConnection(conn)
915 }()
916 return nil
917 }
918
919 p.idleMu.Lock()
920 defer p.idleMu.Unlock()
921
922 for {
923 w := p.idleConnWait.popFront()
924 if w == nil {
925 break
926 }
927 if w.tryDeliver(conn, nil) {
928 return nil
929 }
930 }
931
932 for _, idle := range p.idleConns {
933 if idle == conn {
934 return fmt.Errorf("duplicate idle conn %p in idle connections stack", conn)
935 }
936 }
937
938 p.idleConns = append(p.idleConns, conn)
939 return nil
940 }
941
942
943 func (p *pool) clear(err error, serviceID *primitive.ObjectID) {
944 p.clearImpl(err, serviceID, false)
945 }
946
947
948 func (p *pool) clearAll(err error, serviceID *primitive.ObjectID) {
949 p.clearImpl(err, serviceID, true)
950 }
951
952
953 func (p *pool) interruptConnections(conns []*connection) {
954 for _, conn := range conns {
955 _ = p.removeConnection(conn, reason{
956 loggerConn: logger.ReasonConnClosedStale,
957 event: event.ReasonStale,
958 }, nil)
959 go func(c *connection) {
960 _ = p.closeConnection(c)
961 }(conn)
962 }
963 }
964
965
966
967
968
969
970
971
972 func (p *pool) clearImpl(err error, serviceID *primitive.ObjectID, interruptAllConnections bool) {
973 if p.getState() == poolClosed {
974 return
975 }
976
977 p.generation.clear(serviceID)
978
979
980
981
982 sendEvent := true
983 if serviceID == nil {
984
985
986
987 p.stateMu.Lock()
988 if p.state == poolPaused {
989 sendEvent = false
990 }
991 if p.state == poolReady {
992 p.state = poolPaused
993 }
994 p.lastClearErr = err
995 p.stateMu.Unlock()
996 }
997
998 if mustLogPoolMessage(p) {
999 keysAndValues := logger.KeyValues{
1000 logger.KeyServiceID, serviceID,
1001 }
1002
1003 logPoolMessage(p, logger.ConnectionPoolCleared, keysAndValues...)
1004 }
1005
1006 if sendEvent && p.monitor != nil {
1007 event := &event.PoolEvent{
1008 Type: event.PoolCleared,
1009 Address: p.address.String(),
1010 ServiceID: serviceID,
1011 Interruption: interruptAllConnections,
1012 Error: err,
1013 }
1014 p.monitor.Event(event)
1015 }
1016
1017 p.removePerishedConns()
1018 if interruptAllConnections {
1019 p.createConnectionsCond.L.Lock()
1020 p.idleMu.Lock()
1021
1022 idleConns := make(map[*connection]bool, len(p.idleConns))
1023 for _, idle := range p.idleConns {
1024 idleConns[idle] = true
1025 }
1026
1027 conns := make([]*connection, 0, len(p.conns))
1028 for _, conn := range p.conns {
1029 if _, ok := idleConns[conn]; !ok && p.stale(conn) {
1030 conns = append(conns, conn)
1031 }
1032 }
1033
1034 p.idleMu.Unlock()
1035 p.createConnectionsCond.L.Unlock()
1036
1037 p.interruptConnections(conns)
1038 }
1039
1040 if serviceID == nil {
1041 pcErr := poolClearedError{err: err, address: p.address}
1042
1043
1044 p.idleMu.Lock()
1045 for {
1046 w := p.idleConnWait.popFront()
1047 if w == nil {
1048 break
1049 }
1050 w.tryDeliver(nil, pcErr)
1051 }
1052 p.idleMu.Unlock()
1053
1054
1055
1056
1057 p.createConnectionsCond.L.Lock()
1058 for {
1059 w := p.newConnWait.popFront()
1060 if w == nil {
1061 break
1062 }
1063 w.tryDeliver(nil, pcErr)
1064 }
1065 p.createConnectionsCond.L.Unlock()
1066 }
1067 }
1068
1069
1070
1071
1072
1073 func (p *pool) getOrQueueForIdleConn(w *wantConn) bool {
1074 p.idleMu.Lock()
1075 defer p.idleMu.Unlock()
1076
1077
1078 for len(p.idleConns) > 0 {
1079 conn := p.idleConns[len(p.idleConns)-1]
1080 p.idleConns = p.idleConns[:len(p.idleConns)-1]
1081
1082 if conn == nil {
1083 continue
1084 }
1085
1086 if reason, perished := connectionPerished(conn); perished {
1087 _ = conn.pool.removeConnection(conn, reason, nil)
1088 go func() {
1089 _ = conn.pool.closeConnection(conn)
1090 }()
1091 continue
1092 }
1093
1094 if !w.tryDeliver(conn, nil) {
1095
1096 p.idleConns = append(p.idleConns, conn)
1097 }
1098
1099
1100
1101
1102 return true
1103 }
1104
1105 p.idleConnWait.cleanFront()
1106 p.idleConnWait.pushBack(w)
1107 return false
1108 }
1109
1110 func (p *pool) queueForNewConn(w *wantConn) {
1111 p.createConnectionsCond.L.Lock()
1112 defer p.createConnectionsCond.L.Unlock()
1113
1114 p.newConnWait.cleanFront()
1115 p.newConnWait.pushBack(w)
1116 p.createConnectionsCond.Signal()
1117 }
1118
1119 func (p *pool) totalConnectionCount() int {
1120 p.createConnectionsCond.L.Lock()
1121 defer p.createConnectionsCond.L.Unlock()
1122
1123 return len(p.conns)
1124 }
1125
1126 func (p *pool) availableConnectionCount() int {
1127 p.idleMu.Lock()
1128 defer p.idleMu.Unlock()
1129
1130 return len(p.idleConns)
1131 }
1132
1133
1134 func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
1135 defer wg.Done()
1136
1137
1138
1139
1140 condition := func() bool {
1141 checkOutWaiting := p.newConnWait.len() > 0
1142 poolHasSpace := p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize
1143 cancelled := ctx.Err() != nil
1144 return (checkOutWaiting && poolHasSpace) || cancelled
1145 }
1146
1147
1148
1149
1150
1151 wait := func() (*wantConn, *connection, bool) {
1152 p.createConnectionsCond.L.Lock()
1153 defer p.createConnectionsCond.L.Unlock()
1154
1155 for !condition() {
1156 p.createConnectionsCond.Wait()
1157 }
1158
1159 if ctx.Err() != nil {
1160 return nil, nil, false
1161 }
1162
1163 p.newConnWait.cleanFront()
1164 w := p.newConnWait.popFront()
1165 if w == nil {
1166 return nil, nil, false
1167 }
1168
1169 conn := newConnection(p.address, p.connOpts...)
1170 conn.pool = p
1171 conn.driverConnectionID = atomic.AddUint64(&p.nextID, 1)
1172 p.conns[conn.driverConnectionID] = conn
1173
1174 return w, conn, true
1175 }
1176
1177 for ctx.Err() == nil {
1178 w, conn, ok := wait()
1179 if !ok {
1180 continue
1181 }
1182
1183 if mustLogPoolMessage(p) {
1184 keysAndValues := logger.KeyValues{
1185 logger.KeyDriverConnectionID, conn.driverConnectionID,
1186 }
1187
1188 logPoolMessage(p, logger.ConnectionCreated, keysAndValues...)
1189 }
1190
1191 if p.monitor != nil {
1192 p.monitor.Event(&event.PoolEvent{
1193 Type: event.ConnectionCreated,
1194 Address: p.address.String(),
1195 ConnectionID: conn.driverConnectionID,
1196 })
1197 }
1198
1199 start := time.Now()
1200
1201
1202 err := conn.connect(ctx)
1203 if err != nil {
1204 w.tryDeliver(nil, err)
1205
1206
1207
1208
1209
1210
1211
1212 if p.handshakeErrFn != nil {
1213 p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID)
1214 }
1215
1216 _ = p.removeConnection(conn, reason{
1217 loggerConn: logger.ReasonConnClosedError,
1218 event: event.ReasonError,
1219 }, err)
1220
1221 _ = p.closeConnection(conn)
1222
1223 continue
1224 }
1225
1226 duration := time.Since(start)
1227 if mustLogPoolMessage(p) {
1228 keysAndValues := logger.KeyValues{
1229 logger.KeyDriverConnectionID, conn.driverConnectionID,
1230 logger.KeyDurationMS, duration.Milliseconds(),
1231 }
1232
1233 logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
1234 }
1235
1236 if p.monitor != nil {
1237 p.monitor.Event(&event.PoolEvent{
1238 Type: event.ConnectionReady,
1239 Address: p.address.String(),
1240 ConnectionID: conn.driverConnectionID,
1241 Duration: duration,
1242 })
1243 }
1244
1245 if w.tryDeliver(conn, nil) {
1246 continue
1247 }
1248
1249 _ = p.checkInNoEvent(conn)
1250 }
1251 }
1252
1253 func (p *pool) maintain(ctx context.Context, wg *sync.WaitGroup) {
1254 defer wg.Done()
1255
1256 ticker := time.NewTicker(p.maintainInterval)
1257 defer ticker.Stop()
1258
1259
1260
1261 remove := func(arr []*wantConn, i int) []*wantConn {
1262 end := len(arr) - 1
1263 arr[i], arr[end] = arr[end], arr[i]
1264 return arr[:end]
1265 }
1266
1267
1268
1269
1270
1271 removeNotWaiting := func(arr []*wantConn) []*wantConn {
1272 for i := len(arr) - 1; i >= 0; i-- {
1273 w := arr[i]
1274 if !w.waiting() {
1275 arr = remove(arr, i)
1276 }
1277 }
1278
1279 return arr
1280 }
1281
1282 wantConns := make([]*wantConn, 0, p.minSize)
1283 defer func() {
1284 for _, w := range wantConns {
1285 w.tryDeliver(nil, ErrPoolClosed)
1286 }
1287 }()
1288
1289 for {
1290 select {
1291 case <-ticker.C:
1292 case <-p.maintainReady:
1293 case <-ctx.Done():
1294 return
1295 }
1296
1297
1298
1299
1300
1301
1302
1303 p.stateMu.RLock()
1304 if p.state != poolReady {
1305 p.stateMu.RUnlock()
1306 continue
1307 }
1308
1309 p.removePerishedConns()
1310
1311
1312 wantConns = removeNotWaiting(wantConns)
1313
1314
1315
1316
1317
1318
1319 total := p.totalConnectionCount()
1320 n := int(p.minSize) - total - len(wantConns)
1321 if n > 10 {
1322 n = 10
1323 }
1324
1325 for i := 0; i < n; i++ {
1326 w := newWantConn()
1327 p.queueForNewConn(w)
1328 wantConns = append(wantConns, w)
1329
1330
1331 go func() {
1332 <-w.ready
1333 if w.conn != nil {
1334 _ = p.checkInNoEvent(w.conn)
1335 }
1336 }()
1337 }
1338 p.stateMu.RUnlock()
1339 }
1340 }
1341
1342 func (p *pool) removePerishedConns() {
1343 p.idleMu.Lock()
1344 defer p.idleMu.Unlock()
1345
1346 for i := range p.idleConns {
1347 conn := p.idleConns[i]
1348 if conn == nil {
1349 continue
1350 }
1351
1352 if reason, perished := connectionPerished(conn); perished {
1353 p.idleConns[i] = nil
1354
1355 _ = p.removeConnection(conn, reason, nil)
1356 go func() {
1357 _ = p.closeConnection(conn)
1358 }()
1359 }
1360 }
1361
1362 p.idleConns = compact(p.idleConns)
1363 }
1364
1365
1366
1367 func compact(arr []*connection) []*connection {
1368 offset := 0
1369 for i := range arr {
1370 if arr[i] == nil {
1371 continue
1372 }
1373 arr[offset] = arr[i]
1374 offset++
1375 }
1376 return arr[:offset]
1377 }
1378
1379
1380
1381
1382
1383
1384 type wantConn struct {
1385 ready chan struct{}
1386
1387 mu sync.Mutex
1388 conn *connection
1389 err error
1390 }
1391
1392 func newWantConn() *wantConn {
1393 return &wantConn{
1394 ready: make(chan struct{}, 1),
1395 }
1396 }
1397
1398
1399 func (w *wantConn) waiting() bool {
1400 select {
1401 case <-w.ready:
1402 return false
1403 default:
1404 return true
1405 }
1406 }
1407
1408
1409 func (w *wantConn) tryDeliver(conn *connection, err error) bool {
1410 w.mu.Lock()
1411 defer w.mu.Unlock()
1412
1413 if w.conn != nil || w.err != nil {
1414 return false
1415 }
1416
1417 w.conn = conn
1418 w.err = err
1419 if w.conn == nil && w.err == nil {
1420 panic("x/mongo/driver/topology: internal error: misuse of tryDeliver")
1421 }
1422
1423 close(w.ready)
1424
1425 return true
1426 }
1427
1428
1429
1430
1431 func (w *wantConn) cancel(p *pool, err error) {
1432 if err == nil {
1433 panic("x/mongo/driver/topology: internal error: misuse of cancel")
1434 }
1435
1436 w.mu.Lock()
1437 if w.conn == nil && w.err == nil {
1438 close(w.ready)
1439 }
1440 conn := w.conn
1441 w.conn = nil
1442 w.err = err
1443 w.mu.Unlock()
1444
1445 if conn != nil {
1446 _ = p.checkInNoEvent(conn)
1447 }
1448 }
1449
1450
1451
1452 type wantConnQueue struct {
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463 head []*wantConn
1464 headPos int
1465 tail []*wantConn
1466 }
1467
1468
1469 func (q *wantConnQueue) len() int {
1470 return len(q.head) - q.headPos + len(q.tail)
1471 }
1472
1473
1474 func (q *wantConnQueue) pushBack(w *wantConn) {
1475 q.tail = append(q.tail, w)
1476 }
1477
1478
1479 func (q *wantConnQueue) popFront() *wantConn {
1480 if q.headPos >= len(q.head) {
1481 if len(q.tail) == 0 {
1482 return nil
1483 }
1484
1485 q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
1486 }
1487 w := q.head[q.headPos]
1488 q.head[q.headPos] = nil
1489 q.headPos++
1490 return w
1491 }
1492
1493
1494 func (q *wantConnQueue) peekFront() *wantConn {
1495 if q.headPos < len(q.head) {
1496 return q.head[q.headPos]
1497 }
1498 if len(q.tail) > 0 {
1499 return q.tail[0]
1500 }
1501 return nil
1502 }
1503
1504
1505 func (q *wantConnQueue) cleanFront() {
1506 for {
1507 w := q.peekFront()
1508 if w == nil || w.waiting() {
1509 return
1510 }
1511 q.popFront()
1512 }
1513 }
1514
View as plain text