...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/topology/pool.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/topology

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package topology
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"net"
    13  	"sync"
    14  	"sync/atomic"
    15  	"time"
    16  
    17  	"go.mongodb.org/mongo-driver/bson/primitive"
    18  	"go.mongodb.org/mongo-driver/event"
    19  	"go.mongodb.org/mongo-driver/internal/logger"
    20  	"go.mongodb.org/mongo-driver/mongo/address"
    21  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    22  )
    23  
    24  // Connection pool state constants.
    25  const (
    26  	poolPaused int = iota
    27  	poolReady
    28  	poolClosed
    29  )
    30  
    31  // ErrPoolNotPaused is returned when attempting to mark a connection pool "ready" that is not
    32  // currently "paused".
    33  var ErrPoolNotPaused = PoolError("only a paused pool can be marked ready")
    34  
    35  // ErrPoolClosed is returned when attempting to check out a connection from a closed pool.
    36  var ErrPoolClosed = PoolError("attempted to check out a connection from closed connection pool")
    37  
    38  // ErrConnectionClosed is returned from an attempt to use an already closed connection.
    39  var ErrConnectionClosed = ConnectionError{ConnectionID: "<closed>", message: "connection is closed"}
    40  
    41  // ErrWrongPool is return when a connection is returned to a pool it doesn't belong to.
    42  var ErrWrongPool = PoolError("connection does not belong to this pool")
    43  
    44  // PoolError is an error returned from a Pool method.
    45  type PoolError string
    46  
    47  func (pe PoolError) Error() string { return string(pe) }
    48  
    49  // poolClearedError is an error returned when the connection pool is cleared or currently paused. It
    50  // is a retryable error.
    51  type poolClearedError struct {
    52  	err     error
    53  	address address.Address
    54  }
    55  
    56  func (pce poolClearedError) Error() string {
    57  	return fmt.Sprintf(
    58  		"connection pool for %v was cleared because another operation failed with: %v",
    59  		pce.address,
    60  		pce.err)
    61  }
    62  
    63  // Retryable returns true. All poolClearedErrors are retryable.
    64  func (poolClearedError) Retryable() bool { return true }
    65  
    66  // Assert that poolClearedError is a driver.RetryablePoolError.
    67  var _ driver.RetryablePoolError = poolClearedError{}
    68  
    69  // poolConfig contains all aspects of the pool that can be configured
    70  type poolConfig struct {
    71  	Address          address.Address
    72  	MinPoolSize      uint64
    73  	MaxPoolSize      uint64
    74  	MaxConnecting    uint64
    75  	MaxIdleTime      time.Duration
    76  	MaintainInterval time.Duration
    77  	LoadBalanced     bool
    78  	PoolMonitor      *event.PoolMonitor
    79  	Logger           *logger.Logger
    80  	handshakeErrFn   func(error, uint64, *primitive.ObjectID)
    81  }
    82  
    83  type pool struct {
    84  	// The following integer fields must be accessed using the atomic package
    85  	// and should be at the beginning of the struct.
    86  	// - atomic bug: https://pkg.go.dev/sync/atomic#pkg-note-BUG
    87  	// - suggested layout: https://go101.org/article/memory-layout.html
    88  
    89  	nextID                       uint64 // nextID is the next pool ID for a new connection.
    90  	pinnedCursorConnections      uint64
    91  	pinnedTransactionConnections uint64
    92  
    93  	address       address.Address
    94  	minSize       uint64
    95  	maxSize       uint64
    96  	maxConnecting uint64
    97  	loadBalanced  bool
    98  	monitor       *event.PoolMonitor
    99  	logger        *logger.Logger
   100  
   101  	// handshakeErrFn is used to handle any errors that happen during connection establishment and
   102  	// handshaking.
   103  	handshakeErrFn func(error, uint64, *primitive.ObjectID)
   104  
   105  	connOpts   []ConnectionOption
   106  	generation *poolGenerationMap
   107  
   108  	maintainInterval time.Duration   // maintainInterval is the maintain() loop interval.
   109  	maintainReady    chan struct{}   // maintainReady is a signal channel that starts the maintain() loop when ready() is called.
   110  	backgroundDone   *sync.WaitGroup // backgroundDone waits for all background goroutines to return.
   111  
   112  	stateMu      sync.RWMutex // stateMu guards state, lastClearErr
   113  	state        int          // state is the current state of the connection pool.
   114  	lastClearErr error        // lastClearErr is the last error that caused the pool to be cleared.
   115  
   116  	// createConnectionsCond is the condition variable that controls when the createConnections()
   117  	// loop runs or waits. Its lock guards cancelBackgroundCtx, conns, and newConnWait. Any changes
   118  	// to the state of the guarded values must be made while holding the lock to prevent undefined
   119  	// behavior in the createConnections() waiting logic.
   120  	createConnectionsCond *sync.Cond
   121  	cancelBackgroundCtx   context.CancelFunc     // cancelBackgroundCtx is called to signal background goroutines to stop.
   122  	conns                 map[uint64]*connection // conns holds all currently open connections.
   123  	newConnWait           wantConnQueue          // newConnWait holds all wantConn requests for new connections.
   124  
   125  	idleMu       sync.Mutex    // idleMu guards idleConns, idleConnWait
   126  	idleConns    []*connection // idleConns holds all idle connections.
   127  	idleConnWait wantConnQueue // idleConnWait holds all wantConn requests for idle connections.
   128  }
   129  
   130  // getState returns the current state of the pool. Callers must not hold the stateMu lock.
   131  func (p *pool) getState() int {
   132  	p.stateMu.RLock()
   133  	defer p.stateMu.RUnlock()
   134  
   135  	return p.state
   136  }
   137  
   138  func mustLogPoolMessage(pool *pool) bool {
   139  	return pool.logger != nil && pool.logger.LevelComponentEnabled(
   140  		logger.LevelDebug, logger.ComponentConnection)
   141  }
   142  
   143  func logPoolMessage(pool *pool, msg string, keysAndValues ...interface{}) {
   144  	host, port, err := net.SplitHostPort(pool.address.String())
   145  	if err != nil {
   146  		host = pool.address.String()
   147  		port = ""
   148  	}
   149  
   150  	pool.logger.Print(logger.LevelDebug,
   151  		logger.ComponentConnection,
   152  		msg,
   153  		logger.SerializeConnection(logger.Connection{
   154  			Message:    msg,
   155  			ServerHost: host,
   156  			ServerPort: port,
   157  		}, keysAndValues...)...)
   158  
   159  }
   160  
   161  type reason struct {
   162  	loggerConn string
   163  	event      string
   164  }
   165  
   166  // connectionPerished checks if a given connection is perished and should be removed from the pool.
   167  func connectionPerished(conn *connection) (reason, bool) {
   168  	switch {
   169  	case conn.closed():
   170  		// A connection would only be closed if it encountered a network error during an operation and closed itself.
   171  		return reason{
   172  			loggerConn: logger.ReasonConnClosedError,
   173  			event:      event.ReasonError,
   174  		}, true
   175  	case conn.idleTimeoutExpired():
   176  		return reason{
   177  			loggerConn: logger.ReasonConnClosedIdle,
   178  			event:      event.ReasonIdle,
   179  		}, true
   180  	case conn.pool.stale(conn):
   181  		return reason{
   182  			loggerConn: logger.ReasonConnClosedStale,
   183  			event:      event.ReasonStale,
   184  		}, true
   185  	}
   186  
   187  	return reason{}, false
   188  }
   189  
   190  // newPool creates a new pool. It will use the provided options when creating connections.
   191  func newPool(config poolConfig, connOpts ...ConnectionOption) *pool {
   192  	if config.MaxIdleTime != time.Duration(0) {
   193  		connOpts = append(connOpts, WithIdleTimeout(func(_ time.Duration) time.Duration { return config.MaxIdleTime }))
   194  	}
   195  
   196  	var maxConnecting uint64 = 2
   197  	if config.MaxConnecting > 0 {
   198  		maxConnecting = config.MaxConnecting
   199  	}
   200  
   201  	maintainInterval := 10 * time.Second
   202  	if config.MaintainInterval != 0 {
   203  		maintainInterval = config.MaintainInterval
   204  	}
   205  
   206  	pool := &pool{
   207  		address:               config.Address,
   208  		minSize:               config.MinPoolSize,
   209  		maxSize:               config.MaxPoolSize,
   210  		maxConnecting:         maxConnecting,
   211  		loadBalanced:          config.LoadBalanced,
   212  		monitor:               config.PoolMonitor,
   213  		logger:                config.Logger,
   214  		handshakeErrFn:        config.handshakeErrFn,
   215  		connOpts:              connOpts,
   216  		generation:            newPoolGenerationMap(),
   217  		state:                 poolPaused,
   218  		maintainInterval:      maintainInterval,
   219  		maintainReady:         make(chan struct{}, 1),
   220  		backgroundDone:        &sync.WaitGroup{},
   221  		createConnectionsCond: sync.NewCond(&sync.Mutex{}),
   222  		conns:                 make(map[uint64]*connection, config.MaxPoolSize),
   223  		idleConns:             make([]*connection, 0, config.MaxPoolSize),
   224  	}
   225  	// minSize must not exceed maxSize if maxSize is not 0
   226  	if pool.maxSize != 0 && pool.minSize > pool.maxSize {
   227  		pool.minSize = pool.maxSize
   228  	}
   229  	pool.connOpts = append(pool.connOpts, withGenerationNumberFn(func(_ generationNumberFn) generationNumberFn { return pool.getGenerationForNewConnection }))
   230  
   231  	pool.generation.connect()
   232  
   233  	// Create a Context with cancellation that's used to signal the createConnections() and
   234  	// maintain() background goroutines to stop. Also create a "backgroundDone" WaitGroup that is
   235  	// used to wait for the background goroutines to return.
   236  	var ctx context.Context
   237  	ctx, pool.cancelBackgroundCtx = context.WithCancel(context.Background())
   238  
   239  	for i := 0; i < int(pool.maxConnecting); i++ {
   240  		pool.backgroundDone.Add(1)
   241  		go pool.createConnections(ctx, pool.backgroundDone)
   242  	}
   243  
   244  	// If maintainInterval is not positive, don't start the maintain() goroutine. Expect that
   245  	// negative values are only used in testing; this config value is not user-configurable.
   246  	if maintainInterval > 0 {
   247  		pool.backgroundDone.Add(1)
   248  		go pool.maintain(ctx, pool.backgroundDone)
   249  	}
   250  
   251  	if mustLogPoolMessage(pool) {
   252  		keysAndValues := logger.KeyValues{
   253  			logger.KeyMaxIdleTimeMS, config.MaxIdleTime.Milliseconds(),
   254  			logger.KeyMinPoolSize, config.MinPoolSize,
   255  			logger.KeyMaxPoolSize, config.MaxPoolSize,
   256  			logger.KeyMaxConnecting, config.MaxConnecting,
   257  		}
   258  
   259  		logPoolMessage(pool, logger.ConnectionPoolCreated, keysAndValues...)
   260  	}
   261  
   262  	if pool.monitor != nil {
   263  		pool.monitor.Event(&event.PoolEvent{
   264  			Type: event.PoolCreated,
   265  			PoolOptions: &event.MonitorPoolOptions{
   266  				MaxPoolSize: config.MaxPoolSize,
   267  				MinPoolSize: config.MinPoolSize,
   268  			},
   269  			Address: pool.address.String(),
   270  		})
   271  	}
   272  
   273  	return pool
   274  }
   275  
   276  // stale checks if a given connection's generation is below the generation of the pool
   277  func (p *pool) stale(conn *connection) bool {
   278  	return conn == nil || p.generation.stale(conn.desc.ServiceID, conn.generation)
   279  }
   280  
   281  // ready puts the pool into the "ready" state and starts the background connection creation and
   282  // monitoring goroutines. ready must be called before connections can be checked out. An unused,
   283  // connected pool must be closed or it will leak goroutines and will not be garbage collected.
   284  func (p *pool) ready() error {
   285  	// While holding the stateMu lock, set the pool to "ready" if it is currently "paused".
   286  	p.stateMu.Lock()
   287  	if p.state == poolReady {
   288  		p.stateMu.Unlock()
   289  		return nil
   290  	}
   291  	if p.state != poolPaused {
   292  		p.stateMu.Unlock()
   293  		return ErrPoolNotPaused
   294  	}
   295  	p.lastClearErr = nil
   296  	p.state = poolReady
   297  	p.stateMu.Unlock()
   298  
   299  	if mustLogPoolMessage(p) {
   300  		logPoolMessage(p, logger.ConnectionPoolReady)
   301  	}
   302  
   303  	// Send event.PoolReady before resuming the maintain() goroutine to guarantee that the
   304  	// "pool ready" event is always sent before maintain() starts creating connections.
   305  	if p.monitor != nil {
   306  		p.monitor.Event(&event.PoolEvent{
   307  			Type:    event.PoolReady,
   308  			Address: p.address.String(),
   309  		})
   310  	}
   311  
   312  	// Signal maintain() to wake up immediately when marking the pool "ready".
   313  	select {
   314  	case p.maintainReady <- struct{}{}:
   315  	default:
   316  	}
   317  
   318  	return nil
   319  }
   320  
   321  // close closes the pool, closes all connections associated with the pool, and stops all background
   322  // goroutines. All subsequent checkOut requests will return an error. An unused, ready pool must be
   323  // closed or it will leak goroutines and will not be garbage collected.
   324  func (p *pool) close(ctx context.Context) {
   325  	p.stateMu.Lock()
   326  	if p.state == poolClosed {
   327  		p.stateMu.Unlock()
   328  		return
   329  	}
   330  	p.state = poolClosed
   331  	p.stateMu.Unlock()
   332  
   333  	// Call cancelBackgroundCtx() to exit the maintain() and createConnections() background
   334  	// goroutines. Broadcast to the createConnectionsCond to wake up all createConnections()
   335  	// goroutines. We must hold the createConnectionsCond lock here because we're changing the
   336  	// condition by cancelling the "background goroutine" Context, even tho cancelling the Context
   337  	// is also synchronized by a lock. Otherwise, we run into an intermittent bug that prevents the
   338  	// createConnections() goroutines from exiting.
   339  	p.createConnectionsCond.L.Lock()
   340  	p.cancelBackgroundCtx()
   341  	p.createConnectionsCond.Broadcast()
   342  	p.createConnectionsCond.L.Unlock()
   343  
   344  	// Wait for all background goroutines to exit.
   345  	p.backgroundDone.Wait()
   346  
   347  	p.generation.disconnect()
   348  
   349  	if ctx == nil {
   350  		ctx = context.Background()
   351  	}
   352  
   353  	// If we have a deadline then we interpret it as a request to gracefully shutdown. We wait until
   354  	// either all the connections have been checked back into the pool (i.e. total open connections
   355  	// equals idle connections) or until the Context deadline is reached.
   356  	if _, ok := ctx.Deadline(); ok {
   357  		ticker := time.NewTicker(100 * time.Millisecond)
   358  		defer ticker.Stop()
   359  
   360  	graceful:
   361  		for {
   362  			if p.totalConnectionCount() == p.availableConnectionCount() {
   363  				break graceful
   364  			}
   365  
   366  			select {
   367  			case <-ticker.C:
   368  			case <-ctx.Done():
   369  				break graceful
   370  			default:
   371  			}
   372  		}
   373  	}
   374  
   375  	// Empty the idle connections stack and try to deliver ErrPoolClosed to any waiting wantConns
   376  	// from idleConnWait while holding the idleMu lock.
   377  	p.idleMu.Lock()
   378  	for _, conn := range p.idleConns {
   379  		_ = p.removeConnection(conn, reason{
   380  			loggerConn: logger.ReasonConnClosedPoolClosed,
   381  			event:      event.ReasonPoolClosed,
   382  		}, nil)
   383  		_ = p.closeConnection(conn) // We don't care about errors while closing the connection.
   384  	}
   385  	p.idleConns = p.idleConns[:0]
   386  	for {
   387  		w := p.idleConnWait.popFront()
   388  		if w == nil {
   389  			break
   390  		}
   391  		w.tryDeliver(nil, ErrPoolClosed)
   392  	}
   393  	p.idleMu.Unlock()
   394  
   395  	// Collect all conns from the pool and try to deliver ErrPoolClosed to any waiting wantConns
   396  	// from newConnWait while holding the createConnectionsCond lock. We can't call removeConnection
   397  	// on the connections while holding any locks, so do that after we release the lock.
   398  	p.createConnectionsCond.L.Lock()
   399  	conns := make([]*connection, 0, len(p.conns))
   400  	for _, conn := range p.conns {
   401  		conns = append(conns, conn)
   402  	}
   403  	for {
   404  		w := p.newConnWait.popFront()
   405  		if w == nil {
   406  			break
   407  		}
   408  		w.tryDeliver(nil, ErrPoolClosed)
   409  	}
   410  	p.createConnectionsCond.L.Unlock()
   411  
   412  	if mustLogPoolMessage(p) {
   413  		logPoolMessage(p, logger.ConnectionPoolClosed)
   414  	}
   415  
   416  	if p.monitor != nil {
   417  		p.monitor.Event(&event.PoolEvent{
   418  			Type:    event.PoolClosedEvent,
   419  			Address: p.address.String(),
   420  		})
   421  	}
   422  
   423  	// Now that we're not holding any locks, remove all of the connections we collected from the
   424  	// pool.
   425  	for _, conn := range conns {
   426  		_ = p.removeConnection(conn, reason{
   427  			loggerConn: logger.ReasonConnClosedPoolClosed,
   428  			event:      event.ReasonPoolClosed,
   429  		}, nil)
   430  		_ = p.closeConnection(conn) // We don't care about errors while closing the connection.
   431  	}
   432  }
   433  
   434  func (p *pool) pinConnectionToCursor() {
   435  	atomic.AddUint64(&p.pinnedCursorConnections, 1)
   436  }
   437  
   438  func (p *pool) unpinConnectionFromCursor() {
   439  	// See https://golang.org/pkg/sync/atomic/#AddUint64 for an explanation of the ^uint64(0) syntax.
   440  	atomic.AddUint64(&p.pinnedCursorConnections, ^uint64(0))
   441  }
   442  
   443  func (p *pool) pinConnectionToTransaction() {
   444  	atomic.AddUint64(&p.pinnedTransactionConnections, 1)
   445  }
   446  
   447  func (p *pool) unpinConnectionFromTransaction() {
   448  	// See https://golang.org/pkg/sync/atomic/#AddUint64 for an explanation of the ^uint64(0) syntax.
   449  	atomic.AddUint64(&p.pinnedTransactionConnections, ^uint64(0))
   450  }
   451  
   452  // checkOut checks out a connection from the pool. If an idle connection is not available, the
   453  // checkOut enters a queue waiting for either the next idle or new connection. If the pool is not
   454  // ready, checkOut returns an error.
   455  // Based partially on https://cs.opensource.google/go/go/+/refs/tags/go1.16.6:src/net/http/transport.go;l=1324
   456  func (p *pool) checkOut(ctx context.Context) (conn *connection, err error) {
   457  	if mustLogPoolMessage(p) {
   458  		logPoolMessage(p, logger.ConnectionCheckoutStarted)
   459  	}
   460  
   461  	// TODO(CSOT): If a Timeout was specified at any level, respect the Timeout is server selection, connection
   462  	// TODO checkout.
   463  	if p.monitor != nil {
   464  		p.monitor.Event(&event.PoolEvent{
   465  			Type:    event.GetStarted,
   466  			Address: p.address.String(),
   467  		})
   468  	}
   469  
   470  	start := time.Now()
   471  	// Check the pool state while holding a stateMu read lock. If the pool state is not "ready",
   472  	// return an error. Do all of this while holding the stateMu read lock to prevent a state change between
   473  	// checking the state and entering the wait queue. Not holding the stateMu read lock here may
   474  	// allow a checkOut() to enter the wait queue after clear() pauses the pool and clears the wait
   475  	// queue, resulting in createConnections() doing work while the pool is "paused".
   476  	p.stateMu.RLock()
   477  	switch p.state {
   478  	case poolClosed:
   479  		p.stateMu.RUnlock()
   480  
   481  		duration := time.Since(start)
   482  		if mustLogPoolMessage(p) {
   483  			keysAndValues := logger.KeyValues{
   484  				logger.KeyDurationMS, duration.Milliseconds(),
   485  				logger.KeyReason, logger.ReasonConnCheckoutFailedPoolClosed,
   486  			}
   487  
   488  			logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
   489  		}
   490  
   491  		if p.monitor != nil {
   492  			p.monitor.Event(&event.PoolEvent{
   493  				Type:     event.GetFailed,
   494  				Address:  p.address.String(),
   495  				Duration: duration,
   496  				Reason:   event.ReasonPoolClosed,
   497  			})
   498  		}
   499  		return nil, ErrPoolClosed
   500  	case poolPaused:
   501  		err := poolClearedError{err: p.lastClearErr, address: p.address}
   502  		p.stateMu.RUnlock()
   503  
   504  		duration := time.Since(start)
   505  		if mustLogPoolMessage(p) {
   506  			keysAndValues := logger.KeyValues{
   507  				logger.KeyDurationMS, duration.Milliseconds(),
   508  				logger.KeyReason, logger.ReasonConnCheckoutFailedError,
   509  			}
   510  
   511  			logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
   512  		}
   513  
   514  		if p.monitor != nil {
   515  			p.monitor.Event(&event.PoolEvent{
   516  				Type:     event.GetFailed,
   517  				Address:  p.address.String(),
   518  				Duration: duration,
   519  				Reason:   event.ReasonConnectionErrored,
   520  				Error:    err,
   521  			})
   522  		}
   523  		return nil, err
   524  	}
   525  
   526  	if ctx == nil {
   527  		ctx = context.Background()
   528  	}
   529  
   530  	// Create a wantConn, which we will use to request an existing idle or new connection. Always
   531  	// cancel the wantConn if checkOut() returned an error to make sure any delivered connections
   532  	// are returned to the pool (e.g. if a connection was delivered immediately after the Context
   533  	// timed out).
   534  	w := newWantConn()
   535  	defer func() {
   536  		if err != nil {
   537  			w.cancel(p, err)
   538  		}
   539  	}()
   540  
   541  	// Get in the queue for an idle connection. If getOrQueueForIdleConn returns true, it was able to
   542  	// immediately deliver an idle connection to the wantConn, so we can return the connection or
   543  	// error from the wantConn without waiting for "ready".
   544  	if delivered := p.getOrQueueForIdleConn(w); delivered {
   545  		// If delivered = true, we didn't enter the wait queue and will return either a connection
   546  		// or an error, so unlock the stateMu lock here.
   547  		p.stateMu.RUnlock()
   548  
   549  		duration := time.Since(start)
   550  		if w.err != nil {
   551  			if mustLogPoolMessage(p) {
   552  				keysAndValues := logger.KeyValues{
   553  					logger.KeyDurationMS, duration.Milliseconds(),
   554  					logger.KeyReason, logger.ReasonConnCheckoutFailedError,
   555  				}
   556  
   557  				logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
   558  			}
   559  
   560  			if p.monitor != nil {
   561  				p.monitor.Event(&event.PoolEvent{
   562  					Type:     event.GetFailed,
   563  					Address:  p.address.String(),
   564  					Duration: duration,
   565  					Reason:   event.ReasonConnectionErrored,
   566  					Error:    w.err,
   567  				})
   568  			}
   569  			return nil, w.err
   570  		}
   571  
   572  		duration = time.Since(start)
   573  		if mustLogPoolMessage(p) {
   574  			keysAndValues := logger.KeyValues{
   575  				logger.KeyDriverConnectionID, w.conn.driverConnectionID,
   576  				logger.KeyDurationMS, duration.Milliseconds(),
   577  			}
   578  
   579  			logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...)
   580  		}
   581  
   582  		if p.monitor != nil {
   583  			p.monitor.Event(&event.PoolEvent{
   584  				Type:         event.GetSucceeded,
   585  				Address:      p.address.String(),
   586  				ConnectionID: w.conn.driverConnectionID,
   587  				Duration:     duration,
   588  			})
   589  		}
   590  
   591  		return w.conn, nil
   592  	}
   593  
   594  	// If we didn't get an immediately available idle connection, also get in the queue for a new
   595  	// connection while we're waiting for an idle connection.
   596  	p.queueForNewConn(w)
   597  	p.stateMu.RUnlock()
   598  
   599  	// Wait for either the wantConn to be ready or for the Context to time out.
   600  	waitQueueStart := time.Now()
   601  	select {
   602  	case <-w.ready:
   603  		if w.err != nil {
   604  			duration := time.Since(start)
   605  			if mustLogPoolMessage(p) {
   606  				keysAndValues := logger.KeyValues{
   607  					logger.KeyDurationMS, duration.Milliseconds(),
   608  					logger.KeyReason, logger.ReasonConnCheckoutFailedError,
   609  					logger.KeyError, w.err.Error(),
   610  				}
   611  
   612  				logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
   613  			}
   614  
   615  			if p.monitor != nil {
   616  				p.monitor.Event(&event.PoolEvent{
   617  					Type:     event.GetFailed,
   618  					Address:  p.address.String(),
   619  					Duration: duration,
   620  					Reason:   event.ReasonConnectionErrored,
   621  					Error:    w.err,
   622  				})
   623  			}
   624  
   625  			return nil, w.err
   626  		}
   627  
   628  		duration := time.Since(start)
   629  		if mustLogPoolMessage(p) {
   630  			keysAndValues := logger.KeyValues{
   631  				logger.KeyDriverConnectionID, w.conn.driverConnectionID,
   632  				logger.KeyDurationMS, duration.Milliseconds(),
   633  			}
   634  
   635  			logPoolMessage(p, logger.ConnectionCheckedOut, keysAndValues...)
   636  		}
   637  
   638  		if p.monitor != nil {
   639  			p.monitor.Event(&event.PoolEvent{
   640  				Type:         event.GetSucceeded,
   641  				Address:      p.address.String(),
   642  				ConnectionID: w.conn.driverConnectionID,
   643  				Duration:     duration,
   644  			})
   645  		}
   646  		return w.conn, nil
   647  	case <-ctx.Done():
   648  		waitQueueDuration := time.Since(waitQueueStart)
   649  
   650  		duration := time.Since(start)
   651  		if mustLogPoolMessage(p) {
   652  			keysAndValues := logger.KeyValues{
   653  				logger.KeyDurationMS, duration.Milliseconds(),
   654  				logger.KeyReason, logger.ReasonConnCheckoutFailedTimout,
   655  			}
   656  
   657  			logPoolMessage(p, logger.ConnectionCheckoutFailed, keysAndValues...)
   658  		}
   659  
   660  		if p.monitor != nil {
   661  			p.monitor.Event(&event.PoolEvent{
   662  				Type:     event.GetFailed,
   663  				Address:  p.address.String(),
   664  				Duration: duration,
   665  				Reason:   event.ReasonTimedOut,
   666  				Error:    ctx.Err(),
   667  			})
   668  		}
   669  
   670  		err := WaitQueueTimeoutError{
   671  			Wrapped:              ctx.Err(),
   672  			maxPoolSize:          p.maxSize,
   673  			totalConnections:     p.totalConnectionCount(),
   674  			availableConnections: p.availableConnectionCount(),
   675  			waitDuration:         waitQueueDuration,
   676  		}
   677  		if p.loadBalanced {
   678  			err.pinnedConnections = &pinnedConnections{
   679  				cursorConnections:      atomic.LoadUint64(&p.pinnedCursorConnections),
   680  				transactionConnections: atomic.LoadUint64(&p.pinnedTransactionConnections),
   681  			}
   682  		}
   683  		return nil, err
   684  	}
   685  }
   686  
   687  // closeConnection closes a connection.
   688  func (p *pool) closeConnection(conn *connection) error {
   689  	if conn.pool != p {
   690  		return ErrWrongPool
   691  	}
   692  
   693  	if atomic.LoadInt64(&conn.state) == connConnected {
   694  		conn.closeConnectContext()
   695  		conn.wait() // Make sure that the connection has finished connecting.
   696  	}
   697  
   698  	err := conn.close()
   699  	if err != nil {
   700  		return ConnectionError{ConnectionID: conn.id, Wrapped: err, message: "failed to close net.Conn"}
   701  	}
   702  
   703  	return nil
   704  }
   705  
   706  func (p *pool) getGenerationForNewConnection(serviceID *primitive.ObjectID) uint64 {
   707  	return p.generation.addConnection(serviceID)
   708  }
   709  
   710  // removeConnection removes a connection from the pool and emits a "ConnectionClosed" event.
   711  func (p *pool) removeConnection(conn *connection, reason reason, err error) error {
   712  	if conn == nil {
   713  		return nil
   714  	}
   715  
   716  	if conn.pool != p {
   717  		return ErrWrongPool
   718  	}
   719  
   720  	p.createConnectionsCond.L.Lock()
   721  	_, ok := p.conns[conn.driverConnectionID]
   722  	if !ok {
   723  		// If the connection has been removed from the pool already, exit without doing any
   724  		// additional state changes.
   725  		p.createConnectionsCond.L.Unlock()
   726  		return nil
   727  	}
   728  	delete(p.conns, conn.driverConnectionID)
   729  	// Signal the createConnectionsCond so any goroutines waiting for a new connection slot in the
   730  	// pool will proceed.
   731  	p.createConnectionsCond.Signal()
   732  	p.createConnectionsCond.L.Unlock()
   733  
   734  	// Only update the generation numbers map if the connection has retrieved its generation number.
   735  	// Otherwise, we'd decrement the count for the generation even though it had never been
   736  	// incremented.
   737  	if conn.hasGenerationNumber() {
   738  		p.generation.removeConnection(conn.desc.ServiceID)
   739  	}
   740  
   741  	if mustLogPoolMessage(p) {
   742  		keysAndValues := logger.KeyValues{
   743  			logger.KeyDriverConnectionID, conn.driverConnectionID,
   744  			logger.KeyReason, reason.loggerConn,
   745  		}
   746  
   747  		if err != nil {
   748  			keysAndValues.Add(logger.KeyError, err.Error())
   749  		}
   750  
   751  		logPoolMessage(p, logger.ConnectionClosed, keysAndValues...)
   752  	}
   753  
   754  	if p.monitor != nil {
   755  		p.monitor.Event(&event.PoolEvent{
   756  			Type:         event.ConnectionClosed,
   757  			Address:      p.address.String(),
   758  			ConnectionID: conn.driverConnectionID,
   759  			Reason:       reason.event,
   760  			Error:        err,
   761  		})
   762  	}
   763  
   764  	return nil
   765  }
   766  
   767  var (
   768  	// BGReadTimeout is the maximum amount of the to wait when trying to read
   769  	// the server reply on a connection after an operation timed out. The
   770  	// default is 1 second.
   771  	//
   772  	// Deprecated: BGReadTimeout is intended for internal use only and may be
   773  	// removed or modified at any time.
   774  	BGReadTimeout = 1 * time.Second
   775  
   776  	// BGReadCallback is a callback for monitoring the behavior of the
   777  	// background-read-on-timeout connection preserving mechanism.
   778  	//
   779  	// Deprecated: BGReadCallback is intended for internal use only and may be
   780  	// removed or modified at any time.
   781  	BGReadCallback func(addr string, start, read time.Time, errs []error, connClosed bool)
   782  )
   783  
   784  // bgRead sets a new read deadline on the provided connection (1 second in the
   785  // future) and tries to read any bytes returned by the server. If successful, it
   786  // checks the connection into the provided pool. If there are any errors, it
   787  // closes the connection.
   788  //
   789  // It calls the package-global BGReadCallback function, if set, with the
   790  // address, timings, and any errors that occurred.
   791  func bgRead(pool *pool, conn *connection) {
   792  	var start, read time.Time
   793  	start = time.Now()
   794  	errs := make([]error, 0)
   795  	connClosed := false
   796  
   797  	defer func() {
   798  		// No matter what happens, always check the connection back into the
   799  		// pool, which will either make it available for other operations or
   800  		// remove it from the pool if it was closed.
   801  		err := pool.checkInNoEvent(conn)
   802  		if err != nil {
   803  			errs = append(errs, fmt.Errorf("error checking in: %w", err))
   804  		}
   805  
   806  		if BGReadCallback != nil {
   807  			BGReadCallback(conn.addr.String(), start, read, errs, connClosed)
   808  		}
   809  	}()
   810  
   811  	err := conn.nc.SetReadDeadline(time.Now().Add(BGReadTimeout))
   812  	if err != nil {
   813  		errs = append(errs, fmt.Errorf("error setting a read deadline: %w", err))
   814  
   815  		connClosed = true
   816  		err := conn.close()
   817  		if err != nil {
   818  			errs = append(errs, fmt.Errorf("error closing conn after setting read deadline: %w", err))
   819  		}
   820  
   821  		return
   822  	}
   823  
   824  	// The context here is only used for cancellation, not deadline timeout, so
   825  	// use context.Background(). The read timeout is set by calling
   826  	// SetReadDeadline above.
   827  	_, _, err = conn.read(context.Background())
   828  	read = time.Now()
   829  	if err != nil {
   830  		errs = append(errs, fmt.Errorf("error reading: %w", err))
   831  
   832  		connClosed = true
   833  		err := conn.close()
   834  		if err != nil {
   835  			errs = append(errs, fmt.Errorf("error closing conn after reading: %w", err))
   836  		}
   837  
   838  		return
   839  	}
   840  }
   841  
   842  // checkIn returns an idle connection to the pool. If the connection is perished or the pool is
   843  // closed, it is removed from the connection pool and closed.
   844  func (p *pool) checkIn(conn *connection) error {
   845  	if conn == nil {
   846  		return nil
   847  	}
   848  	if conn.pool != p {
   849  		return ErrWrongPool
   850  	}
   851  
   852  	if mustLogPoolMessage(p) {
   853  		keysAndValues := logger.KeyValues{
   854  			logger.KeyDriverConnectionID, conn.driverConnectionID,
   855  		}
   856  
   857  		logPoolMessage(p, logger.ConnectionCheckedIn, keysAndValues...)
   858  	}
   859  
   860  	if p.monitor != nil {
   861  		p.monitor.Event(&event.PoolEvent{
   862  			Type:         event.ConnectionReturned,
   863  			ConnectionID: conn.driverConnectionID,
   864  			Address:      conn.addr.String(),
   865  		})
   866  	}
   867  
   868  	return p.checkInNoEvent(conn)
   869  }
   870  
   871  // checkInNoEvent returns a connection to the pool. It behaves identically to checkIn except it does
   872  // not publish events. It is only intended for use by pool-internal functions.
   873  func (p *pool) checkInNoEvent(conn *connection) error {
   874  	if conn == nil {
   875  		return nil
   876  	}
   877  	if conn.pool != p {
   878  		return ErrWrongPool
   879  	}
   880  
   881  	// If the connection has an awaiting server response, try to read the
   882  	// response in another goroutine before checking it back into the pool.
   883  	//
   884  	// Do this here because we want to publish checkIn events when the operation
   885  	// is done with the connection, not when it's ready to be used again. That
   886  	// means that connections in "awaiting response" state are checked in but
   887  	// not usable, which is not covered by the current pool events. We may need
   888  	// to add pool event information in the future to communicate that.
   889  	if conn.awaitingResponse {
   890  		conn.awaitingResponse = false
   891  		go bgRead(p, conn)
   892  		return nil
   893  	}
   894  
   895  	// Bump the connection idle deadline here because we're about to make the connection "available".
   896  	// The idle deadline is used to determine when a connection has reached its max idle time and
   897  	// should be closed. A connection reaches its max idle time when it has been "available" in the
   898  	// idle connections stack for more than the configured duration (maxIdleTimeMS). Set it before
   899  	// we call connectionPerished(), which checks the idle deadline, because a newly "available"
   900  	// connection should never be perished due to max idle time.
   901  	conn.bumpIdleDeadline()
   902  
   903  	r, perished := connectionPerished(conn)
   904  	if !perished && conn.pool.getState() == poolClosed {
   905  		perished = true
   906  		r = reason{
   907  			loggerConn: logger.ReasonConnClosedPoolClosed,
   908  			event:      event.ReasonPoolClosed,
   909  		}
   910  	}
   911  	if perished {
   912  		_ = p.removeConnection(conn, r, nil)
   913  		go func() {
   914  			_ = p.closeConnection(conn)
   915  		}()
   916  		return nil
   917  	}
   918  
   919  	p.idleMu.Lock()
   920  	defer p.idleMu.Unlock()
   921  
   922  	for {
   923  		w := p.idleConnWait.popFront()
   924  		if w == nil {
   925  			break
   926  		}
   927  		if w.tryDeliver(conn, nil) {
   928  			return nil
   929  		}
   930  	}
   931  
   932  	for _, idle := range p.idleConns {
   933  		if idle == conn {
   934  			return fmt.Errorf("duplicate idle conn %p in idle connections stack", conn)
   935  		}
   936  	}
   937  
   938  	p.idleConns = append(p.idleConns, conn)
   939  	return nil
   940  }
   941  
   942  // clear calls clearImpl internally with a false interruptAllConnections value.
   943  func (p *pool) clear(err error, serviceID *primitive.ObjectID) {
   944  	p.clearImpl(err, serviceID, false)
   945  }
   946  
   947  // clearAll does same as the "clear" method but interrupts all connections.
   948  func (p *pool) clearAll(err error, serviceID *primitive.ObjectID) {
   949  	p.clearImpl(err, serviceID, true)
   950  }
   951  
   952  // interruptConnections interrupts the input connections.
   953  func (p *pool) interruptConnections(conns []*connection) {
   954  	for _, conn := range conns {
   955  		_ = p.removeConnection(conn, reason{
   956  			loggerConn: logger.ReasonConnClosedStale,
   957  			event:      event.ReasonStale,
   958  		}, nil)
   959  		go func(c *connection) {
   960  			_ = p.closeConnection(c)
   961  		}(conn)
   962  	}
   963  }
   964  
   965  // clear marks all connections as stale by incrementing the generation number, stops all background
   966  // goroutines, removes all requests from idleConnWait and newConnWait, and sets the pool state to
   967  // "paused". If serviceID is nil, clear marks all connections as stale. If serviceID is not nil,
   968  // clear marks only connections associated with the given serviceID stale (for use in load balancer
   969  // mode).
   970  // If interruptAllConnections is true, this function calls interruptConnections to interrupt all
   971  // non-idle connections.
   972  func (p *pool) clearImpl(err error, serviceID *primitive.ObjectID, interruptAllConnections bool) {
   973  	if p.getState() == poolClosed {
   974  		return
   975  	}
   976  
   977  	p.generation.clear(serviceID)
   978  
   979  	// If serviceID is nil (i.e. not in load balancer mode), transition the pool to a paused state
   980  	// by stopping all background goroutines, clearing the wait queues, and setting the pool state
   981  	// to "paused".
   982  	sendEvent := true
   983  	if serviceID == nil {
   984  		// While holding the stateMu lock, set the pool state to "paused" if it's currently "ready",
   985  		// and set lastClearErr to the error that caused the pool to be cleared. If the pool is
   986  		// already paused, don't send another "ConnectionPoolCleared" event.
   987  		p.stateMu.Lock()
   988  		if p.state == poolPaused {
   989  			sendEvent = false
   990  		}
   991  		if p.state == poolReady {
   992  			p.state = poolPaused
   993  		}
   994  		p.lastClearErr = err
   995  		p.stateMu.Unlock()
   996  	}
   997  
   998  	if mustLogPoolMessage(p) {
   999  		keysAndValues := logger.KeyValues{
  1000  			logger.KeyServiceID, serviceID,
  1001  		}
  1002  
  1003  		logPoolMessage(p, logger.ConnectionPoolCleared, keysAndValues...)
  1004  	}
  1005  
  1006  	if sendEvent && p.monitor != nil {
  1007  		event := &event.PoolEvent{
  1008  			Type:         event.PoolCleared,
  1009  			Address:      p.address.String(),
  1010  			ServiceID:    serviceID,
  1011  			Interruption: interruptAllConnections,
  1012  			Error:        err,
  1013  		}
  1014  		p.monitor.Event(event)
  1015  	}
  1016  
  1017  	p.removePerishedConns()
  1018  	if interruptAllConnections {
  1019  		p.createConnectionsCond.L.Lock()
  1020  		p.idleMu.Lock()
  1021  
  1022  		idleConns := make(map[*connection]bool, len(p.idleConns))
  1023  		for _, idle := range p.idleConns {
  1024  			idleConns[idle] = true
  1025  		}
  1026  
  1027  		conns := make([]*connection, 0, len(p.conns))
  1028  		for _, conn := range p.conns {
  1029  			if _, ok := idleConns[conn]; !ok && p.stale(conn) {
  1030  				conns = append(conns, conn)
  1031  			}
  1032  		}
  1033  
  1034  		p.idleMu.Unlock()
  1035  		p.createConnectionsCond.L.Unlock()
  1036  
  1037  		p.interruptConnections(conns)
  1038  	}
  1039  
  1040  	if serviceID == nil {
  1041  		pcErr := poolClearedError{err: err, address: p.address}
  1042  
  1043  		// Clear the idle connections wait queue.
  1044  		p.idleMu.Lock()
  1045  		for {
  1046  			w := p.idleConnWait.popFront()
  1047  			if w == nil {
  1048  				break
  1049  			}
  1050  			w.tryDeliver(nil, pcErr)
  1051  		}
  1052  		p.idleMu.Unlock()
  1053  
  1054  		// Clear the new connections wait queue. This effectively pauses the createConnections()
  1055  		// background goroutine because newConnWait is empty and checkOut() won't insert any more
  1056  		// wantConns into newConnWait until the pool is marked "ready" again.
  1057  		p.createConnectionsCond.L.Lock()
  1058  		for {
  1059  			w := p.newConnWait.popFront()
  1060  			if w == nil {
  1061  				break
  1062  			}
  1063  			w.tryDeliver(nil, pcErr)
  1064  		}
  1065  		p.createConnectionsCond.L.Unlock()
  1066  	}
  1067  }
  1068  
  1069  // getOrQueueForIdleConn attempts to deliver an idle connection to the given wantConn. If there is
  1070  // an idle connection in the idle connections stack, it pops an idle connection, delivers it to the
  1071  // wantConn, and returns true. If there are no idle connections in the idle connections stack, it
  1072  // adds the wantConn to the idleConnWait queue and returns false.
  1073  func (p *pool) getOrQueueForIdleConn(w *wantConn) bool {
  1074  	p.idleMu.Lock()
  1075  	defer p.idleMu.Unlock()
  1076  
  1077  	// Try to deliver an idle connection from the idleConns stack first.
  1078  	for len(p.idleConns) > 0 {
  1079  		conn := p.idleConns[len(p.idleConns)-1]
  1080  		p.idleConns = p.idleConns[:len(p.idleConns)-1]
  1081  
  1082  		if conn == nil {
  1083  			continue
  1084  		}
  1085  
  1086  		if reason, perished := connectionPerished(conn); perished {
  1087  			_ = conn.pool.removeConnection(conn, reason, nil)
  1088  			go func() {
  1089  				_ = conn.pool.closeConnection(conn)
  1090  			}()
  1091  			continue
  1092  		}
  1093  
  1094  		if !w.tryDeliver(conn, nil) {
  1095  			// If we couldn't deliver the conn to w, put it back in the idleConns stack.
  1096  			p.idleConns = append(p.idleConns, conn)
  1097  		}
  1098  
  1099  		// If we got here, we tried to deliver an idle conn to w. No matter if tryDeliver() returned
  1100  		// true or false, w is no longer waiting and doesn't need to be added to any wait queues, so
  1101  		// return delivered = true.
  1102  		return true
  1103  	}
  1104  
  1105  	p.idleConnWait.cleanFront()
  1106  	p.idleConnWait.pushBack(w)
  1107  	return false
  1108  }
  1109  
  1110  func (p *pool) queueForNewConn(w *wantConn) {
  1111  	p.createConnectionsCond.L.Lock()
  1112  	defer p.createConnectionsCond.L.Unlock()
  1113  
  1114  	p.newConnWait.cleanFront()
  1115  	p.newConnWait.pushBack(w)
  1116  	p.createConnectionsCond.Signal()
  1117  }
  1118  
  1119  func (p *pool) totalConnectionCount() int {
  1120  	p.createConnectionsCond.L.Lock()
  1121  	defer p.createConnectionsCond.L.Unlock()
  1122  
  1123  	return len(p.conns)
  1124  }
  1125  
  1126  func (p *pool) availableConnectionCount() int {
  1127  	p.idleMu.Lock()
  1128  	defer p.idleMu.Unlock()
  1129  
  1130  	return len(p.idleConns)
  1131  }
  1132  
  1133  // createConnections creates connections for wantConn requests on the newConnWait queue.
  1134  func (p *pool) createConnections(ctx context.Context, wg *sync.WaitGroup) {
  1135  	defer wg.Done()
  1136  
  1137  	// condition returns true if the createConnections() loop should continue and false if it should
  1138  	// wait. Note that the condition also listens for Context cancellation, which also causes the
  1139  	// loop to continue, allowing for a subsequent check to return from createConnections().
  1140  	condition := func() bool {
  1141  		checkOutWaiting := p.newConnWait.len() > 0
  1142  		poolHasSpace := p.maxSize == 0 || uint64(len(p.conns)) < p.maxSize
  1143  		cancelled := ctx.Err() != nil
  1144  		return (checkOutWaiting && poolHasSpace) || cancelled
  1145  	}
  1146  
  1147  	// wait waits for there to be an available wantConn and for the pool to have space for a new
  1148  	// connection. When the condition becomes true, it creates a new connection and returns the
  1149  	// waiting wantConn and new connection. If the Context is cancelled or there are any
  1150  	// errors, wait returns with "ok = false".
  1151  	wait := func() (*wantConn, *connection, bool) {
  1152  		p.createConnectionsCond.L.Lock()
  1153  		defer p.createConnectionsCond.L.Unlock()
  1154  
  1155  		for !condition() {
  1156  			p.createConnectionsCond.Wait()
  1157  		}
  1158  
  1159  		if ctx.Err() != nil {
  1160  			return nil, nil, false
  1161  		}
  1162  
  1163  		p.newConnWait.cleanFront()
  1164  		w := p.newConnWait.popFront()
  1165  		if w == nil {
  1166  			return nil, nil, false
  1167  		}
  1168  
  1169  		conn := newConnection(p.address, p.connOpts...)
  1170  		conn.pool = p
  1171  		conn.driverConnectionID = atomic.AddUint64(&p.nextID, 1)
  1172  		p.conns[conn.driverConnectionID] = conn
  1173  
  1174  		return w, conn, true
  1175  	}
  1176  
  1177  	for ctx.Err() == nil {
  1178  		w, conn, ok := wait()
  1179  		if !ok {
  1180  			continue
  1181  		}
  1182  
  1183  		if mustLogPoolMessage(p) {
  1184  			keysAndValues := logger.KeyValues{
  1185  				logger.KeyDriverConnectionID, conn.driverConnectionID,
  1186  			}
  1187  
  1188  			logPoolMessage(p, logger.ConnectionCreated, keysAndValues...)
  1189  		}
  1190  
  1191  		if p.monitor != nil {
  1192  			p.monitor.Event(&event.PoolEvent{
  1193  				Type:         event.ConnectionCreated,
  1194  				Address:      p.address.String(),
  1195  				ConnectionID: conn.driverConnectionID,
  1196  			})
  1197  		}
  1198  
  1199  		start := time.Now()
  1200  		// Pass the createConnections context to connect to allow pool close to cancel connection
  1201  		// establishment so shutdown doesn't block indefinitely if connectTimeout=0.
  1202  		err := conn.connect(ctx)
  1203  		if err != nil {
  1204  			w.tryDeliver(nil, err)
  1205  
  1206  			// If there's an error connecting the new connection, call the handshake error handler
  1207  			// that implements the SDAM handshake error handling logic. This must be called after
  1208  			// delivering the connection error to the waiting wantConn. If it's called before, the
  1209  			// handshake error handler may clear the connection pool, leading to a different error
  1210  			// message being delivered to the same waiting wantConn in idleConnWait when the wait
  1211  			// queues are cleared.
  1212  			if p.handshakeErrFn != nil {
  1213  				p.handshakeErrFn(err, conn.generation, conn.desc.ServiceID)
  1214  			}
  1215  
  1216  			_ = p.removeConnection(conn, reason{
  1217  				loggerConn: logger.ReasonConnClosedError,
  1218  				event:      event.ReasonError,
  1219  			}, err)
  1220  
  1221  			_ = p.closeConnection(conn)
  1222  
  1223  			continue
  1224  		}
  1225  
  1226  		duration := time.Since(start)
  1227  		if mustLogPoolMessage(p) {
  1228  			keysAndValues := logger.KeyValues{
  1229  				logger.KeyDriverConnectionID, conn.driverConnectionID,
  1230  				logger.KeyDurationMS, duration.Milliseconds(),
  1231  			}
  1232  
  1233  			logPoolMessage(p, logger.ConnectionReady, keysAndValues...)
  1234  		}
  1235  
  1236  		if p.monitor != nil {
  1237  			p.monitor.Event(&event.PoolEvent{
  1238  				Type:         event.ConnectionReady,
  1239  				Address:      p.address.String(),
  1240  				ConnectionID: conn.driverConnectionID,
  1241  				Duration:     duration,
  1242  			})
  1243  		}
  1244  
  1245  		if w.tryDeliver(conn, nil) {
  1246  			continue
  1247  		}
  1248  
  1249  		_ = p.checkInNoEvent(conn)
  1250  	}
  1251  }
  1252  
  1253  func (p *pool) maintain(ctx context.Context, wg *sync.WaitGroup) {
  1254  	defer wg.Done()
  1255  
  1256  	ticker := time.NewTicker(p.maintainInterval)
  1257  	defer ticker.Stop()
  1258  
  1259  	// remove removes the *wantConn at index i from the slice and returns the new slice. The order
  1260  	// of the slice is not maintained.
  1261  	remove := func(arr []*wantConn, i int) []*wantConn {
  1262  		end := len(arr) - 1
  1263  		arr[i], arr[end] = arr[end], arr[i]
  1264  		return arr[:end]
  1265  	}
  1266  
  1267  	// removeNotWaiting removes any wantConns that are no longer waiting from given slice of
  1268  	// wantConns. That allows maintain() to use the size of its wantConns slice as an indication of
  1269  	// how many new connection requests are outstanding and subtract that from the number of
  1270  	// connections to ask for when maintaining minPoolSize.
  1271  	removeNotWaiting := func(arr []*wantConn) []*wantConn {
  1272  		for i := len(arr) - 1; i >= 0; i-- {
  1273  			w := arr[i]
  1274  			if !w.waiting() {
  1275  				arr = remove(arr, i)
  1276  			}
  1277  		}
  1278  
  1279  		return arr
  1280  	}
  1281  
  1282  	wantConns := make([]*wantConn, 0, p.minSize)
  1283  	defer func() {
  1284  		for _, w := range wantConns {
  1285  			w.tryDeliver(nil, ErrPoolClosed)
  1286  		}
  1287  	}()
  1288  
  1289  	for {
  1290  		select {
  1291  		case <-ticker.C:
  1292  		case <-p.maintainReady:
  1293  		case <-ctx.Done():
  1294  			return
  1295  		}
  1296  
  1297  		// Only maintain the pool while it's in the "ready" state. If the pool state is not "ready",
  1298  		// wait for the next tick or "ready" signal. Do all of this while holding the stateMu read
  1299  		// lock to prevent a state change between checking the state and entering the wait queue.
  1300  		// Not holding the stateMu read lock here may allow maintain() to request wantConns after
  1301  		// clear() pauses the pool and clears the wait queue, resulting in createConnections()
  1302  		// doing work while the pool is "paused".
  1303  		p.stateMu.RLock()
  1304  		if p.state != poolReady {
  1305  			p.stateMu.RUnlock()
  1306  			continue
  1307  		}
  1308  
  1309  		p.removePerishedConns()
  1310  
  1311  		// Remove any wantConns that are no longer waiting.
  1312  		wantConns = removeNotWaiting(wantConns)
  1313  
  1314  		// Figure out how many more wantConns we need to satisfy minPoolSize. Assume that the
  1315  		// outstanding wantConns (i.e. the ones that weren't removed from the slice) will all return
  1316  		// connections when they're ready, so only add wantConns to make up the difference. Limit
  1317  		// the number of connections requested to max 10 at a time to prevent overshooting
  1318  		// minPoolSize in case other checkOut() calls are requesting new connections, too.
  1319  		total := p.totalConnectionCount()
  1320  		n := int(p.minSize) - total - len(wantConns)
  1321  		if n > 10 {
  1322  			n = 10
  1323  		}
  1324  
  1325  		for i := 0; i < n; i++ {
  1326  			w := newWantConn()
  1327  			p.queueForNewConn(w)
  1328  			wantConns = append(wantConns, w)
  1329  
  1330  			// Start a goroutine for each new wantConn, waiting for it to be ready.
  1331  			go func() {
  1332  				<-w.ready
  1333  				if w.conn != nil {
  1334  					_ = p.checkInNoEvent(w.conn)
  1335  				}
  1336  			}()
  1337  		}
  1338  		p.stateMu.RUnlock()
  1339  	}
  1340  }
  1341  
  1342  func (p *pool) removePerishedConns() {
  1343  	p.idleMu.Lock()
  1344  	defer p.idleMu.Unlock()
  1345  
  1346  	for i := range p.idleConns {
  1347  		conn := p.idleConns[i]
  1348  		if conn == nil {
  1349  			continue
  1350  		}
  1351  
  1352  		if reason, perished := connectionPerished(conn); perished {
  1353  			p.idleConns[i] = nil
  1354  
  1355  			_ = p.removeConnection(conn, reason, nil)
  1356  			go func() {
  1357  				_ = p.closeConnection(conn)
  1358  			}()
  1359  		}
  1360  	}
  1361  
  1362  	p.idleConns = compact(p.idleConns)
  1363  }
  1364  
  1365  // compact removes any nil pointers from the slice and keeps the non-nil pointers, retaining the
  1366  // order of the non-nil pointers.
  1367  func compact(arr []*connection) []*connection {
  1368  	offset := 0
  1369  	for i := range arr {
  1370  		if arr[i] == nil {
  1371  			continue
  1372  		}
  1373  		arr[offset] = arr[i]
  1374  		offset++
  1375  	}
  1376  	return arr[:offset]
  1377  }
  1378  
  1379  // A wantConn records state about a wanted connection (that is, an active call to checkOut).
  1380  // The conn may be gotten by creating a new connection or by finding an idle connection, or a
  1381  // cancellation may make the conn no longer wanted. These three options are racing against each
  1382  // other and use wantConn to coordinate and agree about the winning outcome.
  1383  // Based on https://cs.opensource.google/go/go/+/refs/tags/go1.16.6:src/net/http/transport.go;l=1174-1240
  1384  type wantConn struct {
  1385  	ready chan struct{}
  1386  
  1387  	mu   sync.Mutex // Guards conn, err
  1388  	conn *connection
  1389  	err  error
  1390  }
  1391  
  1392  func newWantConn() *wantConn {
  1393  	return &wantConn{
  1394  		ready: make(chan struct{}, 1),
  1395  	}
  1396  }
  1397  
  1398  // waiting reports whether w is still waiting for an answer (connection or error).
  1399  func (w *wantConn) waiting() bool {
  1400  	select {
  1401  	case <-w.ready:
  1402  		return false
  1403  	default:
  1404  		return true
  1405  	}
  1406  }
  1407  
  1408  // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
  1409  func (w *wantConn) tryDeliver(conn *connection, err error) bool {
  1410  	w.mu.Lock()
  1411  	defer w.mu.Unlock()
  1412  
  1413  	if w.conn != nil || w.err != nil {
  1414  		return false
  1415  	}
  1416  
  1417  	w.conn = conn
  1418  	w.err = err
  1419  	if w.conn == nil && w.err == nil {
  1420  		panic("x/mongo/driver/topology: internal error: misuse of tryDeliver")
  1421  	}
  1422  
  1423  	close(w.ready)
  1424  
  1425  	return true
  1426  }
  1427  
  1428  // cancel marks w as no longer wanting a result (for example, due to cancellation). If a connection
  1429  // has been delivered already, cancel returns it with p.checkInNoEvent(). Note that the caller must
  1430  // not hold any locks on the pool while calling cancel.
  1431  func (w *wantConn) cancel(p *pool, err error) {
  1432  	if err == nil {
  1433  		panic("x/mongo/driver/topology: internal error: misuse of cancel")
  1434  	}
  1435  
  1436  	w.mu.Lock()
  1437  	if w.conn == nil && w.err == nil {
  1438  		close(w.ready) // catch misbehavior in future delivery
  1439  	}
  1440  	conn := w.conn
  1441  	w.conn = nil
  1442  	w.err = err
  1443  	w.mu.Unlock()
  1444  
  1445  	if conn != nil {
  1446  		_ = p.checkInNoEvent(conn)
  1447  	}
  1448  }
  1449  
  1450  // A wantConnQueue is a queue of wantConns.
  1451  // Based on https://cs.opensource.google/go/go/+/refs/tags/go1.16.6:src/net/http/transport.go;l=1242-1306
  1452  type wantConnQueue struct {
  1453  	// This is a queue, not a deque.
  1454  	// It is split into two stages - head[headPos:] and tail.
  1455  	// popFront is trivial (headPos++) on the first stage, and
  1456  	// pushBack is trivial (append) on the second stage.
  1457  	// If the first stage is empty, popFront can swap the
  1458  	// first and second stages to remedy the situation.
  1459  	//
  1460  	// This two-stage split is analogous to the use of two lists
  1461  	// in Okasaki's purely functional queue but without the
  1462  	// overhead of reversing the list when swapping stages.
  1463  	head    []*wantConn
  1464  	headPos int
  1465  	tail    []*wantConn
  1466  }
  1467  
  1468  // len returns the number of items in the queue.
  1469  func (q *wantConnQueue) len() int {
  1470  	return len(q.head) - q.headPos + len(q.tail)
  1471  }
  1472  
  1473  // pushBack adds w to the back of the queue.
  1474  func (q *wantConnQueue) pushBack(w *wantConn) {
  1475  	q.tail = append(q.tail, w)
  1476  }
  1477  
  1478  // popFront removes and returns the wantConn at the front of the queue.
  1479  func (q *wantConnQueue) popFront() *wantConn {
  1480  	if q.headPos >= len(q.head) {
  1481  		if len(q.tail) == 0 {
  1482  			return nil
  1483  		}
  1484  		// Pick up tail as new head, clear tail.
  1485  		q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
  1486  	}
  1487  	w := q.head[q.headPos]
  1488  	q.head[q.headPos] = nil
  1489  	q.headPos++
  1490  	return w
  1491  }
  1492  
  1493  // peekFront returns the wantConn at the front of the queue without removing it.
  1494  func (q *wantConnQueue) peekFront() *wantConn {
  1495  	if q.headPos < len(q.head) {
  1496  		return q.head[q.headPos]
  1497  	}
  1498  	if len(q.tail) > 0 {
  1499  		return q.tail[0]
  1500  	}
  1501  	return nil
  1502  }
  1503  
  1504  // cleanFront pops any wantConns that are no longer waiting from the head of the queue.
  1505  func (q *wantConnQueue) cleanFront() {
  1506  	for {
  1507  		w := q.peekFront()
  1508  		if w == nil || w.waiting() {
  1509  			return
  1510  		}
  1511  		q.popFront()
  1512  	}
  1513  }
  1514  

View as plain text