...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/unified/session_operation_execution.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration/unified

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package unified
     8  
     9  import (
    10  	"context"
    11  	"fmt"
    12  
    13  	"go.mongodb.org/mongo-driver/bson"
    14  	"go.mongodb.org/mongo-driver/mongo"
    15  	"go.mongodb.org/mongo-driver/mongo/options"
    16  )
    17  
    18  func executeAbortTransaction(ctx context.Context, operation *operation) (*operationResult, error) {
    19  	sess, err := entities(ctx).session(operation.Object)
    20  	if err != nil {
    21  		return nil, err
    22  	}
    23  
    24  	// AbortTransaction takes no options, so the arguments doc must be nil or empty.
    25  	elems, _ := operation.Arguments.Elements()
    26  	if len(elems) > 0 {
    27  		return nil, fmt.Errorf("unrecognized abortTransaction options %v", operation.Arguments)
    28  	}
    29  
    30  	return newErrorResult(sess.AbortTransaction(ctx)), nil
    31  }
    32  
    33  func executeEndSession(ctx context.Context, operation *operation) error {
    34  	sess, err := entities(ctx).session(operation.Object)
    35  	if err != nil {
    36  		return err
    37  	}
    38  
    39  	// EnsSession takes no options, so the arguments doc must be nil or empty.
    40  	elems, _ := operation.Arguments.Elements()
    41  	if len(elems) > 0 {
    42  		return fmt.Errorf("unrecognized endSession options %v", operation.Arguments)
    43  	}
    44  
    45  	sess.EndSession(ctx)
    46  	return nil
    47  }
    48  
    49  func executeCommitTransaction(ctx context.Context, operation *operation) (*operationResult, error) {
    50  	sess, err := entities(ctx).session(operation.Object)
    51  	if err != nil {
    52  		return nil, err
    53  	}
    54  
    55  	// CommitTransaction takes no options, so the arguments doc must be nil or empty.
    56  	elems, _ := operation.Arguments.Elements()
    57  	if len(elems) > 0 {
    58  		return nil, fmt.Errorf("unrecognized commitTransaction options %v", operation.Arguments)
    59  	}
    60  
    61  	return newErrorResult(sess.CommitTransaction(ctx)), nil
    62  }
    63  
    64  func executeStartTransaction(ctx context.Context, operation *operation) (*operationResult, error) {
    65  	sess, err := entities(ctx).session(operation.Object)
    66  	if err != nil {
    67  		return nil, err
    68  	}
    69  
    70  	opts := options.Transaction()
    71  	if operation.Arguments != nil {
    72  		var temp transactionOptions
    73  		if err := bson.Unmarshal(operation.Arguments, &temp); err != nil {
    74  			return nil, fmt.Errorf("error unmarshalling arguments to transactionOptions: %v", err)
    75  		}
    76  
    77  		opts = temp.TransactionOptions
    78  	}
    79  
    80  	return newErrorResult(sess.StartTransaction(opts)), nil
    81  }
    82  
    83  func executeWithTransaction(ctx context.Context, op *operation, loopDone <-chan struct{}) error {
    84  	sess, err := entities(ctx).session(op.Object)
    85  	if err != nil {
    86  		return err
    87  	}
    88  
    89  	// Process the "callback" argument. This is an array of operation objects, each of which should be executed inside
    90  	// the transaction.
    91  	callback, err := op.Arguments.LookupErr("callback")
    92  	if err != nil {
    93  		return newMissingArgumentError("callback")
    94  	}
    95  	var operations []*operation
    96  	if err := callback.Unmarshal(&operations); err != nil {
    97  		return fmt.Errorf("error transforming callback option to slice of operations: %v", err)
    98  	}
    99  
   100  	// Remove the "callback" field and process the other options.
   101  	var temp transactionOptions
   102  	if err := bson.Unmarshal(removeFieldsFromDocument(op.Arguments, "callback"), &temp); err != nil {
   103  		return fmt.Errorf("error unmarshalling arguments to transactionOptions: %v", err)
   104  	}
   105  
   106  	_, err = sess.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) {
   107  		for idx, oper := range operations {
   108  			if err := oper.execute(ctx, loopDone); err != nil {
   109  				return nil, fmt.Errorf("error executing operation %q at index %d: %v", oper.Name, idx, err)
   110  			}
   111  		}
   112  		return nil, nil
   113  	}, temp.TransactionOptions)
   114  	return err
   115  }
   116  

View as plain text