...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified/database_operation_execution.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/unified

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package unified
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"time"
    13  
    14  	"go.mongodb.org/mongo-driver/bson"
    15  	"go.mongodb.org/mongo-driver/bson/bsontype"
    16  	"go.mongodb.org/mongo-driver/internal/bsonutil"
    17  	"go.mongodb.org/mongo-driver/mongo/options"
    18  )
    19  
    20  // This file contains helpers to execute database operations.
    21  
    22  func executeCreateView(ctx context.Context, operation *operation) (*operationResult, error) {
    23  	db, err := entities(ctx).database(operation.Object)
    24  	if err != nil {
    25  		return nil, err
    26  	}
    27  
    28  	var collName string
    29  	var cvo options.CreateViewOptions
    30  	var viewOn string
    31  	pipeline := make([]interface{}, 0)
    32  
    33  	elems, err := operation.Arguments.Elements()
    34  	if err != nil {
    35  		return nil, err
    36  	}
    37  
    38  	for _, elem := range elems {
    39  		key := elem.Key()
    40  		val := elem.Value()
    41  
    42  		switch key {
    43  		case "collection":
    44  			collName = val.StringValue()
    45  		case "pipeline":
    46  			pipeline = bsonutil.RawToInterfaces(bsonutil.RawToDocuments(val.Array())...)
    47  		case "viewOn":
    48  			viewOn = val.StringValue()
    49  		default:
    50  			return nil, fmt.Errorf("unrecognized createView option %q", key)
    51  		}
    52  	}
    53  	if collName == "" {
    54  		return nil, newMissingArgumentError("collection")
    55  	}
    56  	if viewOn == "" {
    57  		return nil, newMissingArgumentError("viewOn")
    58  	}
    59  
    60  	err = db.CreateView(ctx, collName, viewOn, pipeline, &cvo)
    61  	return newErrorResult(err), nil
    62  }
    63  
    64  func executeCreateCollection(ctx context.Context, operation *operation) (*operationResult, error) {
    65  	// In the Go driver there is a separate method for creating views.  However, the unified test CRUD format does not
    66  	// make this distinction.  If necessary, here we branch to create a view.
    67  	createView, err := operation.isCreateView()
    68  	if err != nil {
    69  		return nil, err
    70  	}
    71  	if createView {
    72  		return executeCreateView(ctx, operation)
    73  	}
    74  
    75  	db, err := entities(ctx).database(operation.Object)
    76  	if err != nil {
    77  		return nil, err
    78  	}
    79  
    80  	var collName string
    81  	var cco options.CreateCollectionOptions
    82  	elems, _ := operation.Arguments.Elements()
    83  	for _, elem := range elems {
    84  		key := elem.Key()
    85  		val := elem.Value()
    86  
    87  		switch key {
    88  		case "collection":
    89  			collName = val.StringValue()
    90  		case "changeStreamPreAndPostImages":
    91  			cco.SetChangeStreamPreAndPostImages(val.Document())
    92  		case "expireAfterSeconds":
    93  			cco.SetExpireAfterSeconds(int64(val.Int32()))
    94  		case "capped":
    95  			cco.SetCapped(val.Boolean())
    96  		case "size":
    97  			cco.SetSizeInBytes(val.AsInt64())
    98  		case "max":
    99  			cco.SetMaxDocuments(val.AsInt64())
   100  		case "timeseries":
   101  			tsElems, err := elem.Value().Document().Elements()
   102  			if err != nil {
   103  				return nil, err
   104  			}
   105  
   106  			tso := options.TimeSeries()
   107  			for _, elem := range tsElems {
   108  				key := elem.Key()
   109  				val := elem.Value()
   110  
   111  				switch key {
   112  				case "timeField":
   113  					tso.SetTimeField(val.StringValue())
   114  				case "metaField":
   115  					tso.SetMetaField(val.StringValue())
   116  				case "granularity":
   117  					tso.SetGranularity(val.StringValue())
   118  				case "bucketMaxSpanSeconds":
   119  					tso.SetBucketMaxSpan(time.Duration(val.Int32()) * time.Second)
   120  				case "bucketRoundingSeconds":
   121  					tso.SetBucketRounding(time.Duration(val.Int32()) * time.Second)
   122  				default:
   123  					return nil, fmt.Errorf("unrecognized timeseries option %q", key)
   124  				}
   125  			}
   126  			cco.SetTimeSeriesOptions(tso)
   127  		case "clusteredIndex":
   128  			cco.SetClusteredIndex(val.Document())
   129  		default:
   130  			return nil, fmt.Errorf("unrecognized createCollection option %q", key)
   131  		}
   132  	}
   133  	if collName == "" {
   134  		return nil, newMissingArgumentError("collection")
   135  	}
   136  
   137  	err = db.CreateCollection(ctx, collName, &cco)
   138  	if err != nil {
   139  		return newErrorResult(err), nil
   140  	}
   141  
   142  	if collID := operation.ResultEntityID; collID != nil {
   143  		collEntityOpts := newCollectionEntityOptions(*collID, operation.Object, collName, nil)
   144  
   145  		err := entities(ctx).addCollectionEntity(collEntityOpts)
   146  		if err != nil {
   147  			return nil, fmt.Errorf("failed to save collection as entity: %w", err)
   148  		}
   149  	}
   150  
   151  	return newEmptyResult(), nil
   152  }
   153  
   154  func executeDropCollection(ctx context.Context, operation *operation) (*operationResult, error) {
   155  	db, err := entities(ctx).database(operation.Object)
   156  	if err != nil {
   157  		return nil, err
   158  	}
   159  
   160  	var collName string
   161  	elems, _ := operation.Arguments.Elements()
   162  	for _, elem := range elems {
   163  		key := elem.Key()
   164  		val := elem.Value()
   165  
   166  		switch key {
   167  		case "collection":
   168  			collName = val.StringValue()
   169  		default:
   170  			return nil, fmt.Errorf("unrecognized dropCollection option %q", key)
   171  		}
   172  	}
   173  	if collName == "" {
   174  		return nil, newMissingArgumentError("collection")
   175  	}
   176  
   177  	err = db.Collection(collName).Drop(ctx)
   178  	return newErrorResult(err), nil
   179  }
   180  
   181  func executeListCollections(ctx context.Context, operation *operation) (*operationResult, error) {
   182  	db, err := entities(ctx).database(operation.Object)
   183  	if err != nil {
   184  		return nil, err
   185  	}
   186  
   187  	listCollArgs, err := createListCollectionsArguments(operation.Arguments)
   188  	if err != nil {
   189  		return nil, err
   190  	}
   191  
   192  	cursor, err := db.ListCollections(ctx, listCollArgs.filter, listCollArgs.opts)
   193  	if err != nil {
   194  		return newErrorResult(err), nil
   195  	}
   196  	defer cursor.Close(ctx)
   197  
   198  	var docs []bson.Raw
   199  	if err := cursor.All(ctx, &docs); err != nil {
   200  		return newErrorResult(err), nil
   201  	}
   202  	return newCursorResult(docs), nil
   203  }
   204  
   205  func executeListCollectionNames(ctx context.Context, operation *operation) (*operationResult, error) {
   206  	db, err := entities(ctx).database(operation.Object)
   207  	if err != nil {
   208  		return nil, err
   209  	}
   210  
   211  	listCollArgs, err := createListCollectionsArguments(operation.Arguments)
   212  	if err != nil {
   213  		return nil, err
   214  	}
   215  
   216  	names, err := db.ListCollectionNames(ctx, listCollArgs.filter, listCollArgs.opts)
   217  	if err != nil {
   218  		return newErrorResult(err), nil
   219  	}
   220  	_, data, err := bson.MarshalValue(names)
   221  	if err != nil {
   222  		return nil, fmt.Errorf("error converting collection names slice to BSON: %w", err)
   223  	}
   224  	return newValueResult(bsontype.Array, data, nil), nil
   225  }
   226  
   227  func executeRunCommand(ctx context.Context, operation *operation) (*operationResult, error) {
   228  	db, err := entities(ctx).database(operation.Object)
   229  	if err != nil {
   230  		return nil, err
   231  	}
   232  
   233  	var command bson.Raw
   234  	opts := options.RunCmd()
   235  
   236  	elems, _ := operation.Arguments.Elements()
   237  	for _, elem := range elems {
   238  		key := elem.Key()
   239  		val := elem.Value()
   240  
   241  		switch key {
   242  		case "command":
   243  			command = val.Document()
   244  		case "commandName":
   245  			// This is only necessary for languages that cannot preserve key order in the command document, so we can
   246  			// ignore it.
   247  		case "readConcern":
   248  			// GODRIVER-1774: We currently don't support overriding read concern for RunCommand.
   249  			return nil, fmt.Errorf("readConcern in runCommand not supported")
   250  		case "readPreference":
   251  			var temp ReadPreference
   252  			if err := bson.Unmarshal(val.Document(), &temp); err != nil {
   253  				return nil, fmt.Errorf("error unmarshalling readPreference option: %w", err)
   254  			}
   255  
   256  			rp, err := temp.ToReadPrefOption()
   257  			if err != nil {
   258  				return nil, fmt.Errorf("error creating readpref.ReadPref object: %w", err)
   259  			}
   260  			opts.SetReadPreference(rp)
   261  		case "writeConcern":
   262  			// GODRIVER-1774: We currently don't support overriding write concern for RunCommand.
   263  			return nil, fmt.Errorf("writeConcern in runCommand not supported")
   264  		default:
   265  			return nil, fmt.Errorf("unrecognized runCommand option %q", key)
   266  		}
   267  	}
   268  	if command == nil {
   269  		return nil, newMissingArgumentError("command")
   270  	}
   271  
   272  	res, err := db.RunCommand(ctx, command, opts).Raw()
   273  	return newDocumentResult(res, err), nil
   274  }
   275  
   276  // executeRunCursorCommand proxies the database's runCursorCommand method and
   277  // supports the same arguments and options.
   278  func executeRunCursorCommand(ctx context.Context, operation *operation) (*operationResult, error) {
   279  	db, err := entities(ctx).database(operation.Object)
   280  	if err != nil {
   281  		return nil, err
   282  	}
   283  
   284  	var (
   285  		batchSize int32
   286  		command   bson.Raw
   287  		comment   bson.Raw
   288  		maxTime   time.Duration
   289  	)
   290  
   291  	opts := options.RunCmd()
   292  
   293  	elems, _ := operation.Arguments.Elements()
   294  	for _, elem := range elems {
   295  		key := elem.Key()
   296  		val := elem.Value()
   297  
   298  		switch key {
   299  		case "batchSize":
   300  			batchSize = val.Int32()
   301  		case "command":
   302  			command = val.Document()
   303  		case "commandName":
   304  			// This is only necessary for languages that cannot
   305  			// preserve key order in the command document, so we can
   306  			// ignore it.
   307  		case "comment":
   308  			comment = val.Document()
   309  		case "maxTimeMS":
   310  			maxTime = time.Duration(val.AsInt64()) * time.Millisecond
   311  		case "cursorTimeout":
   312  			return nil, newSkipTestError("cursorTimeout not supported")
   313  		case "timeoutMode":
   314  			return nil, newSkipTestError("timeoutMode not supported")
   315  		default:
   316  			return nil, fmt.Errorf("unrecognized runCursorCommand option: %q", key)
   317  		}
   318  	}
   319  
   320  	if command == nil {
   321  		return nil, newMissingArgumentError("command")
   322  	}
   323  
   324  	cursor, err := db.RunCommandCursor(ctx, command, opts)
   325  	if err != nil {
   326  		return newErrorResult(err), nil
   327  	}
   328  
   329  	if batchSize > 0 {
   330  		cursor.SetBatchSize(batchSize)
   331  	}
   332  
   333  	if maxTime > 0 {
   334  		cursor.SetMaxTime(maxTime)
   335  	}
   336  
   337  	if len(comment) > 0 {
   338  		cursor.SetComment(comment)
   339  	}
   340  
   341  	// When executing the provided command, the test runner MUST fully
   342  	// iterate the cursor. This will ensure consistent behavior between
   343  	// drivers that eagerly create a server-side cursor and those that do
   344  	// so lazily when iteration begins.
   345  	var docs []bson.Raw
   346  	if err := cursor.All(ctx, &docs); err != nil {
   347  		return newErrorResult(err), nil
   348  	}
   349  
   350  	return newCursorResult(docs), nil
   351  }
   352  
   353  // executeCreateRunCursorCommand proxies the database's runCursorCommand method
   354  // and supports the same arguments and options.
   355  func executeCreateRunCursorCommand(ctx context.Context, operation *operation) (*operationResult, error) {
   356  	db, err := entities(ctx).database(operation.Object)
   357  	if err != nil {
   358  		return nil, err
   359  	}
   360  
   361  	var (
   362  		batchSize int32
   363  		command   bson.Raw
   364  	)
   365  
   366  	opts := options.RunCmd()
   367  
   368  	elems, _ := operation.Arguments.Elements()
   369  	for _, elem := range elems {
   370  		key := elem.Key()
   371  		val := elem.Value()
   372  
   373  		switch key {
   374  		case "batchSize":
   375  			batchSize = val.Int32()
   376  		case "command":
   377  			command = val.Document()
   378  		case "commandName":
   379  			// This is only necessary for languages that cannot
   380  			// preserve key order in the command document, so we can
   381  			// ignore it.
   382  		case "cursorType":
   383  			return nil, newSkipTestError("cursorType not supported")
   384  		case "timeoutMode":
   385  			return nil, newSkipTestError("timeoutMode not supported")
   386  		default:
   387  			return nil, fmt.Errorf("unrecognized createRunCursorCommand option: %q", key)
   388  		}
   389  	}
   390  
   391  	if command == nil {
   392  		return nil, newMissingArgumentError("command")
   393  	}
   394  
   395  	// Test runners MUST ensure that the server-side cursor is created (i.e.
   396  	// the command document has executed) as part of this operation.
   397  	cursor, err := db.RunCommandCursor(ctx, command, opts)
   398  	if err != nil {
   399  		return newErrorResult(err), nil
   400  	}
   401  
   402  	if batchSize > 0 {
   403  		cursor.SetBatchSize(batchSize)
   404  	}
   405  
   406  	if cursorID := operation.ResultEntityID; cursorID != nil {
   407  		err := entities(ctx).addCursorEntity(*cursorID, cursor)
   408  		if err != nil {
   409  			return nil, fmt.Errorf("failed to store result as cursor entity: %w", err)
   410  		}
   411  	}
   412  
   413  	return newEmptyResult(), nil
   414  }
   415  

View as plain text