
Source file src/go.mongodb.org/mongo-driver/mongo/client.go

Documentation: go.mongodb.org/mongo-driver/mongo

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     7  package mongo
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"net/http"
    14  	"time"
    16  	"go.mongodb.org/mongo-driver/bson"
    17  	"go.mongodb.org/mongo-driver/bson/bsoncodec"
    18  	"go.mongodb.org/mongo-driver/event"
    19  	"go.mongodb.org/mongo-driver/internal/httputil"
    20  	"go.mongodb.org/mongo-driver/internal/logger"
    21  	"go.mongodb.org/mongo-driver/internal/uuid"
    22  	"go.mongodb.org/mongo-driver/mongo/description"
    23  	"go.mongodb.org/mongo-driver/mongo/options"
    24  	"go.mongodb.org/mongo-driver/mongo/readconcern"
    25  	"go.mongodb.org/mongo-driver/mongo/readpref"
    26  	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    27  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    28  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    29  	"go.mongodb.org/mongo-driver/x/mongo/driver/mongocrypt"
    30  	mcopts "go.mongodb.org/mongo-driver/x/mongo/driver/mongocrypt/options"
    31  	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
    32  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    33  	"go.mongodb.org/mongo-driver/x/mongo/driver/topology"
    34  )
    36  const (
    37  	defaultLocalThreshold = 15 * time.Millisecond
    38  	defaultMaxPoolSize    = 100
    39  )
    41  var (
    42  	// keyVaultCollOpts specifies options used to communicate with the key vault collection
    43  	keyVaultCollOpts = options.Collection().SetReadConcern(readconcern.Majority()).
    44  				SetWriteConcern(writeconcern.New(writeconcern.WMajority()))
    46  	endSessionsBatchSize = 10000
    47  )
    49  // Client is a handle representing a pool of connections to a MongoDB deployment. It is safe for concurrent use by
    50  // multiple goroutines.
    51  //
    52  // The Client type opens and closes connections automatically and maintains a pool of idle connections. For
    53  // connection pool configuration options, see documentation for the ClientOptions type in the mongo/options package.
    54  type Client struct {
    55  	id             uuid.UUID
    56  	deployment     driver.Deployment
    57  	localThreshold time.Duration
    58  	retryWrites    bool
    59  	retryReads     bool
    60  	clock          *session.ClusterClock
    61  	readPreference *readpref.ReadPref
    62  	readConcern    *readconcern.ReadConcern
    63  	writeConcern   *writeconcern.WriteConcern
    64  	bsonOpts       *options.BSONOptions
    65  	registry       *bsoncodec.Registry
    66  	monitor        *event.CommandMonitor
    67  	serverAPI      *driver.ServerAPIOptions
    68  	serverMonitor  *event.ServerMonitor
    69  	sessionPool    *session.Pool
    70  	timeout        *time.Duration
    71  	httpClient     *http.Client
    72  	logger         *logger.Logger
    74  	// client-side encryption fields
    75  	keyVaultClientFLE  *Client
    76  	keyVaultCollFLE    *Collection
    77  	mongocryptdFLE     *mongocryptdClient
    78  	cryptFLE           driver.Crypt
    79  	metadataClientFLE  *Client
    80  	internalClientFLE  *Client
    81  	encryptedFieldsMap map[string]interface{}
    82  }
    84  // Connect creates a new Client and then initializes it using the Connect method. This is equivalent to calling
    85  // NewClient followed by Client.Connect.
    86  //
    87  // When creating an options.ClientOptions, the order the methods are called matters. Later Set*
    88  // methods will overwrite the values from previous Set* method invocations. This includes the
    89  // ApplyURI method. This allows callers to determine the order of precedence for option
    90  // application. For instance, if ApplyURI is called before SetAuth, the Credential from
    91  // SetAuth will overwrite the values from the connection string. If ApplyURI is called
    92  // after SetAuth, then its values will overwrite those from SetAuth.
    93  //
    94  // The opts parameter is processed using options.MergeClientOptions, which will overwrite entire
    95  // option fields of previous options, there is no partial overwriting. For example, if Username is
    96  // set in the Auth field for the first option, and Password is set for the second but with no
    97  // Username, after the merge the Username field will be empty.
    98  //
    99  // The NewClient function does not do any I/O and returns an error if the given options are invalid.
   100  // The Client.Connect method starts background goroutines to monitor the state of the deployment and does not do
   101  // any I/O in the main goroutine to prevent the main goroutine from blocking. Therefore, it will not error if the
   102  // deployment is down.
   103  //
   104  // The Client.Ping method can be used to verify that the deployment is successfully connected and the
   105  // Client was correctly configured.
   106  func Connect(ctx context.Context, opts ...*options.ClientOptions) (*Client, error) {
   107  	c, err := NewClient(opts...)
   108  	if err != nil {
   109  		return nil, err
   110  	}
   111  	err = c.Connect(ctx)
   112  	if err != nil {
   113  		return nil, err
   114  	}
   115  	return c, nil
   116  }
   118  // NewClient creates a new client to connect to a deployment specified by the uri.
   119  //
   120  // When creating an options.ClientOptions, the order the methods are called matters. Later Set*
   121  // methods will overwrite the values from previous Set* method invocations. This includes the
   122  // ApplyURI method. This allows callers to determine the order of precedence for option
   123  // application. For instance, if ApplyURI is called before SetAuth, the Credential from
   124  // SetAuth will overwrite the values from the connection string. If ApplyURI is called
   125  // after SetAuth, then its values will overwrite those from SetAuth.
   126  //
   127  // The opts parameter is processed using options.MergeClientOptions, which will overwrite entire
   128  // option fields of previous options, there is no partial overwriting. For example, if Username is
   129  // set in the Auth field for the first option, and Password is set for the second but with no
   130  // Username, after the merge the Username field will be empty.
   131  //
   132  // Deprecated: Use [Connect] instead.
   133  func NewClient(opts ...*options.ClientOptions) (*Client, error) {
   134  	clientOpt := options.MergeClientOptions(opts...)
   136  	id, err := uuid.New()
   137  	if err != nil {
   138  		return nil, err
   139  	}
   140  	client := &Client{id: id}
   142  	// ClusterClock
   143  	client.clock = new(session.ClusterClock)
   145  	// LocalThreshold
   146  	client.localThreshold = defaultLocalThreshold
   147  	if clientOpt.LocalThreshold != nil {
   148  		client.localThreshold = *clientOpt.LocalThreshold
   149  	}
   150  	// Monitor
   151  	if clientOpt.Monitor != nil {
   152  		client.monitor = clientOpt.Monitor
   153  	}
   154  	// ServerMonitor
   155  	if clientOpt.ServerMonitor != nil {
   156  		client.serverMonitor = clientOpt.ServerMonitor
   157  	}
   158  	// ReadConcern
   159  	client.readConcern = readconcern.New()
   160  	if clientOpt.ReadConcern != nil {
   161  		client.readConcern = clientOpt.ReadConcern
   162  	}
   163  	// ReadPreference
   164  	client.readPreference = readpref.Primary()
   165  	if clientOpt.ReadPreference != nil {
   166  		client.readPreference = clientOpt.ReadPreference
   167  	}
   168  	// BSONOptions
   169  	if clientOpt.BSONOptions != nil {
   170  		client.bsonOpts = clientOpt.BSONOptions
   171  	}
   172  	// Registry
   173  	client.registry = bson.DefaultRegistry
   174  	if clientOpt.Registry != nil {
   175  		client.registry = clientOpt.Registry
   176  	}
   177  	// RetryWrites
   178  	client.retryWrites = true // retry writes on by default
   179  	if clientOpt.RetryWrites != nil {
   180  		client.retryWrites = *clientOpt.RetryWrites
   181  	}
   182  	client.retryReads = true
   183  	if clientOpt.RetryReads != nil {
   184  		client.retryReads = *clientOpt.RetryReads
   185  	}
   186  	// Timeout
   187  	client.timeout = clientOpt.Timeout
   188  	client.httpClient = clientOpt.HTTPClient
   189  	// WriteConcern
   190  	if clientOpt.WriteConcern != nil {
   191  		client.writeConcern = clientOpt.WriteConcern
   192  	}
   193  	// AutoEncryptionOptions
   194  	if clientOpt.AutoEncryptionOptions != nil {
   195  		if err := client.configureAutoEncryption(clientOpt); err != nil {
   196  			return nil, err
   197  		}
   198  	} else {
   199  		client.cryptFLE = clientOpt.Crypt
   200  	}
   202  	// Deployment
   203  	if clientOpt.Deployment != nil {
   204  		client.deployment = clientOpt.Deployment
   205  	}
   207  	// Set default options
   208  	if clientOpt.MaxPoolSize == nil {
   209  		clientOpt.SetMaxPoolSize(defaultMaxPoolSize)
   210  	}
   212  	if err != nil {
   213  		return nil, err
   214  	}
   216  	cfg, err := topology.NewConfig(clientOpt, client.clock)
   217  	if err != nil {
   218  		return nil, err
   219  	}
   220  	client.serverAPI = topology.ServerAPIFromServerOptions(cfg.ServerOpts)
   222  	if client.deployment == nil {
   223  		client.deployment, err = topology.New(cfg)
   224  		if err != nil {
   225  			return nil, replaceErrors(err)
   226  		}
   227  	}
   229  	// Create a logger for the client.
   230  	client.logger, err = newLogger(clientOpt.LoggerOptions)
   231  	if err != nil {
   232  		return nil, fmt.Errorf("invalid logger options: %w", err)
   233  	}
   235  	return client, nil
   236  }
   238  // Connect initializes the Client by starting background monitoring goroutines.
   239  // If the Client was created using the NewClient function, this method must be called before a Client can be used.
   240  //
   241  // Connect starts background goroutines to monitor the state of the deployment and does not do any I/O in the main
   242  // goroutine. The Client.Ping method can be used to verify that the connection was created successfully.
   243  //
   244  // Deprecated: Use [mongo.Connect] instead.
   245  func (c *Client) Connect(ctx context.Context) error {
   246  	if connector, ok := c.deployment.(driver.Connector); ok {
   247  		err := connector.Connect()
   248  		if err != nil {
   249  			return replaceErrors(err)
   250  		}
   251  	}
   253  	if c.mongocryptdFLE != nil {
   254  		if err := c.mongocryptdFLE.connect(ctx); err != nil {
   255  			return err
   256  		}
   257  	}
   259  	if c.internalClientFLE != nil {
   260  		if err := c.internalClientFLE.Connect(ctx); err != nil {
   261  			return err
   262  		}
   263  	}
   265  	if c.keyVaultClientFLE != nil && c.keyVaultClientFLE != c.internalClientFLE && c.keyVaultClientFLE != c {
   266  		if err := c.keyVaultClientFLE.Connect(ctx); err != nil {
   267  			return err
   268  		}
   269  	}
   271  	if c.metadataClientFLE != nil && c.metadataClientFLE != c.internalClientFLE && c.metadataClientFLE != c {
   272  		if err := c.metadataClientFLE.Connect(ctx); err != nil {
   273  			return err
   274  		}
   275  	}
   277  	var updateChan <-chan description.Topology
   278  	if subscriber, ok := c.deployment.(driver.Subscriber); ok {
   279  		sub, err := subscriber.Subscribe()
   280  		if err != nil {
   281  			return replaceErrors(err)
   282  		}
   283  		updateChan = sub.Updates
   284  	}
   285  	c.sessionPool = session.NewPool(updateChan)
   286  	return nil
   287  }
   289  // Disconnect closes sockets to the topology referenced by this Client. It will
   290  // shut down any monitoring goroutines, close the idle connection pool, and will
   291  // wait until all the in use connections have been returned to the connection
   292  // pool and closed before returning. If the context expires via cancellation,
   293  // deadline, or timeout before the in use connections have returned, the in use
   294  // connections will be closed, resulting in the failure of any in flight read
   295  // or write operations. If this method returns with no errors, all connections
   296  // associated with this Client have been closed.
   297  func (c *Client) Disconnect(ctx context.Context) error {
   298  	if c.logger != nil {
   299  		defer c.logger.Close()
   300  	}
   302  	if ctx == nil {
   303  		ctx = context.Background()
   304  	}
   306  	if c.httpClient == httputil.DefaultHTTPClient {
   307  		defer httputil.CloseIdleHTTPConnections(c.httpClient)
   308  	}
   310  	c.endSessions(ctx)
   311  	if c.mongocryptdFLE != nil {
   312  		if err := c.mongocryptdFLE.disconnect(ctx); err != nil {
   313  			return err
   314  		}
   315  	}
   317  	if c.internalClientFLE != nil {
   318  		if err := c.internalClientFLE.Disconnect(ctx); err != nil {
   319  			return err
   320  		}
   321  	}
   323  	if c.keyVaultClientFLE != nil && c.keyVaultClientFLE != c.internalClientFLE && c.keyVaultClientFLE != c {
   324  		if err := c.keyVaultClientFLE.Disconnect(ctx); err != nil {
   325  			return err
   326  		}
   327  	}
   328  	if c.metadataClientFLE != nil && c.metadataClientFLE != c.internalClientFLE && c.metadataClientFLE != c {
   329  		if err := c.metadataClientFLE.Disconnect(ctx); err != nil {
   330  			return err
   331  		}
   332  	}
   333  	if c.cryptFLE != nil {
   334  		c.cryptFLE.Close()
   335  	}
   337  	if disconnector, ok := c.deployment.(driver.Disconnector); ok {
   338  		return replaceErrors(disconnector.Disconnect(ctx))
   339  	}
   341  	return nil
   342  }
   344  // Ping sends a ping command to verify that the client can connect to the deployment.
   345  //
   346  // The rp parameter is used to determine which server is selected for the operation.
   347  // If it is nil, the client's read preference is used.
   348  //
   349  // If the server is down, Ping will try to select a server until the client's server selection timeout expires.
   350  // This can be configured through the ClientOptions.SetServerSelectionTimeout option when creating a new Client.
   351  // After the timeout expires, a server selection error is returned.
   352  //
   353  // Using Ping reduces application resilience because applications starting up will error if the server is temporarily
   354  // unavailable or is failing over (e.g. during autoscaling due to a load spike).
   355  func (c *Client) Ping(ctx context.Context, rp *readpref.ReadPref) error {
   356  	if ctx == nil {
   357  		ctx = context.Background()
   358  	}
   360  	if rp == nil {
   361  		rp = c.readPreference
   362  	}
   364  	db := c.Database("admin")
   365  	res := db.RunCommand(ctx, bson.D{
   366  		{"ping", 1},
   367  	}, options.RunCmd().SetReadPreference(rp))
   369  	return replaceErrors(res.Err())
   370  }
   372  // StartSession starts a new session configured with the given options.
   373  //
   374  // StartSession does not actually communicate with the server and will not error if the client is
   375  // disconnected.
   376  //
   377  // StartSession is safe to call from multiple goroutines concurrently. However, Sessions returned by StartSession are
   378  // not safe for concurrent use by multiple goroutines.
   379  //
   380  // If the DefaultReadConcern, DefaultWriteConcern, or DefaultReadPreference options are not set, the client's read
   381  // concern, write concern, or read preference will be used, respectively.
   382  func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error) {
   383  	if c.sessionPool == nil {
   384  		return nil, ErrClientDisconnected
   385  	}
   387  	sopts := options.MergeSessionOptions(opts...)
   388  	coreOpts := &session.ClientOptions{
   389  		DefaultReadConcern:    c.readConcern,
   390  		DefaultReadPreference: c.readPreference,
   391  		DefaultWriteConcern:   c.writeConcern,
   392  	}
   393  	if sopts.CausalConsistency != nil {
   394  		coreOpts.CausalConsistency = sopts.CausalConsistency
   395  	}
   396  	if sopts.DefaultReadConcern != nil {
   397  		coreOpts.DefaultReadConcern = sopts.DefaultReadConcern
   398  	}
   399  	if sopts.DefaultWriteConcern != nil {
   400  		coreOpts.DefaultWriteConcern = sopts.DefaultWriteConcern
   401  	}
   402  	if sopts.DefaultReadPreference != nil {
   403  		coreOpts.DefaultReadPreference = sopts.DefaultReadPreference
   404  	}
   405  	if sopts.DefaultMaxCommitTime != nil {
   406  		coreOpts.DefaultMaxCommitTime = sopts.DefaultMaxCommitTime
   407  	}
   408  	if sopts.Snapshot != nil {
   409  		coreOpts.Snapshot = sopts.Snapshot
   410  	}
   412  	sess, err := session.NewClientSession(c.sessionPool, c.id, coreOpts)
   413  	if err != nil {
   414  		return nil, replaceErrors(err)
   415  	}
   417  	// Writes are not retryable on standalones, so let operation determine whether to retry
   418  	sess.RetryWrite = false
   419  	sess.RetryRead = c.retryReads
   421  	return &sessionImpl{
   422  		clientSession: sess,
   423  		client:        c,
   424  		deployment:    c.deployment,
   425  	}, nil
   426  }
   428  func (c *Client) endSessions(ctx context.Context) {
   429  	if c.sessionPool == nil {
   430  		return
   431  	}
   433  	sessionIDs := c.sessionPool.IDSlice()
   434  	op := operation.NewEndSessions(nil).ClusterClock(c.clock).Deployment(c.deployment).
   435  		ServerSelector(description.ReadPrefSelector(readpref.PrimaryPreferred())).CommandMonitor(c.monitor).
   436  		Database("admin").Crypt(c.cryptFLE).ServerAPI(c.serverAPI)
   438  	totalNumIDs := len(sessionIDs)
   439  	var currentBatch []bsoncore.Document
   440  	for i := 0; i < totalNumIDs; i++ {
   441  		currentBatch = append(currentBatch, sessionIDs[i])
   443  		// If we are at the end of a batch or the end of the overall IDs array, execute the operation.
   444  		if ((i+1)%endSessionsBatchSize) == 0 || i == totalNumIDs-1 {
   445  			// Ignore all errors when ending sessions.
   446  			_, marshalVal, err := bson.MarshalValue(currentBatch)
   447  			if err == nil {
   448  				_ = op.SessionIDs(marshalVal).Execute(ctx)
   449  			}
   451  			currentBatch = currentBatch[:0]
   452  		}
   453  	}
   454  }
   456  func (c *Client) configureAutoEncryption(clientOpts *options.ClientOptions) error {
   457  	c.encryptedFieldsMap = clientOpts.AutoEncryptionOptions.EncryptedFieldsMap
   458  	if err := c.configureKeyVaultClientFLE(clientOpts); err != nil {
   459  		return err
   460  	}
   461  	if err := c.configureMetadataClientFLE(clientOpts); err != nil {
   462  		return err
   463  	}
   465  	mc, err := c.newMongoCrypt(clientOpts.AutoEncryptionOptions)
   466  	if err != nil {
   467  		return err
   468  	}
   470  	// If the crypt_shared library was not loaded, try to spawn and connect to mongocryptd.
   471  	if mc.CryptSharedLibVersionString() == "" {
   472  		mongocryptdFLE, err := newMongocryptdClient(clientOpts.AutoEncryptionOptions)
   473  		if err != nil {
   474  			return err
   475  		}
   476  		c.mongocryptdFLE = mongocryptdFLE
   477  	}
   479  	c.configureCryptFLE(mc, clientOpts.AutoEncryptionOptions)
   480  	return nil
   481  }
   483  func (c *Client) getOrCreateInternalClient(clientOpts *options.ClientOptions) (*Client, error) {
   484  	if c.internalClientFLE != nil {
   485  		return c.internalClientFLE, nil
   486  	}
   488  	internalClientOpts := options.MergeClientOptions(clientOpts)
   489  	internalClientOpts.AutoEncryptionOptions = nil
   490  	internalClientOpts.SetMinPoolSize(0)
   491  	var err error
   492  	c.internalClientFLE, err = NewClient(internalClientOpts)
   493  	return c.internalClientFLE, err
   494  }
   496  func (c *Client) configureKeyVaultClientFLE(clientOpts *options.ClientOptions) error {
   497  	// parse key vault options and create new key vault client
   498  	var err error
   499  	aeOpts := clientOpts.AutoEncryptionOptions
   500  	switch {
   501  	case aeOpts.KeyVaultClientOptions != nil:
   502  		c.keyVaultClientFLE, err = NewClient(aeOpts.KeyVaultClientOptions)
   503  	case clientOpts.MaxPoolSize != nil && *clientOpts.MaxPoolSize == 0:
   504  		c.keyVaultClientFLE = c
   505  	default:
   506  		c.keyVaultClientFLE, err = c.getOrCreateInternalClient(clientOpts)
   507  	}
   509  	if err != nil {
   510  		return err
   511  	}
   513  	dbName, collName := splitNamespace(aeOpts.KeyVaultNamespace)
   514  	c.keyVaultCollFLE = c.keyVaultClientFLE.Database(dbName).Collection(collName, keyVaultCollOpts)
   515  	return nil
   516  }
   518  func (c *Client) configureMetadataClientFLE(clientOpts *options.ClientOptions) error {
   519  	// parse key vault options and create new key vault client
   520  	aeOpts := clientOpts.AutoEncryptionOptions
   521  	if aeOpts.BypassAutoEncryption != nil && *aeOpts.BypassAutoEncryption {
   522  		// no need for a metadata client.
   523  		return nil
   524  	}
   525  	if clientOpts.MaxPoolSize != nil && *clientOpts.MaxPoolSize == 0 {
   526  		c.metadataClientFLE = c
   527  		return nil
   528  	}
   530  	var err error
   531  	c.metadataClientFLE, err = c.getOrCreateInternalClient(clientOpts)
   532  	return err
   533  }
   535  func (c *Client) newMongoCrypt(opts *options.AutoEncryptionOptions) (*mongocrypt.MongoCrypt, error) {
   536  	// convert schemas in SchemaMap to bsoncore documents
   537  	cryptSchemaMap := make(map[string]bsoncore.Document)
   538  	for k, v := range opts.SchemaMap {
   539  		schema, err := marshal(v, c.bsonOpts, c.registry)
   540  		if err != nil {
   541  			return nil, err
   542  		}
   543  		cryptSchemaMap[k] = schema
   544  	}
   546  	// convert schemas in EncryptedFieldsMap to bsoncore documents
   547  	cryptEncryptedFieldsMap := make(map[string]bsoncore.Document)
   548  	for k, v := range opts.EncryptedFieldsMap {
   549  		encryptedFields, err := marshal(v, c.bsonOpts, c.registry)
   550  		if err != nil {
   551  			return nil, err
   552  		}
   553  		cryptEncryptedFieldsMap[k] = encryptedFields
   554  	}
   556  	kmsProviders, err := marshal(opts.KmsProviders, c.bsonOpts, c.registry)
   557  	if err != nil {
   558  		return nil, fmt.Errorf("error creating KMS providers document: %w", err)
   559  	}
   561  	// Set the crypt_shared library override path from the "cryptSharedLibPath" extra option if one
   562  	// was set.
   563  	cryptSharedLibPath := ""
   564  	if val, ok := opts.ExtraOptions["cryptSharedLibPath"]; ok {
   565  		str, ok := val.(string)
   566  		if !ok {
   567  			return nil, fmt.Errorf(
   568  				`expected AutoEncryption extra option "cryptSharedLibPath" to be a string, but is a %T`, val)
   569  		}
   570  		cryptSharedLibPath = str
   571  	}
   573  	// Explicitly disable loading the crypt_shared library if requested. Note that this is ONLY
   574  	// intended for use from tests; there is no supported public API for explicitly disabling
   575  	// loading the crypt_shared library.
   576  	cryptSharedLibDisabled := false
   577  	if v, ok := opts.ExtraOptions["__cryptSharedLibDisabledForTestOnly"]; ok {
   578  		cryptSharedLibDisabled = v.(bool)
   579  	}
   581  	bypassAutoEncryption := opts.BypassAutoEncryption != nil && *opts.BypassAutoEncryption
   582  	bypassQueryAnalysis := opts.BypassQueryAnalysis != nil && *opts.BypassQueryAnalysis
   584  	mc, err := mongocrypt.NewMongoCrypt(mcopts.MongoCrypt().
   585  		SetKmsProviders(kmsProviders).
   586  		SetLocalSchemaMap(cryptSchemaMap).
   587  		SetBypassQueryAnalysis(bypassQueryAnalysis).
   588  		SetEncryptedFieldsMap(cryptEncryptedFieldsMap).
   589  		SetCryptSharedLibDisabled(cryptSharedLibDisabled || bypassAutoEncryption).
   590  		SetCryptSharedLibOverridePath(cryptSharedLibPath).
   591  		SetHTTPClient(opts.HTTPClient))
   592  	if err != nil {
   593  		return nil, err
   594  	}
   596  	var cryptSharedLibRequired bool
   597  	if val, ok := opts.ExtraOptions["cryptSharedLibRequired"]; ok {
   598  		b, ok := val.(bool)
   599  		if !ok {
   600  			return nil, fmt.Errorf(
   601  				`expected AutoEncryption extra option "cryptSharedLibRequired" to be a bool, but is a %T`, val)
   602  		}
   603  		cryptSharedLibRequired = b
   604  	}
   606  	// If the "cryptSharedLibRequired" extra option is set to true, check the MongoCrypt version
   607  	// string to confirm that the library was successfully loaded. If the version string is empty,
   608  	// return an error indicating that we couldn't load the crypt_shared library.
   609  	if cryptSharedLibRequired && mc.CryptSharedLibVersionString() == "" {
   610  		return nil, errors.New(
   611  			`AutoEncryption extra option "cryptSharedLibRequired" is true, but we failed to load the crypt_shared library`)
   612  	}
   614  	return mc, nil
   615  }
   617  //nolint:unused // the unused linter thinks that this function is unreachable because "c.newMongoCrypt" always panics without the "cse" build tag set.
   618  func (c *Client) configureCryptFLE(mc *mongocrypt.MongoCrypt, opts *options.AutoEncryptionOptions) {
   619  	bypass := opts.BypassAutoEncryption != nil && *opts.BypassAutoEncryption
   620  	kr := keyRetriever{coll: c.keyVaultCollFLE}
   621  	var cir collInfoRetriever
   622  	// If bypass is true, c.metadataClientFLE is nil and the collInfoRetriever
   623  	// will not be used. If bypass is false, to the parent client or the
   624  	// internal client.
   625  	if !bypass {
   626  		cir = collInfoRetriever{client: c.metadataClientFLE}
   627  	}
   629  	c.cryptFLE = driver.NewCrypt(&driver.CryptOptions{
   630  		MongoCrypt:           mc,
   631  		CollInfoFn:           cir.cryptCollInfo,
   632  		KeyFn:                kr.cryptKeys,
   633  		MarkFn:               c.mongocryptdFLE.markCommand,
   634  		TLSConfig:            opts.TLSConfig,
   635  		BypassAutoEncryption: bypass,
   636  	})
   637  }
   639  // validSession returns an error if the session doesn't belong to the client
   640  func (c *Client) validSession(sess *session.Client) error {
   641  	if sess != nil && sess.ClientID != c.id {
   642  		return ErrWrongClient
   643  	}
   644  	return nil
   645  }
   647  // Database returns a handle for a database with the given name configured with the given DatabaseOptions.
   648  func (c *Client) Database(name string, opts ...*options.DatabaseOptions) *Database {
   649  	return newDatabase(c, name, opts...)
   650  }
   652  // ListDatabases executes a listDatabases command and returns the result.
   653  //
   654  // The filter parameter must be a document containing query operators and can be used to select which
   655  // databases are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include
   656  // all databases.
   657  //
   658  // The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions documentation).
   659  //
   660  // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listDatabases/.
   661  func (c *Client) ListDatabases(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) (ListDatabasesResult, error) {
   662  	if ctx == nil {
   663  		ctx = context.Background()
   664  	}
   666  	sess := sessionFromContext(ctx)
   668  	err := c.validSession(sess)
   669  	if err != nil {
   670  		return ListDatabasesResult{}, err
   671  	}
   672  	if sess == nil && c.sessionPool != nil {
   673  		sess = session.NewImplicitClientSession(c.sessionPool, c.id)
   674  		defer sess.EndSession()
   675  	}
   677  	err = c.validSession(sess)
   678  	if err != nil {
   679  		return ListDatabasesResult{}, err
   680  	}
   682  	filterDoc, err := marshal(filter, c.bsonOpts, c.registry)
   683  	if err != nil {
   684  		return ListDatabasesResult{}, err
   685  	}
   687  	selector := description.CompositeSelector([]description.ServerSelector{
   688  		description.ReadPrefSelector(readpref.Primary()),
   689  		description.LatencySelector(c.localThreshold),
   690  	})
   691  	selector = makeReadPrefSelector(sess, selector, c.localThreshold)
   693  	ldo := options.MergeListDatabasesOptions(opts...)
   694  	op := operation.NewListDatabases(filterDoc).
   695  		Session(sess).ReadPreference(c.readPreference).CommandMonitor(c.monitor).
   696  		ServerSelector(selector).ClusterClock(c.clock).Database("admin").Deployment(c.deployment).Crypt(c.cryptFLE).
   697  		ServerAPI(c.serverAPI).Timeout(c.timeout)
   699  	if ldo.NameOnly != nil {
   700  		op = op.NameOnly(*ldo.NameOnly)
   701  	}
   702  	if ldo.AuthorizedDatabases != nil {
   703  		op = op.AuthorizedDatabases(*ldo.AuthorizedDatabases)
   704  	}
   706  	retry := driver.RetryNone
   707  	if c.retryReads {
   708  		retry = driver.RetryOncePerCommand
   709  	}
   710  	op.Retry(retry)
   712  	err = op.Execute(ctx)
   713  	if err != nil {
   714  		return ListDatabasesResult{}, replaceErrors(err)
   715  	}
   717  	return newListDatabasesResultFromOperation(op.Result()), nil
   718  }
   720  // ListDatabaseNames executes a listDatabases command and returns a slice containing the names of all of the databases
   721  // on the server.
   722  //
   723  // The filter parameter must be a document containing query operators and can be used to select which databases
   724  // are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
   725  // databases.
   726  //
   727  // The opts parameter can be used to specify options for this operation (see the options.ListDatabasesOptions
   728  // documentation.)
   729  //
   730  // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listDatabases/.
   731  func (c *Client) ListDatabaseNames(ctx context.Context, filter interface{}, opts ...*options.ListDatabasesOptions) ([]string, error) {
   732  	opts = append(opts, options.ListDatabases().SetNameOnly(true))
   734  	res, err := c.ListDatabases(ctx, filter, opts...)
   735  	if err != nil {
   736  		return nil, err
   737  	}
   739  	names := make([]string, 0)
   740  	for _, spec := range res.Databases {
   741  		names = append(names, spec.Name)
   742  	}
   744  	return names, nil
   745  }
   747  // WithSession creates a new SessionContext from the ctx and sess parameters and uses it to call the fn callback. The
   748  // SessionContext must be used as the Context parameter for any operations in the fn callback that should be executed
   749  // under the session.
   750  //
   751  // WithSession is safe to call from multiple goroutines concurrently. However, the SessionContext passed to the
   752  // WithSession callback function is not safe for concurrent use by multiple goroutines.
   753  //
   754  // If the ctx parameter already contains a Session, that Session will be replaced with the one provided.
   755  //
   756  // Any error returned by the fn callback will be returned without any modifications.
   757  func WithSession(ctx context.Context, sess Session, fn func(SessionContext) error) error {
   758  	return fn(NewSessionContext(ctx, sess))
   759  }
   761  // UseSession creates a new Session and uses it to create a new SessionContext, which is used to call the fn callback.
   762  // The SessionContext parameter must be used as the Context parameter for any operations in the fn callback that should
   763  // be executed under a session. After the callback returns, the created Session is ended, meaning that any in-progress
   764  // transactions started by fn will be aborted even if fn returns an error.
   765  //
   766  // UseSession is safe to call from multiple goroutines concurrently. However, the SessionContext passed to the
   767  // UseSession callback function is not safe for concurrent use by multiple goroutines.
   768  //
   769  // If the ctx parameter already contains a Session, that Session will be replaced with the newly created one.
   770  //
   771  // Any error returned by the fn callback will be returned without any modifications.
   772  func (c *Client) UseSession(ctx context.Context, fn func(SessionContext) error) error {
   773  	return c.UseSessionWithOptions(ctx, options.Session(), fn)
   774  }
   776  // UseSessionWithOptions operates like UseSession but uses the given SessionOptions to create the Session.
   777  //
   778  // UseSessionWithOptions is safe to call from multiple goroutines concurrently. However, the SessionContext passed to
   779  // the UseSessionWithOptions callback function is not safe for concurrent use by multiple goroutines.
   780  func (c *Client) UseSessionWithOptions(ctx context.Context, opts *options.SessionOptions, fn func(SessionContext) error) error {
   781  	defaultSess, err := c.StartSession(opts)
   782  	if err != nil {
   783  		return err
   784  	}
   786  	defer defaultSess.EndSession(ctx)
   787  	return fn(NewSessionContext(ctx, defaultSess))
   788  }
   790  // Watch returns a change stream for all changes on the deployment. See
   791  // https://www.mongodb.com/docs/manual/changeStreams/ for more information about change streams.
   792  //
   793  // The client must be configured with read concern majority or no read concern for a change stream to be created
   794  // successfully.
   795  //
   796  // The pipeline parameter must be an array of documents, each representing a pipeline stage. The pipeline cannot be
   797  // nil or empty. The stage documents must all be non-nil. See https://www.mongodb.com/docs/manual/changeStreams/ for a list
   798  // of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the mongo.Pipeline{}
   799  // type can be used.
   800  //
   801  // The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions
   802  // documentation).
   803  func (c *Client) Watch(ctx context.Context, pipeline interface{},
   804  	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
   805  	if c.sessionPool == nil {
   806  		return nil, ErrClientDisconnected
   807  	}
   809  	csConfig := changeStreamConfig{
   810  		readConcern:    c.readConcern,
   811  		readPreference: c.readPreference,
   812  		client:         c,
   813  		bsonOpts:       c.bsonOpts,
   814  		registry:       c.registry,
   815  		streamType:     ClientStream,
   816  		crypt:          c.cryptFLE,
   817  	}
   819  	return newChangeStream(ctx, csConfig, pipeline, opts...)
   820  }
   822  // NumberSessionsInProgress returns the number of sessions that have been started for this client but have not been
   823  // closed (i.e. EndSession has not been called).
   824  func (c *Client) NumberSessionsInProgress() int {
   825  	// The underlying session pool uses an int64 for checkedOut to allow atomic
   826  	// access. We convert to an int here to maintain backward compatibility with
   827  	// older versions of the driver that did not atomically access checkedOut.
   828  	return int(c.sessionPool.CheckedOut())
   829  }
   831  // Timeout returns the timeout set for this client.
   832  func (c *Client) Timeout() *time.Duration {
   833  	return c.timeout
   834  }
   836  func (c *Client) createBaseCursorOptions() driver.CursorOptions {
   837  	return driver.CursorOptions{
   838  		CommandMonitor: c.monitor,
   839  		Crypt:          c.cryptFLE,
   840  		ServerAPI:      c.serverAPI,
   841  	}
   842  }
   844  // newLogger will use the LoggerOptions to create an internal logger and publish
   845  // messages using a LogSink.
   846  func newLogger(opts *options.LoggerOptions) (*logger.Logger, error) {
   847  	// If there are no logger options, then create a default logger.
   848  	if opts == nil {
   849  		opts = options.Logger()
   850  	}
   852  	// If there are no component-level options and the environment does not
   853  	// contain component variables, then do nothing.
   854  	if (opts.ComponentLevels == nil || len(opts.ComponentLevels) == 0) &&
   855  		!logger.EnvHasComponentVariables() {
   857  		return nil, nil
   858  	}
   860  	// Otherwise, collect the component-level options and create a logger.
   861  	componentLevels := make(map[logger.Component]logger.Level)
   862  	for component, level := range opts.ComponentLevels {
   863  		componentLevels[logger.Component(component)] = logger.Level(level)
   864  	}
   866  	return logger.New(opts.Sink, opts.MaxDocumentLength, componentLevels)
   867  }

View as plain text