...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/crud_helpers_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"bytes"
    11  	"context"
    12  	"fmt"
    13  	"math"
    14  	"strconv"
    15  	"strings"
    16  	"time"
    17  
    18  	"go.mongodb.org/mongo-driver/bson"
    19  	"go.mongodb.org/mongo-driver/bson/bsontype"
    20  	"go.mongodb.org/mongo-driver/bson/primitive"
    21  	"go.mongodb.org/mongo-driver/internal/assert"
    22  	"go.mongodb.org/mongo-driver/internal/bsonutil"
    23  	"go.mongodb.org/mongo-driver/internal/integtest"
    24  	"go.mongodb.org/mongo-driver/mongo"
    25  	"go.mongodb.org/mongo-driver/mongo/gridfs"
    26  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    27  	"go.mongodb.org/mongo-driver/mongo/integration/unified"
    28  	"go.mongodb.org/mongo-driver/mongo/options"
    29  	"go.mongodb.org/mongo-driver/mongo/readconcern"
    30  	"go.mongodb.org/mongo-driver/mongo/readpref"
    31  )
    32  
    33  // Helper functions to execute and verify results from CRUD methods.
    34  
    35  var (
    36  	emptyDoc                        = []byte{5, 0, 0, 0, 0}
    37  	errorCommandNotFound      int32 = 59
    38  	errorLockTimeout          int32 = 24
    39  	errorCommandNotSupported  int32 = 115
    40  	killAllSessionsErrorCodes       = map[int32]struct{}{
    41  		errorInterrupted:         {}, // the command interrupts itself
    42  		errorCommandNotFound:     {}, // the killAllSessions command does not exist on server versions < 3.6
    43  		errorCommandNotSupported: {}, // the command is not supported on Atlas Data Lake
    44  	}
    45  )
    46  
    47  // create an update document or pipeline from a bson.RawValue
    48  func createUpdate(mt *mtest.T, updateVal bson.RawValue) interface{} {
    49  	switch updateVal.Type {
    50  	case bson.TypeEmbeddedDocument:
    51  		return updateVal.Document()
    52  	case bson.TypeArray:
    53  		var updateDocs []bson.Raw
    54  		docs, _ := updateVal.Array().Values()
    55  		for _, doc := range docs {
    56  			updateDocs = append(updateDocs, doc.Document())
    57  		}
    58  
    59  		return updateDocs
    60  	default:
    61  		mt.Fatalf("unrecognized update type: %v", updateVal.Type)
    62  	}
    63  
    64  	return nil
    65  }
    66  
    67  // create a hint string or document from a bson.RawValue
    68  func createHint(mt *mtest.T, val bson.RawValue) interface{} {
    69  	mt.Helper()
    70  
    71  	var hint interface{}
    72  	switch val.Type {
    73  	case bsontype.String:
    74  		hint = val.StringValue()
    75  	case bsontype.EmbeddedDocument:
    76  		hint = val.Document()
    77  	default:
    78  		mt.Fatalf("unrecognized hint value type: %s\n", val.Type)
    79  	}
    80  	return hint
    81  }
    82  
    83  // returns true if err is a mongo.CommandError containing a code that is expected from a killAllSessions command.
    84  func isExpectedKillAllSessionsError(err error) bool {
    85  	cmdErr, ok := err.(mongo.CommandError)
    86  	if !ok {
    87  		return false
    88  	}
    89  
    90  	_, ok = killAllSessionsErrorCodes[cmdErr.Code]
    91  	// for SERVER-54216 on atlas
    92  	atlasUnauthorized := strings.Contains(err.Error(), "(AtlasError) (Unauthorized)")
    93  	return ok || atlasUnauthorized
    94  }
    95  
    96  // kill all open sessions on the server. This function uses mt.GlobalClient() because killAllSessions is not allowed
    97  // for clients configured with specific options (e.g. client side encryption).
    98  func killSessions(mt *mtest.T) {
    99  	mt.Helper()
   100  
   101  	cmd := bson.D{
   102  		{"killAllSessions", bson.A{}},
   103  	}
   104  	runCmdOpts := options.RunCmd().SetReadPreference(mtest.PrimaryRp)
   105  
   106  	// killAllSessions has to be run against each mongos in a sharded cluster, so we use the runCommandOnAllServers
   107  	// helper.
   108  	err := runCommandOnAllServers(func(client *mongo.Client) error {
   109  		return client.Database("admin").RunCommand(context.Background(), cmd, runCmdOpts).Err()
   110  	})
   111  
   112  	if err == nil {
   113  		return
   114  	}
   115  	if !isExpectedKillAllSessionsError(err) {
   116  		mt.Fatalf("killAllSessions error: %v", err)
   117  	}
   118  }
   119  
   120  // Utility function to run a command on all servers. For standalones, the command is run against the one server. For
   121  // replica sets, the command is run against the primary. sharded clusters, the command is run against each mongos.
   122  func runCommandOnAllServers(commandFn func(client *mongo.Client) error) error {
   123  	opts := options.Client().ApplyURI(mtest.ClusterURI())
   124  	integtest.AddTestServerAPIVersion(opts)
   125  
   126  	if mtest.ClusterTopologyKind() != mtest.Sharded {
   127  		client, err := mongo.Connect(context.Background(), opts)
   128  		if err != nil {
   129  			return fmt.Errorf("error creating replica set client: %w", err)
   130  		}
   131  		defer func() { _ = client.Disconnect(context.Background()) }()
   132  
   133  		return commandFn(client)
   134  	}
   135  
   136  	for _, host := range opts.Hosts {
   137  		shardClient, err := mongo.Connect(context.Background(), opts.SetHosts([]string{host}))
   138  		if err != nil {
   139  			return fmt.Errorf("error creating client for mongos %v: %w", host, err)
   140  		}
   141  
   142  		err = commandFn(shardClient)
   143  		_ = shardClient.Disconnect(context.Background())
   144  		if err != nil {
   145  			return err
   146  		}
   147  	}
   148  
   149  	return nil
   150  }
   151  
   152  // aggregator is an interface used to run collection and database-level aggregations
   153  type aggregator interface {
   154  	Aggregate(context.Context, interface{}, ...*options.AggregateOptions) (*mongo.Cursor, error)
   155  }
   156  
   157  // watcher is an interface used to create client, db, and collection-level change streams
   158  type watcher interface {
   159  	Watch(context.Context, interface{}, ...*options.ChangeStreamOptions) (*mongo.ChangeStream, error)
   160  }
   161  
   162  func executeAggregate(mt *mtest.T, agg aggregator, sess mongo.Session, args bson.Raw) (*mongo.Cursor, error) {
   163  	mt.Helper()
   164  
   165  	var pipeline []interface{}
   166  	opts := options.Aggregate()
   167  
   168  	elems, _ := args.Elements()
   169  	for _, elem := range elems {
   170  		key := elem.Key()
   171  		val := elem.Value()
   172  
   173  		switch key {
   174  		case "pipeline":
   175  			pipeline = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
   176  		case "batchSize":
   177  			opts.SetBatchSize(val.Int32())
   178  		case "collation":
   179  			opts.SetCollation(createCollation(mt, val.Document()))
   180  		case "maxTimeMS":
   181  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   182  		case "allowDiskUse":
   183  			opts.SetAllowDiskUse(val.Boolean())
   184  		case "session":
   185  		default:
   186  			mt.Fatalf("unrecognized aggregate option: %v", key)
   187  		}
   188  	}
   189  
   190  	if sess != nil {
   191  		var cur *mongo.Cursor
   192  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   193  			var aerr error
   194  			cur, aerr = agg.Aggregate(sc, pipeline, opts)
   195  			return aerr
   196  		})
   197  		return cur, err
   198  	}
   199  	return agg.Aggregate(context.Background(), pipeline, opts)
   200  }
   201  
   202  func executeWatch(mt *mtest.T, w watcher, sess mongo.Session, args bson.Raw) (*mongo.ChangeStream, error) {
   203  	mt.Helper()
   204  
   205  	pipeline := []interface{}{}
   206  	elems, _ := args.Elements()
   207  	for _, elem := range elems {
   208  		key := elem.Key()
   209  		val := elem.Value()
   210  
   211  		switch key {
   212  		case "pipeline":
   213  			pipeline = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
   214  		default:
   215  			mt.Fatalf("unrecognized watch option: %v", key)
   216  		}
   217  	}
   218  
   219  	if sess != nil {
   220  		var stream *mongo.ChangeStream
   221  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   222  			var csErr error
   223  			stream, csErr = w.Watch(sc, pipeline)
   224  			return csErr
   225  		})
   226  		return stream, err
   227  	}
   228  	return w.Watch(context.Background(), pipeline)
   229  }
   230  
   231  func executeCountDocuments(mt *mtest.T, sess mongo.Session, args bson.Raw) (int64, error) {
   232  	mt.Helper()
   233  
   234  	filter := emptyDoc
   235  	opts := options.Count()
   236  
   237  	elems, _ := args.Elements()
   238  	for _, elem := range elems {
   239  		name := elem.Key()
   240  		opt := elem.Value()
   241  
   242  		switch name {
   243  		case "filter":
   244  			filter = opt.Document()
   245  		case "skip":
   246  			opts = opts.SetSkip(int64(opt.Int32()))
   247  		case "limit":
   248  			opts = opts.SetLimit(int64(opt.Int32()))
   249  		case "collation":
   250  			opts = opts.SetCollation(createCollation(mt, opt.Document()))
   251  		case "session":
   252  		default:
   253  			mt.Fatalf("unrecognized count option: %v", name)
   254  		}
   255  	}
   256  
   257  	if sess != nil {
   258  		var count int64
   259  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   260  			var countErr error
   261  			count, countErr = mt.Coll.CountDocuments(sc, filter, opts)
   262  			return countErr
   263  		})
   264  		return count, err
   265  	}
   266  	return mt.Coll.CountDocuments(context.Background(), filter, opts)
   267  }
   268  
   269  func executeInsertOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.InsertOneResult, error) {
   270  	mt.Helper()
   271  
   272  	doc := emptyDoc
   273  	opts := options.InsertOne()
   274  
   275  	elems, _ := args.Elements()
   276  	for _, elem := range elems {
   277  		key := elem.Key()
   278  		val := elem.Value()
   279  
   280  		switch key {
   281  		case "document":
   282  			doc = val.Document()
   283  		case "bypassDocumentValidation":
   284  			opts.SetBypassDocumentValidation(val.Boolean())
   285  		case "session":
   286  		default:
   287  			mt.Fatalf("unrecognized insertOne option: %v", key)
   288  		}
   289  	}
   290  
   291  	if sess != nil {
   292  		var res *mongo.InsertOneResult
   293  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   294  			var insertErr error
   295  			res, insertErr = mt.Coll.InsertOne(sc, doc, opts)
   296  			return insertErr
   297  		})
   298  		return res, err
   299  	}
   300  	return mt.Coll.InsertOne(context.Background(), doc, opts)
   301  }
   302  
   303  func executeInsertMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.InsertManyResult, error) {
   304  	mt.Helper()
   305  
   306  	var docs []interface{}
   307  	opts := options.InsertMany()
   308  
   309  	elems, _ := args.Elements()
   310  	for _, elem := range elems {
   311  		key := elem.Key()
   312  		val := elem.Value()
   313  
   314  		switch key {
   315  		case "documents":
   316  			docs = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
   317  		case "options":
   318  			// Some of the older tests use this to set the "ordered" option
   319  			optsDoc := val.Document()
   320  			optsElems, _ := optsDoc.Elements()
   321  			assert.Equal(mt, 1, len(optsElems), "expected 1 options element, got %v", len(optsElems))
   322  			opts.SetOrdered(optsDoc.Lookup("ordered").Boolean())
   323  		case "session":
   324  		default:
   325  			mt.Fatalf("unrecognized insertMany option: %v", key)
   326  		}
   327  	}
   328  
   329  	if sess != nil {
   330  		var res *mongo.InsertManyResult
   331  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   332  			var insertErr error
   333  			res, insertErr = mt.Coll.InsertMany(sc, docs, opts)
   334  			return insertErr
   335  		})
   336  		return res, err
   337  	}
   338  	return mt.Coll.InsertMany(context.Background(), docs, opts)
   339  }
   340  
   341  func setFindModifiers(modifiersDoc bson.Raw, opts *options.FindOptions) {
   342  	elems, _ := modifiersDoc.Elements()
   343  	for _, elem := range elems {
   344  		key := elem.Key()
   345  		val := elem.Value()
   346  
   347  		switch key {
   348  		case "$comment":
   349  			opts.SetComment(val.StringValue())
   350  		case "$hint":
   351  			opts.SetHint(val.Document())
   352  		case "$max":
   353  			opts.SetMax(val.Document())
   354  		case "$maxTimeMS":
   355  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   356  		case "$min":
   357  			opts.SetMin(val.Document())
   358  		case "$returnKey":
   359  			opts.SetReturnKey(val.Boolean())
   360  		case "$showDiskLoc":
   361  			opts.SetShowRecordID(val.Boolean())
   362  		}
   363  	}
   364  }
   365  
   366  func executeFind(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Cursor, error) {
   367  	mt.Helper()
   368  
   369  	filter := emptyDoc
   370  	opts := options.Find()
   371  
   372  	elems, _ := args.Elements()
   373  	for _, elem := range elems {
   374  		key := elem.Key()
   375  		val := elem.Value()
   376  
   377  		switch key {
   378  		case "filter":
   379  			filter = val.Document()
   380  		case "sort":
   381  			opts = opts.SetSort(val.Document())
   382  		case "skip":
   383  			opts = opts.SetSkip(numberFromValue(mt, val))
   384  		case "limit":
   385  			opts = opts.SetLimit(numberFromValue(mt, val))
   386  		case "batchSize":
   387  			opts = opts.SetBatchSize(int32(numberFromValue(mt, val)))
   388  		case "collation":
   389  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   390  		case "modifiers":
   391  			setFindModifiers(val.Document(), opts)
   392  		case "allowDiskUse":
   393  			opts = opts.SetAllowDiskUse(val.Boolean())
   394  		case "projection":
   395  			opts = opts.SetProjection(val.Document())
   396  		case "session":
   397  		default:
   398  			mt.Fatalf("unrecognized find option: %v", key)
   399  		}
   400  	}
   401  
   402  	if sess != nil {
   403  		var c *mongo.Cursor
   404  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   405  			var findErr error
   406  			c, findErr = mt.Coll.Find(sc, filter, opts)
   407  			return findErr
   408  		})
   409  		return c, err
   410  	}
   411  	return mt.Coll.Find(context.Background(), filter, opts)
   412  }
   413  
   414  func executeRunCommand(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult {
   415  	mt.Helper()
   416  
   417  	cmd := emptyDoc
   418  	opts := options.RunCmd()
   419  
   420  	elems, _ := args.Elements()
   421  	for _, elem := range elems {
   422  		key := elem.Key()
   423  		val := elem.Value()
   424  
   425  		switch key {
   426  		case "command":
   427  			cmd = val.Document()
   428  		case "readPreference":
   429  			opts.SetReadPreference(createReadPref(val))
   430  		case "session":
   431  		default:
   432  			mt.Fatalf("unrecognized runCommand option: %v", key)
   433  		}
   434  	}
   435  
   436  	if sess != nil {
   437  		var sr *mongo.SingleResult
   438  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   439  			sr = mt.DB.RunCommand(sc, cmd, opts)
   440  			return nil
   441  		})
   442  		return sr
   443  	}
   444  	return mt.DB.RunCommand(context.Background(), cmd, opts)
   445  }
   446  
   447  func executeListCollections(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Cursor, error) {
   448  	mt.Helper()
   449  
   450  	filter := emptyDoc
   451  	elems, _ := args.Elements()
   452  	for _, elem := range elems {
   453  		key := elem.Key()
   454  		val := elem.Value()
   455  
   456  		switch key {
   457  		case "filter":
   458  			filter = val.Document()
   459  		default:
   460  			mt.Fatalf("unrecognized listCollectionNames option: %v", key)
   461  		}
   462  	}
   463  
   464  	if sess != nil {
   465  		var c *mongo.Cursor
   466  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   467  			var lcErr error
   468  			c, lcErr = mt.DB.ListCollections(sc, filter)
   469  			return lcErr
   470  		})
   471  		return c, err
   472  	}
   473  	return mt.DB.ListCollections(context.Background(), filter)
   474  }
   475  
   476  func executeListCollectionNames(mt *mtest.T, sess mongo.Session, args bson.Raw) ([]string, error) {
   477  	mt.Helper()
   478  
   479  	filter := emptyDoc
   480  	elems, _ := args.Elements()
   481  	for _, elem := range elems {
   482  		key := elem.Key()
   483  		val := elem.Value()
   484  
   485  		switch key {
   486  		case "filter":
   487  			filter = val.Document()
   488  		default:
   489  			mt.Fatalf("unrecognized listCollectionNames option: %v", key)
   490  		}
   491  	}
   492  
   493  	if sess != nil {
   494  		var res []string
   495  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   496  			var lcErr error
   497  			res, lcErr = mt.DB.ListCollectionNames(sc, filter)
   498  			return lcErr
   499  		})
   500  		return res, err
   501  	}
   502  	return mt.DB.ListCollectionNames(context.Background(), filter)
   503  }
   504  
   505  func executeListDatabaseNames(mt *mtest.T, sess mongo.Session, args bson.Raw) ([]string, error) {
   506  	mt.Helper()
   507  
   508  	filter := emptyDoc
   509  	elems, _ := args.Elements()
   510  	for _, elem := range elems {
   511  		key := elem.Key()
   512  		val := elem.Value()
   513  
   514  		switch key {
   515  		case "filter":
   516  			filter = val.Document()
   517  		default:
   518  			mt.Fatalf("unrecognized listCollectionNames option: %v", key)
   519  		}
   520  	}
   521  
   522  	if sess != nil {
   523  		var res []string
   524  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   525  			var ldErr error
   526  			res, ldErr = mt.Client.ListDatabaseNames(sc, filter)
   527  			return ldErr
   528  		})
   529  		return res, err
   530  	}
   531  	return mt.Client.ListDatabaseNames(context.Background(), filter)
   532  }
   533  
   534  func executeListDatabases(mt *mtest.T, sess mongo.Session, args bson.Raw) (mongo.ListDatabasesResult, error) {
   535  	mt.Helper()
   536  
   537  	filter := emptyDoc
   538  	elems, _ := args.Elements()
   539  	for _, elem := range elems {
   540  		key := elem.Key()
   541  		val := elem.Value()
   542  
   543  		switch key {
   544  		case "filter":
   545  			filter = val.Document()
   546  		default:
   547  			mt.Fatalf("unrecognized listCollectionNames option: %v", key)
   548  		}
   549  	}
   550  
   551  	if sess != nil {
   552  		var res mongo.ListDatabasesResult
   553  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   554  			var ldErr error
   555  			res, ldErr = mt.Client.ListDatabases(sc, filter)
   556  			return ldErr
   557  		})
   558  		return res, err
   559  	}
   560  	return mt.Client.ListDatabases(context.Background(), filter)
   561  }
   562  
   563  func executeFindOne(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult {
   564  	mt.Helper()
   565  
   566  	filter := emptyDoc
   567  	elems, _ := args.Elements()
   568  	for _, elem := range elems {
   569  		key := elem.Key()
   570  		val := elem.Value()
   571  
   572  		switch key {
   573  		case "filter":
   574  			filter = val.Document()
   575  		default:
   576  			mt.Fatalf("unrecognized findOne option: %v", key)
   577  		}
   578  	}
   579  
   580  	if sess != nil {
   581  		var res *mongo.SingleResult
   582  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   583  			res = mt.Coll.FindOne(sc, filter)
   584  			return nil
   585  		})
   586  		return res
   587  	}
   588  	return mt.Coll.FindOne(context.Background(), filter)
   589  }
   590  
   591  func executeListIndexes(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.Cursor, error) {
   592  	mt.Helper()
   593  
   594  	// no arguments expected. add a Fatal in case arguments are added in the future
   595  	assert.Equal(mt, 0, len(args), "unexpected listIndexes arguments: %v", args)
   596  	if sess != nil {
   597  		var cursor *mongo.Cursor
   598  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   599  			var listErr error
   600  			cursor, listErr = mt.Coll.Indexes().List(sc)
   601  			return listErr
   602  		})
   603  		return cursor, err
   604  	}
   605  	return mt.Coll.Indexes().List(context.Background())
   606  }
   607  
   608  func executeDistinct(mt *mtest.T, sess mongo.Session, args bson.Raw) ([]interface{}, error) {
   609  	mt.Helper()
   610  
   611  	var fieldName string
   612  	filter := emptyDoc
   613  	opts := options.Distinct()
   614  
   615  	elems, _ := args.Elements()
   616  	for _, elem := range elems {
   617  		key := elem.Key()
   618  		val := elem.Value()
   619  
   620  		switch key {
   621  		case "filter":
   622  			filter = val.Document()
   623  		case "fieldName":
   624  			fieldName = val.StringValue()
   625  		case "collation":
   626  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   627  		case "session":
   628  		default:
   629  			mt.Fatalf("unrecognized distinct option: %v", key)
   630  		}
   631  	}
   632  
   633  	if sess != nil {
   634  		var res []interface{}
   635  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   636  			var derr error
   637  			res, derr = mt.Coll.Distinct(sc, fieldName, filter, opts)
   638  			return derr
   639  		})
   640  		return res, err
   641  	}
   642  	return mt.Coll.Distinct(context.Background(), fieldName, filter, opts)
   643  }
   644  
   645  func executeFindOneAndDelete(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult {
   646  	mt.Helper()
   647  
   648  	filter := emptyDoc
   649  	opts := options.FindOneAndDelete()
   650  
   651  	elems, _ := args.Elements()
   652  	for _, elem := range elems {
   653  		key := elem.Key()
   654  		val := elem.Value()
   655  
   656  		switch key {
   657  		case "filter":
   658  			filter = val.Document()
   659  		case "sort":
   660  			opts = opts.SetSort(val.Document())
   661  		case "projection":
   662  			opts = opts.SetProjection(val.Document())
   663  		case "collation":
   664  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   665  		case "hint":
   666  			opts = opts.SetHint(createHint(mt, val))
   667  		case "session":
   668  		default:
   669  			mt.Fatalf("unrecognized findOneAndDelete option: %v", key)
   670  		}
   671  	}
   672  
   673  	if sess != nil {
   674  		var res *mongo.SingleResult
   675  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   676  			res = mt.Coll.FindOneAndDelete(sc, filter, opts)
   677  			return nil
   678  		})
   679  		return res
   680  	}
   681  	return mt.Coll.FindOneAndDelete(context.Background(), filter, opts)
   682  }
   683  
   684  func executeFindOneAndUpdate(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult {
   685  	mt.Helper()
   686  
   687  	filter := emptyDoc
   688  	var update interface{} = emptyDoc
   689  	opts := options.FindOneAndUpdate()
   690  
   691  	elems, _ := args.Elements()
   692  	for _, elem := range elems {
   693  		key := elem.Key()
   694  		val := elem.Value()
   695  
   696  		switch key {
   697  		case "filter":
   698  			filter = val.Document()
   699  		case "update":
   700  			update = createUpdate(mt, val)
   701  		case "arrayFilters":
   702  			opts = opts.SetArrayFilters(options.ArrayFilters{
   703  				Filters: bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...),
   704  			})
   705  		case "sort":
   706  			opts = opts.SetSort(val.Document())
   707  		case "projection":
   708  			opts = opts.SetProjection(val.Document())
   709  		case "upsert":
   710  			opts = opts.SetUpsert(val.Boolean())
   711  		case "returnDocument":
   712  			switch vstr := val.StringValue(); vstr {
   713  			case "After":
   714  				opts = opts.SetReturnDocument(options.After)
   715  			case "Before":
   716  				opts = opts.SetReturnDocument(options.Before)
   717  			default:
   718  				mt.Fatalf("unrecognized returnDocument value: %v", vstr)
   719  			}
   720  		case "collation":
   721  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   722  		case "hint":
   723  			opts = opts.SetHint(createHint(mt, val))
   724  		case "session":
   725  		default:
   726  			mt.Fatalf("unrecognized findOneAndUpdate option: %v", key)
   727  		}
   728  	}
   729  
   730  	if sess != nil {
   731  		var res *mongo.SingleResult
   732  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   733  			res = mt.Coll.FindOneAndUpdate(sc, filter, update, opts)
   734  			return nil
   735  		})
   736  		return res
   737  	}
   738  	return mt.Coll.FindOneAndUpdate(context.Background(), filter, update, opts)
   739  }
   740  
   741  func executeFindOneAndReplace(mt *mtest.T, sess mongo.Session, args bson.Raw) *mongo.SingleResult {
   742  	mt.Helper()
   743  
   744  	filter := emptyDoc
   745  	replacement := emptyDoc
   746  	opts := options.FindOneAndReplace()
   747  
   748  	elems, _ := args.Elements()
   749  	for _, elem := range elems {
   750  		key := elem.Key()
   751  		val := elem.Value()
   752  
   753  		switch key {
   754  		case "filter":
   755  			filter = val.Document()
   756  		case "replacement":
   757  			replacement = val.Document()
   758  		case "sort":
   759  			opts = opts.SetSort(val.Document())
   760  		case "projection":
   761  			opts = opts.SetProjection(val.Document())
   762  		case "upsert":
   763  			opts = opts.SetUpsert(val.Boolean())
   764  		case "returnDocument":
   765  			switch vstr := val.StringValue(); vstr {
   766  			case "After":
   767  				opts = opts.SetReturnDocument(options.After)
   768  			case "Before":
   769  				opts = opts.SetReturnDocument(options.Before)
   770  			default:
   771  				mt.Fatalf("unrecognized returnDocument value: %v", vstr)
   772  			}
   773  		case "collation":
   774  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   775  		case "hint":
   776  			opts = opts.SetHint(createHint(mt, val))
   777  		case "session":
   778  		default:
   779  			mt.Fatalf("unrecognized findOneAndReplace option: %v", key)
   780  		}
   781  	}
   782  
   783  	if sess != nil {
   784  		var res *mongo.SingleResult
   785  		_ = mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   786  			res = mt.Coll.FindOneAndReplace(sc, filter, replacement, opts)
   787  			return nil
   788  		})
   789  		return res
   790  	}
   791  	return mt.Coll.FindOneAndReplace(context.Background(), filter, replacement, opts)
   792  }
   793  
   794  func executeDeleteOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.DeleteResult, error) {
   795  	mt.Helper()
   796  
   797  	filter := emptyDoc
   798  	opts := options.Delete()
   799  
   800  	elems, _ := args.Elements()
   801  	for _, elem := range elems {
   802  		key := elem.Key()
   803  		val := elem.Value()
   804  
   805  		switch key {
   806  		case "filter":
   807  			filter = val.Document()
   808  		case "collation":
   809  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   810  		case "hint":
   811  			opts = opts.SetHint(createHint(mt, val))
   812  		case "session":
   813  		default:
   814  			mt.Fatalf("unrecognized deleteOne option: %v", key)
   815  		}
   816  	}
   817  
   818  	if sess != nil {
   819  		var res *mongo.DeleteResult
   820  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   821  			var derr error
   822  			res, derr = mt.Coll.DeleteOne(sc, filter, opts)
   823  			return derr
   824  		})
   825  		return res, err
   826  	}
   827  	return mt.Coll.DeleteOne(context.Background(), filter, opts)
   828  }
   829  
   830  func executeDeleteMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.DeleteResult, error) {
   831  	mt.Helper()
   832  
   833  	filter := emptyDoc
   834  	opts := options.Delete()
   835  
   836  	elems, _ := args.Elements()
   837  	for _, elem := range elems {
   838  		key := elem.Key()
   839  		val := elem.Value()
   840  
   841  		switch key {
   842  		case "filter":
   843  			filter = val.Document()
   844  		case "collation":
   845  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   846  		case "hint":
   847  			opts = opts.SetHint(createHint(mt, val))
   848  		case "session":
   849  		default:
   850  			mt.Fatalf("unrecognized deleteMany option: %v", key)
   851  		}
   852  	}
   853  
   854  	if sess != nil {
   855  		var res *mongo.DeleteResult
   856  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   857  			var derr error
   858  			res, derr = mt.Coll.DeleteMany(sc, filter, opts)
   859  			return derr
   860  		})
   861  		return res, err
   862  	}
   863  	return mt.Coll.DeleteMany(context.Background(), filter, opts)
   864  }
   865  
   866  func executeUpdateOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.UpdateResult, error) {
   867  	mt.Helper()
   868  
   869  	filter := emptyDoc
   870  	var update interface{} = emptyDoc
   871  	opts := options.Update()
   872  
   873  	elems, _ := args.Elements()
   874  	for _, elem := range elems {
   875  		key := elem.Key()
   876  		val := elem.Value()
   877  
   878  		switch key {
   879  		case "filter":
   880  			filter = val.Document()
   881  		case "update":
   882  			update = createUpdate(mt, val)
   883  		case "arrayFilters":
   884  			opts = opts.SetArrayFilters(options.ArrayFilters{
   885  				Filters: bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...),
   886  			})
   887  		case "upsert":
   888  			opts = opts.SetUpsert(val.Boolean())
   889  		case "collation":
   890  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   891  		case "hint":
   892  			opts = opts.SetHint(createHint(mt, val))
   893  		case "session":
   894  		default:
   895  			mt.Fatalf("unrecognized updateOne option: %v", key)
   896  		}
   897  	}
   898  	if opts.Upsert == nil {
   899  		opts = opts.SetUpsert(false)
   900  	}
   901  
   902  	if sess != nil {
   903  		var res *mongo.UpdateResult
   904  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   905  			var uerr error
   906  			res, uerr = mt.Coll.UpdateOne(sc, filter, update, opts)
   907  			return uerr
   908  		})
   909  		return res, err
   910  	}
   911  	return mt.Coll.UpdateOne(context.Background(), filter, update, opts)
   912  }
   913  
   914  func executeUpdateMany(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.UpdateResult, error) {
   915  	mt.Helper()
   916  
   917  	filter := emptyDoc
   918  	var update interface{} = emptyDoc
   919  	opts := options.Update()
   920  
   921  	elems, _ := args.Elements()
   922  	for _, elem := range elems {
   923  		key := elem.Key()
   924  		val := elem.Value()
   925  
   926  		switch key {
   927  		case "filter":
   928  			filter = val.Document()
   929  		case "update":
   930  			update = createUpdate(mt, val)
   931  		case "arrayFilters":
   932  			opts = opts.SetArrayFilters(options.ArrayFilters{
   933  				Filters: bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...),
   934  			})
   935  		case "upsert":
   936  			opts = opts.SetUpsert(val.Boolean())
   937  		case "collation":
   938  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   939  		case "hint":
   940  			opts = opts.SetHint(createHint(mt, val))
   941  		case "session":
   942  		default:
   943  			mt.Fatalf("unrecognized updateMany option: %v", key)
   944  		}
   945  	}
   946  	if opts.Upsert == nil {
   947  		opts = opts.SetUpsert(false)
   948  	}
   949  
   950  	if sess != nil {
   951  		var res *mongo.UpdateResult
   952  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   953  			var uerr error
   954  			res, uerr = mt.Coll.UpdateMany(sc, filter, update, opts)
   955  			return uerr
   956  		})
   957  		return res, err
   958  	}
   959  	return mt.Coll.UpdateMany(context.Background(), filter, update, opts)
   960  }
   961  
   962  func executeReplaceOne(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.UpdateResult, error) {
   963  	mt.Helper()
   964  
   965  	filter := emptyDoc
   966  	replacement := emptyDoc
   967  	opts := options.Replace()
   968  
   969  	elems, _ := args.Elements()
   970  	for _, elem := range elems {
   971  		key := elem.Key()
   972  		val := elem.Value()
   973  
   974  		switch key {
   975  		case "filter":
   976  			filter = val.Document()
   977  		case "replacement":
   978  			replacement = val.Document()
   979  		case "upsert":
   980  			opts = opts.SetUpsert(val.Boolean())
   981  		case "collation":
   982  			opts = opts.SetCollation(createCollation(mt, val.Document()))
   983  		case "hint":
   984  			opts = opts.SetHint(createHint(mt, val))
   985  		case "session":
   986  		default:
   987  			mt.Fatalf("unrecognized replaceOne option: %v", key)
   988  		}
   989  	}
   990  	if opts.Upsert == nil {
   991  		opts = opts.SetUpsert(false)
   992  	}
   993  
   994  	if sess != nil {
   995  		var res *mongo.UpdateResult
   996  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
   997  			var uerr error
   998  			res, uerr = mt.Coll.ReplaceOne(sc, filter, replacement, opts)
   999  			return uerr
  1000  		})
  1001  		return res, err
  1002  	}
  1003  	return mt.Coll.ReplaceOne(context.Background(), filter, replacement, opts)
  1004  }
  1005  
  1006  type withTransactionArgs struct {
  1007  	Callback *struct {
  1008  		Operations []*operation `bson:"operations"`
  1009  	} `bson:"callback"`
  1010  	Options bson.Raw `bson:"options"`
  1011  }
  1012  
  1013  func runWithTransactionOperations(mt *mtest.T, operations []*operation, sess mongo.Session) error {
  1014  	mt.Helper()
  1015  
  1016  	for _, op := range operations {
  1017  		if op.Name == "count" {
  1018  			mt.Skip("count has been deprecated")
  1019  		}
  1020  
  1021  		// create collection with default read preference Primary (needed to prevent server selection fail)
  1022  		mt.CloneCollection(options.Collection().SetReadPreference(readpref.Primary()).SetReadConcern(readconcern.Local()))
  1023  
  1024  		// execute the command on given object
  1025  		var err error
  1026  		switch op.Object {
  1027  		case "session0":
  1028  			err = executeSessionOperation(mt, op, sess)
  1029  		case "collection":
  1030  			err = executeCollectionOperation(mt, op, sess)
  1031  		default:
  1032  			mt.Fatalf("unrecognized withTransaction operation object: %v", op.Object)
  1033  		}
  1034  		if err != nil {
  1035  			return err
  1036  		}
  1037  	}
  1038  	return nil
  1039  }
  1040  
  1041  func executeWithTransaction(mt *mtest.T, sess mongo.Session, args bson.Raw) error {
  1042  	mt.Helper()
  1043  
  1044  	var testArgs withTransactionArgs
  1045  	err := bson.UnmarshalWithRegistry(specTestRegistry, args, &testArgs)
  1046  	assert.Nil(mt, err, "error creating withTransactionArgs: %v", err)
  1047  	opts := createTransactionOptions(mt, testArgs.Options)
  1048  
  1049  	_, err = sess.WithTransaction(context.Background(), func(sc mongo.SessionContext) (interface{}, error) {
  1050  		err := runWithTransactionOperations(mt, testArgs.Callback.Operations, sess)
  1051  		return nil, err
  1052  	}, opts)
  1053  	return err
  1054  }
  1055  
  1056  func executeBulkWrite(mt *mtest.T, sess mongo.Session, args bson.Raw) (*mongo.BulkWriteResult, error) {
  1057  	mt.Helper()
  1058  
  1059  	models := createBulkWriteModels(mt, args.Lookup("requests").Array())
  1060  	opts := options.BulkWrite()
  1061  
  1062  	rawOpts, err := args.LookupErr("options")
  1063  	if err == nil {
  1064  		elems, _ := rawOpts.Document().Elements()
  1065  		for _, elem := range elems {
  1066  			name := elem.Key()
  1067  			opt := elem.Value()
  1068  
  1069  			switch name {
  1070  			case "ordered":
  1071  				opts.SetOrdered(opt.Boolean())
  1072  			default:
  1073  				mt.Fatalf("unrecognized bulk write option: %v", name)
  1074  			}
  1075  		}
  1076  	}
  1077  
  1078  	if sess != nil {
  1079  		var res *mongo.BulkWriteResult
  1080  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
  1081  			var bwerr error
  1082  			res, bwerr = mt.Coll.BulkWrite(sc, models, opts)
  1083  			return bwerr
  1084  		})
  1085  		return res, err
  1086  	}
  1087  	return mt.Coll.BulkWrite(context.Background(), models, opts)
  1088  }
  1089  
  1090  func createBulkWriteModels(mt *mtest.T, rawModels bson.Raw) []mongo.WriteModel {
  1091  	vals, _ := rawModels.Values()
  1092  	models := make([]mongo.WriteModel, len(vals))
  1093  
  1094  	for i, val := range vals {
  1095  		models[i] = createBulkWriteModel(mt, val.Document())
  1096  	}
  1097  	return models
  1098  }
  1099  
  1100  func createBulkWriteModel(mt *mtest.T, rawModel bson.Raw) mongo.WriteModel {
  1101  	name := rawModel.Lookup("name").StringValue()
  1102  	args := rawModel.Lookup("arguments").Document()
  1103  
  1104  	switch name {
  1105  	case "insertOne":
  1106  		return mongo.NewInsertOneModel().SetDocument(args.Lookup("document").Document())
  1107  	case "updateOne":
  1108  		uom := mongo.NewUpdateOneModel()
  1109  		uom.SetFilter(args.Lookup("filter").Document())
  1110  		uom.SetUpdate(createUpdate(mt, args.Lookup("update")))
  1111  		if upsert, err := args.LookupErr("upsert"); err == nil {
  1112  			uom.SetUpsert(upsert.Boolean())
  1113  		}
  1114  		if collation, err := args.LookupErr("collation"); err == nil {
  1115  			uom.SetCollation(createCollation(mt, collation.Document()))
  1116  		}
  1117  		if arrayFilters, err := args.LookupErr("arrayFilters"); err == nil {
  1118  			uom.SetArrayFilters(options.ArrayFilters{
  1119  				Filters: bsonutil.RawToInterfaces(bsonutil.RawToDocuments(arrayFilters.Array())...),
  1120  			})
  1121  		}
  1122  		if hintVal, err := args.LookupErr("hint"); err == nil {
  1123  			uom.SetHint(createHint(mt, hintVal))
  1124  		}
  1125  		if uom.Upsert == nil {
  1126  			uom.SetUpsert(false)
  1127  		}
  1128  
  1129  		return uom
  1130  	case "updateMany":
  1131  		umm := mongo.NewUpdateManyModel()
  1132  		umm.SetFilter(args.Lookup("filter").Document())
  1133  		umm.SetUpdate(createUpdate(mt, args.Lookup("update")))
  1134  		if upsert, err := args.LookupErr("upsert"); err == nil {
  1135  			umm.SetUpsert(upsert.Boolean())
  1136  		}
  1137  		if collation, err := args.LookupErr("collation"); err == nil {
  1138  			umm.SetCollation(createCollation(mt, collation.Document()))
  1139  		}
  1140  		if arrayFilters, err := args.LookupErr("arrayFilters"); err == nil {
  1141  			umm.SetArrayFilters(options.ArrayFilters{
  1142  				Filters: bsonutil.RawToInterfaces(bsonutil.RawToDocuments(arrayFilters.Array())...),
  1143  			})
  1144  		}
  1145  		if hintVal, err := args.LookupErr("hint"); err == nil {
  1146  			umm.SetHint(createHint(mt, hintVal))
  1147  		}
  1148  		if umm.Upsert == nil {
  1149  			umm.SetUpsert(false)
  1150  		}
  1151  
  1152  		return umm
  1153  	case "deleteOne":
  1154  		dom := mongo.NewDeleteOneModel()
  1155  		dom.SetFilter(args.Lookup("filter").Document())
  1156  		if collation, err := args.LookupErr("collation"); err == nil {
  1157  			dom.SetCollation(createCollation(mt, collation.Document()))
  1158  		}
  1159  		if hint, err := args.LookupErr("hint"); err == nil {
  1160  			dom.SetHint(createHint(mt, hint))
  1161  		}
  1162  
  1163  		return dom
  1164  	case "deleteMany":
  1165  		dmm := mongo.NewDeleteManyModel()
  1166  		dmm.SetFilter(args.Lookup("filter").Document())
  1167  		if collation, err := args.LookupErr("collation"); err == nil {
  1168  			dmm.SetCollation(createCollation(mt, collation.Document()))
  1169  		}
  1170  		if hint, err := args.LookupErr("hint"); err == nil {
  1171  			dmm.SetHint(createHint(mt, hint))
  1172  		}
  1173  
  1174  		return dmm
  1175  	case "replaceOne":
  1176  		rom := mongo.NewReplaceOneModel()
  1177  		rom.SetFilter(args.Lookup("filter").Document())
  1178  		rom.SetReplacement(args.Lookup("replacement").Document())
  1179  		if upsert, err := args.LookupErr("upsert"); err == nil {
  1180  			rom.SetUpsert(upsert.Boolean())
  1181  		}
  1182  		if collation, err := args.LookupErr("collation"); err == nil {
  1183  			rom.SetCollation(createCollation(mt, collation.Document()))
  1184  		}
  1185  		if hintVal, err := args.LookupErr("hint"); err == nil {
  1186  			rom.SetHint(createHint(mt, hintVal))
  1187  		}
  1188  		if rom.Upsert == nil {
  1189  			rom.SetUpsert(false)
  1190  		}
  1191  
  1192  		return rom
  1193  	default:
  1194  		mt.Fatalf("unrecognized model type: %v", name)
  1195  	}
  1196  
  1197  	return nil
  1198  }
  1199  
  1200  func executeEstimatedDocumentCount(mt *mtest.T, sess mongo.Session, args bson.Raw) (int64, error) {
  1201  	mt.Helper()
  1202  
  1203  	// no arguments expected. add a Fatal in case arguments are added in the future
  1204  	elems, _ := args.Elements()
  1205  	assert.Equal(mt, 0, len(elems), "unexpected estimatedDocumentCount arguments %v", args)
  1206  
  1207  	if sess != nil {
  1208  		var res int64
  1209  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
  1210  			var countErr error
  1211  			res, countErr = mt.Coll.EstimatedDocumentCount(sc)
  1212  			return countErr
  1213  		})
  1214  		return res, err
  1215  	}
  1216  	return mt.Coll.EstimatedDocumentCount(context.Background())
  1217  }
  1218  
  1219  func executeGridFSDownload(mt *mtest.T, bucket *gridfs.Bucket, args bson.Raw) (int64, error) {
  1220  	mt.Helper()
  1221  
  1222  	var fileID primitive.ObjectID
  1223  	elems, _ := args.Elements()
  1224  	for _, elem := range elems {
  1225  		key := elem.Key()
  1226  		val := elem.Value()
  1227  
  1228  		switch key {
  1229  		case "id":
  1230  			fileID = val.ObjectID()
  1231  		default:
  1232  			mt.Fatalf("unrecognized download option: %v", key)
  1233  		}
  1234  	}
  1235  
  1236  	return bucket.DownloadToStream(fileID, new(bytes.Buffer))
  1237  }
  1238  
  1239  func executeGridFSDownloadByName(mt *mtest.T, bucket *gridfs.Bucket, args bson.Raw) (int64, error) {
  1240  	mt.Helper()
  1241  
  1242  	var file string
  1243  	elems, _ := args.Elements()
  1244  	for _, elem := range elems {
  1245  		key := elem.Key()
  1246  		val := elem.Value()
  1247  
  1248  		switch key {
  1249  		case "filename":
  1250  			file = val.StringValue()
  1251  		default:
  1252  			mt.Fatalf("unrecognized download by name option: %v", key)
  1253  		}
  1254  	}
  1255  
  1256  	return bucket.DownloadToStreamByName(file, new(bytes.Buffer))
  1257  }
  1258  
  1259  func executeCreateIndex(mt *mtest.T, sess mongo.Session, args bson.Raw) (string, error) {
  1260  	mt.Helper()
  1261  
  1262  	model := mongo.IndexModel{
  1263  		Options: options.Index(),
  1264  	}
  1265  	elems, _ := args.Elements()
  1266  	for _, elem := range elems {
  1267  		key := elem.Key()
  1268  		val := elem.Value()
  1269  
  1270  		switch key {
  1271  		case "keys":
  1272  			model.Keys = val.Document()
  1273  		case "name":
  1274  			model.Options.SetName(val.StringValue())
  1275  		case "session":
  1276  		default:
  1277  			mt.Fatalf("unrecognized createIndex option %v", key)
  1278  		}
  1279  	}
  1280  
  1281  	if sess != nil {
  1282  		var indexName string
  1283  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
  1284  			var indexErr error
  1285  			indexName, indexErr = mt.Coll.Indexes().CreateOne(sc, model)
  1286  			return indexErr
  1287  		})
  1288  		return indexName, err
  1289  	}
  1290  	return mt.Coll.Indexes().CreateOne(context.Background(), model)
  1291  }
  1292  
  1293  func executeDropIndex(mt *mtest.T, sess mongo.Session, args bson.Raw) (bson.Raw, error) {
  1294  	mt.Helper()
  1295  
  1296  	var name string
  1297  	elems, _ := args.Elements()
  1298  	for _, elem := range elems {
  1299  		key := elem.Key()
  1300  		val := elem.Value()
  1301  
  1302  		switch key {
  1303  		case "name":
  1304  			name = val.StringValue()
  1305  		default:
  1306  			mt.Fatalf("unrecognized dropIndex option %v", key)
  1307  		}
  1308  	}
  1309  
  1310  	if sess != nil {
  1311  		var res bson.Raw
  1312  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
  1313  			var indexErr error
  1314  			res, indexErr = mt.Coll.Indexes().DropOne(sc, name)
  1315  			return indexErr
  1316  		})
  1317  		return res, err
  1318  	}
  1319  	return mt.Coll.Indexes().DropOne(context.Background(), name)
  1320  }
  1321  
  1322  func executeDropCollection(mt *mtest.T, sess mongo.Session, args bson.Raw) error {
  1323  	mt.Helper()
  1324  
  1325  	var collName string
  1326  	elems, _ := args.Elements()
  1327  	for _, elem := range elems {
  1328  		key := elem.Key()
  1329  		val := elem.Value()
  1330  
  1331  		switch key {
  1332  		case "encryptedFields":
  1333  			mt.Fatalf("unsupported field: encryptedFields")
  1334  		case "collection":
  1335  			collName = val.StringValue()
  1336  		default:
  1337  			mt.Fatalf("unrecognized dropCollection option %v", key)
  1338  		}
  1339  	}
  1340  
  1341  	coll := mt.DB.Collection(collName)
  1342  	if sess != nil {
  1343  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
  1344  			return coll.Drop(sc)
  1345  		})
  1346  		return err
  1347  	}
  1348  	return coll.Drop(context.Background())
  1349  }
  1350  
  1351  func executeCreateCollection(mt *mtest.T, sess mongo.Session, args bson.Raw) error {
  1352  	mt.Helper()
  1353  
  1354  	cco := options.CreateCollection()
  1355  
  1356  	var collName string
  1357  	elems, _ := args.Elements()
  1358  	for _, elem := range elems {
  1359  		key := elem.Key()
  1360  		val := elem.Value()
  1361  
  1362  		switch key {
  1363  		case "encryptedFields":
  1364  			cco.SetEncryptedFields(val.Document())
  1365  		case "collection":
  1366  			collName = val.StringValue()
  1367  		case "validator":
  1368  			cco.SetValidator(val.Document())
  1369  		case "session":
  1370  		default:
  1371  			mt.Fatalf("unrecognized createCollection option %v", key)
  1372  		}
  1373  	}
  1374  
  1375  	if sess != nil {
  1376  		err := mongo.WithSession(context.Background(), sess, func(sc mongo.SessionContext) error {
  1377  			return mt.DB.CreateCollection(sc, collName, cco)
  1378  		})
  1379  		return err
  1380  	}
  1381  	return mt.DB.CreateCollection(context.Background(), collName, cco)
  1382  }
  1383  
  1384  func executeAdminCommand(mt *mtest.T, op *operation) {
  1385  	// Per the streamable hello test format description, a separate client must be used to execute this operation.
  1386  	clientOpts := options.Client().ApplyURI(mtest.ClusterURI())
  1387  	integtest.AddTestServerAPIVersion(clientOpts)
  1388  	client, err := mongo.Connect(context.Background(), clientOpts)
  1389  	assert.Nil(mt, err, "Connect error: %v", err)
  1390  	defer func() {
  1391  		_ = client.Disconnect(context.Background())
  1392  	}()
  1393  
  1394  	cmd := op.Arguments.Lookup("command").Document()
  1395  	if op.CommandName == "replSetStepDown" {
  1396  		// replSetStepDown can fail with transient errors, so we use executeAdminCommandWithRetry to handle them and
  1397  		// retry until a timeout is hit.
  1398  		executeAdminCommandWithRetry(mt, client, cmd)
  1399  		return
  1400  	}
  1401  
  1402  	rco := options.RunCmd()
  1403  	rpVal, err := op.Arguments.LookupErr("readPreference")
  1404  	if err == nil {
  1405  		var temp unified.ReadPreference
  1406  		err = bson.Unmarshal(rpVal.Document(), &temp)
  1407  		assert.Nil(mt, err, "error unmarshalling readPreference option: %v", err)
  1408  
  1409  		rp, err := temp.ToReadPrefOption()
  1410  		assert.Nil(mt, err, "error creating readpref.ReadPref object: %v", err)
  1411  		rco.SetReadPreference(rp)
  1412  	}
  1413  
  1414  	db := client.Database("admin")
  1415  	err = db.RunCommand(context.Background(), cmd, rco).Err()
  1416  	assert.Nil(mt, err, "RunCommand error for command %q: %v", op.CommandName, err)
  1417  }
  1418  
  1419  func executeAdminCommandWithRetry(mt *mtest.T, client *mongo.Client, cmd interface{}, opts ...*options.RunCmdOptions) {
  1420  	mt.Helper()
  1421  
  1422  	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
  1423  	defer cancel()
  1424  
  1425  	for {
  1426  		err := client.Database("admin").RunCommand(ctx, cmd, opts...).Err()
  1427  		if err == nil {
  1428  			return
  1429  		}
  1430  
  1431  		if ce, ok := err.(mongo.CommandError); ok && ce.Code == errorLockTimeout {
  1432  			continue
  1433  		}
  1434  		mt.Fatalf("error executing command: %v", err)
  1435  	}
  1436  }
  1437  
  1438  // verification function to use for all count operations
  1439  func verifyCountResult(mt *mtest.T, actualResult int64, expectedResult interface{}) {
  1440  	mt.Helper()
  1441  	if expectedResult == nil {
  1442  		return
  1443  	}
  1444  
  1445  	expected := getIntFromInterface(expectedResult)
  1446  	assert.NotNil(mt, expected, "unexpected type for estimatedDocumentCount result: %T", expectedResult)
  1447  	assert.Equal(mt, *expected, actualResult, "count mismatch; expected %v, got %v", *expected, actualResult)
  1448  }
  1449  
  1450  func verifyBulkWriteResult(mt *mtest.T, actualResult *mongo.BulkWriteResult, expectedResult interface{}) {
  1451  	mt.Helper()
  1452  
  1453  	if expectedResult == nil {
  1454  		return
  1455  	}
  1456  
  1457  	var expected struct {
  1458  		InsertedCount int64                  `bson:"insertedCount"`
  1459  		MatchedCount  int64                  `bson:"matchedCount"`
  1460  		ModifiedCount int64                  `bson:"modifiedCount"`
  1461  		DeletedCount  int64                  `bson:"deletedCount"`
  1462  		UpsertedCount int64                  `bson:"upsertedCount"`
  1463  		UpsertedIDs   map[string]interface{} `bson:"upsertedIds"`
  1464  	}
  1465  	err := bson.Unmarshal(expectedResult.(bson.Raw), &expected)
  1466  	assert.Nil(mt, err, "error creating BulkWriteResult: %v", err)
  1467  
  1468  	assert.Equal(mt, expected.InsertedCount, actualResult.InsertedCount,
  1469  		"InsertedCount mismatch; expected %v, got %v", expected.InsertedCount, actualResult.InsertedCount)
  1470  	assert.Equal(mt, expected.MatchedCount, actualResult.MatchedCount,
  1471  		"MatchedCount mismatch; expected %v, got %v", expected.MatchedCount, actualResult.MatchedCount)
  1472  	assert.Equal(mt, expected.ModifiedCount, actualResult.ModifiedCount,
  1473  		"ModifiedCount mismatch; expected %v, got %v", expected.ModifiedCount, actualResult.ModifiedCount)
  1474  	assert.Equal(mt, expected.DeletedCount, actualResult.DeletedCount,
  1475  		"DeletedCount mismatch; expected %v, got %v", expected.DeletedCount, actualResult.DeletedCount)
  1476  	assert.Equal(mt, expected.UpsertedCount, actualResult.UpsertedCount,
  1477  		"UpsertedCount mismatch; expected %v, got %v", expected.UpsertedCount, actualResult.UpsertedCount)
  1478  
  1479  	for idxStr, expectedID := range expected.UpsertedIDs {
  1480  		idx, err := strconv.Atoi(idxStr)
  1481  		assert.Nil(mt, err, "error converted index %v to int", idxStr)
  1482  
  1483  		actualID, ok := actualResult.UpsertedIDs[int64(idx)]
  1484  		assert.True(mt, ok, "operation index %v not found in actual upserted IDs map", idx)
  1485  		assert.Equal(mt, expectedID, actualID,
  1486  			"upserted ID mismatch for key %v; expected %v, got %v", idx, expectedID, actualID)
  1487  	}
  1488  }
  1489  
  1490  func verifyUpdateResult(mt *mtest.T, res *mongo.UpdateResult, result interface{}) {
  1491  	mt.Helper()
  1492  
  1493  	if result == nil {
  1494  		return
  1495  	}
  1496  
  1497  	var expected struct {
  1498  		MatchedCount  int64 `bson:"matchedCount"`
  1499  		ModifiedCount int64 `bson:"modifiedCount"`
  1500  		UpsertedCount int64 `bson:"upsertedCount"`
  1501  	}
  1502  	err := bson.Unmarshal(result.(bson.Raw), &expected)
  1503  	assert.Nil(mt, err, "error creating UpdateResult: %v", err)
  1504  
  1505  	assert.Equal(mt, expected.MatchedCount, res.MatchedCount,
  1506  		"matched count mismatch; expected %v, got %v", expected.MatchedCount, res.MatchedCount)
  1507  	assert.Equal(mt, expected.ModifiedCount, res.ModifiedCount,
  1508  		"modified count mismatch; expected %v, got %v", expected.ModifiedCount, res.ModifiedCount)
  1509  
  1510  	actualUpsertedCount := int64(0)
  1511  	if res.UpsertedID != nil {
  1512  		actualUpsertedCount = 1
  1513  	}
  1514  	assert.Equal(mt, expected.UpsertedCount, actualUpsertedCount,
  1515  		"upserted count mismatch; expected %v, got %v", expected.UpsertedCount, actualUpsertedCount)
  1516  }
  1517  
  1518  func verifyDeleteResult(mt *mtest.T, res *mongo.DeleteResult, result interface{}) {
  1519  	mt.Helper()
  1520  
  1521  	if result == nil {
  1522  		return
  1523  	}
  1524  
  1525  	var expected struct {
  1526  		DeletedCount int64 `bson:"deletedCount"`
  1527  	}
  1528  	err := bson.Unmarshal(result.(bson.Raw), &expected)
  1529  	assert.Nil(mt, err, "error creating Delete result: %v", err)
  1530  	assert.Equal(mt, expected.DeletedCount, res.DeletedCount,
  1531  		"deleted count mismatch; expected %v, got %v", expected.DeletedCount, res.DeletedCount)
  1532  }
  1533  
  1534  func verifyDistinctResult(mt *mtest.T, actualResult []interface{}, expectedResult interface{}) {
  1535  	mt.Helper()
  1536  
  1537  	if expectedResult == nil {
  1538  		return
  1539  	}
  1540  
  1541  	for i, expected := range expectedResult.(bson.A) {
  1542  		actual := actualResult[i]
  1543  		iExpected := getIntFromInterface(expected)
  1544  		iActual := getIntFromInterface(actual)
  1545  
  1546  		if iExpected != nil {
  1547  			assert.NotNil(mt, iActual, "expected nil but got %v", iActual)
  1548  			assert.Equal(mt, *iExpected, *iActual, "expected value %v but got %v", *iExpected, *iActual)
  1549  			continue
  1550  		}
  1551  
  1552  		assert.Equal(mt, expected, actual, "expected value %v but got %v", expected, actual)
  1553  	}
  1554  }
  1555  
  1556  func verifyInsertOneResult(mt *mtest.T, actualResult *mongo.InsertOneResult, expectedResult interface{}) {
  1557  	mt.Helper()
  1558  
  1559  	if expectedResult == nil {
  1560  		return
  1561  	}
  1562  
  1563  	var expected mongo.InsertOneResult
  1564  	err := bson.Unmarshal(expectedResult.(bson.Raw), &expected)
  1565  	assert.Nil(mt, err, "error creating InsertOne result: %v", err)
  1566  
  1567  	expectedID := expected.InsertedID
  1568  	if f, ok := expectedID.(float64); ok && f == math.Floor(f) {
  1569  		expectedID = int32(f)
  1570  	}
  1571  
  1572  	if expectedID != nil {
  1573  		assert.NotNil(mt, actualResult, "expected result but got nil")
  1574  		assert.Equal(mt, expectedID, actualResult.InsertedID,
  1575  			"inserted ID mismatch; expected %v, got %v", expectedID, actualResult.InsertedID)
  1576  	}
  1577  }
  1578  
  1579  func verifyInsertManyResult(mt *mtest.T, actualResult *mongo.InsertManyResult, expectedResult interface{}) {
  1580  	mt.Helper()
  1581  
  1582  	if expectedResult == nil {
  1583  		return
  1584  	}
  1585  
  1586  	assert.NotNil(mt, actualResult, "expected InsertMany result %v but got nil", expectedResult)
  1587  	var expected struct{ InsertedIds map[string]interface{} }
  1588  	err := bson.Unmarshal(expectedResult.(bson.Raw), &expected)
  1589  	assert.Nil(mt, err, "error creating expected InsertMany result: %v", err)
  1590  
  1591  	for _, val := range expected.InsertedIds {
  1592  		var found bool
  1593  		for _, inserted := range actualResult.InsertedIDs {
  1594  			if val == inserted {
  1595  				found = true
  1596  				break
  1597  			}
  1598  		}
  1599  
  1600  		assert.True(mt, found, "expected to find ID %v in %v", val, actualResult.InsertedIDs)
  1601  	}
  1602  }
  1603  
  1604  func verifyListDatabasesResult(mt *mtest.T, actualResult mongo.ListDatabasesResult, expectedResult interface{}) {
  1605  	mt.Helper()
  1606  
  1607  	if expectedResult == nil {
  1608  		return
  1609  	}
  1610  
  1611  	var expected mongo.ListDatabasesResult
  1612  	err := bson.Unmarshal(expectedResult.(bson.Raw), &expected)
  1613  	assert.Nil(mt, err, "error creating ListDatabasesResult result: %v", err)
  1614  
  1615  	assert.Equal(mt, expected, actualResult, "ListDatabasesResult mismatch; expected %v, got %v", expected, actualResult)
  1616  }
  1617  
  1618  func verifyCursorResult(mt *mtest.T, cur *mongo.Cursor, result interface{}) {
  1619  	mt.Helper()
  1620  
  1621  	// The Atlas Data Lake tests expect a getMore to be sent even though the operation does not have a Result field.
  1622  	// To account for this, we fetch all documents via cursor.All and then compare them to the result if it's non-nil.
  1623  	assert.NotNil(mt, cur, "expected cursor to not be nil")
  1624  	var actual []bson.Raw
  1625  	err := cur.All(context.Background(), &actual)
  1626  	assert.Nil(mt, err, "All error: %v", err)
  1627  
  1628  	if result == nil {
  1629  		return
  1630  	}
  1631  
  1632  	resultsArray := result.(bson.A)
  1633  	assert.Equal(mt, len(resultsArray), len(actual), "expected %d documents from cursor, got %d", len(resultsArray),
  1634  		len(actual))
  1635  	for i, expected := range resultsArray {
  1636  		err := compareDocs(mt, expected.(bson.Raw), actual[i])
  1637  		assert.Nil(mt, err, "cursor document mismatch at index %d: %v", i, err)
  1638  	}
  1639  }
  1640  
  1641  func verifySingleResult(mt *mtest.T, actualResult *mongo.SingleResult, expectedResult interface{}) {
  1642  	mt.Helper()
  1643  
  1644  	if expectedResult == nil {
  1645  		return
  1646  	}
  1647  
  1648  	expected := expectedResult.(bson.Raw)
  1649  	actual, _ := actualResult.Raw()
  1650  	if err := compareDocs(mt, expected, actual); err != nil {
  1651  		mt.Fatalf("SingleResult document mismatch: %s", err)
  1652  	}
  1653  }
  1654  

View as plain text