...

Source file src/go.mongodb.org/mongo-driver/mongo/database.go

Documentation: go.mongodb.org/mongo-driver/mongo

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package mongo
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"time"
    14  
    15  	"go.mongodb.org/mongo-driver/bson"
    16  	"go.mongodb.org/mongo-driver/bson/bsoncodec"
    17  	"go.mongodb.org/mongo-driver/internal/csfle"
    18  	"go.mongodb.org/mongo-driver/mongo/description"
    19  	"go.mongodb.org/mongo-driver/mongo/options"
    20  	"go.mongodb.org/mongo-driver/mongo/readconcern"
    21  	"go.mongodb.org/mongo-driver/mongo/readpref"
    22  	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    23  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    24  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    25  	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
    26  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    27  )
    28  
    29  var (
    30  	defaultRunCmdOpts = []*options.RunCmdOptions{options.RunCmd().SetReadPreference(readpref.Primary())}
    31  )
    32  
    33  // Database is a handle to a MongoDB database. It is safe for concurrent use by multiple goroutines.
    34  type Database struct {
    35  	client         *Client
    36  	name           string
    37  	readConcern    *readconcern.ReadConcern
    38  	writeConcern   *writeconcern.WriteConcern
    39  	readPreference *readpref.ReadPref
    40  	readSelector   description.ServerSelector
    41  	writeSelector  description.ServerSelector
    42  	bsonOpts       *options.BSONOptions
    43  	registry       *bsoncodec.Registry
    44  }
    45  
    46  func newDatabase(client *Client, name string, opts ...*options.DatabaseOptions) *Database {
    47  	dbOpt := options.MergeDatabaseOptions(opts...)
    48  
    49  	rc := client.readConcern
    50  	if dbOpt.ReadConcern != nil {
    51  		rc = dbOpt.ReadConcern
    52  	}
    53  
    54  	rp := client.readPreference
    55  	if dbOpt.ReadPreference != nil {
    56  		rp = dbOpt.ReadPreference
    57  	}
    58  
    59  	wc := client.writeConcern
    60  	if dbOpt.WriteConcern != nil {
    61  		wc = dbOpt.WriteConcern
    62  	}
    63  
    64  	bsonOpts := client.bsonOpts
    65  	if dbOpt.BSONOptions != nil {
    66  		bsonOpts = dbOpt.BSONOptions
    67  	}
    68  
    69  	reg := client.registry
    70  	if dbOpt.Registry != nil {
    71  		reg = dbOpt.Registry
    72  	}
    73  
    74  	db := &Database{
    75  		client:         client,
    76  		name:           name,
    77  		readPreference: rp,
    78  		readConcern:    rc,
    79  		writeConcern:   wc,
    80  		bsonOpts:       bsonOpts,
    81  		registry:       reg,
    82  	}
    83  
    84  	db.readSelector = description.CompositeSelector([]description.ServerSelector{
    85  		description.ReadPrefSelector(db.readPreference),
    86  		description.LatencySelector(db.client.localThreshold),
    87  	})
    88  
    89  	db.writeSelector = description.CompositeSelector([]description.ServerSelector{
    90  		description.WriteSelector(),
    91  		description.LatencySelector(db.client.localThreshold),
    92  	})
    93  
    94  	return db
    95  }
    96  
    97  // Client returns the Client the Database was created from.
    98  func (db *Database) Client() *Client {
    99  	return db.client
   100  }
   101  
   102  // Name returns the name of the database.
   103  func (db *Database) Name() string {
   104  	return db.name
   105  }
   106  
   107  // Collection gets a handle for a collection with the given name configured with the given CollectionOptions.
   108  func (db *Database) Collection(name string, opts ...*options.CollectionOptions) *Collection {
   109  	return newCollection(db, name, opts...)
   110  }
   111  
   112  // Aggregate executes an aggregate command the database. This requires MongoDB version >= 3.6 and driver version >=
   113  // 1.1.0.
   114  //
   115  // The pipeline parameter must be a slice of documents, each representing an aggregation stage. The pipeline
   116  // cannot be nil but can be empty. The stage documents must all be non-nil. For a pipeline of bson.D documents, the
   117  // mongo.Pipeline type can be used. See
   118  // https://www.mongodb.com/docs/manual/reference/operator/aggregation-pipeline/#db-aggregate-stages for a list of valid
   119  // stages in database-level aggregations.
   120  //
   121  // The opts parameter can be used to specify options for this operation (see the options.AggregateOptions documentation).
   122  //
   123  // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/aggregate/.
   124  func (db *Database) Aggregate(ctx context.Context, pipeline interface{},
   125  	opts ...*options.AggregateOptions) (*Cursor, error) {
   126  	a := aggregateParams{
   127  		ctx:            ctx,
   128  		pipeline:       pipeline,
   129  		client:         db.client,
   130  		registry:       db.registry,
   131  		readConcern:    db.readConcern,
   132  		writeConcern:   db.writeConcern,
   133  		retryRead:      db.client.retryReads,
   134  		db:             db.name,
   135  		readSelector:   db.readSelector,
   136  		writeSelector:  db.writeSelector,
   137  		readPreference: db.readPreference,
   138  		opts:           opts,
   139  	}
   140  	return aggregate(a)
   141  }
   142  
   143  func (db *Database) processRunCommand(ctx context.Context, cmd interface{},
   144  	cursorCommand bool, opts ...*options.RunCmdOptions) (*operation.Command, *session.Client, error) {
   145  	sess := sessionFromContext(ctx)
   146  	if sess == nil && db.client.sessionPool != nil {
   147  		sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
   148  	}
   149  
   150  	err := db.client.validSession(sess)
   151  	if err != nil {
   152  		return nil, sess, err
   153  	}
   154  
   155  	ro := options.MergeRunCmdOptions(append(defaultRunCmdOpts, opts...)...)
   156  	if sess != nil && sess.TransactionRunning() && ro.ReadPreference != nil && ro.ReadPreference.Mode() != readpref.PrimaryMode {
   157  		return nil, sess, errors.New("read preference in a transaction must be primary")
   158  	}
   159  
   160  	if isUnorderedMap(cmd) {
   161  		return nil, sess, ErrMapForOrderedArgument{"cmd"}
   162  	}
   163  
   164  	runCmdDoc, err := marshal(cmd, db.bsonOpts, db.registry)
   165  	if err != nil {
   166  		return nil, sess, err
   167  	}
   168  	readSelect := description.CompositeSelector([]description.ServerSelector{
   169  		description.ReadPrefSelector(ro.ReadPreference),
   170  		description.LatencySelector(db.client.localThreshold),
   171  	})
   172  	if sess != nil && sess.PinnedServer != nil {
   173  		readSelect = makePinnedSelector(sess, readSelect)
   174  	}
   175  
   176  	var op *operation.Command
   177  	switch cursorCommand {
   178  	case true:
   179  		cursorOpts := db.client.createBaseCursorOptions()
   180  
   181  		cursorOpts.MarshalValueEncoderFn = newEncoderFn(db.bsonOpts, db.registry)
   182  
   183  		op = operation.NewCursorCommand(runCmdDoc, cursorOpts)
   184  	default:
   185  		op = operation.NewCommand(runCmdDoc)
   186  	}
   187  
   188  	return op.Session(sess).CommandMonitor(db.client.monitor).
   189  		ServerSelector(readSelect).ClusterClock(db.client.clock).
   190  		Database(db.name).Deployment(db.client.deployment).
   191  		Crypt(db.client.cryptFLE).ReadPreference(ro.ReadPreference).ServerAPI(db.client.serverAPI).
   192  		Timeout(db.client.timeout).Logger(db.client.logger), sess, nil
   193  }
   194  
   195  // RunCommand executes the given command against the database.
   196  //
   197  // This function does not obey the Database's readPreference. To specify a read
   198  // preference, the RunCmdOptions.ReadPreference option must be used.
   199  //
   200  // This function does not obey the Database's readConcern or writeConcern. A
   201  // user must supply these values manually in the user-provided runCommand
   202  // parameter.
   203  //
   204  // The runCommand parameter must be a document for the command to be executed. It cannot be nil.
   205  // This must be an order-preserving type such as bson.D. Map types such as bson.M are not valid.
   206  //
   207  // The opts parameter can be used to specify options for this operation (see the options.RunCmdOptions documentation).
   208  //
   209  // The behavior of RunCommand is undefined if the command document contains any of the following:
   210  // - A session ID or any transaction-specific fields
   211  // - API versioning options when an API version is already declared on the Client
   212  // - maxTimeMS when Timeout is set on the Client
   213  func (db *Database) RunCommand(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) *SingleResult {
   214  	if ctx == nil {
   215  		ctx = context.Background()
   216  	}
   217  
   218  	op, sess, err := db.processRunCommand(ctx, runCommand, false, opts...)
   219  	defer closeImplicitSession(sess)
   220  	if err != nil {
   221  		return &SingleResult{err: err}
   222  	}
   223  
   224  	err = op.Execute(ctx)
   225  	// RunCommand can be used to run a write, thus execute may return a write error
   226  	_, convErr := processWriteError(err)
   227  	return &SingleResult{
   228  		ctx:      ctx,
   229  		err:      convErr,
   230  		rdr:      bson.Raw(op.Result()),
   231  		bsonOpts: db.bsonOpts,
   232  		reg:      db.registry,
   233  	}
   234  }
   235  
   236  // RunCommandCursor executes the given command against the database and parses the response as a cursor. If the command
   237  // being executed does not return a cursor (e.g. insert), the command will be executed on the server and an error will
   238  // be returned because the server response cannot be parsed as a cursor. This function does not obey the Database's read
   239  // preference. To specify a read preference, the RunCmdOptions.ReadPreference option must be used.
   240  //
   241  // The runCommand parameter must be a document for the command to be executed. It cannot be nil.
   242  // This must be an order-preserving type such as bson.D. Map types such as bson.M are not valid.
   243  //
   244  // The opts parameter can be used to specify options for this operation (see the options.RunCmdOptions documentation).
   245  //
   246  // The behavior of RunCommandCursor is undefined if the command document contains any of the following:
   247  // - A session ID or any transaction-specific fields
   248  // - API versioning options when an API version is already declared on the Client
   249  // - maxTimeMS when Timeout is set on the Client
   250  func (db *Database) RunCommandCursor(ctx context.Context, runCommand interface{}, opts ...*options.RunCmdOptions) (*Cursor, error) {
   251  	if ctx == nil {
   252  		ctx = context.Background()
   253  	}
   254  
   255  	op, sess, err := db.processRunCommand(ctx, runCommand, true, opts...)
   256  	if err != nil {
   257  		closeImplicitSession(sess)
   258  		return nil, replaceErrors(err)
   259  	}
   260  
   261  	if err = op.Execute(ctx); err != nil {
   262  		closeImplicitSession(sess)
   263  		if errors.Is(err, driver.ErrNoCursor) {
   264  			return nil, errors.New(
   265  				"database response does not contain a cursor; try using RunCommand instead")
   266  		}
   267  		return nil, replaceErrors(err)
   268  	}
   269  
   270  	bc, err := op.ResultCursor()
   271  	if err != nil {
   272  		closeImplicitSession(sess)
   273  		return nil, replaceErrors(err)
   274  	}
   275  	cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess)
   276  	return cursor, replaceErrors(err)
   277  }
   278  
   279  // Drop drops the database on the server. This method ignores "namespace not found" errors so it is safe to drop
   280  // a database that does not exist on the server.
   281  func (db *Database) Drop(ctx context.Context) error {
   282  	if ctx == nil {
   283  		ctx = context.Background()
   284  	}
   285  
   286  	sess := sessionFromContext(ctx)
   287  	if sess == nil && db.client.sessionPool != nil {
   288  		sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
   289  		defer sess.EndSession()
   290  	}
   291  
   292  	err := db.client.validSession(sess)
   293  	if err != nil {
   294  		return err
   295  	}
   296  
   297  	wc := db.writeConcern
   298  	if sess.TransactionRunning() {
   299  		wc = nil
   300  	}
   301  	if !writeconcern.AckWrite(wc) {
   302  		sess = nil
   303  	}
   304  
   305  	selector := makePinnedSelector(sess, db.writeSelector)
   306  
   307  	op := operation.NewDropDatabase().
   308  		Session(sess).WriteConcern(wc).CommandMonitor(db.client.monitor).
   309  		ServerSelector(selector).ClusterClock(db.client.clock).
   310  		Database(db.name).Deployment(db.client.deployment).Crypt(db.client.cryptFLE).
   311  		ServerAPI(db.client.serverAPI)
   312  
   313  	err = op.Execute(ctx)
   314  
   315  	driverErr, ok := err.(driver.Error)
   316  	if err != nil && (!ok || !driverErr.NamespaceNotFound()) {
   317  		return replaceErrors(err)
   318  	}
   319  	return nil
   320  }
   321  
   322  // ListCollectionSpecifications executes a listCollections command and returns a slice of CollectionSpecification
   323  // instances representing the collections in the database.
   324  //
   325  // The filter parameter must be a document containing query operators and can be used to select which collections
   326  // are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
   327  // collections.
   328  //
   329  // The opts parameter can be used to specify options for the operation (see the options.ListCollectionsOptions
   330  // documentation).
   331  //
   332  // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listCollections/.
   333  //
   334  // BUG(benjirewis): ListCollectionSpecifications prevents listing more than 100 collections per database when running
   335  // against MongoDB version 2.6.
   336  func (db *Database) ListCollectionSpecifications(ctx context.Context, filter interface{},
   337  	opts ...*options.ListCollectionsOptions) ([]*CollectionSpecification, error) {
   338  
   339  	cursor, err := db.ListCollections(ctx, filter, opts...)
   340  	if err != nil {
   341  		return nil, err
   342  	}
   343  
   344  	var specs []*CollectionSpecification
   345  	err = cursor.All(ctx, &specs)
   346  	if err != nil {
   347  		return nil, err
   348  	}
   349  
   350  	for _, spec := range specs {
   351  		// Pre-4.4 servers report a namespace in their responses, so we only set Namespace manually if it was not in
   352  		// the response.
   353  		if spec.IDIndex != nil && spec.IDIndex.Namespace == "" {
   354  			spec.IDIndex.Namespace = db.name + "." + spec.Name
   355  		}
   356  	}
   357  	return specs, nil
   358  }
   359  
   360  // ListCollections executes a listCollections command and returns a cursor over the collections in the database.
   361  //
   362  // The filter parameter must be a document containing query operators and can be used to select which collections
   363  // are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
   364  // collections.
   365  //
   366  // The opts parameter can be used to specify options for the operation (see the options.ListCollectionsOptions
   367  // documentation).
   368  //
   369  // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listCollections/.
   370  //
   371  // BUG(benjirewis): ListCollections prevents listing more than 100 collections per database when running against
   372  // MongoDB version 2.6.
   373  func (db *Database) ListCollections(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) (*Cursor, error) {
   374  	if ctx == nil {
   375  		ctx = context.Background()
   376  	}
   377  
   378  	filterDoc, err := marshal(filter, db.bsonOpts, db.registry)
   379  	if err != nil {
   380  		return nil, err
   381  	}
   382  
   383  	sess := sessionFromContext(ctx)
   384  	if sess == nil && db.client.sessionPool != nil {
   385  		sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
   386  	}
   387  
   388  	err = db.client.validSession(sess)
   389  	if err != nil {
   390  		closeImplicitSession(sess)
   391  		return nil, err
   392  	}
   393  
   394  	selector := description.CompositeSelector([]description.ServerSelector{
   395  		description.ReadPrefSelector(readpref.Primary()),
   396  		description.LatencySelector(db.client.localThreshold),
   397  	})
   398  	selector = makeReadPrefSelector(sess, selector, db.client.localThreshold)
   399  
   400  	lco := options.MergeListCollectionsOptions(opts...)
   401  	op := operation.NewListCollections(filterDoc).
   402  		Session(sess).ReadPreference(db.readPreference).CommandMonitor(db.client.monitor).
   403  		ServerSelector(selector).ClusterClock(db.client.clock).
   404  		Database(db.name).Deployment(db.client.deployment).Crypt(db.client.cryptFLE).
   405  		ServerAPI(db.client.serverAPI).Timeout(db.client.timeout)
   406  
   407  	cursorOpts := db.client.createBaseCursorOptions()
   408  
   409  	cursorOpts.MarshalValueEncoderFn = newEncoderFn(db.bsonOpts, db.registry)
   410  
   411  	if lco.NameOnly != nil {
   412  		op = op.NameOnly(*lco.NameOnly)
   413  	}
   414  	if lco.BatchSize != nil {
   415  		cursorOpts.BatchSize = *lco.BatchSize
   416  		op = op.BatchSize(*lco.BatchSize)
   417  	}
   418  	if lco.AuthorizedCollections != nil {
   419  		op = op.AuthorizedCollections(*lco.AuthorizedCollections)
   420  	}
   421  
   422  	retry := driver.RetryNone
   423  	if db.client.retryReads {
   424  		retry = driver.RetryOncePerCommand
   425  	}
   426  	op = op.Retry(retry)
   427  
   428  	err = op.Execute(ctx)
   429  	if err != nil {
   430  		closeImplicitSession(sess)
   431  		return nil, replaceErrors(err)
   432  	}
   433  
   434  	bc, err := op.Result(cursorOpts)
   435  	if err != nil {
   436  		closeImplicitSession(sess)
   437  		return nil, replaceErrors(err)
   438  	}
   439  	cursor, err := newCursorWithSession(bc, db.bsonOpts, db.registry, sess)
   440  	return cursor, replaceErrors(err)
   441  }
   442  
   443  // ListCollectionNames executes a listCollections command and returns a slice containing the names of the collections
   444  // in the database. This method requires driver version >= 1.1.0.
   445  //
   446  // The filter parameter must be a document containing query operators and can be used to select which collections
   447  // are included in the result. It cannot be nil. An empty document (e.g. bson.D{}) should be used to include all
   448  // collections.
   449  //
   450  // The opts parameter can be used to specify options for the operation (see the options.ListCollectionsOptions
   451  // documentation).
   452  //
   453  // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/listCollections/.
   454  //
   455  // BUG(benjirewis): ListCollectionNames prevents listing more than 100 collections per database when running against
   456  // MongoDB version 2.6.
   457  func (db *Database) ListCollectionNames(ctx context.Context, filter interface{}, opts ...*options.ListCollectionsOptions) ([]string, error) {
   458  	opts = append(opts, options.ListCollections().SetNameOnly(true))
   459  
   460  	res, err := db.ListCollections(ctx, filter, opts...)
   461  	if err != nil {
   462  		return nil, err
   463  	}
   464  
   465  	defer res.Close(ctx)
   466  
   467  	names := make([]string, 0)
   468  	for res.Next(ctx) {
   469  		elem, err := res.Current.LookupErr("name")
   470  		if err != nil {
   471  			return nil, err
   472  		}
   473  
   474  		if elem.Type != bson.TypeString {
   475  			return nil, fmt.Errorf("incorrect type for 'name'. got %v. want %v", elem.Type, bson.TypeString)
   476  		}
   477  
   478  		elemName := elem.StringValue()
   479  		names = append(names, elemName)
   480  	}
   481  
   482  	res.Close(ctx)
   483  	return names, nil
   484  }
   485  
   486  // ReadConcern returns the read concern used to configure the Database object.
   487  func (db *Database) ReadConcern() *readconcern.ReadConcern {
   488  	return db.readConcern
   489  }
   490  
   491  // ReadPreference returns the read preference used to configure the Database object.
   492  func (db *Database) ReadPreference() *readpref.ReadPref {
   493  	return db.readPreference
   494  }
   495  
   496  // WriteConcern returns the write concern used to configure the Database object.
   497  func (db *Database) WriteConcern() *writeconcern.WriteConcern {
   498  	return db.writeConcern
   499  }
   500  
   501  // Watch returns a change stream for all changes to the corresponding database. See
   502  // https://www.mongodb.com/docs/manual/changeStreams/ for more information about change streams.
   503  //
   504  // The Database must be configured with read concern majority or no read concern for a change stream to be created
   505  // successfully.
   506  //
   507  // The pipeline parameter must be a slice of documents, each representing a pipeline stage. The pipeline cannot be
   508  // nil but can be empty. The stage documents must all be non-nil. See https://www.mongodb.com/docs/manual/changeStreams/ for
   509  // a list of pipeline stages that can be used with change streams. For a pipeline of bson.D documents, the
   510  // mongo.Pipeline{} type can be used.
   511  //
   512  // The opts parameter can be used to specify options for change stream creation (see the options.ChangeStreamOptions
   513  // documentation).
   514  func (db *Database) Watch(ctx context.Context, pipeline interface{},
   515  	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
   516  
   517  	csConfig := changeStreamConfig{
   518  		readConcern:    db.readConcern,
   519  		readPreference: db.readPreference,
   520  		client:         db.client,
   521  		registry:       db.registry,
   522  		streamType:     DatabaseStream,
   523  		databaseName:   db.Name(),
   524  		crypt:          db.client.cryptFLE,
   525  	}
   526  	return newChangeStream(ctx, csConfig, pipeline, opts...)
   527  }
   528  
   529  // CreateCollection executes a create command to explicitly create a new collection with the specified name on the
   530  // server. If the collection being created already exists, this method will return a mongo.CommandError. This method
   531  // requires driver version 1.4.0 or higher.
   532  //
   533  // The opts parameter can be used to specify options for the operation (see the options.CreateCollectionOptions
   534  // documentation).
   535  //
   536  // For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/create/.
   537  func (db *Database) CreateCollection(ctx context.Context, name string, opts ...*options.CreateCollectionOptions) error {
   538  	cco := options.MergeCreateCollectionOptions(opts...)
   539  	// Follow Client-Side Encryption specification to check for encryptedFields.
   540  	// Check for encryptedFields from create options.
   541  	ef := cco.EncryptedFields
   542  	// Check for encryptedFields from the client EncryptedFieldsMap.
   543  	if ef == nil {
   544  		ef = db.getEncryptedFieldsFromMap(name)
   545  	}
   546  	if ef != nil {
   547  		return db.createCollectionWithEncryptedFields(ctx, name, ef, opts...)
   548  	}
   549  
   550  	return db.createCollection(ctx, name, opts...)
   551  }
   552  
   553  // getEncryptedFieldsFromServer tries to get an "encryptedFields" document associated with collectionName by running the "listCollections" command.
   554  // Returns nil and no error if the listCollections command succeeds, but "encryptedFields" is not present.
   555  func (db *Database) getEncryptedFieldsFromServer(ctx context.Context, collectionName string) (interface{}, error) {
   556  	// Check if collection has an EncryptedFields configured server-side.
   557  	collSpecs, err := db.ListCollectionSpecifications(ctx, bson.D{{"name", collectionName}})
   558  	if err != nil {
   559  		return nil, err
   560  	}
   561  	if len(collSpecs) == 0 {
   562  		return nil, nil
   563  	}
   564  	if len(collSpecs) > 1 {
   565  		return nil, fmt.Errorf("expected 1 or 0 results from listCollections, got %v", len(collSpecs))
   566  	}
   567  	collSpec := collSpecs[0]
   568  	rawValue, err := collSpec.Options.LookupErr("encryptedFields")
   569  	if errors.Is(err, bsoncore.ErrElementNotFound) {
   570  		return nil, nil
   571  	} else if err != nil {
   572  		return nil, err
   573  	}
   574  
   575  	encryptedFields, ok := rawValue.DocumentOK()
   576  	if !ok {
   577  		return nil, fmt.Errorf("expected encryptedFields of %v to be document, got %v", collectionName, rawValue.Type)
   578  	}
   579  
   580  	return encryptedFields, nil
   581  }
   582  
   583  // getEncryptedFieldsFromServer tries to get an "encryptedFields" document associated with collectionName by checking the client EncryptedFieldsMap.
   584  // Returns nil and no error if an EncryptedFieldsMap is not configured, or does not contain an entry for collectionName.
   585  func (db *Database) getEncryptedFieldsFromMap(collectionName string) interface{} {
   586  	// Check the EncryptedFieldsMap
   587  	efMap := db.client.encryptedFieldsMap
   588  	if efMap == nil {
   589  		return nil
   590  	}
   591  
   592  	namespace := db.name + "." + collectionName
   593  
   594  	ef, ok := efMap[namespace]
   595  	if ok {
   596  		return ef
   597  	}
   598  	return nil
   599  }
   600  
   601  // createCollectionWithEncryptedFields creates a collection with an EncryptedFields.
   602  func (db *Database) createCollectionWithEncryptedFields(ctx context.Context, name string, ef interface{}, opts ...*options.CreateCollectionOptions) error {
   603  	efBSON, err := marshal(ef, db.bsonOpts, db.registry)
   604  	if err != nil {
   605  		return fmt.Errorf("error transforming document: %w", err)
   606  	}
   607  
   608  	// Check the wire version to ensure server is 7.0.0 or newer.
   609  	// After the wire version check, and before creating the collections, it is possible the server state changes.
   610  	// That is OK. This wire version check is a best effort to inform users earlier if using a QEv2 driver with a QEv1 server.
   611  	{
   612  		const QEv2WireVersion = 21
   613  		server, err := db.client.deployment.SelectServer(ctx, description.WriteSelector())
   614  		if err != nil {
   615  			return fmt.Errorf("error selecting server to check maxWireVersion: %w", err)
   616  		}
   617  		conn, err := server.Connection(ctx)
   618  		if err != nil {
   619  			return fmt.Errorf("error getting connection to check maxWireVersion: %w", err)
   620  		}
   621  		defer conn.Close()
   622  		wireVersionRange := conn.Description().WireVersion
   623  		if wireVersionRange.Max < QEv2WireVersion {
   624  			return fmt.Errorf("Driver support of Queryable Encryption is incompatible with server. Upgrade server to use Queryable Encryption. Got maxWireVersion %v but need maxWireVersion >= %v", wireVersionRange.Max, QEv2WireVersion)
   625  		}
   626  	}
   627  
   628  	// Create the two encryption-related, associated collections: `escCollection` and `ecocCollection`.
   629  
   630  	stateCollectionOpts := options.CreateCollection().
   631  		SetClusteredIndex(bson.D{{"key", bson.D{{"_id", 1}}}, {"unique", true}})
   632  	// Create ESCCollection.
   633  	escCollection, err := csfle.GetEncryptedStateCollectionName(efBSON, name, csfle.EncryptedStateCollection)
   634  	if err != nil {
   635  		return err
   636  	}
   637  
   638  	if err := db.createCollection(ctx, escCollection, stateCollectionOpts); err != nil {
   639  		return err
   640  	}
   641  
   642  	// Create ECOCCollection.
   643  	ecocCollection, err := csfle.GetEncryptedStateCollectionName(efBSON, name, csfle.EncryptedCompactionCollection)
   644  	if err != nil {
   645  		return err
   646  	}
   647  
   648  	if err := db.createCollection(ctx, ecocCollection, stateCollectionOpts); err != nil {
   649  		return err
   650  	}
   651  
   652  	// Create a data collection with the 'encryptedFields' option.
   653  	op, err := db.createCollectionOperation(name, opts...)
   654  	if err != nil {
   655  		return err
   656  	}
   657  
   658  	op.EncryptedFields(efBSON)
   659  	if err := db.executeCreateOperation(ctx, op); err != nil {
   660  		return err
   661  	}
   662  
   663  	// Create an index on the __safeContent__ field in the collection @collectionName.
   664  	if _, err := db.Collection(name).Indexes().CreateOne(ctx, IndexModel{Keys: bson.D{{"__safeContent__", 1}}}); err != nil {
   665  		return fmt.Errorf("error creating safeContent index: %w", err)
   666  	}
   667  
   668  	return nil
   669  }
   670  
   671  // createCollection creates a collection without EncryptedFields.
   672  func (db *Database) createCollection(ctx context.Context, name string, opts ...*options.CreateCollectionOptions) error {
   673  	op, err := db.createCollectionOperation(name, opts...)
   674  	if err != nil {
   675  		return err
   676  	}
   677  	return db.executeCreateOperation(ctx, op)
   678  }
   679  
   680  func (db *Database) createCollectionOperation(name string, opts ...*options.CreateCollectionOptions) (*operation.Create, error) {
   681  	cco := options.MergeCreateCollectionOptions(opts...)
   682  	op := operation.NewCreate(name).ServerAPI(db.client.serverAPI)
   683  
   684  	if cco.Capped != nil {
   685  		op.Capped(*cco.Capped)
   686  	}
   687  	if cco.Collation != nil {
   688  		op.Collation(bsoncore.Document(cco.Collation.ToDocument()))
   689  	}
   690  	if cco.ChangeStreamPreAndPostImages != nil {
   691  		csppi, err := marshal(cco.ChangeStreamPreAndPostImages, db.bsonOpts, db.registry)
   692  		if err != nil {
   693  			return nil, err
   694  		}
   695  		op.ChangeStreamPreAndPostImages(csppi)
   696  	}
   697  	if cco.DefaultIndexOptions != nil {
   698  		idx, doc := bsoncore.AppendDocumentStart(nil)
   699  		if cco.DefaultIndexOptions.StorageEngine != nil {
   700  			storageEngine, err := marshal(cco.DefaultIndexOptions.StorageEngine, db.bsonOpts, db.registry)
   701  			if err != nil {
   702  				return nil, err
   703  			}
   704  
   705  			doc = bsoncore.AppendDocumentElement(doc, "storageEngine", storageEngine)
   706  		}
   707  		doc, err := bsoncore.AppendDocumentEnd(doc, idx)
   708  		if err != nil {
   709  			return nil, err
   710  		}
   711  
   712  		op.IndexOptionDefaults(doc)
   713  	}
   714  	if cco.MaxDocuments != nil {
   715  		op.Max(*cco.MaxDocuments)
   716  	}
   717  	if cco.SizeInBytes != nil {
   718  		op.Size(*cco.SizeInBytes)
   719  	}
   720  	if cco.StorageEngine != nil {
   721  		storageEngine, err := marshal(cco.StorageEngine, db.bsonOpts, db.registry)
   722  		if err != nil {
   723  			return nil, err
   724  		}
   725  		op.StorageEngine(storageEngine)
   726  	}
   727  	if cco.ValidationAction != nil {
   728  		op.ValidationAction(*cco.ValidationAction)
   729  	}
   730  	if cco.ValidationLevel != nil {
   731  		op.ValidationLevel(*cco.ValidationLevel)
   732  	}
   733  	if cco.Validator != nil {
   734  		validator, err := marshal(cco.Validator, db.bsonOpts, db.registry)
   735  		if err != nil {
   736  			return nil, err
   737  		}
   738  		op.Validator(validator)
   739  	}
   740  	if cco.ExpireAfterSeconds != nil {
   741  		op.ExpireAfterSeconds(*cco.ExpireAfterSeconds)
   742  	}
   743  	if cco.TimeSeriesOptions != nil {
   744  		idx, doc := bsoncore.AppendDocumentStart(nil)
   745  		doc = bsoncore.AppendStringElement(doc, "timeField", cco.TimeSeriesOptions.TimeField)
   746  
   747  		if cco.TimeSeriesOptions.MetaField != nil {
   748  			doc = bsoncore.AppendStringElement(doc, "metaField", *cco.TimeSeriesOptions.MetaField)
   749  		}
   750  		if cco.TimeSeriesOptions.Granularity != nil {
   751  			doc = bsoncore.AppendStringElement(doc, "granularity", *cco.TimeSeriesOptions.Granularity)
   752  		}
   753  
   754  		if cco.TimeSeriesOptions.BucketMaxSpan != nil {
   755  			bmss := int64(*cco.TimeSeriesOptions.BucketMaxSpan / time.Second)
   756  
   757  			doc = bsoncore.AppendInt64Element(doc, "bucketMaxSpanSeconds", bmss)
   758  		}
   759  
   760  		if cco.TimeSeriesOptions.BucketRounding != nil {
   761  			brs := int64(*cco.TimeSeriesOptions.BucketRounding / time.Second)
   762  
   763  			doc = bsoncore.AppendInt64Element(doc, "bucketRoundingSeconds", brs)
   764  		}
   765  
   766  		doc, err := bsoncore.AppendDocumentEnd(doc, idx)
   767  		if err != nil {
   768  			return nil, err
   769  		}
   770  
   771  		op.TimeSeries(doc)
   772  	}
   773  	if cco.ClusteredIndex != nil {
   774  		clusteredIndex, err := marshal(cco.ClusteredIndex, db.bsonOpts, db.registry)
   775  		if err != nil {
   776  			return nil, err
   777  		}
   778  		op.ClusteredIndex(clusteredIndex)
   779  	}
   780  
   781  	return op, nil
   782  }
   783  
   784  // CreateView executes a create command to explicitly create a view on the server. See
   785  // https://www.mongodb.com/docs/manual/core/views/ for more information about views. This method requires driver version >=
   786  // 1.4.0 and MongoDB version >= 3.4.
   787  //
   788  // The viewName parameter specifies the name of the view to create.
   789  //
   790  // # The viewOn parameter specifies the name of the collection or view on which this view will be created
   791  //
   792  // The pipeline parameter specifies an aggregation pipeline that will be exececuted against the source collection or
   793  // view to create this view.
   794  //
   795  // The opts parameter can be used to specify options for the operation (see the options.CreateViewOptions
   796  // documentation).
   797  func (db *Database) CreateView(ctx context.Context, viewName, viewOn string, pipeline interface{},
   798  	opts ...*options.CreateViewOptions) error {
   799  
   800  	pipelineArray, _, err := marshalAggregatePipeline(pipeline, db.bsonOpts, db.registry)
   801  	if err != nil {
   802  		return err
   803  	}
   804  
   805  	op := operation.NewCreate(viewName).
   806  		ViewOn(viewOn).
   807  		Pipeline(pipelineArray).
   808  		ServerAPI(db.client.serverAPI)
   809  	cvo := options.MergeCreateViewOptions(opts...)
   810  	if cvo.Collation != nil {
   811  		op.Collation(bsoncore.Document(cvo.Collation.ToDocument()))
   812  	}
   813  
   814  	return db.executeCreateOperation(ctx, op)
   815  }
   816  
   817  func (db *Database) executeCreateOperation(ctx context.Context, op *operation.Create) error {
   818  	sess := sessionFromContext(ctx)
   819  	if sess == nil && db.client.sessionPool != nil {
   820  		sess = session.NewImplicitClientSession(db.client.sessionPool, db.client.id)
   821  		defer sess.EndSession()
   822  	}
   823  
   824  	err := db.client.validSession(sess)
   825  	if err != nil {
   826  		return err
   827  	}
   828  
   829  	wc := db.writeConcern
   830  	if sess.TransactionRunning() {
   831  		wc = nil
   832  	}
   833  	if !writeconcern.AckWrite(wc) {
   834  		sess = nil
   835  	}
   836  
   837  	selector := makePinnedSelector(sess, db.writeSelector)
   838  	op = op.Session(sess).
   839  		WriteConcern(wc).
   840  		CommandMonitor(db.client.monitor).
   841  		ServerSelector(selector).
   842  		ClusterClock(db.client.clock).
   843  		Database(db.name).
   844  		Deployment(db.client.deployment).
   845  		Crypt(db.client.cryptFLE)
   846  
   847  	return replaceErrors(op.Execute(ctx))
   848  }
   849  

View as plain text