...

Source file src/github.com/jackc/pgx/v4/tx.go

Documentation: github.com/jackc/pgx/v4

     1  package pgx
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"errors"
     7  	"fmt"
     8  	"strconv"
     9  
    10  	"github.com/jackc/pgconn"
    11  )
    12  
    13  // TxIsoLevel is the transaction isolation level (serializable, repeatable read, read committed or read uncommitted)
    14  type TxIsoLevel string
    15  
    16  // Transaction isolation levels
    17  const (
    18  	Serializable    TxIsoLevel = "serializable"
    19  	RepeatableRead  TxIsoLevel = "repeatable read"
    20  	ReadCommitted   TxIsoLevel = "read committed"
    21  	ReadUncommitted TxIsoLevel = "read uncommitted"
    22  )
    23  
    24  // TxAccessMode is the transaction access mode (read write or read only)
    25  type TxAccessMode string
    26  
    27  // Transaction access modes
    28  const (
    29  	ReadWrite TxAccessMode = "read write"
    30  	ReadOnly  TxAccessMode = "read only"
    31  )
    32  
    33  // TxDeferrableMode is the transaction deferrable mode (deferrable or not deferrable)
    34  type TxDeferrableMode string
    35  
    36  // Transaction deferrable modes
    37  const (
    38  	Deferrable    TxDeferrableMode = "deferrable"
    39  	NotDeferrable TxDeferrableMode = "not deferrable"
    40  )
    41  
    42  // TxOptions are transaction modes within a transaction block
    43  type TxOptions struct {
    44  	IsoLevel       TxIsoLevel
    45  	AccessMode     TxAccessMode
    46  	DeferrableMode TxDeferrableMode
    47  }
    48  
    49  var emptyTxOptions TxOptions
    50  
    51  func (txOptions TxOptions) beginSQL() string {
    52  	if txOptions == emptyTxOptions {
    53  		return "begin"
    54  	}
    55  	buf := &bytes.Buffer{}
    56  	buf.WriteString("begin")
    57  	if txOptions.IsoLevel != "" {
    58  		fmt.Fprintf(buf, " isolation level %s", txOptions.IsoLevel)
    59  	}
    60  	if txOptions.AccessMode != "" {
    61  		fmt.Fprintf(buf, " %s", txOptions.AccessMode)
    62  	}
    63  	if txOptions.DeferrableMode != "" {
    64  		fmt.Fprintf(buf, " %s", txOptions.DeferrableMode)
    65  	}
    66  
    67  	return buf.String()
    68  }
    69  
    70  var ErrTxClosed = errors.New("tx is closed")
    71  
    72  // ErrTxCommitRollback occurs when an error has occurred in a transaction and
    73  // Commit() is called. PostgreSQL accepts COMMIT on aborted transactions, but
    74  // it is treated as ROLLBACK.
    75  var ErrTxCommitRollback = errors.New("commit unexpectedly resulted in rollback")
    76  
    77  // Begin starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no
    78  // auto-rollback on context cancellation.
    79  func (c *Conn) Begin(ctx context.Context) (Tx, error) {
    80  	return c.BeginTx(ctx, TxOptions{})
    81  }
    82  
    83  // BeginTx starts a transaction with txOptions determining the transaction mode. Unlike database/sql, the context only
    84  // affects the begin command. i.e. there is no auto-rollback on context cancellation.
    85  func (c *Conn) BeginTx(ctx context.Context, txOptions TxOptions) (Tx, error) {
    86  	_, err := c.Exec(ctx, txOptions.beginSQL())
    87  	if err != nil {
    88  		// begin should never fail unless there is an underlying connection issue or
    89  		// a context timeout. In either case, the connection is possibly broken.
    90  		c.die(errors.New("failed to begin transaction"))
    91  		return nil, err
    92  	}
    93  
    94  	return &dbTx{conn: c}, nil
    95  }
    96  
    97  // BeginFunc starts a transaction and calls f. If f does not return an error the transaction is committed. If f returns
    98  // an error the transaction is rolled back. The context will be used when executing the transaction control statements
    99  // (BEGIN, ROLLBACK, and COMMIT) but does not otherwise affect the execution of f.
   100  func (c *Conn) BeginFunc(ctx context.Context, f func(Tx) error) (err error) {
   101  	return c.BeginTxFunc(ctx, TxOptions{}, f)
   102  }
   103  
   104  // BeginTxFunc starts a transaction with txOptions determining the transaction mode and calls f. If f does not return
   105  // an error the transaction is committed. If f returns an error the transaction is rolled back. The context will be
   106  // used when executing the transaction control statements (BEGIN, ROLLBACK, and COMMIT) but does not otherwise affect
   107  // the execution of f.
   108  func (c *Conn) BeginTxFunc(ctx context.Context, txOptions TxOptions, f func(Tx) error) (err error) {
   109  	var tx Tx
   110  	tx, err = c.BeginTx(ctx, txOptions)
   111  	if err != nil {
   112  		return err
   113  	}
   114  	defer func() {
   115  		rollbackErr := tx.Rollback(ctx)
   116  		if rollbackErr != nil && !errors.Is(rollbackErr, ErrTxClosed) {
   117  			err = rollbackErr
   118  		}
   119  	}()
   120  
   121  	fErr := f(tx)
   122  	if fErr != nil {
   123  		_ = tx.Rollback(ctx) // ignore rollback error as there is already an error to return
   124  		return fErr
   125  	}
   126  
   127  	return tx.Commit(ctx)
   128  }
   129  
   130  // Tx represents a database transaction.
   131  //
   132  // Tx is an interface instead of a struct to enable connection pools to be implemented without relying on internal pgx
   133  // state, to support pseudo-nested transactions with savepoints, and to allow tests to mock transactions. However,
   134  // adding a method to an interface is technically a breaking change. If new methods are added to Conn it may be
   135  // desirable to add them to Tx as well. Because of this the Tx interface is partially excluded from semantic version
   136  // requirements. Methods will not be removed or changed, but new methods may be added.
   137  type Tx interface {
   138  	// Begin starts a pseudo nested transaction.
   139  	Begin(ctx context.Context) (Tx, error)
   140  
   141  	// BeginFunc starts a pseudo nested transaction and executes f. If f does not return an err the pseudo nested
   142  	// transaction will be committed. If it does then it will be rolled back.
   143  	BeginFunc(ctx context.Context, f func(Tx) error) (err error)
   144  
   145  	// Commit commits the transaction if this is a real transaction or releases the savepoint if this is a pseudo nested
   146  	// transaction. Commit will return ErrTxClosed if the Tx is already closed, but is otherwise safe to call multiple
   147  	// times. If the commit fails with a rollback status (e.g. the transaction was already in a broken state) then
   148  	// ErrTxCommitRollback will be returned.
   149  	Commit(ctx context.Context) error
   150  
   151  	// Rollback rolls back the transaction if this is a real transaction or rolls back to the savepoint if this is a
   152  	// pseudo nested transaction. Rollback will return ErrTxClosed if the Tx is already closed, but is otherwise safe to
   153  	// call multiple times. Hence, a defer tx.Rollback() is safe even if tx.Commit() will be called first in a non-error
   154  	// condition. Any other failure of a real transaction will result in the connection being closed.
   155  	Rollback(ctx context.Context) error
   156  
   157  	CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error)
   158  	SendBatch(ctx context.Context, b *Batch) BatchResults
   159  	LargeObjects() LargeObjects
   160  
   161  	Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error)
   162  
   163  	Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error)
   164  	Query(ctx context.Context, sql string, args ...interface{}) (Rows, error)
   165  	QueryRow(ctx context.Context, sql string, args ...interface{}) Row
   166  	QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(QueryFuncRow) error) (pgconn.CommandTag, error)
   167  
   168  	// Conn returns the underlying *Conn that on which this transaction is executing.
   169  	Conn() *Conn
   170  }
   171  
   172  // dbTx represents a database transaction.
   173  //
   174  // All dbTx methods return ErrTxClosed if Commit or Rollback has already been
   175  // called on the dbTx.
   176  type dbTx struct {
   177  	conn         *Conn
   178  	err          error
   179  	savepointNum int64
   180  	closed       bool
   181  }
   182  
   183  // Begin starts a pseudo nested transaction implemented with a savepoint.
   184  func (tx *dbTx) Begin(ctx context.Context) (Tx, error) {
   185  	if tx.closed {
   186  		return nil, ErrTxClosed
   187  	}
   188  
   189  	tx.savepointNum++
   190  	_, err := tx.conn.Exec(ctx, "savepoint sp_"+strconv.FormatInt(tx.savepointNum, 10))
   191  	if err != nil {
   192  		return nil, err
   193  	}
   194  
   195  	return &dbSimulatedNestedTx{tx: tx, savepointNum: tx.savepointNum}, nil
   196  }
   197  
   198  func (tx *dbTx) BeginFunc(ctx context.Context, f func(Tx) error) (err error) {
   199  	if tx.closed {
   200  		return ErrTxClosed
   201  	}
   202  
   203  	var savepoint Tx
   204  	savepoint, err = tx.Begin(ctx)
   205  	if err != nil {
   206  		return err
   207  	}
   208  	defer func() {
   209  		rollbackErr := savepoint.Rollback(ctx)
   210  		if rollbackErr != nil && !errors.Is(rollbackErr, ErrTxClosed) {
   211  			err = rollbackErr
   212  		}
   213  	}()
   214  
   215  	fErr := f(savepoint)
   216  	if fErr != nil {
   217  		_ = savepoint.Rollback(ctx) // ignore rollback error as there is already an error to return
   218  		return fErr
   219  	}
   220  
   221  	return savepoint.Commit(ctx)
   222  }
   223  
   224  // Commit commits the transaction.
   225  func (tx *dbTx) Commit(ctx context.Context) error {
   226  	if tx.closed {
   227  		return ErrTxClosed
   228  	}
   229  
   230  	commandTag, err := tx.conn.Exec(ctx, "commit")
   231  	tx.closed = true
   232  	if err != nil {
   233  		if tx.conn.PgConn().TxStatus() != 'I' {
   234  			_ = tx.conn.Close(ctx) // already have error to return
   235  		}
   236  		return err
   237  	}
   238  	if string(commandTag) == "ROLLBACK" {
   239  		return ErrTxCommitRollback
   240  	}
   241  
   242  	return nil
   243  }
   244  
   245  // Rollback rolls back the transaction. Rollback will return ErrTxClosed if the
   246  // Tx is already closed, but is otherwise safe to call multiple times. Hence, a
   247  // defer tx.Rollback() is safe even if tx.Commit() will be called first in a
   248  // non-error condition.
   249  func (tx *dbTx) Rollback(ctx context.Context) error {
   250  	if tx.closed {
   251  		return ErrTxClosed
   252  	}
   253  
   254  	_, err := tx.conn.Exec(ctx, "rollback")
   255  	tx.closed = true
   256  	if err != nil {
   257  		// A rollback failure leaves the connection in an undefined state
   258  		tx.conn.die(fmt.Errorf("rollback failed: %w", err))
   259  		return err
   260  	}
   261  
   262  	return nil
   263  }
   264  
   265  // Exec delegates to the underlying *Conn
   266  func (tx *dbTx) Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error) {
   267  	if tx.closed {
   268  		return pgconn.CommandTag{}, ErrTxClosed
   269  	}
   270  
   271  	return tx.conn.Exec(ctx, sql, arguments...)
   272  }
   273  
   274  // Prepare delegates to the underlying *Conn
   275  func (tx *dbTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) {
   276  	if tx.closed {
   277  		return nil, ErrTxClosed
   278  	}
   279  
   280  	return tx.conn.Prepare(ctx, name, sql)
   281  }
   282  
   283  // Query delegates to the underlying *Conn
   284  func (tx *dbTx) Query(ctx context.Context, sql string, args ...interface{}) (Rows, error) {
   285  	if tx.closed {
   286  		// Because checking for errors can be deferred to the *Rows, build one with the error
   287  		err := ErrTxClosed
   288  		return &connRows{closed: true, err: err}, err
   289  	}
   290  
   291  	return tx.conn.Query(ctx, sql, args...)
   292  }
   293  
   294  // QueryRow delegates to the underlying *Conn
   295  func (tx *dbTx) QueryRow(ctx context.Context, sql string, args ...interface{}) Row {
   296  	rows, _ := tx.Query(ctx, sql, args...)
   297  	return (*connRow)(rows.(*connRows))
   298  }
   299  
   300  // QueryFunc delegates to the underlying *Conn.
   301  func (tx *dbTx) QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(QueryFuncRow) error) (pgconn.CommandTag, error) {
   302  	if tx.closed {
   303  		return nil, ErrTxClosed
   304  	}
   305  
   306  	return tx.conn.QueryFunc(ctx, sql, args, scans, f)
   307  }
   308  
   309  // CopyFrom delegates to the underlying *Conn
   310  func (tx *dbTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
   311  	if tx.closed {
   312  		return 0, ErrTxClosed
   313  	}
   314  
   315  	return tx.conn.CopyFrom(ctx, tableName, columnNames, rowSrc)
   316  }
   317  
   318  // SendBatch delegates to the underlying *Conn
   319  func (tx *dbTx) SendBatch(ctx context.Context, b *Batch) BatchResults {
   320  	if tx.closed {
   321  		return &batchResults{err: ErrTxClosed}
   322  	}
   323  
   324  	return tx.conn.SendBatch(ctx, b)
   325  }
   326  
   327  // LargeObjects returns a LargeObjects instance for the transaction.
   328  func (tx *dbTx) LargeObjects() LargeObjects {
   329  	return LargeObjects{tx: tx}
   330  }
   331  
   332  func (tx *dbTx) Conn() *Conn {
   333  	return tx.conn
   334  }
   335  
   336  // dbSimulatedNestedTx represents a simulated nested transaction implemented by a savepoint.
   337  type dbSimulatedNestedTx struct {
   338  	tx           Tx
   339  	savepointNum int64
   340  	closed       bool
   341  }
   342  
   343  // Begin starts a pseudo nested transaction implemented with a savepoint.
   344  func (sp *dbSimulatedNestedTx) Begin(ctx context.Context) (Tx, error) {
   345  	if sp.closed {
   346  		return nil, ErrTxClosed
   347  	}
   348  
   349  	return sp.tx.Begin(ctx)
   350  }
   351  
   352  func (sp *dbSimulatedNestedTx) BeginFunc(ctx context.Context, f func(Tx) error) (err error) {
   353  	if sp.closed {
   354  		return ErrTxClosed
   355  	}
   356  
   357  	return sp.tx.BeginFunc(ctx, f)
   358  }
   359  
   360  // Commit releases the savepoint essentially committing the pseudo nested transaction.
   361  func (sp *dbSimulatedNestedTx) Commit(ctx context.Context) error {
   362  	if sp.closed {
   363  		return ErrTxClosed
   364  	}
   365  
   366  	_, err := sp.Exec(ctx, "release savepoint sp_"+strconv.FormatInt(sp.savepointNum, 10))
   367  	sp.closed = true
   368  	return err
   369  }
   370  
   371  // Rollback rolls back to the savepoint essentially rolling back the pseudo nested transaction. Rollback will return
   372  // ErrTxClosed if the dbSavepoint is already closed, but is otherwise safe to call multiple times. Hence, a defer sp.Rollback()
   373  // is safe even if sp.Commit() will be called first in a non-error condition.
   374  func (sp *dbSimulatedNestedTx) Rollback(ctx context.Context) error {
   375  	if sp.closed {
   376  		return ErrTxClosed
   377  	}
   378  
   379  	_, err := sp.Exec(ctx, "rollback to savepoint sp_"+strconv.FormatInt(sp.savepointNum, 10))
   380  	sp.closed = true
   381  	return err
   382  }
   383  
   384  // Exec delegates to the underlying Tx
   385  func (sp *dbSimulatedNestedTx) Exec(ctx context.Context, sql string, arguments ...interface{}) (commandTag pgconn.CommandTag, err error) {
   386  	if sp.closed {
   387  		return nil, ErrTxClosed
   388  	}
   389  
   390  	return sp.tx.Exec(ctx, sql, arguments...)
   391  }
   392  
   393  // Prepare delegates to the underlying Tx
   394  func (sp *dbSimulatedNestedTx) Prepare(ctx context.Context, name, sql string) (*pgconn.StatementDescription, error) {
   395  	if sp.closed {
   396  		return nil, ErrTxClosed
   397  	}
   398  
   399  	return sp.tx.Prepare(ctx, name, sql)
   400  }
   401  
   402  // Query delegates to the underlying Tx
   403  func (sp *dbSimulatedNestedTx) Query(ctx context.Context, sql string, args ...interface{}) (Rows, error) {
   404  	if sp.closed {
   405  		// Because checking for errors can be deferred to the *Rows, build one with the error
   406  		err := ErrTxClosed
   407  		return &connRows{closed: true, err: err}, err
   408  	}
   409  
   410  	return sp.tx.Query(ctx, sql, args...)
   411  }
   412  
   413  // QueryRow delegates to the underlying Tx
   414  func (sp *dbSimulatedNestedTx) QueryRow(ctx context.Context, sql string, args ...interface{}) Row {
   415  	rows, _ := sp.Query(ctx, sql, args...)
   416  	return (*connRow)(rows.(*connRows))
   417  }
   418  
   419  // QueryFunc delegates to the underlying Tx.
   420  func (sp *dbSimulatedNestedTx) QueryFunc(ctx context.Context, sql string, args []interface{}, scans []interface{}, f func(QueryFuncRow) error) (pgconn.CommandTag, error) {
   421  	if sp.closed {
   422  		return nil, ErrTxClosed
   423  	}
   424  
   425  	return sp.tx.QueryFunc(ctx, sql, args, scans, f)
   426  }
   427  
   428  // CopyFrom delegates to the underlying *Conn
   429  func (sp *dbSimulatedNestedTx) CopyFrom(ctx context.Context, tableName Identifier, columnNames []string, rowSrc CopyFromSource) (int64, error) {
   430  	if sp.closed {
   431  		return 0, ErrTxClosed
   432  	}
   433  
   434  	return sp.tx.CopyFrom(ctx, tableName, columnNames, rowSrc)
   435  }
   436  
   437  // SendBatch delegates to the underlying *Conn
   438  func (sp *dbSimulatedNestedTx) SendBatch(ctx context.Context, b *Batch) BatchResults {
   439  	if sp.closed {
   440  		return &batchResults{err: ErrTxClosed}
   441  	}
   442  
   443  	return sp.tx.SendBatch(ctx, b)
   444  }
   445  
   446  func (sp *dbSimulatedNestedTx) LargeObjects() LargeObjects {
   447  	return LargeObjects{tx: sp}
   448  }
   449  
   450  func (sp *dbSimulatedNestedTx) Conn() *Conn {
   451  	return sp.tx.Conn()
   452  }
   453  

View as plain text