...

Source file src/github.com/go-redis/redis/pipeline.go

Documentation: github.com/go-redis/redis

     1  package redis
     2  
     3  import (
     4  	"sync"
     5  
     6  	"github.com/go-redis/redis/internal/pool"
     7  )
     8  
     9  type pipelineExecer func([]Cmder) error
    10  
    11  // Pipeliner is an mechanism to realise Redis Pipeline technique.
    12  //
    13  // Pipelining is a technique to extremely speed up processing by packing
    14  // operations to batches, send them at once to Redis and read a replies in a
    15  // singe step.
    16  // See https://redis.io/topics/pipelining
    17  //
    18  // Pay attention, that Pipeline is not a transaction, so you can get unexpected
    19  // results in case of big pipelines and small read/write timeouts.
    20  // Redis client has retransmission logic in case of timeouts, pipeline
    21  // can be retransmitted and commands can be executed more then once.
    22  // To avoid this: it is good idea to use reasonable bigger read/write timeouts
    23  // depends of your batch size and/or use TxPipeline.
    24  type Pipeliner interface {
    25  	StatefulCmdable
    26  	Do(args ...interface{}) *Cmd
    27  	Process(cmd Cmder) error
    28  	Close() error
    29  	Discard() error
    30  	Exec() ([]Cmder, error)
    31  }
    32  
    33  var _ Pipeliner = (*Pipeline)(nil)
    34  
    35  // Pipeline implements pipelining as described in
    36  // http://redis.io/topics/pipelining. It's safe for concurrent use
    37  // by multiple goroutines.
    38  type Pipeline struct {
    39  	statefulCmdable
    40  
    41  	exec pipelineExecer
    42  
    43  	mu     sync.Mutex
    44  	cmds   []Cmder
    45  	closed bool
    46  }
    47  
    48  func (c *Pipeline) Do(args ...interface{}) *Cmd {
    49  	cmd := NewCmd(args...)
    50  	_ = c.Process(cmd)
    51  	return cmd
    52  }
    53  
    54  // Process queues the cmd for later execution.
    55  func (c *Pipeline) Process(cmd Cmder) error {
    56  	c.mu.Lock()
    57  	c.cmds = append(c.cmds, cmd)
    58  	c.mu.Unlock()
    59  	return nil
    60  }
    61  
    62  // Close closes the pipeline, releasing any open resources.
    63  func (c *Pipeline) Close() error {
    64  	c.mu.Lock()
    65  	c.discard()
    66  	c.closed = true
    67  	c.mu.Unlock()
    68  	return nil
    69  }
    70  
    71  // Discard resets the pipeline and discards queued commands.
    72  func (c *Pipeline) Discard() error {
    73  	c.mu.Lock()
    74  	err := c.discard()
    75  	c.mu.Unlock()
    76  	return err
    77  }
    78  
    79  func (c *Pipeline) discard() error {
    80  	if c.closed {
    81  		return pool.ErrClosed
    82  	}
    83  	c.cmds = c.cmds[:0]
    84  	return nil
    85  }
    86  
    87  // Exec executes all previously queued commands using one
    88  // client-server roundtrip.
    89  //
    90  // Exec always returns list of commands and error of the first failed
    91  // command if any.
    92  func (c *Pipeline) Exec() ([]Cmder, error) {
    93  	c.mu.Lock()
    94  	defer c.mu.Unlock()
    95  
    96  	if c.closed {
    97  		return nil, pool.ErrClosed
    98  	}
    99  
   100  	if len(c.cmds) == 0 {
   101  		return nil, nil
   102  	}
   103  
   104  	cmds := c.cmds
   105  	c.cmds = nil
   106  
   107  	return cmds, c.exec(cmds)
   108  }
   109  
   110  func (c *Pipeline) pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
   111  	if err := fn(c); err != nil {
   112  		return nil, err
   113  	}
   114  	cmds, err := c.Exec()
   115  	_ = c.Close()
   116  	return cmds, err
   117  }
   118  
   119  func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
   120  	return c.pipelined(fn)
   121  }
   122  
   123  func (c *Pipeline) Pipeline() Pipeliner {
   124  	return c
   125  }
   126  
   127  func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
   128  	return c.pipelined(fn)
   129  }
   130  
   131  func (c *Pipeline) TxPipeline() Pipeliner {
   132  	return c
   133  }
   134  

View as plain text