...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/operation/abort_transaction.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/operation

     1  // Copyright (C) MongoDB, Inc. 2019-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package operation
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  
    13  	"go.mongodb.org/mongo-driver/event"
    14  	"go.mongodb.org/mongo-driver/internal/driverutil"
    15  	"go.mongodb.org/mongo-driver/mongo/description"
    16  	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    17  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    18  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    19  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    20  )
    21  
    22  // AbortTransaction performs an abortTransaction operation.
    23  type AbortTransaction struct {
    24  	recoveryToken bsoncore.Document
    25  	session       *session.Client
    26  	clock         *session.ClusterClock
    27  	collection    string
    28  	monitor       *event.CommandMonitor
    29  	crypt         driver.Crypt
    30  	database      string
    31  	deployment    driver.Deployment
    32  	selector      description.ServerSelector
    33  	writeConcern  *writeconcern.WriteConcern
    34  	retry         *driver.RetryMode
    35  	serverAPI     *driver.ServerAPIOptions
    36  }
    37  
    38  // NewAbortTransaction constructs and returns a new AbortTransaction.
    39  func NewAbortTransaction() *AbortTransaction {
    40  	return &AbortTransaction{}
    41  }
    42  
    43  func (at *AbortTransaction) processResponse(driver.ResponseInfo) error {
    44  	var err error
    45  	return err
    46  }
    47  
    48  // Execute runs this operations and returns an error if the operation did not execute successfully.
    49  func (at *AbortTransaction) Execute(ctx context.Context) error {
    50  	if at.deployment == nil {
    51  		return errors.New("the AbortTransaction operation must have a Deployment set before Execute can be called")
    52  	}
    53  
    54  	return driver.Operation{
    55  		CommandFn:         at.command,
    56  		ProcessResponseFn: at.processResponse,
    57  		RetryMode:         at.retry,
    58  		Type:              driver.Write,
    59  		Client:            at.session,
    60  		Clock:             at.clock,
    61  		CommandMonitor:    at.monitor,
    62  		Crypt:             at.crypt,
    63  		Database:          at.database,
    64  		Deployment:        at.deployment,
    65  		Selector:          at.selector,
    66  		WriteConcern:      at.writeConcern,
    67  		ServerAPI:         at.serverAPI,
    68  		Name:              driverutil.AbortTransactionOp,
    69  	}.Execute(ctx)
    70  
    71  }
    72  
    73  func (at *AbortTransaction) command(dst []byte, _ description.SelectedServer) ([]byte, error) {
    74  
    75  	dst = bsoncore.AppendInt32Element(dst, "abortTransaction", 1)
    76  	if at.recoveryToken != nil {
    77  		dst = bsoncore.AppendDocumentElement(dst, "recoveryToken", at.recoveryToken)
    78  	}
    79  	return dst, nil
    80  }
    81  
    82  // RecoveryToken sets the recovery token to use when committing or aborting a sharded transaction.
    83  func (at *AbortTransaction) RecoveryToken(recoveryToken bsoncore.Document) *AbortTransaction {
    84  	if at == nil {
    85  		at = new(AbortTransaction)
    86  	}
    87  
    88  	at.recoveryToken = recoveryToken
    89  	return at
    90  }
    91  
    92  // Session sets the session for this operation.
    93  func (at *AbortTransaction) Session(session *session.Client) *AbortTransaction {
    94  	if at == nil {
    95  		at = new(AbortTransaction)
    96  	}
    97  
    98  	at.session = session
    99  	return at
   100  }
   101  
   102  // ClusterClock sets the cluster clock for this operation.
   103  func (at *AbortTransaction) ClusterClock(clock *session.ClusterClock) *AbortTransaction {
   104  	if at == nil {
   105  		at = new(AbortTransaction)
   106  	}
   107  
   108  	at.clock = clock
   109  	return at
   110  }
   111  
   112  // Collection sets the collection that this command will run against.
   113  func (at *AbortTransaction) Collection(collection string) *AbortTransaction {
   114  	if at == nil {
   115  		at = new(AbortTransaction)
   116  	}
   117  
   118  	at.collection = collection
   119  	return at
   120  }
   121  
   122  // CommandMonitor sets the monitor to use for APM events.
   123  func (at *AbortTransaction) CommandMonitor(monitor *event.CommandMonitor) *AbortTransaction {
   124  	if at == nil {
   125  		at = new(AbortTransaction)
   126  	}
   127  
   128  	at.monitor = monitor
   129  	return at
   130  }
   131  
   132  // Crypt sets the Crypt object to use for automatic encryption and decryption.
   133  func (at *AbortTransaction) Crypt(crypt driver.Crypt) *AbortTransaction {
   134  	if at == nil {
   135  		at = new(AbortTransaction)
   136  	}
   137  
   138  	at.crypt = crypt
   139  	return at
   140  }
   141  
   142  // Database sets the database to run this operation against.
   143  func (at *AbortTransaction) Database(database string) *AbortTransaction {
   144  	if at == nil {
   145  		at = new(AbortTransaction)
   146  	}
   147  
   148  	at.database = database
   149  	return at
   150  }
   151  
   152  // Deployment sets the deployment to use for this operation.
   153  func (at *AbortTransaction) Deployment(deployment driver.Deployment) *AbortTransaction {
   154  	if at == nil {
   155  		at = new(AbortTransaction)
   156  	}
   157  
   158  	at.deployment = deployment
   159  	return at
   160  }
   161  
   162  // ServerSelector sets the selector used to retrieve a server.
   163  func (at *AbortTransaction) ServerSelector(selector description.ServerSelector) *AbortTransaction {
   164  	if at == nil {
   165  		at = new(AbortTransaction)
   166  	}
   167  
   168  	at.selector = selector
   169  	return at
   170  }
   171  
   172  // WriteConcern sets the write concern for this operation.
   173  func (at *AbortTransaction) WriteConcern(writeConcern *writeconcern.WriteConcern) *AbortTransaction {
   174  	if at == nil {
   175  		at = new(AbortTransaction)
   176  	}
   177  
   178  	at.writeConcern = writeConcern
   179  	return at
   180  }
   181  
   182  // Retry enables retryable mode for this operation. Retries are handled automatically in driver.Operation.Execute based
   183  // on how the operation is set.
   184  func (at *AbortTransaction) Retry(retry driver.RetryMode) *AbortTransaction {
   185  	if at == nil {
   186  		at = new(AbortTransaction)
   187  	}
   188  
   189  	at.retry = &retry
   190  	return at
   191  }
   192  
   193  // ServerAPI sets the server API version for this operation.
   194  func (at *AbortTransaction) ServerAPI(serverAPI *driver.ServerAPIOptions) *AbortTransaction {
   195  	if at == nil {
   196  		at = new(AbortTransaction)
   197  	}
   198  
   199  	at.serverAPI = serverAPI
   200  	return at
   201  }
   202  

View as plain text