...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified/operation.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/unified

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package unified
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  	"time"
    13  
    14  	"go.mongodb.org/mongo-driver/bson"
    15  	"go.mongodb.org/mongo-driver/internal/csot"
    16  	"go.mongodb.org/mongo-driver/mongo"
    17  )
    18  
    19  type operation struct {
    20  	Name                 string         `bson:"name"`
    21  	Object               string         `bson:"object"`
    22  	Arguments            bson.Raw       `bson:"arguments"`
    23  	IgnoreResultAndError bool           `bson:"ignoreResultAndError"`
    24  	ExpectedError        *expectedError `bson:"expectError"`
    25  	ExpectedResult       *bson.RawValue `bson:"expectResult"`
    26  	ResultEntityID       *string        `bson:"saveResultAsEntity"`
    27  }
    28  
    29  // execute runs the operation and verifies the returned result and/or error. If the result needs to be saved as
    30  // an entity, it also updates the entityMap associated with ctx to do so.
    31  func (op *operation) execute(ctx context.Context, loopDone <-chan struct{}) error {
    32  	res, err := op.run(ctx, loopDone)
    33  	if err != nil {
    34  		return fmt.Errorf("execution failed: %v", err)
    35  	}
    36  
    37  	if op.IgnoreResultAndError {
    38  		return nil
    39  	}
    40  
    41  	if err := verifyOperationError(ctx, op.ExpectedError, res); err != nil {
    42  		return fmt.Errorf("error verification failed: %v", err)
    43  	}
    44  
    45  	if op.ExpectedResult != nil {
    46  		if err := verifyOperationResult(ctx, *op.ExpectedResult, res); err != nil {
    47  			return fmt.Errorf("result verification failed: %v", err)
    48  		}
    49  	}
    50  	return nil
    51  }
    52  
    53  // isCreateView will return true if the operation is to create a collection with a view.
    54  func (op *operation) isCreateView() (bool, error) {
    55  	if op.Name != "createCollection" {
    56  		return false, nil
    57  	}
    58  
    59  	elements, err := op.Arguments.Elements()
    60  	if err != nil {
    61  		return false, err
    62  	}
    63  
    64  	var has bool
    65  	for _, elem := range elements {
    66  		if elem.Key() == "viewOn" {
    67  			has = true
    68  			break
    69  		}
    70  	}
    71  	return has, nil
    72  }
    73  
    74  func (op *operation) run(ctx context.Context, loopDone <-chan struct{}) (*operationResult, error) {
    75  	if op.Object == "testRunner" {
    76  		// testRunner operations don't have results or expected errors, so we use newEmptyResult to fake a result.
    77  		return newEmptyResult(), executeTestRunnerOperation(ctx, op, loopDone)
    78  	}
    79  
    80  	// Special handling for the "session" field because it applies to all operations.
    81  	if id, ok := op.Arguments.Lookup("session").StringValueOK(); ok {
    82  		sess, err := entities(ctx).session(id)
    83  		if err != nil {
    84  			return nil, err
    85  		}
    86  		ctx = mongo.NewSessionContext(ctx, sess)
    87  
    88  		// Set op.Arguments to a new document that has the "session" field removed so individual operations do
    89  		// not have to account for it.
    90  		op.Arguments = removeFieldsFromDocument(op.Arguments, "session")
    91  	}
    92  
    93  	// Special handling for the "timeoutMS" field because it applies to (almost) all operations.
    94  	if tms, ok := op.Arguments.Lookup("timeoutMS").Int32OK(); ok {
    95  		timeout := time.Duration(tms) * time.Millisecond
    96  		newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, timeout)
    97  		// Redefine ctx to be the new timeout-derived context.
    98  		ctx = newCtx
    99  		// Cancel the timeout-derived context at the end of run to avoid a context leak.
   100  		defer cancelFunc()
   101  
   102  		// Set op.Arguments to a new document that has the "timeoutMS" field removed
   103  		// so individual operations do not have to account for it.
   104  		op.Arguments = removeFieldsFromDocument(op.Arguments, "timeoutMS")
   105  	}
   106  
   107  	switch op.Name {
   108  	// Session operations
   109  	case "abortTransaction":
   110  		return executeAbortTransaction(ctx, op)
   111  	case "commitTransaction":
   112  		return executeCommitTransaction(ctx, op)
   113  	case "endSession":
   114  		// The EndSession() method doesn't return a result, so we return a non-nil empty result.
   115  		return newEmptyResult(), executeEndSession(ctx, op)
   116  	case "startTransaction":
   117  		return executeStartTransaction(ctx, op)
   118  	case "withTransaction":
   119  		// executeWithTransaction internally verifies results/errors for each operation, so it doesn't return a result.
   120  		return newEmptyResult(), executeWithTransaction(ctx, op, loopDone)
   121  
   122  	// Client operations
   123  	case "createChangeStream":
   124  		return executeCreateChangeStream(ctx, op)
   125  	case "listDatabases":
   126  		return executeListDatabases(ctx, op, false)
   127  	case "listDatabaseNames":
   128  		return executeListDatabases(ctx, op, true)
   129  
   130  	// Database operations
   131  	case "createCollection":
   132  		return executeCreateCollection(ctx, op)
   133  	case "dropCollection":
   134  		return executeDropCollection(ctx, op)
   135  	case "listCollections":
   136  		return executeListCollections(ctx, op)
   137  	case "listCollectionNames":
   138  		return executeListCollectionNames(ctx, op)
   139  	case "runCommand":
   140  		return executeRunCommand(ctx, op)
   141  	case "runCursorCommand":
   142  		return executeRunCursorCommand(ctx, op)
   143  	case "createCommandCursor":
   144  		return executeCreateRunCursorCommand(ctx, op)
   145  
   146  	// Collection operations
   147  	case "aggregate":
   148  		return executeAggregate(ctx, op)
   149  	case "bulkWrite":
   150  		return executeBulkWrite(ctx, op)
   151  	case "countDocuments":
   152  		return executeCountDocuments(ctx, op)
   153  	case "createFindCursor":
   154  		return executeCreateFindCursor(ctx, op)
   155  	case "createIndex":
   156  		return executeCreateIndex(ctx, op)
   157  	case "createSearchIndex":
   158  		return executeCreateSearchIndex(ctx, op)
   159  	case "createSearchIndexes":
   160  		return executeCreateSearchIndexes(ctx, op)
   161  	case "deleteOne":
   162  		return executeDeleteOne(ctx, op)
   163  	case "deleteMany":
   164  		return executeDeleteMany(ctx, op)
   165  	case "distinct":
   166  		return executeDistinct(ctx, op)
   167  	case "dropIndex":
   168  		return executeDropIndex(ctx, op)
   169  	case "dropIndexes":
   170  		return executeDropIndexes(ctx, op)
   171  	case "dropSearchIndex":
   172  		return executeDropSearchIndex(ctx, op)
   173  	case "estimatedDocumentCount":
   174  		return executeEstimatedDocumentCount(ctx, op)
   175  	case "find":
   176  		return executeFind(ctx, op) // Can also be a GridFS operation
   177  	case "findOne":
   178  		return executeFindOne(ctx, op)
   179  	case "findOneAndDelete":
   180  		return executeFindOneAndDelete(ctx, op)
   181  	case "findOneAndReplace":
   182  		return executeFindOneAndReplace(ctx, op)
   183  	case "findOneAndUpdate":
   184  		return executeFindOneAndUpdate(ctx, op)
   185  	case "insertMany":
   186  		return executeInsertMany(ctx, op)
   187  	case "insertOne":
   188  		return executeInsertOne(ctx, op)
   189  	case "listIndexes":
   190  		return executeListIndexes(ctx, op)
   191  	case "listSearchIndexes":
   192  		return executeListSearchIndexes(ctx, op)
   193  	case "rename":
   194  		// "rename" can either target a collection or a GridFS bucket.
   195  		if _, err := entities(ctx).collection(op.Object); err == nil {
   196  			return executeRenameCollection(ctx, op)
   197  		}
   198  		if _, err := entities(ctx).gridFSBucket(op.Object); err == nil {
   199  			return executeBucketRename(ctx, op)
   200  		}
   201  		return nil, fmt.Errorf("failed to find a collection or GridFS bucket named %q", op.Object)
   202  	case "replaceOne":
   203  		return executeReplaceOne(ctx, op)
   204  	case "updateOne":
   205  		return executeUpdateOne(ctx, op)
   206  	case "updateMany":
   207  		return executeUpdateMany(ctx, op)
   208  	case "updateSearchIndex":
   209  		return executeUpdateSearchIndex(ctx, op)
   210  
   211  	// GridFS operations
   212  	case "delete":
   213  		return executeBucketDelete(ctx, op)
   214  	case "downloadByName":
   215  		return executeBucketDownloadByName(ctx, op)
   216  	case "download":
   217  		return executeBucketDownload(ctx, op)
   218  	case "drop":
   219  		return executeBucketDrop(ctx, op)
   220  	case "upload":
   221  		return executeBucketUpload(ctx, op)
   222  
   223  	// Cursor operations
   224  	case "close":
   225  		if cursor, err := entities(ctx).cursor(op.Object); err == nil {
   226  			_ = cursor.Close(ctx)
   227  
   228  			return newEmptyResult(), nil
   229  		}
   230  
   231  		if clientEntity, err := entities(ctx).client(op.Object); err == nil {
   232  			_ = clientEntity.disconnect(context.Background())
   233  
   234  			return newEmptyResult(), nil
   235  		}
   236  
   237  		return nil, fmt.Errorf("failed to find a cursor or client named %q", op.Object)
   238  	case "iterateOnce":
   239  		return executeIterateOnce(ctx, op)
   240  	case "iterateUntilDocumentOrError":
   241  		return executeIterateUntilDocumentOrError(ctx, op)
   242  
   243  	// CSFLE operations
   244  	case "createDataKey":
   245  		return executeCreateDataKey(ctx, op)
   246  	case "rewrapManyDataKey":
   247  		return executeRewrapManyDataKey(ctx, op)
   248  	case "removeKeyAltName":
   249  		return executeRemoveKeyAltName(ctx, op)
   250  	case "getKeys":
   251  		return executeGetKeys(ctx, op)
   252  	case "getKeyByAltName":
   253  		return executeGetKeyByAltName(ctx, op)
   254  	case "getKey":
   255  		return executeGetKey(ctx, op)
   256  	case "deleteKey":
   257  		return executeDeleteKey(ctx, op)
   258  	case "addKeyAltName":
   259  		return executeAddKeyAltName(ctx, op)
   260  
   261  	// Unsupported operations
   262  	case "count", "listIndexNames", "modifyCollection":
   263  		return nil, newSkipTestError(fmt.Sprintf("the %q operation is not supported", op.Name))
   264  	default:
   265  		return nil, fmt.Errorf("unrecognized entity operation %q", op.Name)
   266  	}
   267  }
   268  

View as plain text