...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/session/client_session.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver/session

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package session // import "go.mongodb.org/mongo-driver/x/mongo/driver/session"
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"time"
    13  
    14  	"go.mongodb.org/mongo-driver/bson"
    15  	"go.mongodb.org/mongo-driver/bson/primitive"
    16  	"go.mongodb.org/mongo-driver/internal/uuid"
    17  	"go.mongodb.org/mongo-driver/mongo/address"
    18  	"go.mongodb.org/mongo-driver/mongo/description"
    19  	"go.mongodb.org/mongo-driver/mongo/readconcern"
    20  	"go.mongodb.org/mongo-driver/mongo/readpref"
    21  	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    22  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    23  )
    24  
    25  // ErrSessionEnded is returned when a client session is used after a call to endSession().
    26  var ErrSessionEnded = errors.New("ended session was used")
    27  
    28  // ErrNoTransactStarted is returned if a transaction operation is called when no transaction has started.
    29  var ErrNoTransactStarted = errors.New("no transaction started")
    30  
    31  // ErrTransactInProgress is returned if startTransaction() is called when a transaction is in progress.
    32  var ErrTransactInProgress = errors.New("transaction already in progress")
    33  
    34  // ErrAbortAfterCommit is returned when abort is called after a commit.
    35  var ErrAbortAfterCommit = errors.New("cannot call abortTransaction after calling commitTransaction")
    36  
    37  // ErrAbortTwice is returned if abort is called after transaction is already aborted.
    38  var ErrAbortTwice = errors.New("cannot call abortTransaction twice")
    39  
    40  // ErrCommitAfterAbort is returned if commit is called after an abort.
    41  var ErrCommitAfterAbort = errors.New("cannot call commitTransaction after calling abortTransaction")
    42  
    43  // ErrUnackWCUnsupported is returned if an unacknowledged write concern is supported for a transaction.
    44  var ErrUnackWCUnsupported = errors.New("transactions do not support unacknowledged write concerns")
    45  
    46  // ErrSnapshotTransaction is returned if an transaction is started on a snapshot session.
    47  var ErrSnapshotTransaction = errors.New("transactions are not supported in snapshot sessions")
    48  
    49  // TransactionState indicates the state of the transactions FSM.
    50  type TransactionState uint8
    51  
    52  // Client Session states
    53  const (
    54  	None TransactionState = iota
    55  	Starting
    56  	InProgress
    57  	Committed
    58  	Aborted
    59  )
    60  
    61  // String implements the fmt.Stringer interface.
    62  func (s TransactionState) String() string {
    63  	switch s {
    64  	case None:
    65  		return "none"
    66  	case Starting:
    67  		return "starting"
    68  	case InProgress:
    69  		return "in progress"
    70  	case Committed:
    71  		return "committed"
    72  	case Aborted:
    73  		return "aborted"
    74  	default:
    75  		return "unknown"
    76  	}
    77  }
    78  
    79  // LoadBalancedTransactionConnection represents a connection that's pinned by a ClientSession because it's being used
    80  // to execute a transaction when running against a load balancer. This interface is a copy of driver.PinnedConnection
    81  // and exists to be able to pin transactions to a connection without causing an import cycle.
    82  type LoadBalancedTransactionConnection interface {
    83  	// Functions copied over from driver.Connection.
    84  	WriteWireMessage(context.Context, []byte) error
    85  	ReadWireMessage(ctx context.Context) ([]byte, error)
    86  	Description() description.Server
    87  	Close() error
    88  	ID() string
    89  	ServerConnectionID() *int64
    90  	DriverConnectionID() uint64 // TODO(GODRIVER-2824): change type to int64.
    91  	Address() address.Address
    92  	Stale() bool
    93  
    94  	// Functions copied over from driver.PinnedConnection that are not part of Connection or Expirable.
    95  	PinToCursor() error
    96  	PinToTransaction() error
    97  	UnpinFromCursor() error
    98  	UnpinFromTransaction() error
    99  }
   100  
   101  // Client is a session for clients to run commands.
   102  type Client struct {
   103  	*Server
   104  	ClientID       uuid.UUID
   105  	ClusterTime    bson.Raw
   106  	Consistent     bool // causal consistency
   107  	OperationTime  *primitive.Timestamp
   108  	IsImplicit     bool
   109  	Terminated     bool
   110  	RetryingCommit bool
   111  	Committing     bool
   112  	Aborting       bool
   113  	RetryWrite     bool
   114  	RetryRead      bool
   115  	Snapshot       bool
   116  
   117  	// options for the current transaction
   118  	// most recently set by transactionopt
   119  	CurrentRc  *readconcern.ReadConcern
   120  	CurrentRp  *readpref.ReadPref
   121  	CurrentWc  *writeconcern.WriteConcern
   122  	CurrentMct *time.Duration
   123  
   124  	// default transaction options
   125  	transactionRc            *readconcern.ReadConcern
   126  	transactionRp            *readpref.ReadPref
   127  	transactionWc            *writeconcern.WriteConcern
   128  	transactionMaxCommitTime *time.Duration
   129  
   130  	pool             *Pool
   131  	TransactionState TransactionState
   132  	PinnedServer     *description.Server
   133  	RecoveryToken    bson.Raw
   134  	PinnedConnection LoadBalancedTransactionConnection
   135  	SnapshotTime     *primitive.Timestamp
   136  }
   137  
   138  func getClusterTime(clusterTime bson.Raw) (uint32, uint32) {
   139  	if clusterTime == nil {
   140  		return 0, 0
   141  	}
   142  
   143  	clusterTimeVal, err := clusterTime.LookupErr("$clusterTime")
   144  	if err != nil {
   145  		return 0, 0
   146  	}
   147  
   148  	timestampVal, err := bson.Raw(clusterTimeVal.Value).LookupErr("clusterTime")
   149  	if err != nil {
   150  		return 0, 0
   151  	}
   152  
   153  	return timestampVal.Timestamp()
   154  }
   155  
   156  // MaxClusterTime compares 2 clusterTime documents and returns the document representing the highest cluster time.
   157  func MaxClusterTime(ct1, ct2 bson.Raw) bson.Raw {
   158  	epoch1, ord1 := getClusterTime(ct1)
   159  	epoch2, ord2 := getClusterTime(ct2)
   160  
   161  	if epoch1 > epoch2 {
   162  		return ct1
   163  	} else if epoch1 < epoch2 {
   164  		return ct2
   165  	} else if ord1 > ord2 {
   166  		return ct1
   167  	} else if ord1 < ord2 {
   168  		return ct2
   169  	}
   170  
   171  	return ct1
   172  }
   173  
   174  // NewImplicitClientSession creates a new implicit client-side session.
   175  func NewImplicitClientSession(pool *Pool, clientID uuid.UUID) *Client {
   176  	// Server-side session checkout for implicit sessions is deferred until after checking out a
   177  	// connection, so don't check out a server-side session right now. This will limit the number of
   178  	// implicit sessions to no greater than an application's maxPoolSize.
   179  
   180  	return &Client{
   181  		pool:       pool,
   182  		ClientID:   clientID,
   183  		IsImplicit: true,
   184  	}
   185  }
   186  
   187  // NewClientSession creates a new explicit client-side session.
   188  func NewClientSession(pool *Pool, clientID uuid.UUID, opts ...*ClientOptions) (*Client, error) {
   189  	c := &Client{
   190  		pool:     pool,
   191  		ClientID: clientID,
   192  	}
   193  
   194  	mergedOpts := mergeClientOptions(opts...)
   195  	if mergedOpts.DefaultReadPreference != nil {
   196  		c.transactionRp = mergedOpts.DefaultReadPreference
   197  	}
   198  	if mergedOpts.DefaultReadConcern != nil {
   199  		c.transactionRc = mergedOpts.DefaultReadConcern
   200  	}
   201  	if mergedOpts.DefaultWriteConcern != nil {
   202  		c.transactionWc = mergedOpts.DefaultWriteConcern
   203  	}
   204  	if mergedOpts.DefaultMaxCommitTime != nil {
   205  		c.transactionMaxCommitTime = mergedOpts.DefaultMaxCommitTime
   206  	}
   207  	if mergedOpts.Snapshot != nil {
   208  		c.Snapshot = *mergedOpts.Snapshot
   209  	}
   210  
   211  	// For explicit sessions, the default for causalConsistency is true, unless Snapshot is
   212  	// enabled, then it's false. Set the default and then allow any explicit causalConsistency
   213  	// setting to override it.
   214  	c.Consistent = !c.Snapshot
   215  	if mergedOpts.CausalConsistency != nil {
   216  		c.Consistent = *mergedOpts.CausalConsistency
   217  	}
   218  
   219  	if c.Consistent && c.Snapshot {
   220  		return nil, errors.New("causal consistency and snapshot cannot both be set for a session")
   221  	}
   222  
   223  	if err := c.SetServer(); err != nil {
   224  		return nil, err
   225  	}
   226  
   227  	return c, nil
   228  }
   229  
   230  // SetServer will check out a session from the client session pool.
   231  func (c *Client) SetServer() error {
   232  	var err error
   233  	c.Server, err = c.pool.GetSession()
   234  	return err
   235  }
   236  
   237  // AdvanceClusterTime updates the session's cluster time.
   238  func (c *Client) AdvanceClusterTime(clusterTime bson.Raw) error {
   239  	if c.Terminated {
   240  		return ErrSessionEnded
   241  	}
   242  	c.ClusterTime = MaxClusterTime(c.ClusterTime, clusterTime)
   243  	return nil
   244  }
   245  
   246  // AdvanceOperationTime updates the session's operation time.
   247  func (c *Client) AdvanceOperationTime(opTime *primitive.Timestamp) error {
   248  	if c.Terminated {
   249  		return ErrSessionEnded
   250  	}
   251  
   252  	if c.OperationTime == nil {
   253  		c.OperationTime = opTime
   254  		return nil
   255  	}
   256  
   257  	if opTime.T > c.OperationTime.T {
   258  		c.OperationTime = opTime
   259  	} else if (opTime.T == c.OperationTime.T) && (opTime.I > c.OperationTime.I) {
   260  		c.OperationTime = opTime
   261  	}
   262  
   263  	return nil
   264  }
   265  
   266  // UpdateUseTime sets the session's last used time to the current time. This must be called whenever the session is
   267  // used to send a command to the server to ensure that the session is not prematurely marked expired in the driver's
   268  // session pool. If the session has already been ended, this method will return ErrSessionEnded.
   269  func (c *Client) UpdateUseTime() error {
   270  	if c.Terminated {
   271  		return ErrSessionEnded
   272  	}
   273  	c.updateUseTime()
   274  	return nil
   275  }
   276  
   277  // UpdateRecoveryToken updates the session's recovery token from the server response.
   278  func (c *Client) UpdateRecoveryToken(response bson.Raw) {
   279  	if c == nil {
   280  		return
   281  	}
   282  
   283  	token, err := response.LookupErr("recoveryToken")
   284  	if err != nil {
   285  		return
   286  	}
   287  
   288  	c.RecoveryToken = token.Document()
   289  }
   290  
   291  // UpdateSnapshotTime updates the session's value for the atClusterTime field of ReadConcern.
   292  func (c *Client) UpdateSnapshotTime(response bsoncore.Document) {
   293  	if c == nil {
   294  		return
   295  	}
   296  
   297  	subDoc := response
   298  	if cur, ok := response.Lookup("cursor").DocumentOK(); ok {
   299  		subDoc = cur
   300  	}
   301  
   302  	ssTimeElem, err := subDoc.LookupErr("atClusterTime")
   303  	if err != nil {
   304  		// atClusterTime not included by the server
   305  		return
   306  	}
   307  
   308  	t, i := ssTimeElem.Timestamp()
   309  	c.SnapshotTime = &primitive.Timestamp{
   310  		T: t,
   311  		I: i,
   312  	}
   313  }
   314  
   315  // ClearPinnedResources clears the pinned server and/or connection associated with the session.
   316  func (c *Client) ClearPinnedResources() error {
   317  	if c == nil {
   318  		return nil
   319  	}
   320  
   321  	c.PinnedServer = nil
   322  	if c.PinnedConnection != nil {
   323  		if err := c.PinnedConnection.UnpinFromTransaction(); err != nil {
   324  			return err
   325  		}
   326  		if err := c.PinnedConnection.Close(); err != nil {
   327  			return err
   328  		}
   329  	}
   330  	c.PinnedConnection = nil
   331  	return nil
   332  }
   333  
   334  // unpinConnection gracefully unpins the connection associated with the session
   335  // if there is one. This is done via the pinned connection's
   336  // UnpinFromTransaction function.
   337  func (c *Client) unpinConnection() error {
   338  	if c == nil || c.PinnedConnection == nil {
   339  		return nil
   340  	}
   341  
   342  	err := c.PinnedConnection.UnpinFromTransaction()
   343  	closeErr := c.PinnedConnection.Close()
   344  	if err == nil && closeErr != nil {
   345  		err = closeErr
   346  	}
   347  	c.PinnedConnection = nil
   348  	return err
   349  }
   350  
   351  // EndSession ends the session.
   352  func (c *Client) EndSession() {
   353  	if c.Terminated {
   354  		return
   355  	}
   356  	c.Terminated = true
   357  
   358  	// Ignore the error when unpinning the connection because we can't do
   359  	// anything about it if it doesn't work. Typically the only errors that can
   360  	// happen here indicate that something went wrong with the connection state,
   361  	// like it wasn't marked as pinned or attempted to return to the wrong pool.
   362  	_ = c.unpinConnection()
   363  	c.pool.ReturnSession(c.Server)
   364  }
   365  
   366  // TransactionInProgress returns true if the client session is in an active transaction.
   367  func (c *Client) TransactionInProgress() bool {
   368  	return c.TransactionState == InProgress
   369  }
   370  
   371  // TransactionStarting returns true if the client session is starting a transaction.
   372  func (c *Client) TransactionStarting() bool {
   373  	return c.TransactionState == Starting
   374  }
   375  
   376  // TransactionRunning returns true if the client session has started the transaction
   377  // and it hasn't been committed or aborted
   378  func (c *Client) TransactionRunning() bool {
   379  	return c != nil && (c.TransactionState == Starting || c.TransactionState == InProgress)
   380  }
   381  
   382  // TransactionCommitted returns true of the client session just committed a transaction.
   383  func (c *Client) TransactionCommitted() bool {
   384  	return c.TransactionState == Committed
   385  }
   386  
   387  // CheckStartTransaction checks to see if allowed to start transaction and returns
   388  // an error if not allowed
   389  func (c *Client) CheckStartTransaction() error {
   390  	if c.TransactionState == InProgress || c.TransactionState == Starting {
   391  		return ErrTransactInProgress
   392  	}
   393  	if c.Snapshot {
   394  		return ErrSnapshotTransaction
   395  	}
   396  	return nil
   397  }
   398  
   399  // StartTransaction initializes the transaction options and advances the state machine.
   400  // It does not contact the server to start the transaction.
   401  func (c *Client) StartTransaction(opts *TransactionOptions) error {
   402  	err := c.CheckStartTransaction()
   403  	if err != nil {
   404  		return err
   405  	}
   406  
   407  	c.IncrementTxnNumber()
   408  	c.RetryingCommit = false
   409  
   410  	if opts != nil {
   411  		c.CurrentRc = opts.ReadConcern
   412  		c.CurrentRp = opts.ReadPreference
   413  		c.CurrentWc = opts.WriteConcern
   414  		c.CurrentMct = opts.MaxCommitTime
   415  	}
   416  
   417  	if c.CurrentRc == nil {
   418  		c.CurrentRc = c.transactionRc
   419  	}
   420  
   421  	if c.CurrentRp == nil {
   422  		c.CurrentRp = c.transactionRp
   423  	}
   424  
   425  	if c.CurrentWc == nil {
   426  		c.CurrentWc = c.transactionWc
   427  	}
   428  
   429  	if c.CurrentMct == nil {
   430  		c.CurrentMct = c.transactionMaxCommitTime
   431  	}
   432  
   433  	if !writeconcern.AckWrite(c.CurrentWc) {
   434  		_ = c.clearTransactionOpts()
   435  		return ErrUnackWCUnsupported
   436  	}
   437  
   438  	c.TransactionState = Starting
   439  	return c.ClearPinnedResources()
   440  }
   441  
   442  // CheckCommitTransaction checks to see if allowed to commit transaction and returns
   443  // an error if not allowed.
   444  func (c *Client) CheckCommitTransaction() error {
   445  	if c.TransactionState == None {
   446  		return ErrNoTransactStarted
   447  	} else if c.TransactionState == Aborted {
   448  		return ErrCommitAfterAbort
   449  	}
   450  	return nil
   451  }
   452  
   453  // CommitTransaction updates the state for a successfully committed transaction and returns
   454  // an error if not permissible.  It does not actually perform the commit.
   455  func (c *Client) CommitTransaction() error {
   456  	err := c.CheckCommitTransaction()
   457  	if err != nil {
   458  		return err
   459  	}
   460  	c.TransactionState = Committed
   461  	return nil
   462  }
   463  
   464  // UpdateCommitTransactionWriteConcern will set the write concern to majority and potentially set  a
   465  // w timeout of 10 seconds. This should be called after a commit transaction operation fails with a
   466  // retryable error or after a successful commit transaction operation.
   467  func (c *Client) UpdateCommitTransactionWriteConcern() {
   468  	wc := c.CurrentWc
   469  	timeout := 10 * time.Second
   470  	if wc != nil && wc.GetWTimeout() != 0 {
   471  		timeout = wc.GetWTimeout()
   472  	}
   473  	c.CurrentWc = wc.WithOptions(writeconcern.WMajority(), writeconcern.WTimeout(timeout))
   474  }
   475  
   476  // CheckAbortTransaction checks to see if allowed to abort transaction and returns
   477  // an error if not allowed.
   478  func (c *Client) CheckAbortTransaction() error {
   479  	if c.TransactionState == None {
   480  		return ErrNoTransactStarted
   481  	} else if c.TransactionState == Committed {
   482  		return ErrAbortAfterCommit
   483  	} else if c.TransactionState == Aborted {
   484  		return ErrAbortTwice
   485  	}
   486  	return nil
   487  }
   488  
   489  // AbortTransaction updates the state for a successfully aborted transaction and returns
   490  // an error if not permissible.  It does not actually perform the abort.
   491  func (c *Client) AbortTransaction() error {
   492  	err := c.CheckAbortTransaction()
   493  	if err != nil {
   494  		return err
   495  	}
   496  	c.TransactionState = Aborted
   497  	return c.clearTransactionOpts()
   498  }
   499  
   500  // StartCommand updates the session's internal state at the beginning of an operation. This must be called before
   501  // server selection is done for the operation as the session's state can impact the result of that process.
   502  func (c *Client) StartCommand() error {
   503  	if c == nil {
   504  		return nil
   505  	}
   506  
   507  	// If we're executing the first operation using this session after a transaction, we must ensure that the session
   508  	// is not pinned to any resources.
   509  	if !c.TransactionRunning() && !c.Committing && !c.Aborting {
   510  		return c.ClearPinnedResources()
   511  	}
   512  	return nil
   513  }
   514  
   515  // ApplyCommand advances the state machine upon command execution. This must be called after server selection is
   516  // complete.
   517  func (c *Client) ApplyCommand(desc description.Server) error {
   518  	if c.Committing {
   519  		// Do not change state if committing after already committed
   520  		return nil
   521  	}
   522  	if c.TransactionState == Starting {
   523  		c.TransactionState = InProgress
   524  		// If this is in a transaction and the server is a mongos, pin it
   525  		if desc.Kind == description.Mongos {
   526  			c.PinnedServer = &desc
   527  		}
   528  	} else if c.TransactionState == Committed || c.TransactionState == Aborted {
   529  		c.TransactionState = None
   530  		return c.clearTransactionOpts()
   531  	}
   532  
   533  	return nil
   534  }
   535  
   536  func (c *Client) clearTransactionOpts() error {
   537  	c.RetryingCommit = false
   538  	c.Aborting = false
   539  	c.Committing = false
   540  	c.CurrentWc = nil
   541  	c.CurrentRp = nil
   542  	c.CurrentRc = nil
   543  	c.RecoveryToken = nil
   544  
   545  	return c.ClearPinnedResources()
   546  }
   547  

View as plain text