...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified/collection_operation_execution.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/unified

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package unified
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"time"
    14  
    15  	"go.mongodb.org/mongo-driver/bson"
    16  	"go.mongodb.org/mongo-driver/bson/bsontype"
    17  	"go.mongodb.org/mongo-driver/internal/bsonutil"
    18  	"go.mongodb.org/mongo-driver/mongo"
    19  	"go.mongodb.org/mongo-driver/mongo/options"
    20  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    21  )
    22  
    23  // This file contains helpers to execute collection operations.
    24  
    25  func executeAggregate(ctx context.Context, operation *operation) (*operationResult, error) {
    26  	var aggregator interface {
    27  		Aggregate(context.Context, interface{}, ...*options.AggregateOptions) (*mongo.Cursor, error)
    28  	}
    29  	var err error
    30  
    31  	aggregator, err = entities(ctx).collection(operation.Object)
    32  	if err != nil {
    33  		aggregator, err = entities(ctx).database(operation.Object)
    34  	}
    35  	if err != nil {
    36  		return nil, fmt.Errorf("no database or collection entity found with ID %q", operation.Object)
    37  	}
    38  
    39  	var pipeline []interface{}
    40  	opts := options.Aggregate()
    41  
    42  	elems, err := operation.Arguments.Elements()
    43  	if err != nil {
    44  		return nil, err
    45  	}
    46  	for _, elem := range elems {
    47  		key := elem.Key()
    48  		val := elem.Value()
    49  
    50  		switch key {
    51  		case "allowDiskUse":
    52  			opts.SetAllowDiskUse(val.Boolean())
    53  		case "batchSize":
    54  			opts.SetBatchSize(val.Int32())
    55  		case "bypassDocumentValidation":
    56  			opts.SetBypassDocumentValidation(val.Boolean())
    57  		case "collation":
    58  			collation, err := createCollation(val.Document())
    59  			if err != nil {
    60  				return nil, fmt.Errorf("error creating collation: %w", err)
    61  			}
    62  			opts.SetCollation(collation)
    63  		case "comment":
    64  			// TODO(GODRIVER-2386): when document support for comments is added, we can replace this switch condition
    65  			// TODO with `opts.SetComment(val)`
    66  			commentString, err := createCommentString(val)
    67  			if err != nil {
    68  				return nil, fmt.Errorf("error creating comment: %w", err)
    69  			}
    70  			opts.SetComment(commentString)
    71  		case "hint":
    72  			hint, err := createHint(val)
    73  			if err != nil {
    74  				return nil, fmt.Errorf("error creating hint: %w", err)
    75  			}
    76  			opts.SetHint(hint)
    77  		case "maxTimeMS":
    78  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
    79  		case "maxAwaitTimeMS":
    80  			opts.SetMaxAwaitTime(time.Duration(val.Int32()) * time.Millisecond)
    81  		case "pipeline":
    82  			pipeline = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
    83  		case "let":
    84  			opts.SetLet(val.Document())
    85  		default:
    86  			return nil, fmt.Errorf("unrecognized aggregate option %q", key)
    87  		}
    88  	}
    89  	if pipeline == nil {
    90  		return nil, newMissingArgumentError("pipeline")
    91  	}
    92  
    93  	cursor, err := aggregator.Aggregate(ctx, pipeline, opts)
    94  	if err != nil {
    95  		return newErrorResult(err), nil
    96  	}
    97  	defer cursor.Close(ctx)
    98  
    99  	var docs []bson.Raw
   100  	if err := cursor.All(ctx, &docs); err != nil {
   101  		return newErrorResult(err), nil
   102  	}
   103  	return newCursorResult(docs), nil
   104  }
   105  
   106  func executeBulkWrite(ctx context.Context, operation *operation) (*operationResult, error) {
   107  	coll, err := entities(ctx).collection(operation.Object)
   108  	if err != nil {
   109  		return nil, err
   110  	}
   111  
   112  	var models []mongo.WriteModel
   113  	opts := options.BulkWrite()
   114  
   115  	elems, err := operation.Arguments.Elements()
   116  	if err != nil {
   117  		return nil, err
   118  	}
   119  	for _, elem := range elems {
   120  		key := elem.Key()
   121  		val := elem.Value()
   122  
   123  		switch key {
   124  		case "comment":
   125  			opts.SetComment(val)
   126  		case "ordered":
   127  			opts.SetOrdered(val.Boolean())
   128  		case "requests":
   129  			models, err = createBulkWriteModels(val.Array())
   130  			if err != nil {
   131  				return nil, fmt.Errorf("error creating write models: %w", err)
   132  			}
   133  		case "let":
   134  			opts.SetLet(val.Document())
   135  		default:
   136  			return nil, fmt.Errorf("unrecognized bulkWrite option %q", key)
   137  		}
   138  	}
   139  	if models == nil {
   140  		return nil, newMissingArgumentError("requests")
   141  	}
   142  
   143  	res, err := coll.BulkWrite(ctx, models, opts)
   144  	raw := emptyCoreDocument
   145  	if res != nil {
   146  		rawUpsertedIDs := emptyDocument
   147  		var marshalErr error
   148  		if res.UpsertedIDs != nil {
   149  			rawUpsertedIDs, marshalErr = bson.Marshal(res.UpsertedIDs)
   150  			if marshalErr != nil {
   151  				return nil, fmt.Errorf("error marshalling UpsertedIDs map to BSON: %w", marshalErr)
   152  			}
   153  		}
   154  
   155  		raw = bsoncore.NewDocumentBuilder().
   156  			AppendInt64("insertedCount", res.InsertedCount).
   157  			AppendInt64("deletedCount", res.DeletedCount).
   158  			AppendInt64("matchedCount", res.MatchedCount).
   159  			AppendInt64("modifiedCount", res.ModifiedCount).
   160  			AppendInt64("upsertedCount", res.UpsertedCount).
   161  			AppendDocument("upsertedIds", rawUpsertedIDs).
   162  			Build()
   163  	}
   164  	return newDocumentResult(raw, err), nil
   165  }
   166  
   167  func executeCountDocuments(ctx context.Context, operation *operation) (*operationResult, error) {
   168  	coll, err := entities(ctx).collection(operation.Object)
   169  	if err != nil {
   170  		return nil, err
   171  	}
   172  
   173  	var filter bson.Raw
   174  	opts := options.Count()
   175  
   176  	elems, err := operation.Arguments.Elements()
   177  	if err != nil {
   178  		return nil, err
   179  	}
   180  	for _, elem := range elems {
   181  		key := elem.Key()
   182  		val := elem.Value()
   183  
   184  		switch key {
   185  		case "collation":
   186  			collation, err := createCollation(val.Document())
   187  			if err != nil {
   188  				return nil, fmt.Errorf("error creating collation: %w", err)
   189  			}
   190  			opts.SetCollation(collation)
   191  		case "comment":
   192  			// TODO(GODRIVER-2386): when document support for comments is added, we can replace this switch condition
   193  			// TODO with `opts.SetComment(val)`
   194  			commentString, err := createCommentString(val)
   195  			if err != nil {
   196  				return nil, fmt.Errorf("error creating comment: %w", err)
   197  			}
   198  			opts.SetComment(commentString)
   199  		case "filter":
   200  			filter = val.Document()
   201  		case "hint":
   202  			hint, err := createHint(val)
   203  			if err != nil {
   204  				return nil, fmt.Errorf("error creating hint: %w", err)
   205  			}
   206  			opts.SetHint(hint)
   207  		case "limit":
   208  			opts.SetLimit(val.Int64())
   209  		case "maxTimeMS":
   210  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   211  		case "skip":
   212  			opts.SetSkip(int64(val.Int32()))
   213  		default:
   214  			return nil, fmt.Errorf("unrecognized countDocuments option %q", key)
   215  		}
   216  	}
   217  	if filter == nil {
   218  		return nil, newMissingArgumentError("filter")
   219  	}
   220  
   221  	count, err := coll.CountDocuments(ctx, filter, opts)
   222  	if err != nil {
   223  		return newErrorResult(err), nil
   224  	}
   225  	return newValueResult(bsontype.Int64, bsoncore.AppendInt64(nil, count), nil), nil
   226  }
   227  
   228  func executeCreateIndex(ctx context.Context, operation *operation) (*operationResult, error) {
   229  	coll, err := entities(ctx).collection(operation.Object)
   230  	if err != nil {
   231  		return nil, err
   232  	}
   233  
   234  	var keys bson.Raw
   235  	indexOpts := options.Index()
   236  
   237  	elems, err := operation.Arguments.Elements()
   238  	if err != nil {
   239  		return nil, err
   240  	}
   241  	for _, elem := range elems {
   242  		key := elem.Key()
   243  		val := elem.Value()
   244  
   245  		switch key {
   246  		case "2dsphereIndexVersion":
   247  			indexOpts.SetSphereVersion(val.Int32())
   248  		case "background":
   249  			indexOpts.SetBackground(val.Boolean())
   250  		case "bits":
   251  			indexOpts.SetBits(val.Int32())
   252  		case "bucketSize":
   253  			indexOpts.SetBucketSize(val.Int32())
   254  		case "collation":
   255  			collation, err := createCollation(val.Document())
   256  			if err != nil {
   257  				return nil, fmt.Errorf("error creating collation: %w", err)
   258  			}
   259  			indexOpts.SetCollation(collation)
   260  		case "defaultLanguage":
   261  			indexOpts.SetDefaultLanguage(val.StringValue())
   262  		case "expireAfterSeconds":
   263  			indexOpts.SetExpireAfterSeconds(val.Int32())
   264  		case "hidden":
   265  			indexOpts.SetHidden(val.Boolean())
   266  		case "keys":
   267  			keys = val.Document()
   268  		case "languageOverride":
   269  			indexOpts.SetLanguageOverride(val.StringValue())
   270  		case "max":
   271  			indexOpts.SetMax(val.Double())
   272  		case "min":
   273  			indexOpts.SetMin(val.Double())
   274  		case "name":
   275  			indexOpts.SetName(val.StringValue())
   276  		case "partialFilterExpression":
   277  			indexOpts.SetPartialFilterExpression(val.Document())
   278  		case "sparse":
   279  			indexOpts.SetSparse(val.Boolean())
   280  		case "storageEngine":
   281  			indexOpts.SetStorageEngine(val.Document())
   282  		case "unique":
   283  			indexOpts.SetUnique(val.Boolean())
   284  		case "version":
   285  			indexOpts.SetVersion(val.Int32())
   286  		case "textIndexVersion":
   287  			indexOpts.SetTextVersion(val.Int32())
   288  		case "weights":
   289  			indexOpts.SetWeights(val.Document())
   290  		case "wildcardProjection":
   291  			indexOpts.SetWildcardProjection(val.Document())
   292  		default:
   293  			return nil, fmt.Errorf("unrecognized createIndex option %q", key)
   294  		}
   295  	}
   296  	if keys == nil {
   297  		return nil, newMissingArgumentError("keys")
   298  	}
   299  
   300  	model := mongo.IndexModel{
   301  		Keys:    keys,
   302  		Options: indexOpts,
   303  	}
   304  	name, err := coll.Indexes().CreateOne(ctx, model)
   305  	return newValueResult(bsontype.String, bsoncore.AppendString(nil, name), err), nil
   306  }
   307  
   308  func executeCreateSearchIndex(ctx context.Context, operation *operation) (*operationResult, error) {
   309  	coll, err := entities(ctx).collection(operation.Object)
   310  	if err != nil {
   311  		return nil, err
   312  	}
   313  
   314  	var model mongo.SearchIndexModel
   315  
   316  	elems, err := operation.Arguments.Elements()
   317  	if err != nil {
   318  		return nil, err
   319  	}
   320  	for _, elem := range elems {
   321  		key := elem.Key()
   322  		val := elem.Value()
   323  
   324  		switch key {
   325  		case "model":
   326  			var m struct {
   327  				Definition interface{}
   328  				Name       *string
   329  			}
   330  			err = bson.Unmarshal(val.Document(), &m)
   331  			if err != nil {
   332  				return nil, err
   333  			}
   334  			model.Definition = m.Definition
   335  			model.Options = options.SearchIndexes()
   336  			model.Options.Name = m.Name
   337  		default:
   338  			return nil, fmt.Errorf("unrecognized createSearchIndex option %q", key)
   339  		}
   340  	}
   341  
   342  	name, err := coll.SearchIndexes().CreateOne(ctx, model)
   343  	return newValueResult(bsontype.String, bsoncore.AppendString(nil, name), err), nil
   344  }
   345  
   346  func executeCreateSearchIndexes(ctx context.Context, operation *operation) (*operationResult, error) {
   347  	coll, err := entities(ctx).collection(operation.Object)
   348  	if err != nil {
   349  		return nil, err
   350  	}
   351  
   352  	var models []mongo.SearchIndexModel
   353  
   354  	elems, err := operation.Arguments.Elements()
   355  	if err != nil {
   356  		return nil, err
   357  	}
   358  	for _, elem := range elems {
   359  		key := elem.Key()
   360  		val := elem.Value()
   361  
   362  		switch key {
   363  		case "models":
   364  			vals, err := val.Array().Values()
   365  			if err != nil {
   366  				return nil, err
   367  			}
   368  			for _, val := range vals {
   369  				var m struct {
   370  					Definition interface{}
   371  					Name       *string
   372  				}
   373  				err = bson.Unmarshal(val.Value, &m)
   374  				if err != nil {
   375  					return nil, err
   376  				}
   377  				model := mongo.SearchIndexModel{
   378  					Definition: m.Definition,
   379  					Options:    options.SearchIndexes(),
   380  				}
   381  				model.Options.Name = m.Name
   382  				models = append(models, model)
   383  			}
   384  		default:
   385  			return nil, fmt.Errorf("unrecognized createSearchIndexes option %q", key)
   386  		}
   387  	}
   388  
   389  	names, err := coll.SearchIndexes().CreateMany(ctx, models)
   390  	builder := bsoncore.NewArrayBuilder()
   391  	for _, name := range names {
   392  		builder.AppendString(name)
   393  	}
   394  	return newValueResult(bsontype.Array, builder.Build(), err), nil
   395  }
   396  
   397  func executeDeleteOne(ctx context.Context, operation *operation) (*operationResult, error) {
   398  	coll, err := entities(ctx).collection(operation.Object)
   399  	if err != nil {
   400  		return nil, err
   401  	}
   402  
   403  	var filter bson.Raw
   404  	opts := options.Delete()
   405  
   406  	elems, err := operation.Arguments.Elements()
   407  	if err != nil {
   408  		return nil, err
   409  	}
   410  	for _, elem := range elems {
   411  		key := elem.Key()
   412  		val := elem.Value()
   413  
   414  		switch key {
   415  		case "collation":
   416  			collation, err := createCollation(val.Document())
   417  			if err != nil {
   418  				return nil, fmt.Errorf("error creating collation: %w", err)
   419  			}
   420  			opts.SetCollation(collation)
   421  		case "comment":
   422  			opts.SetComment(val)
   423  		case "filter":
   424  			filter = val.Document()
   425  		case "hint":
   426  			hint, err := createHint(val)
   427  			if err != nil {
   428  				return nil, fmt.Errorf("error creating hint: %w", err)
   429  			}
   430  			opts.SetHint(hint)
   431  		case "let":
   432  			opts.SetLet(val.Document())
   433  		default:
   434  			return nil, fmt.Errorf("unrecognized deleteOne option %q", key)
   435  		}
   436  	}
   437  	if filter == nil {
   438  		return nil, newMissingArgumentError("filter")
   439  	}
   440  
   441  	res, err := coll.DeleteOne(ctx, filter, opts)
   442  	raw := emptyCoreDocument
   443  	if res != nil {
   444  		raw = bsoncore.NewDocumentBuilder().
   445  			AppendInt64("deletedCount", res.DeletedCount).
   446  			Build()
   447  	}
   448  	return newDocumentResult(raw, err), nil
   449  }
   450  
   451  func executeDeleteMany(ctx context.Context, operation *operation) (*operationResult, error) {
   452  	coll, err := entities(ctx).collection(operation.Object)
   453  	if err != nil {
   454  		return nil, err
   455  	}
   456  
   457  	var filter bson.Raw
   458  	opts := options.Delete()
   459  
   460  	elems, err := operation.Arguments.Elements()
   461  	if err != nil {
   462  		return nil, err
   463  	}
   464  	for _, elem := range elems {
   465  		key := elem.Key()
   466  		val := elem.Value()
   467  
   468  		switch key {
   469  		case "comment":
   470  			opts.SetComment(val)
   471  		case "collation":
   472  			collation, err := createCollation(val.Document())
   473  			if err != nil {
   474  				return nil, fmt.Errorf("error creating collation: %w", err)
   475  			}
   476  			opts.SetCollation(collation)
   477  		case "filter":
   478  			filter = val.Document()
   479  		case "hint":
   480  			hint, err := createHint(val)
   481  			if err != nil {
   482  				return nil, fmt.Errorf("error creating hint: %w", err)
   483  			}
   484  			opts.SetHint(hint)
   485  		case "let":
   486  			opts.SetLet(val.Document())
   487  		default:
   488  			return nil, fmt.Errorf("unrecognized deleteMany option %q", key)
   489  		}
   490  	}
   491  	if filter == nil {
   492  		return nil, newMissingArgumentError("filter")
   493  	}
   494  
   495  	res, err := coll.DeleteMany(ctx, filter, opts)
   496  	raw := emptyCoreDocument
   497  	if res != nil {
   498  		raw = bsoncore.NewDocumentBuilder().
   499  			AppendInt64("deletedCount", res.DeletedCount).
   500  			Build()
   501  	}
   502  	return newDocumentResult(raw, err), nil
   503  }
   504  
   505  func executeDistinct(ctx context.Context, operation *operation) (*operationResult, error) {
   506  	coll, err := entities(ctx).collection(operation.Object)
   507  	if err != nil {
   508  		return nil, err
   509  	}
   510  
   511  	var fieldName string
   512  	var filter bson.Raw
   513  	opts := options.Distinct()
   514  
   515  	elems, err := operation.Arguments.Elements()
   516  	if err != nil {
   517  		return nil, err
   518  	}
   519  	for _, elem := range elems {
   520  		key := elem.Key()
   521  		val := elem.Value()
   522  
   523  		switch key {
   524  		case "collation":
   525  			collation, err := createCollation(val.Document())
   526  			if err != nil {
   527  				return nil, fmt.Errorf("error creating collation: %w", err)
   528  			}
   529  			opts.SetCollation(collation)
   530  		case "comment":
   531  			opts.SetComment(val)
   532  		case "fieldName":
   533  			fieldName = val.StringValue()
   534  		case "filter":
   535  			filter = val.Document()
   536  		case "maxTimeMS":
   537  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   538  		default:
   539  			return nil, fmt.Errorf("unrecognized distinct option %q", key)
   540  		}
   541  	}
   542  	if fieldName == "" {
   543  		return nil, newMissingArgumentError("fieldName")
   544  	}
   545  	if filter == nil {
   546  		return nil, newMissingArgumentError("filter")
   547  	}
   548  
   549  	res, err := coll.Distinct(ctx, fieldName, filter, opts)
   550  	if err != nil {
   551  		return newErrorResult(err), nil
   552  	}
   553  	_, rawRes, err := bson.MarshalValue(res)
   554  	if err != nil {
   555  		return nil, fmt.Errorf("error converting Distinct result to raw BSON: %w", err)
   556  	}
   557  	return newValueResult(bsontype.Array, rawRes, nil), nil
   558  }
   559  
   560  func executeDropIndex(ctx context.Context, operation *operation) (*operationResult, error) {
   561  	coll, err := entities(ctx).collection(operation.Object)
   562  	if err != nil {
   563  		return nil, err
   564  	}
   565  
   566  	var name string
   567  	dropIndexOpts := options.DropIndexes()
   568  
   569  	elems, _ := operation.Arguments.Elements()
   570  	for _, elem := range elems {
   571  		key := elem.Key()
   572  		val := elem.Value()
   573  
   574  		switch key {
   575  		case "name":
   576  			name = val.StringValue()
   577  		case "maxTimeMS":
   578  			dropIndexOpts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   579  		default:
   580  			return nil, fmt.Errorf("unrecognized dropIndex option %q", key)
   581  		}
   582  	}
   583  	if name == "" {
   584  		return nil, newMissingArgumentError("name")
   585  	}
   586  
   587  	res, err := coll.Indexes().DropOne(ctx, name, dropIndexOpts)
   588  	return newDocumentResult(res, err), nil
   589  }
   590  
   591  func executeDropIndexes(ctx context.Context, operation *operation) (*operationResult, error) {
   592  	coll, err := entities(ctx).collection(operation.Object)
   593  	if err != nil {
   594  		return nil, err
   595  	}
   596  
   597  	dropIndexOpts := options.DropIndexes()
   598  	elems, _ := operation.Arguments.Elements()
   599  	for _, elem := range elems {
   600  		key := elem.Key()
   601  		val := elem.Value()
   602  
   603  		switch key {
   604  		case "maxTimeMS":
   605  			dropIndexOpts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   606  		default:
   607  			return nil, fmt.Errorf("unrecognized dropIndexes option %q", key)
   608  		}
   609  	}
   610  
   611  	res, err := coll.Indexes().DropAll(ctx, dropIndexOpts)
   612  	return newDocumentResult(res, err), nil
   613  }
   614  
   615  func executeDropSearchIndex(ctx context.Context, operation *operation) (*operationResult, error) {
   616  	coll, err := entities(ctx).collection(operation.Object)
   617  	if err != nil {
   618  		return nil, err
   619  	}
   620  
   621  	var name string
   622  
   623  	elems, err := operation.Arguments.Elements()
   624  	if err != nil {
   625  		return nil, err
   626  	}
   627  	for _, elem := range elems {
   628  		key := elem.Key()
   629  		val := elem.Value()
   630  
   631  		switch key {
   632  		case "name":
   633  			name = val.StringValue()
   634  		default:
   635  			return nil, fmt.Errorf("unrecognized dropSearchIndex option %q", key)
   636  		}
   637  	}
   638  
   639  	err = coll.SearchIndexes().DropOne(ctx, name)
   640  	return newValueResult(bsontype.Null, nil, err), nil
   641  }
   642  
   643  func executeEstimatedDocumentCount(ctx context.Context, operation *operation) (*operationResult, error) {
   644  	coll, err := entities(ctx).collection(operation.Object)
   645  	if err != nil {
   646  		return nil, err
   647  	}
   648  
   649  	opts := options.EstimatedDocumentCount()
   650  	var elems []bson.RawElement
   651  	// Some estimatedDocumentCount operations in the unified test format have no arguments.
   652  	if operation.Arguments != nil {
   653  		elems, err = operation.Arguments.Elements()
   654  		if err != nil {
   655  			return nil, err
   656  		}
   657  	}
   658  	for _, elem := range elems {
   659  		key := elem.Key()
   660  		val := elem.Value()
   661  
   662  		switch key {
   663  		case "comment":
   664  			opts.SetComment(val)
   665  		case "maxTimeMS":
   666  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   667  		default:
   668  			return nil, fmt.Errorf("unrecognized estimatedDocumentCount option %q", key)
   669  		}
   670  	}
   671  
   672  	count, err := coll.EstimatedDocumentCount(ctx, opts)
   673  	if err != nil {
   674  		return newErrorResult(err), nil
   675  	}
   676  	return newValueResult(bsontype.Int64, bsoncore.AppendInt64(nil, count), nil), nil
   677  }
   678  
   679  func executeCreateFindCursor(ctx context.Context, operation *operation) (*operationResult, error) {
   680  	result, err := createFindCursor(ctx, operation)
   681  	if err != nil {
   682  		return nil, err
   683  	}
   684  	if result.err != nil {
   685  		return newErrorResult(result.err), nil
   686  	}
   687  
   688  	if operation.ResultEntityID == nil {
   689  		return nil, fmt.Errorf("no entity name provided to store executeCreateFindCursor result")
   690  	}
   691  	if err := entities(ctx).addCursorEntity(*operation.ResultEntityID, result.cursor); err != nil {
   692  		return nil, fmt.Errorf("error storing result as cursor entity: %w", err)
   693  	}
   694  	return newEmptyResult(), nil
   695  }
   696  
   697  func executeFind(ctx context.Context, operation *operation) (*operationResult, error) {
   698  	result, err := createFindCursor(ctx, operation)
   699  	if err != nil {
   700  		return nil, err
   701  	}
   702  	if result.err != nil {
   703  		return newErrorResult(result.err), nil
   704  	}
   705  
   706  	var docs []bson.Raw
   707  	if err := result.cursor.All(ctx, &docs); err != nil {
   708  		return newErrorResult(err), nil
   709  	}
   710  	return newCursorResult(docs), nil
   711  }
   712  
   713  func executeFindOne(ctx context.Context, operation *operation) (*operationResult, error) {
   714  	coll, err := entities(ctx).collection(operation.Object)
   715  	if err != nil {
   716  		return nil, err
   717  	}
   718  
   719  	var filter bson.Raw
   720  	opts := options.FindOne()
   721  
   722  	elems, _ := operation.Arguments.Elements()
   723  	for _, elem := range elems {
   724  		key := elem.Key()
   725  		val := elem.Value()
   726  
   727  		switch key {
   728  		case "collation":
   729  			collation, err := createCollation(val.Document())
   730  			if err != nil {
   731  				return nil, fmt.Errorf("error creating collation: %w", err)
   732  			}
   733  			opts.SetCollation(collation)
   734  		case "filter":
   735  			filter = val.Document()
   736  		case "hint":
   737  			hint, err := createHint(val)
   738  			if err != nil {
   739  				return nil, fmt.Errorf("error creating hint: %w", err)
   740  			}
   741  			opts.SetHint(hint)
   742  		case "maxTimeMS":
   743  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   744  		case "projection":
   745  			opts.SetProjection(val.Document())
   746  		case "sort":
   747  			opts.SetSort(val.Document())
   748  		default:
   749  			return nil, fmt.Errorf("unrecognized findOne option %q", key)
   750  		}
   751  	}
   752  	if filter == nil {
   753  		return nil, newMissingArgumentError("filter")
   754  	}
   755  
   756  	res, err := coll.FindOne(ctx, filter, opts).Raw()
   757  	// Ignore ErrNoDocuments errors from Raw. In the event that the cursor
   758  	// returned in a find operation has no associated documents, Raw will
   759  	// return ErrNoDocuments.
   760  	if errors.Is(err, mongo.ErrNoDocuments) {
   761  		err = nil
   762  	}
   763  
   764  	return newDocumentResult(res, err), nil
   765  }
   766  
   767  func executeFindOneAndDelete(ctx context.Context, operation *operation) (*operationResult, error) {
   768  	coll, err := entities(ctx).collection(operation.Object)
   769  	if err != nil {
   770  		return nil, err
   771  	}
   772  
   773  	var filter bson.Raw
   774  	opts := options.FindOneAndDelete()
   775  
   776  	elems, err := operation.Arguments.Elements()
   777  	if err != nil {
   778  		return nil, err
   779  	}
   780  	for _, elem := range elems {
   781  		key := elem.Key()
   782  		val := elem.Value()
   783  
   784  		switch key {
   785  		case "collation":
   786  			collation, err := createCollation(val.Document())
   787  			if err != nil {
   788  				return nil, fmt.Errorf("error creating collation: %w", err)
   789  			}
   790  			opts.SetCollation(collation)
   791  		case "comment":
   792  			opts.SetComment(val)
   793  		case "filter":
   794  			filter = val.Document()
   795  		case "hint":
   796  			hint, err := createHint(val)
   797  			if err != nil {
   798  				return nil, fmt.Errorf("error creating hint: %w", err)
   799  			}
   800  			opts.SetHint(hint)
   801  		case "maxTimeMS":
   802  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   803  		case "projection":
   804  			opts.SetProjection(val.Document())
   805  		case "sort":
   806  			opts.SetSort(val.Document())
   807  		case "let":
   808  			opts.SetLet(val.Document())
   809  		default:
   810  			return nil, fmt.Errorf("unrecognized findOneAndDelete option %q", key)
   811  		}
   812  	}
   813  	if filter == nil {
   814  		return nil, newMissingArgumentError("filter")
   815  	}
   816  
   817  	res, err := coll.FindOneAndDelete(ctx, filter, opts).Raw()
   818  	// Ignore ErrNoDocuments errors from Raw. In the event that the cursor
   819  	// returned in a find operation has no associated documents, Raw will
   820  	// return ErrNoDocuments.
   821  	if errors.Is(err, mongo.ErrNoDocuments) {
   822  		err = nil
   823  	}
   824  
   825  	return newDocumentResult(res, err), nil
   826  }
   827  
   828  func executeFindOneAndReplace(ctx context.Context, operation *operation) (*operationResult, error) {
   829  	coll, err := entities(ctx).collection(operation.Object)
   830  	if err != nil {
   831  		return nil, err
   832  	}
   833  
   834  	var filter bson.Raw
   835  	var replacement bson.Raw
   836  	opts := options.FindOneAndReplace()
   837  
   838  	elems, err := operation.Arguments.Elements()
   839  	if err != nil {
   840  		return nil, err
   841  	}
   842  	for _, elem := range elems {
   843  		key := elem.Key()
   844  		val := elem.Value()
   845  
   846  		switch key {
   847  		case "bypassDocumentValidation":
   848  			opts.SetBypassDocumentValidation(val.Boolean())
   849  		case "collation":
   850  			collation, err := createCollation(val.Document())
   851  			if err != nil {
   852  				return nil, fmt.Errorf("error creating collation: %w", err)
   853  			}
   854  			opts.SetCollation(collation)
   855  		case "comment":
   856  			opts.SetComment(val)
   857  		case "filter":
   858  			filter = val.Document()
   859  		case "hint":
   860  			hint, err := createHint(val)
   861  			if err != nil {
   862  				return nil, fmt.Errorf("error creating hint: %w", err)
   863  			}
   864  			opts.SetHint(hint)
   865  		case "let":
   866  			opts.SetLet(val.Document())
   867  		case "maxTimeMS":
   868  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   869  		case "projection":
   870  			opts.SetProjection(val.Document())
   871  		case "replacement":
   872  			replacement = val.Document()
   873  		case "returnDocument":
   874  			switch rd := val.StringValue(); rd {
   875  			case "After":
   876  				opts.SetReturnDocument(options.After)
   877  			case "Before":
   878  				opts.SetReturnDocument(options.Before)
   879  			default:
   880  				return nil, fmt.Errorf("unrecognized returnDocument value %q", rd)
   881  			}
   882  		case "sort":
   883  			opts.SetSort(val.Document())
   884  		case "upsert":
   885  			opts.SetUpsert(val.Boolean())
   886  		default:
   887  			return nil, fmt.Errorf("unrecognized findOneAndReplace option %q", key)
   888  		}
   889  	}
   890  	if filter == nil {
   891  		return nil, newMissingArgumentError("filter")
   892  	}
   893  	if replacement == nil {
   894  		return nil, newMissingArgumentError("replacement")
   895  	}
   896  
   897  	res, err := coll.FindOneAndReplace(ctx, filter, replacement, opts).Raw()
   898  	// Ignore ErrNoDocuments errors from Raw. In the event that the cursor
   899  	// returned in a find operation has no associated documents, Raw will
   900  	// return ErrNoDocuments.
   901  	if errors.Is(err, mongo.ErrNoDocuments) {
   902  		err = nil
   903  	}
   904  
   905  	return newDocumentResult(res, err), nil
   906  }
   907  
   908  func executeFindOneAndUpdate(ctx context.Context, operation *operation) (*operationResult, error) {
   909  	coll, err := entities(ctx).collection(operation.Object)
   910  	if err != nil {
   911  		return nil, err
   912  	}
   913  
   914  	var filter bson.Raw
   915  	var update interface{}
   916  	opts := options.FindOneAndUpdate()
   917  
   918  	elems, err := operation.Arguments.Elements()
   919  	if err != nil {
   920  		return nil, err
   921  	}
   922  	for _, elem := range elems {
   923  		key := elem.Key()
   924  		val := elem.Value()
   925  
   926  		switch key {
   927  		case "arrayFilters":
   928  			opts.SetArrayFilters(options.ArrayFilters{
   929  				Filters: bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...),
   930  			})
   931  		case "bypassDocumentValidation":
   932  			opts.SetBypassDocumentValidation(val.Boolean())
   933  		case "collation":
   934  			collation, err := createCollation(val.Document())
   935  			if err != nil {
   936  				return nil, fmt.Errorf("error creating collation: %w", err)
   937  			}
   938  			opts.SetCollation(collation)
   939  		case "comment":
   940  			opts.SetComment(val)
   941  		case "filter":
   942  			filter = val.Document()
   943  		case "hint":
   944  			hint, err := createHint(val)
   945  			if err != nil {
   946  				return nil, fmt.Errorf("error creating hint: %w", err)
   947  			}
   948  			opts.SetHint(hint)
   949  		case "let":
   950  			opts.SetLet(val.Document())
   951  		case "maxTimeMS":
   952  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
   953  		case "projection":
   954  			opts.SetProjection(val.Document())
   955  		case "returnDocument":
   956  			switch rd := val.StringValue(); rd {
   957  			case "After":
   958  				opts.SetReturnDocument(options.After)
   959  			case "Before":
   960  				opts.SetReturnDocument(options.Before)
   961  			default:
   962  				return nil, fmt.Errorf("unrecognized returnDocument value %q", rd)
   963  			}
   964  		case "sort":
   965  			opts.SetSort(val.Document())
   966  		case "update":
   967  			update, err = createUpdateValue(val)
   968  			if err != nil {
   969  				return nil, fmt.Errorf("error processing update value: %q", err)
   970  			}
   971  		case "upsert":
   972  			opts.SetUpsert(val.Boolean())
   973  		default:
   974  			return nil, fmt.Errorf("unrecognized findOneAndUpdate option %q", key)
   975  		}
   976  	}
   977  	if filter == nil {
   978  		return nil, newMissingArgumentError("filter")
   979  	}
   980  	if update == nil {
   981  		return nil, newMissingArgumentError("update")
   982  	}
   983  
   984  	res, err := coll.FindOneAndUpdate(ctx, filter, update, opts).Raw()
   985  	// Ignore ErrNoDocuments errors from Raw. In the event that the cursor
   986  	// returned in a find operation has no associated documents, Raw will
   987  	// return ErrNoDocuments.
   988  	if errors.Is(err, mongo.ErrNoDocuments) {
   989  		err = nil
   990  	}
   991  
   992  	return newDocumentResult(res, err), nil
   993  }
   994  
   995  func executeInsertMany(ctx context.Context, operation *operation) (*operationResult, error) {
   996  	coll, err := entities(ctx).collection(operation.Object)
   997  	if err != nil {
   998  		return nil, err
   999  	}
  1000  
  1001  	var documents []interface{}
  1002  	opts := options.InsertMany()
  1003  
  1004  	elems, err := operation.Arguments.Elements()
  1005  	if err != nil {
  1006  		return nil, err
  1007  	}
  1008  	for _, elem := range elems {
  1009  		key := elem.Key()
  1010  		val := elem.Value()
  1011  
  1012  		switch key {
  1013  		case "comment":
  1014  			opts.SetComment(val)
  1015  		case "documents":
  1016  			documents = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
  1017  		case "ordered":
  1018  			opts.SetOrdered(val.Boolean())
  1019  		default:
  1020  			return nil, fmt.Errorf("unrecognized insertMany option %q", key)
  1021  		}
  1022  	}
  1023  	if documents == nil {
  1024  		return nil, newMissingArgumentError("documents")
  1025  	}
  1026  
  1027  	res, err := coll.InsertMany(ctx, documents, opts)
  1028  	raw := emptyCoreDocument
  1029  	if res != nil {
  1030  		// We return InsertedIDs as []interface{} but the CRUD spec documents it as a map[int64]interface{}, so
  1031  		// comparisons will fail if we include it in the result document. This is marked as an optional field and is
  1032  		// always surrounded in an $$unsetOrMatches assertion, so we leave it out of the document.
  1033  		raw = bsoncore.NewDocumentBuilder().
  1034  			AppendInt32("insertedCount", int32(len(res.InsertedIDs))).
  1035  			AppendInt32("deletedCount", 0).
  1036  			AppendInt32("matchedCount", 0).
  1037  			AppendInt32("modifiedCount", 0).
  1038  			AppendInt32("upsertedCount", 0).
  1039  			AppendDocument("upsertedIds", bsoncore.NewDocumentBuilder().Build()).
  1040  			Build()
  1041  	}
  1042  	return newDocumentResult(raw, err), nil
  1043  }
  1044  
  1045  func executeInsertOne(ctx context.Context, operation *operation) (*operationResult, error) {
  1046  	coll, err := entities(ctx).collection(operation.Object)
  1047  	if err != nil {
  1048  		return nil, err
  1049  	}
  1050  
  1051  	var document bson.Raw
  1052  	opts := options.InsertOne()
  1053  
  1054  	elems, err := operation.Arguments.Elements()
  1055  	if err != nil {
  1056  		return nil, err
  1057  	}
  1058  	for _, elem := range elems {
  1059  		key := elem.Key()
  1060  		val := elem.Value()
  1061  
  1062  		switch key {
  1063  		case "document":
  1064  			document = val.Document()
  1065  		case "bypassDocumentValidation":
  1066  			opts.SetBypassDocumentValidation(val.Boolean())
  1067  		case "comment":
  1068  			opts.SetComment(val)
  1069  		default:
  1070  			return nil, fmt.Errorf("unrecognized insertOne option %q", key)
  1071  		}
  1072  	}
  1073  	if document == nil {
  1074  		return nil, newMissingArgumentError("documents")
  1075  	}
  1076  
  1077  	res, err := coll.InsertOne(ctx, document, opts)
  1078  	raw := emptyCoreDocument
  1079  	if res != nil {
  1080  		t, data, err := bson.MarshalValue(res.InsertedID)
  1081  		if err != nil {
  1082  			return nil, fmt.Errorf("error converting InsertedID field to BSON: %w", err)
  1083  		}
  1084  		raw = bsoncore.NewDocumentBuilder().
  1085  			AppendValue("insertedId", bsoncore.Value{Type: t, Data: data}).
  1086  			Build()
  1087  	}
  1088  	return newDocumentResult(raw, err), nil
  1089  }
  1090  
  1091  func executeListIndexes(ctx context.Context, operation *operation) (*operationResult, error) {
  1092  	coll, err := entities(ctx).collection(operation.Object)
  1093  	if err != nil {
  1094  		return nil, err
  1095  	}
  1096  
  1097  	var elems []bson.RawElement
  1098  	// Some listIndexes operations in the unified test format have no arguments.
  1099  	if operation.Arguments != nil {
  1100  		elems, err = operation.Arguments.Elements()
  1101  		if err != nil {
  1102  			return nil, err
  1103  		}
  1104  	}
  1105  	opts := options.ListIndexes()
  1106  	for _, elem := range elems {
  1107  		key := elem.Key()
  1108  		val := elem.Value()
  1109  
  1110  		switch key {
  1111  		case "batchSize":
  1112  			opts.SetBatchSize(val.Int32())
  1113  		default:
  1114  			return nil, fmt.Errorf("unrecognized listIndexes option: %q", key)
  1115  		}
  1116  	}
  1117  
  1118  	cursor, err := coll.Indexes().List(ctx, opts)
  1119  	if err != nil {
  1120  		return newErrorResult(err), nil
  1121  	}
  1122  
  1123  	var docs []bson.Raw
  1124  	if err := cursor.All(ctx, &docs); err != nil {
  1125  		return newErrorResult(err), nil
  1126  	}
  1127  	return newCursorResult(docs), nil
  1128  }
  1129  
  1130  func executeListSearchIndexes(ctx context.Context, operation *operation) (*operationResult, error) {
  1131  	coll, err := entities(ctx).collection(operation.Object)
  1132  	if err != nil {
  1133  		return nil, err
  1134  	}
  1135  
  1136  	searchIdxOpts := options.SearchIndexes()
  1137  	var opts []*options.ListSearchIndexesOptions
  1138  
  1139  	elems, err := operation.Arguments.Elements()
  1140  	if err != nil {
  1141  		return nil, err
  1142  	}
  1143  	for _, elem := range elems {
  1144  		key := elem.Key()
  1145  		val := elem.Value()
  1146  
  1147  		switch key {
  1148  		case "name":
  1149  			searchIdxOpts.SetName(val.StringValue())
  1150  		case "aggregationOptions":
  1151  			var opt options.AggregateOptions
  1152  			err = bson.Unmarshal(val.Document(), &opt)
  1153  			if err != nil {
  1154  				return nil, err
  1155  			}
  1156  			opts = append(opts, &options.ListSearchIndexesOptions{
  1157  				AggregateOpts: &opt,
  1158  			})
  1159  		default:
  1160  			return nil, fmt.Errorf("unrecognized listSearchIndexes option %q", key)
  1161  		}
  1162  	}
  1163  
  1164  	_, err = coll.SearchIndexes().List(ctx, searchIdxOpts, opts...)
  1165  	return newValueResult(bsontype.Null, nil, err), nil
  1166  }
  1167  
  1168  func executeRenameCollection(ctx context.Context, operation *operation) (*operationResult, error) {
  1169  	coll, err := entities(ctx).collection(operation.Object)
  1170  	if err != nil {
  1171  		return nil, err
  1172  	}
  1173  
  1174  	var toName string
  1175  	var dropTarget bool
  1176  	elems, err := operation.Arguments.Elements()
  1177  	if err != nil {
  1178  		return nil, err
  1179  	}
  1180  	for _, elem := range elems {
  1181  		key := elem.Key()
  1182  		val := elem.Value()
  1183  
  1184  		switch key {
  1185  		case "dropTarget":
  1186  			dropTarget = val.Boolean()
  1187  		case "to":
  1188  			toName = val.StringValue()
  1189  		default:
  1190  			return nil, fmt.Errorf("unrecognized rename option %q", key)
  1191  		}
  1192  	}
  1193  	if toName == "" {
  1194  		return nil, newMissingArgumentError("to")
  1195  	}
  1196  
  1197  	renameCmd := bson.D{
  1198  		{"renameCollection", coll.Database().Name() + "." + coll.Name()},
  1199  		{"to", coll.Database().Name() + "." + toName},
  1200  	}
  1201  	if dropTarget {
  1202  		renameCmd = append(renameCmd, bson.E{"dropTarget", dropTarget})
  1203  	}
  1204  	// rename can only be run on the 'admin' database.
  1205  	admin := coll.Database().Client().Database("admin")
  1206  	res, err := admin.RunCommand(context.Background(), renameCmd).Raw()
  1207  	return newDocumentResult(res, err), nil
  1208  }
  1209  
  1210  func executeReplaceOne(ctx context.Context, operation *operation) (*operationResult, error) {
  1211  	coll, err := entities(ctx).collection(operation.Object)
  1212  	if err != nil {
  1213  		return nil, err
  1214  	}
  1215  
  1216  	filter := emptyDocument
  1217  	replacement := emptyDocument
  1218  	opts := options.Replace()
  1219  
  1220  	elems, err := operation.Arguments.Elements()
  1221  	if err != nil {
  1222  		return nil, err
  1223  	}
  1224  	for _, elem := range elems {
  1225  		key := elem.Key()
  1226  		val := elem.Value()
  1227  
  1228  		switch key {
  1229  		case "bypassDocumentValidation":
  1230  			opts.SetBypassDocumentValidation(val.Boolean())
  1231  		case "collation":
  1232  			collation, err := createCollation(val.Document())
  1233  			if err != nil {
  1234  				return nil, fmt.Errorf("error creating collation: %w", err)
  1235  			}
  1236  			opts.SetCollation(collation)
  1237  		case "comment":
  1238  			opts.SetComment(val)
  1239  		case "filter":
  1240  			filter = val.Document()
  1241  		case "hint":
  1242  			hint, err := createHint(val)
  1243  			if err != nil {
  1244  				return nil, fmt.Errorf("error creating hint: %w", err)
  1245  			}
  1246  			opts.SetHint(hint)
  1247  		case "replacement":
  1248  			replacement = val.Document()
  1249  		case "upsert":
  1250  			opts.SetUpsert(val.Boolean())
  1251  		case "let":
  1252  			opts.SetLet(val.Document())
  1253  		default:
  1254  			return nil, fmt.Errorf("unrecognized replaceOne option %q", key)
  1255  		}
  1256  	}
  1257  
  1258  	res, err := coll.ReplaceOne(ctx, filter, replacement, opts)
  1259  	raw, buildErr := buildUpdateResultDocument(res)
  1260  	if buildErr != nil {
  1261  		return nil, buildErr
  1262  	}
  1263  	return newDocumentResult(raw, err), nil
  1264  }
  1265  
  1266  func executeUpdateOne(ctx context.Context, operation *operation) (*operationResult, error) {
  1267  	coll, err := entities(ctx).collection(operation.Object)
  1268  	if err != nil {
  1269  		return nil, err
  1270  	}
  1271  
  1272  	updateArgs, err := createUpdateArguments(operation.Arguments)
  1273  	if err != nil {
  1274  		return nil, err
  1275  	}
  1276  
  1277  	res, err := coll.UpdateOne(ctx, updateArgs.filter, updateArgs.update, updateArgs.opts)
  1278  	raw, buildErr := buildUpdateResultDocument(res)
  1279  	if buildErr != nil {
  1280  		return nil, buildErr
  1281  	}
  1282  	return newDocumentResult(raw, err), nil
  1283  }
  1284  
  1285  func executeUpdateMany(ctx context.Context, operation *operation) (*operationResult, error) {
  1286  	coll, err := entities(ctx).collection(operation.Object)
  1287  	if err != nil {
  1288  		return nil, err
  1289  	}
  1290  
  1291  	updateArgs, err := createUpdateArguments(operation.Arguments)
  1292  	if err != nil {
  1293  		return nil, err
  1294  	}
  1295  
  1296  	res, err := coll.UpdateMany(ctx, updateArgs.filter, updateArgs.update, updateArgs.opts)
  1297  	raw, buildErr := buildUpdateResultDocument(res)
  1298  	if buildErr != nil {
  1299  		return nil, buildErr
  1300  	}
  1301  	return newDocumentResult(raw, err), nil
  1302  }
  1303  
  1304  func executeUpdateSearchIndex(ctx context.Context, operation *operation) (*operationResult, error) {
  1305  	coll, err := entities(ctx).collection(operation.Object)
  1306  	if err != nil {
  1307  		return nil, err
  1308  	}
  1309  
  1310  	var name string
  1311  	var definition interface{}
  1312  
  1313  	elems, err := operation.Arguments.Elements()
  1314  	if err != nil {
  1315  		return nil, err
  1316  	}
  1317  	for _, elem := range elems {
  1318  		key := elem.Key()
  1319  		val := elem.Value()
  1320  
  1321  		switch key {
  1322  		case "name":
  1323  			name = val.StringValue()
  1324  		case "definition":
  1325  			err = bson.Unmarshal(val.Value, &definition)
  1326  			if err != nil {
  1327  				return nil, err
  1328  			}
  1329  		default:
  1330  			return nil, fmt.Errorf("unrecognized updateSearchIndex option %q", key)
  1331  		}
  1332  	}
  1333  
  1334  	err = coll.SearchIndexes().UpdateOne(ctx, name, definition)
  1335  	return newValueResult(bsontype.Null, nil, err), nil
  1336  }
  1337  
  1338  func buildUpdateResultDocument(res *mongo.UpdateResult) (bsoncore.Document, error) {
  1339  	if res == nil {
  1340  		return emptyCoreDocument, nil
  1341  	}
  1342  
  1343  	builder := bsoncore.NewDocumentBuilder().
  1344  		AppendInt64("matchedCount", res.MatchedCount).
  1345  		AppendInt64("modifiedCount", res.ModifiedCount).
  1346  		AppendInt64("upsertedCount", res.UpsertedCount)
  1347  
  1348  	if res.UpsertedID != nil {
  1349  		t, data, err := bson.MarshalValue(res.UpsertedID)
  1350  		if err != nil {
  1351  			return nil, fmt.Errorf("error converting UpsertedID to BSON: %w", err)
  1352  		}
  1353  		builder.AppendValue("upsertedId", bsoncore.Value{Type: t, Data: data})
  1354  	}
  1355  	return builder.Build(), nil
  1356  }
  1357  
  1358  type cursorResult struct {
  1359  	cursor *mongo.Cursor
  1360  	err    error
  1361  }
  1362  
  1363  func createFindCursor(ctx context.Context, operation *operation) (*cursorResult, error) {
  1364  	coll, err := entities(ctx).collection(operation.Object)
  1365  	// Find operations can also be run against GridFS buckets. Check for a bucket entity of the
  1366  	// same name and run createBucketFindCursor if an entity is found.
  1367  	if err != nil {
  1368  		if _, bucketOk := entities(ctx).gridFSBucket(operation.Object); bucketOk == nil {
  1369  			return createBucketFindCursor(ctx, operation)
  1370  		}
  1371  		return nil, err
  1372  	}
  1373  
  1374  	var filter bson.Raw
  1375  	opts := options.Find()
  1376  
  1377  	elems, err := operation.Arguments.Elements()
  1378  	if err != nil {
  1379  		return nil, err
  1380  	}
  1381  	for _, elem := range elems {
  1382  		key := elem.Key()
  1383  		val := elem.Value()
  1384  
  1385  		switch key {
  1386  		case "allowDiskUse":
  1387  			opts.SetAllowDiskUse(val.Boolean())
  1388  		case "allowPartialResults":
  1389  			opts.SetAllowPartialResults(val.Boolean())
  1390  		case "batchSize":
  1391  			opts.SetBatchSize(val.Int32())
  1392  		case "collation":
  1393  			collation, err := createCollation(val.Document())
  1394  			if err != nil {
  1395  				return nil, fmt.Errorf("error creating collation: %w", err)
  1396  			}
  1397  			opts.SetCollation(collation)
  1398  		case "comment":
  1399  			// TODO(GODRIVER-2386): when document support for comments is added, we can replace this switch condition
  1400  			// TODO with `opts.SetComment(val)`
  1401  			commentString, err := createCommentString(val)
  1402  			if err != nil {
  1403  				return nil, fmt.Errorf("error creating comment: %w", err)
  1404  			}
  1405  			opts.SetComment(commentString)
  1406  		case "filter":
  1407  			filter = val.Document()
  1408  		case "hint":
  1409  			hint, err := createHint(val)
  1410  			if err != nil {
  1411  				return nil, fmt.Errorf("error creating hint: %w", err)
  1412  			}
  1413  			opts.SetHint(hint)
  1414  		case "let":
  1415  			opts.SetLet(val.Document())
  1416  		case "limit":
  1417  			opts.SetLimit(int64(val.Int32()))
  1418  		case "max":
  1419  			opts.SetMax(val.Document())
  1420  		case "maxTimeMS":
  1421  			opts.SetMaxTime(time.Duration(val.Int32()) * time.Millisecond)
  1422  		case "min":
  1423  			opts.SetMin(val.Document())
  1424  		case "noCursorTimeout":
  1425  			opts.SetNoCursorTimeout(val.Boolean())
  1426  		case "oplogReplay":
  1427  			opts.SetOplogReplay(val.Boolean())
  1428  		case "projection":
  1429  			opts.SetProjection(val.Document())
  1430  		case "returnKey":
  1431  			opts.SetReturnKey(val.Boolean())
  1432  		case "showRecordId":
  1433  			opts.SetShowRecordID(val.Boolean())
  1434  		case "skip":
  1435  			opts.SetSkip(int64(val.Int32()))
  1436  		case "snapshot":
  1437  			opts.SetSnapshot(val.Boolean())
  1438  		case "sort":
  1439  			opts.SetSort(val.Document())
  1440  		default:
  1441  			return nil, fmt.Errorf("unrecognized find option %q", key)
  1442  		}
  1443  	}
  1444  	if filter == nil {
  1445  		return nil, newMissingArgumentError("filter")
  1446  	}
  1447  
  1448  	cursor, err := coll.Find(ctx, filter, opts)
  1449  	res := &cursorResult{
  1450  		cursor: cursor,
  1451  		err:    err,
  1452  	}
  1453  	return res, nil
  1454  }
  1455  

View as plain text