...

Source file src/go.mongodb.org/mongo-driver/mongo/session.go

Documentation: go.mongodb.org/mongo-driver/mongo

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package mongo
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"time"
    13  
    14  	"go.mongodb.org/mongo-driver/bson"
    15  	"go.mongodb.org/mongo-driver/bson/primitive"
    16  	"go.mongodb.org/mongo-driver/mongo/description"
    17  	"go.mongodb.org/mongo-driver/mongo/options"
    18  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    19  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    20  	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
    21  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    22  )
    23  
    24  // ErrWrongClient is returned when a user attempts to pass in a session created by a different client than
    25  // the method call is using.
    26  var ErrWrongClient = errors.New("session was not created by this client")
    27  
    28  var withTransactionTimeout = 120 * time.Second
    29  
    30  // SessionContext combines the context.Context and mongo.Session interfaces. It should be used as the Context arguments
    31  // to operations that should be executed in a session.
    32  //
    33  // Implementations of SessionContext are not safe for concurrent use by multiple goroutines.
    34  //
    35  // There are two ways to create a SessionContext and use it in a session/transaction. The first is to use one of the
    36  // callback-based functions such as WithSession and UseSession. These functions create a SessionContext and pass it to
    37  // the provided callback. The other is to use NewSessionContext to explicitly create a SessionContext.
    38  type SessionContext interface {
    39  	context.Context
    40  	Session
    41  }
    42  
    43  type sessionContext struct {
    44  	context.Context
    45  	Session
    46  }
    47  
    48  type sessionKey struct {
    49  }
    50  
    51  // NewSessionContext creates a new SessionContext associated with the given Context and Session parameters.
    52  func NewSessionContext(ctx context.Context, sess Session) SessionContext {
    53  	return &sessionContext{
    54  		Context: context.WithValue(ctx, sessionKey{}, sess),
    55  		Session: sess,
    56  	}
    57  }
    58  
    59  // SessionFromContext extracts the mongo.Session object stored in a Context. This can be used on a SessionContext that
    60  // was created implicitly through one of the callback-based session APIs or explicitly by calling NewSessionContext. If
    61  // there is no Session stored in the provided Context, nil is returned.
    62  func SessionFromContext(ctx context.Context) Session {
    63  	val := ctx.Value(sessionKey{})
    64  	if val == nil {
    65  		return nil
    66  	}
    67  
    68  	sess, ok := val.(Session)
    69  	if !ok {
    70  		return nil
    71  	}
    72  
    73  	return sess
    74  }
    75  
    76  // Session is an interface that represents a MongoDB logical session. Sessions can be used to enable causal consistency
    77  // for a group of operations or to execute operations in an ACID transaction. A new Session can be created from a Client
    78  // instance. A Session created from a Client must only be used to execute operations using that Client or a Database or
    79  // Collection created from that Client. Custom implementations of this interface should not be used in production. For
    80  // more information about sessions, and their use cases, see
    81  // https://www.mongodb.com/docs/manual/reference/server-sessions/,
    82  // https://www.mongodb.com/docs/manual/core/read-isolation-consistency-recency/#causal-consistency, and
    83  // https://www.mongodb.com/docs/manual/core/transactions/.
    84  //
    85  // Implementations of Session are not safe for concurrent use by multiple goroutines.
    86  type Session interface {
    87  	// StartTransaction starts a new transaction, configured with the given options, on this
    88  	// session. This method returns an error if there is already a transaction in-progress for this
    89  	// session.
    90  	StartTransaction(...*options.TransactionOptions) error
    91  
    92  	// AbortTransaction aborts the active transaction for this session. This method returns an error
    93  	// if there is no active transaction for this session or if the transaction has been committed
    94  	// or aborted.
    95  	AbortTransaction(context.Context) error
    96  
    97  	// CommitTransaction commits the active transaction for this session. This method returns an
    98  	// error if there is no active transaction for this session or if the transaction has been
    99  	// aborted.
   100  	CommitTransaction(context.Context) error
   101  
   102  	// WithTransaction starts a transaction on this session and runs the fn callback. Errors with
   103  	// the TransientTransactionError and UnknownTransactionCommitResult labels are retried for up to
   104  	// 120 seconds. Inside the callback, the SessionContext must be used as the Context parameter
   105  	// for any operations that should be part of the transaction. If the ctx parameter already has a
   106  	// Session attached to it, it will be replaced by this session. The fn callback may be run
   107  	// multiple times during WithTransaction due to retry attempts, so it must be idempotent.
   108  	// Non-retryable operation errors or any operation errors that occur after the timeout expires
   109  	// will be returned without retrying. If the callback fails, the driver will call
   110  	// AbortTransaction. Because this method must succeed to ensure that server-side resources are
   111  	// properly cleaned up, context deadlines and cancellations will not be respected during this
   112  	// call. For a usage example, see the Client.StartSession method documentation.
   113  	WithTransaction(ctx context.Context, fn func(ctx SessionContext) (interface{}, error),
   114  		opts ...*options.TransactionOptions) (interface{}, error)
   115  
   116  	// EndSession aborts any existing transactions and close the session.
   117  	EndSession(context.Context)
   118  
   119  	// ClusterTime returns the current cluster time document associated with the session.
   120  	ClusterTime() bson.Raw
   121  
   122  	// OperationTime returns the current operation time document associated with the session.
   123  	OperationTime() *primitive.Timestamp
   124  
   125  	// Client the Client associated with the session.
   126  	Client() *Client
   127  
   128  	// ID returns the current ID document associated with the session. The ID document is in the
   129  	// form {"id": <BSON binary value>}.
   130  	ID() bson.Raw
   131  
   132  	// AdvanceClusterTime advances the cluster time for a session. This method returns an error if
   133  	// the session has ended.
   134  	AdvanceClusterTime(bson.Raw) error
   135  
   136  	// AdvanceOperationTime advances the operation time for a session. This method returns an error
   137  	// if the session has ended.
   138  	AdvanceOperationTime(*primitive.Timestamp) error
   139  
   140  	session()
   141  }
   142  
   143  // XSession is an unstable interface for internal use only.
   144  //
   145  // Deprecated: This interface is unstable because it provides access to a session.Client object, which exists in the
   146  // "x" package. It should not be used by applications and may be changed or removed in any release.
   147  type XSession interface {
   148  	ClientSession() *session.Client
   149  }
   150  
   151  // sessionImpl represents a set of sequential operations executed by an application that are related in some way.
   152  type sessionImpl struct {
   153  	clientSession       *session.Client
   154  	client              *Client
   155  	deployment          driver.Deployment
   156  	didCommitAfterStart bool // true if commit was called after start with no other operations
   157  }
   158  
   159  var _ Session = &sessionImpl{}
   160  var _ XSession = &sessionImpl{}
   161  
   162  // ClientSession implements the XSession interface.
   163  func (s *sessionImpl) ClientSession() *session.Client {
   164  	return s.clientSession
   165  }
   166  
   167  // ID implements the Session interface.
   168  func (s *sessionImpl) ID() bson.Raw {
   169  	return bson.Raw(s.clientSession.SessionID)
   170  }
   171  
   172  // EndSession implements the Session interface.
   173  func (s *sessionImpl) EndSession(ctx context.Context) {
   174  	if s.clientSession.TransactionInProgress() {
   175  		// ignore all errors aborting during an end session
   176  		_ = s.AbortTransaction(ctx)
   177  	}
   178  	s.clientSession.EndSession()
   179  }
   180  
   181  // WithTransaction implements the Session interface.
   182  func (s *sessionImpl) WithTransaction(ctx context.Context, fn func(ctx SessionContext) (interface{}, error),
   183  	opts ...*options.TransactionOptions) (interface{}, error) {
   184  	timeout := time.NewTimer(withTransactionTimeout)
   185  	defer timeout.Stop()
   186  	var err error
   187  	for {
   188  		err = s.StartTransaction(opts...)
   189  		if err != nil {
   190  			return nil, err
   191  		}
   192  
   193  		res, err := fn(NewSessionContext(ctx, s))
   194  		if err != nil {
   195  			if s.clientSession.TransactionRunning() {
   196  				// Wrap the user-provided Context in a new one that behaves like context.Background() for deadlines and
   197  				// cancellations, but forwards Value requests to the original one.
   198  				_ = s.AbortTransaction(newBackgroundContext(ctx))
   199  			}
   200  
   201  			select {
   202  			case <-timeout.C:
   203  				return nil, err
   204  			default:
   205  			}
   206  
   207  			if errorHasLabel(err, driver.TransientTransactionError) {
   208  				continue
   209  			}
   210  			return res, err
   211  		}
   212  
   213  		// Check if callback intentionally aborted and, if so, return immediately
   214  		// with no error.
   215  		err = s.clientSession.CheckAbortTransaction()
   216  		if err != nil {
   217  			return res, nil
   218  		}
   219  
   220  		// If context has errored, run AbortTransaction and return, as the CommitLoop
   221  		// has no chance of succeeding.
   222  		//
   223  		// Aborting after a failed CommitTransaction is dangerous. Failed transaction
   224  		// commits may unpin the session server-side, and subsequent transaction aborts
   225  		// may run on a new mongos which could end up with commit and abort being executed
   226  		// simultaneously.
   227  		if ctx.Err() != nil {
   228  			// Wrap the user-provided Context in a new one that behaves like context.Background() for deadlines and
   229  			// cancellations, but forwards Value requests to the original one.
   230  			_ = s.AbortTransaction(newBackgroundContext(ctx))
   231  			return nil, ctx.Err()
   232  		}
   233  
   234  	CommitLoop:
   235  		for {
   236  			err = s.CommitTransaction(newBackgroundContext(ctx))
   237  			// End when error is nil, as transaction has been committed.
   238  			if err == nil {
   239  				return res, nil
   240  			}
   241  
   242  			select {
   243  			case <-timeout.C:
   244  				return res, err
   245  			default:
   246  			}
   247  
   248  			if cerr, ok := err.(CommandError); ok {
   249  				if cerr.HasErrorLabel(driver.UnknownTransactionCommitResult) && !cerr.IsMaxTimeMSExpiredError() {
   250  					continue
   251  				}
   252  				if cerr.HasErrorLabel(driver.TransientTransactionError) {
   253  					break CommitLoop
   254  				}
   255  			}
   256  			return res, err
   257  		}
   258  	}
   259  }
   260  
   261  // StartTransaction implements the Session interface.
   262  func (s *sessionImpl) StartTransaction(opts ...*options.TransactionOptions) error {
   263  	err := s.clientSession.CheckStartTransaction()
   264  	if err != nil {
   265  		return err
   266  	}
   267  
   268  	s.didCommitAfterStart = false
   269  
   270  	topts := options.MergeTransactionOptions(opts...)
   271  	coreOpts := &session.TransactionOptions{
   272  		ReadConcern:    topts.ReadConcern,
   273  		ReadPreference: topts.ReadPreference,
   274  		WriteConcern:   topts.WriteConcern,
   275  		MaxCommitTime:  topts.MaxCommitTime,
   276  	}
   277  
   278  	return s.clientSession.StartTransaction(coreOpts)
   279  }
   280  
   281  // AbortTransaction implements the Session interface.
   282  func (s *sessionImpl) AbortTransaction(ctx context.Context) error {
   283  	err := s.clientSession.CheckAbortTransaction()
   284  	if err != nil {
   285  		return err
   286  	}
   287  
   288  	// Do not run the abort command if the transaction is in starting state
   289  	if s.clientSession.TransactionStarting() || s.didCommitAfterStart {
   290  		return s.clientSession.AbortTransaction()
   291  	}
   292  
   293  	selector := makePinnedSelector(s.clientSession, description.WriteSelector())
   294  
   295  	s.clientSession.Aborting = true
   296  	_ = operation.NewAbortTransaction().Session(s.clientSession).ClusterClock(s.client.clock).Database("admin").
   297  		Deployment(s.deployment).WriteConcern(s.clientSession.CurrentWc).ServerSelector(selector).
   298  		Retry(driver.RetryOncePerCommand).CommandMonitor(s.client.monitor).
   299  		RecoveryToken(bsoncore.Document(s.clientSession.RecoveryToken)).ServerAPI(s.client.serverAPI).Execute(ctx)
   300  
   301  	s.clientSession.Aborting = false
   302  	_ = s.clientSession.AbortTransaction()
   303  
   304  	return nil
   305  }
   306  
   307  // CommitTransaction implements the Session interface.
   308  func (s *sessionImpl) CommitTransaction(ctx context.Context) error {
   309  	err := s.clientSession.CheckCommitTransaction()
   310  	if err != nil {
   311  		return err
   312  	}
   313  
   314  	// Do not run the commit command if the transaction is in started state
   315  	if s.clientSession.TransactionStarting() || s.didCommitAfterStart {
   316  		s.didCommitAfterStart = true
   317  		return s.clientSession.CommitTransaction()
   318  	}
   319  
   320  	if s.clientSession.TransactionCommitted() {
   321  		s.clientSession.RetryingCommit = true
   322  	}
   323  
   324  	selector := makePinnedSelector(s.clientSession, description.WriteSelector())
   325  
   326  	s.clientSession.Committing = true
   327  	op := operation.NewCommitTransaction().
   328  		Session(s.clientSession).ClusterClock(s.client.clock).Database("admin").Deployment(s.deployment).
   329  		WriteConcern(s.clientSession.CurrentWc).ServerSelector(selector).Retry(driver.RetryOncePerCommand).
   330  		CommandMonitor(s.client.monitor).RecoveryToken(bsoncore.Document(s.clientSession.RecoveryToken)).
   331  		ServerAPI(s.client.serverAPI).MaxTime(s.clientSession.CurrentMct)
   332  
   333  	err = op.Execute(ctx)
   334  	// Return error without updating transaction state if it is a timeout, as the transaction has not
   335  	// actually been committed.
   336  	if IsTimeout(err) {
   337  		return replaceErrors(err)
   338  	}
   339  	s.clientSession.Committing = false
   340  	commitErr := s.clientSession.CommitTransaction()
   341  
   342  	// We set the write concern to majority for subsequent calls to CommitTransaction.
   343  	s.clientSession.UpdateCommitTransactionWriteConcern()
   344  
   345  	if err != nil {
   346  		return replaceErrors(err)
   347  	}
   348  	return commitErr
   349  }
   350  
   351  // ClusterTime implements the Session interface.
   352  func (s *sessionImpl) ClusterTime() bson.Raw {
   353  	return s.clientSession.ClusterTime
   354  }
   355  
   356  // AdvanceClusterTime implements the Session interface.
   357  func (s *sessionImpl) AdvanceClusterTime(d bson.Raw) error {
   358  	return s.clientSession.AdvanceClusterTime(d)
   359  }
   360  
   361  // OperationTime implements the Session interface.
   362  func (s *sessionImpl) OperationTime() *primitive.Timestamp {
   363  	return s.clientSession.OperationTime
   364  }
   365  
   366  // AdvanceOperationTime implements the Session interface.
   367  func (s *sessionImpl) AdvanceOperationTime(ts *primitive.Timestamp) error {
   368  	return s.clientSession.AdvanceOperationTime(ts)
   369  }
   370  
   371  // Client implements the Session interface.
   372  func (s *sessionImpl) Client() *Client {
   373  	return s.client
   374  }
   375  
   376  // session implements the Session interface.
   377  func (*sessionImpl) session() {
   378  }
   379  
   380  // sessionFromContext checks for a sessionImpl in the argued context and returns the session if it
   381  // exists
   382  func sessionFromContext(ctx context.Context) *session.Client {
   383  	s := ctx.Value(sessionKey{})
   384  	if ses, ok := s.(*sessionImpl); ses != nil && ok {
   385  		return ses.clientSession
   386  	}
   387  
   388  	return nil
   389  }
   390  

View as plain text