...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/operation/commit_transaction.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/operation

     1  // Copyright (C) MongoDB, Inc. 2019-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package operation
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"time"
    13  
    14  	"go.mongodb.org/mongo-driver/event"
    15  	"go.mongodb.org/mongo-driver/internal/driverutil"
    16  	"go.mongodb.org/mongo-driver/mongo/description"
    17  	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    18  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    19  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    20  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    21  )
    22  
    23  // CommitTransaction attempts to commit a transaction.
    24  type CommitTransaction struct {
    25  	maxTime       *time.Duration
    26  	recoveryToken bsoncore.Document
    27  	session       *session.Client
    28  	clock         *session.ClusterClock
    29  	monitor       *event.CommandMonitor
    30  	crypt         driver.Crypt
    31  	database      string
    32  	deployment    driver.Deployment
    33  	selector      description.ServerSelector
    34  	writeConcern  *writeconcern.WriteConcern
    35  	retry         *driver.RetryMode
    36  	serverAPI     *driver.ServerAPIOptions
    37  }
    38  
    39  // NewCommitTransaction constructs and returns a new CommitTransaction.
    40  func NewCommitTransaction() *CommitTransaction {
    41  	return &CommitTransaction{}
    42  }
    43  
    44  func (ct *CommitTransaction) processResponse(driver.ResponseInfo) error {
    45  	var err error
    46  	return err
    47  }
    48  
    49  // Execute runs this operations and returns an error if the operation did not execute successfully.
    50  func (ct *CommitTransaction) Execute(ctx context.Context) error {
    51  	if ct.deployment == nil {
    52  		return errors.New("the CommitTransaction operation must have a Deployment set before Execute can be called")
    53  	}
    54  
    55  	return driver.Operation{
    56  		CommandFn:         ct.command,
    57  		ProcessResponseFn: ct.processResponse,
    58  		RetryMode:         ct.retry,
    59  		Type:              driver.Write,
    60  		Client:            ct.session,
    61  		Clock:             ct.clock,
    62  		CommandMonitor:    ct.monitor,
    63  		Crypt:             ct.crypt,
    64  		Database:          ct.database,
    65  		Deployment:        ct.deployment,
    66  		MaxTime:           ct.maxTime,
    67  		Selector:          ct.selector,
    68  		WriteConcern:      ct.writeConcern,
    69  		ServerAPI:         ct.serverAPI,
    70  		Name:              driverutil.CommitTransactionOp,
    71  	}.Execute(ctx)
    72  
    73  }
    74  
    75  func (ct *CommitTransaction) command(dst []byte, _ description.SelectedServer) ([]byte, error) {
    76  
    77  	dst = bsoncore.AppendInt32Element(dst, "commitTransaction", 1)
    78  	if ct.recoveryToken != nil {
    79  		dst = bsoncore.AppendDocumentElement(dst, "recoveryToken", ct.recoveryToken)
    80  	}
    81  	return dst, nil
    82  }
    83  
    84  // MaxTime specifies the maximum amount of time to allow the query to run on the server.
    85  func (ct *CommitTransaction) MaxTime(maxTime *time.Duration) *CommitTransaction {
    86  	if ct == nil {
    87  		ct = new(CommitTransaction)
    88  	}
    89  
    90  	ct.maxTime = maxTime
    91  	return ct
    92  }
    93  
    94  // RecoveryToken sets the recovery token to use when committing or aborting a sharded transaction.
    95  func (ct *CommitTransaction) RecoveryToken(recoveryToken bsoncore.Document) *CommitTransaction {
    96  	if ct == nil {
    97  		ct = new(CommitTransaction)
    98  	}
    99  
   100  	ct.recoveryToken = recoveryToken
   101  	return ct
   102  }
   103  
   104  // Session sets the session for this operation.
   105  func (ct *CommitTransaction) Session(session *session.Client) *CommitTransaction {
   106  	if ct == nil {
   107  		ct = new(CommitTransaction)
   108  	}
   109  
   110  	ct.session = session
   111  	return ct
   112  }
   113  
   114  // ClusterClock sets the cluster clock for this operation.
   115  func (ct *CommitTransaction) ClusterClock(clock *session.ClusterClock) *CommitTransaction {
   116  	if ct == nil {
   117  		ct = new(CommitTransaction)
   118  	}
   119  
   120  	ct.clock = clock
   121  	return ct
   122  }
   123  
   124  // CommandMonitor sets the monitor to use for APM events.
   125  func (ct *CommitTransaction) CommandMonitor(monitor *event.CommandMonitor) *CommitTransaction {
   126  	if ct == nil {
   127  		ct = new(CommitTransaction)
   128  	}
   129  
   130  	ct.monitor = monitor
   131  	return ct
   132  }
   133  
   134  // Crypt sets the Crypt object to use for automatic encryption and decryption.
   135  func (ct *CommitTransaction) Crypt(crypt driver.Crypt) *CommitTransaction {
   136  	if ct == nil {
   137  		ct = new(CommitTransaction)
   138  	}
   139  
   140  	ct.crypt = crypt
   141  	return ct
   142  }
   143  
   144  // Database sets the database to run this operation against.
   145  func (ct *CommitTransaction) Database(database string) *CommitTransaction {
   146  	if ct == nil {
   147  		ct = new(CommitTransaction)
   148  	}
   149  
   150  	ct.database = database
   151  	return ct
   152  }
   153  
   154  // Deployment sets the deployment to use for this operation.
   155  func (ct *CommitTransaction) Deployment(deployment driver.Deployment) *CommitTransaction {
   156  	if ct == nil {
   157  		ct = new(CommitTransaction)
   158  	}
   159  
   160  	ct.deployment = deployment
   161  	return ct
   162  }
   163  
   164  // ServerSelector sets the selector used to retrieve a server.
   165  func (ct *CommitTransaction) ServerSelector(selector description.ServerSelector) *CommitTransaction {
   166  	if ct == nil {
   167  		ct = new(CommitTransaction)
   168  	}
   169  
   170  	ct.selector = selector
   171  	return ct
   172  }
   173  
   174  // WriteConcern sets the write concern for this operation.
   175  func (ct *CommitTransaction) WriteConcern(writeConcern *writeconcern.WriteConcern) *CommitTransaction {
   176  	if ct == nil {
   177  		ct = new(CommitTransaction)
   178  	}
   179  
   180  	ct.writeConcern = writeConcern
   181  	return ct
   182  }
   183  
   184  // Retry enables retryable mode for this operation. Retries are handled automatically in driver.Operation.Execute based
   185  // on how the operation is set.
   186  func (ct *CommitTransaction) Retry(retry driver.RetryMode) *CommitTransaction {
   187  	if ct == nil {
   188  		ct = new(CommitTransaction)
   189  	}
   190  
   191  	ct.retry = &retry
   192  	return ct
   193  }
   194  
   195  // ServerAPI sets the server API version for this operation.
   196  func (ct *CommitTransaction) ServerAPI(serverAPI *driver.ServerAPIOptions) *CommitTransaction {
   197  	if ct == nil {
   198  		ct = new(CommitTransaction)
   199  	}
   200  
   201  	ct.serverAPI = serverAPI
   202  	return ct
   203  }
   204  

View as plain text