...

Source file src/github.com/go-redis/redis/tx.go

Documentation: github.com/go-redis/redis

     1  package redis
     2  
     3  import (
     4  	"github.com/go-redis/redis/internal/pool"
     5  	"github.com/go-redis/redis/internal/proto"
     6  )
     7  
     8  // TxFailedErr transaction redis failed.
     9  const TxFailedErr = proto.RedisError("redis: transaction failed")
    10  
    11  // Tx implements Redis transactions as described in
    12  // http://redis.io/topics/transactions. It's NOT safe for concurrent use
    13  // by multiple goroutines, because Exec resets list of watched keys.
    14  // If you don't need WATCH it is better to use Pipeline.
    15  type Tx struct {
    16  	statefulCmdable
    17  	baseClient
    18  }
    19  
    20  func (c *Client) newTx() *Tx {
    21  	tx := Tx{
    22  		baseClient: baseClient{
    23  			opt:      c.opt,
    24  			connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), true),
    25  		},
    26  	}
    27  	tx.baseClient.init()
    28  	tx.statefulCmdable.setProcessor(tx.Process)
    29  	return &tx
    30  }
    31  
    32  // Watch prepares a transaction and marks the keys to be watched
    33  // for conditional execution if there are any keys.
    34  //
    35  // The transaction is automatically closed when fn exits.
    36  func (c *Client) Watch(fn func(*Tx) error, keys ...string) error {
    37  	tx := c.newTx()
    38  	if len(keys) > 0 {
    39  		if err := tx.Watch(keys...).Err(); err != nil {
    40  			_ = tx.Close()
    41  			return err
    42  		}
    43  	}
    44  
    45  	err := fn(tx)
    46  	_ = tx.Close()
    47  	return err
    48  }
    49  
    50  // Close closes the transaction, releasing any open resources.
    51  func (c *Tx) Close() error {
    52  	_ = c.Unwatch().Err()
    53  	return c.baseClient.Close()
    54  }
    55  
    56  // Watch marks the keys to be watched for conditional execution
    57  // of a transaction.
    58  func (c *Tx) Watch(keys ...string) *StatusCmd {
    59  	args := make([]interface{}, 1+len(keys))
    60  	args[0] = "watch"
    61  	for i, key := range keys {
    62  		args[1+i] = key
    63  	}
    64  	cmd := NewStatusCmd(args...)
    65  	c.Process(cmd)
    66  	return cmd
    67  }
    68  
    69  // Unwatch flushes all the previously watched keys for a transaction.
    70  func (c *Tx) Unwatch(keys ...string) *StatusCmd {
    71  	args := make([]interface{}, 1+len(keys))
    72  	args[0] = "unwatch"
    73  	for i, key := range keys {
    74  		args[1+i] = key
    75  	}
    76  	cmd := NewStatusCmd(args...)
    77  	c.Process(cmd)
    78  	return cmd
    79  }
    80  
    81  // Pipeline creates a new pipeline. It is more convenient to use Pipelined.
    82  func (c *Tx) Pipeline() Pipeliner {
    83  	pipe := Pipeline{
    84  		exec: c.processTxPipeline,
    85  	}
    86  	pipe.statefulCmdable.setProcessor(pipe.Process)
    87  	return &pipe
    88  }
    89  
    90  // Pipelined executes commands queued in the fn in a transaction.
    91  //
    92  // When using WATCH, EXEC will execute commands only if the watched keys
    93  // were not modified, allowing for a check-and-set mechanism.
    94  //
    95  // Exec always returns list of commands. If transaction fails
    96  // TxFailedErr is returned. Otherwise Exec returns an error of the first
    97  // failed command or nil.
    98  func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
    99  	return c.Pipeline().Pipelined(fn)
   100  }
   101  
   102  // TxPipelined is an alias for Pipelined.
   103  func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
   104  	return c.Pipelined(fn)
   105  }
   106  
   107  // TxPipeline is an alias for Pipeline.
   108  func (c *Tx) TxPipeline() Pipeliner {
   109  	return c.Pipeline()
   110  }
   111  

View as plain text