...

Source file src/github.com/go-redis/redis/redis.go

Documentation: github.com/go-redis/redis

     1  package redis
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"log"
     7  	"os"
     8  	"time"
     9  
    10  	"github.com/go-redis/redis/internal"
    11  	"github.com/go-redis/redis/internal/pool"
    12  	"github.com/go-redis/redis/internal/proto"
    13  )
    14  
    15  // Nil reply Redis returns when key does not exist.
    16  const Nil = proto.Nil
    17  
    18  func init() {
    19  	SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
    20  }
    21  
    22  func SetLogger(logger *log.Logger) {
    23  	internal.Logger = logger
    24  }
    25  
    26  type baseClient struct {
    27  	opt      *Options
    28  	connPool pool.Pooler
    29  	limiter  Limiter
    30  
    31  	process           func(Cmder) error
    32  	processPipeline   func([]Cmder) error
    33  	processTxPipeline func([]Cmder) error
    34  
    35  	onClose func() error // hook called when client is closed
    36  }
    37  
    38  func (c *baseClient) init() {
    39  	c.process = c.defaultProcess
    40  	c.processPipeline = c.defaultProcessPipeline
    41  	c.processTxPipeline = c.defaultProcessTxPipeline
    42  }
    43  
    44  func (c *baseClient) String() string {
    45  	return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
    46  }
    47  
    48  func (c *baseClient) newConn() (*pool.Conn, error) {
    49  	cn, err := c.connPool.NewConn()
    50  	if err != nil {
    51  		return nil, err
    52  	}
    53  
    54  	err = c.initConn(cn)
    55  	if err != nil {
    56  		_ = c.connPool.CloseConn(cn)
    57  		return nil, err
    58  	}
    59  
    60  	return cn, nil
    61  }
    62  
    63  func (c *baseClient) getConn() (*pool.Conn, error) {
    64  	if c.limiter != nil {
    65  		err := c.limiter.Allow()
    66  		if err != nil {
    67  			return nil, err
    68  		}
    69  	}
    70  
    71  	cn, err := c._getConn()
    72  	if err != nil {
    73  		if c.limiter != nil {
    74  			c.limiter.ReportResult(err)
    75  		}
    76  		return nil, err
    77  	}
    78  	return cn, nil
    79  }
    80  
    81  func (c *baseClient) _getConn() (*pool.Conn, error) {
    82  	cn, err := c.connPool.Get()
    83  	if err != nil {
    84  		return nil, err
    85  	}
    86  
    87  	err = c.initConn(cn)
    88  	if err != nil {
    89  		c.connPool.Remove(cn, err)
    90  		if err := internal.Unwrap(err); err != nil {
    91  			return nil, err
    92  		}
    93  		return nil, err
    94  	}
    95  
    96  	return cn, nil
    97  }
    98  
    99  func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
   100  	if c.limiter != nil {
   101  		c.limiter.ReportResult(err)
   102  	}
   103  
   104  	if internal.IsBadConn(err, false) {
   105  		c.connPool.Remove(cn, err)
   106  	} else {
   107  		c.connPool.Put(cn)
   108  	}
   109  }
   110  
   111  func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
   112  	if c.limiter != nil {
   113  		c.limiter.ReportResult(err)
   114  	}
   115  
   116  	if err == nil || internal.IsRedisError(err) {
   117  		c.connPool.Put(cn)
   118  	} else {
   119  		c.connPool.Remove(cn, err)
   120  	}
   121  }
   122  
   123  func (c *baseClient) initConn(cn *pool.Conn) error {
   124  	if cn.Inited {
   125  		return nil
   126  	}
   127  	cn.Inited = true
   128  
   129  	if c.opt.Password == "" &&
   130  		c.opt.DB == 0 &&
   131  		!c.opt.readOnly &&
   132  		c.opt.OnConnect == nil {
   133  		return nil
   134  	}
   135  
   136  	conn := newConn(c.opt, cn)
   137  	_, err := conn.Pipelined(func(pipe Pipeliner) error {
   138  		if c.opt.Password != "" {
   139  			pipe.Auth(c.opt.Password)
   140  		}
   141  
   142  		if c.opt.DB > 0 {
   143  			pipe.Select(c.opt.DB)
   144  		}
   145  
   146  		if c.opt.readOnly {
   147  			pipe.ReadOnly()
   148  		}
   149  
   150  		return nil
   151  	})
   152  	if err != nil {
   153  		return err
   154  	}
   155  
   156  	if c.opt.OnConnect != nil {
   157  		return c.opt.OnConnect(conn)
   158  	}
   159  	return nil
   160  }
   161  
   162  // Do creates a Cmd from the args and processes the cmd.
   163  func (c *baseClient) Do(args ...interface{}) *Cmd {
   164  	cmd := NewCmd(args...)
   165  	_ = c.Process(cmd)
   166  	return cmd
   167  }
   168  
   169  // WrapProcess wraps function that processes Redis commands.
   170  func (c *baseClient) WrapProcess(
   171  	fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
   172  ) {
   173  	c.process = fn(c.process)
   174  }
   175  
   176  func (c *baseClient) Process(cmd Cmder) error {
   177  	return c.process(cmd)
   178  }
   179  
   180  func (c *baseClient) defaultProcess(cmd Cmder) error {
   181  	for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
   182  		if attempt > 0 {
   183  			time.Sleep(c.retryBackoff(attempt))
   184  		}
   185  
   186  		cn, err := c.getConn()
   187  		if err != nil {
   188  			cmd.setErr(err)
   189  			if internal.IsRetryableError(err, true) {
   190  				continue
   191  			}
   192  			return err
   193  		}
   194  
   195  		err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
   196  			return writeCmd(wr, cmd)
   197  		})
   198  		if err != nil {
   199  			c.releaseConn(cn, err)
   200  			cmd.setErr(err)
   201  			if internal.IsRetryableError(err, true) {
   202  				continue
   203  			}
   204  			return err
   205  		}
   206  
   207  		err = cn.WithReader(c.cmdTimeout(cmd), cmd.readReply)
   208  		c.releaseConn(cn, err)
   209  		if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
   210  			continue
   211  		}
   212  
   213  		return err
   214  	}
   215  
   216  	return cmd.Err()
   217  }
   218  
   219  func (c *baseClient) retryBackoff(attempt int) time.Duration {
   220  	return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
   221  }
   222  
   223  func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
   224  	if timeout := cmd.readTimeout(); timeout != nil {
   225  		t := *timeout
   226  		if t == 0 {
   227  			return 0
   228  		}
   229  		return t + 10*time.Second
   230  	}
   231  	return c.opt.ReadTimeout
   232  }
   233  
   234  // Close closes the client, releasing any open resources.
   235  //
   236  // It is rare to Close a Client, as the Client is meant to be
   237  // long-lived and shared between many goroutines.
   238  func (c *baseClient) Close() error {
   239  	var firstErr error
   240  	if c.onClose != nil {
   241  		if err := c.onClose(); err != nil {
   242  			firstErr = err
   243  		}
   244  	}
   245  	if err := c.connPool.Close(); err != nil && firstErr == nil {
   246  		firstErr = err
   247  	}
   248  	return firstErr
   249  }
   250  
   251  func (c *baseClient) getAddr() string {
   252  	return c.opt.Addr
   253  }
   254  
   255  func (c *baseClient) WrapProcessPipeline(
   256  	fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
   257  ) {
   258  	c.processPipeline = fn(c.processPipeline)
   259  	c.processTxPipeline = fn(c.processTxPipeline)
   260  }
   261  
   262  func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
   263  	return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
   264  }
   265  
   266  func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
   267  	return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
   268  }
   269  
   270  type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
   271  
   272  func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
   273  	for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
   274  		if attempt > 0 {
   275  			time.Sleep(c.retryBackoff(attempt))
   276  		}
   277  
   278  		cn, err := c.getConn()
   279  		if err != nil {
   280  			setCmdsErr(cmds, err)
   281  			return err
   282  		}
   283  
   284  		canRetry, err := p(cn, cmds)
   285  		c.releaseConnStrict(cn, err)
   286  
   287  		if !canRetry || !internal.IsRetryableError(err, true) {
   288  			break
   289  		}
   290  	}
   291  	return cmdsFirstErr(cmds)
   292  }
   293  
   294  func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
   295  	err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
   296  		return writeCmd(wr, cmds...)
   297  	})
   298  	if err != nil {
   299  		setCmdsErr(cmds, err)
   300  		return true, err
   301  	}
   302  
   303  	err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
   304  		return pipelineReadCmds(rd, cmds)
   305  	})
   306  	return true, err
   307  }
   308  
   309  func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
   310  	for _, cmd := range cmds {
   311  		err := cmd.readReply(rd)
   312  		if err != nil && !internal.IsRedisError(err) {
   313  			return err
   314  		}
   315  	}
   316  	return nil
   317  }
   318  
   319  func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
   320  	err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
   321  		return txPipelineWriteMulti(wr, cmds)
   322  	})
   323  	if err != nil {
   324  		setCmdsErr(cmds, err)
   325  		return true, err
   326  	}
   327  
   328  	err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
   329  		err := txPipelineReadQueued(rd, cmds)
   330  		if err != nil {
   331  			setCmdsErr(cmds, err)
   332  			return err
   333  		}
   334  		return pipelineReadCmds(rd, cmds)
   335  	})
   336  	return false, err
   337  }
   338  
   339  func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
   340  	multiExec := make([]Cmder, 0, len(cmds)+2)
   341  	multiExec = append(multiExec, NewStatusCmd("MULTI"))
   342  	multiExec = append(multiExec, cmds...)
   343  	multiExec = append(multiExec, NewSliceCmd("EXEC"))
   344  	return writeCmd(wr, multiExec...)
   345  }
   346  
   347  func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
   348  	// Parse queued replies.
   349  	var statusCmd StatusCmd
   350  	err := statusCmd.readReply(rd)
   351  	if err != nil {
   352  		return err
   353  	}
   354  
   355  	for range cmds {
   356  		err = statusCmd.readReply(rd)
   357  		if err != nil && !internal.IsRedisError(err) {
   358  			return err
   359  		}
   360  	}
   361  
   362  	// Parse number of replies.
   363  	line, err := rd.ReadLine()
   364  	if err != nil {
   365  		if err == Nil {
   366  			err = TxFailedErr
   367  		}
   368  		return err
   369  	}
   370  
   371  	switch line[0] {
   372  	case proto.ErrorReply:
   373  		return proto.ParseErrorReply(line)
   374  	case proto.ArrayReply:
   375  		// ok
   376  	default:
   377  		err := fmt.Errorf("redis: expected '*', but got line %q", line)
   378  		return err
   379  	}
   380  
   381  	return nil
   382  }
   383  
   384  //------------------------------------------------------------------------------
   385  
   386  // Client is a Redis client representing a pool of zero or more
   387  // underlying connections. It's safe for concurrent use by multiple
   388  // goroutines.
   389  type Client struct {
   390  	baseClient
   391  	cmdable
   392  
   393  	ctx context.Context
   394  }
   395  
   396  // NewClient returns a client to the Redis Server specified by Options.
   397  func NewClient(opt *Options) *Client {
   398  	opt.init()
   399  
   400  	c := Client{
   401  		baseClient: baseClient{
   402  			opt:      opt,
   403  			connPool: newConnPool(opt),
   404  		},
   405  	}
   406  	c.baseClient.init()
   407  	c.init()
   408  
   409  	return &c
   410  }
   411  
   412  func (c *Client) init() {
   413  	c.cmdable.setProcessor(c.Process)
   414  }
   415  
   416  func (c *Client) Context() context.Context {
   417  	if c.ctx != nil {
   418  		return c.ctx
   419  	}
   420  	return context.Background()
   421  }
   422  
   423  func (c *Client) WithContext(ctx context.Context) *Client {
   424  	if ctx == nil {
   425  		panic("nil context")
   426  	}
   427  	c2 := c.clone()
   428  	c2.ctx = ctx
   429  	return c2
   430  }
   431  
   432  func (c *Client) clone() *Client {
   433  	cp := *c
   434  	cp.init()
   435  	return &cp
   436  }
   437  
   438  // Options returns read-only Options that were used to create the client.
   439  func (c *Client) Options() *Options {
   440  	return c.opt
   441  }
   442  
   443  func (c *Client) SetLimiter(l Limiter) *Client {
   444  	c.limiter = l
   445  	return c
   446  }
   447  
   448  type PoolStats pool.Stats
   449  
   450  // PoolStats returns connection pool stats.
   451  func (c *Client) PoolStats() *PoolStats {
   452  	stats := c.connPool.Stats()
   453  	return (*PoolStats)(stats)
   454  }
   455  
   456  func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
   457  	return c.Pipeline().Pipelined(fn)
   458  }
   459  
   460  func (c *Client) Pipeline() Pipeliner {
   461  	pipe := Pipeline{
   462  		exec: c.processPipeline,
   463  	}
   464  	pipe.statefulCmdable.setProcessor(pipe.Process)
   465  	return &pipe
   466  }
   467  
   468  func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
   469  	return c.TxPipeline().Pipelined(fn)
   470  }
   471  
   472  // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
   473  func (c *Client) TxPipeline() Pipeliner {
   474  	pipe := Pipeline{
   475  		exec: c.processTxPipeline,
   476  	}
   477  	pipe.statefulCmdable.setProcessor(pipe.Process)
   478  	return &pipe
   479  }
   480  
   481  func (c *Client) pubSub() *PubSub {
   482  	pubsub := &PubSub{
   483  		opt: c.opt,
   484  
   485  		newConn: func(channels []string) (*pool.Conn, error) {
   486  			return c.newConn()
   487  		},
   488  		closeConn: c.connPool.CloseConn,
   489  	}
   490  	pubsub.init()
   491  	return pubsub
   492  }
   493  
   494  // Subscribe subscribes the client to the specified channels.
   495  // Channels can be omitted to create empty subscription.
   496  // Note that this method does not wait on a response from Redis, so the
   497  // subscription may not be active immediately. To force the connection to wait,
   498  // you may call the Receive() method on the returned *PubSub like so:
   499  //
   500  //    sub := client.Subscribe(queryResp)
   501  //    iface, err := sub.Receive()
   502  //    if err != nil {
   503  //        // handle error
   504  //    }
   505  //
   506  //    // Should be *Subscription, but others are possible if other actions have been
   507  //    // taken on sub since it was created.
   508  //    switch iface.(type) {
   509  //    case *Subscription:
   510  //        // subscribe succeeded
   511  //    case *Message:
   512  //        // received first message
   513  //    case *Pong:
   514  //        // pong received
   515  //    default:
   516  //        // handle error
   517  //    }
   518  //
   519  //    ch := sub.Channel()
   520  func (c *Client) Subscribe(channels ...string) *PubSub {
   521  	pubsub := c.pubSub()
   522  	if len(channels) > 0 {
   523  		_ = pubsub.Subscribe(channels...)
   524  	}
   525  	return pubsub
   526  }
   527  
   528  // PSubscribe subscribes the client to the given patterns.
   529  // Patterns can be omitted to create empty subscription.
   530  func (c *Client) PSubscribe(channels ...string) *PubSub {
   531  	pubsub := c.pubSub()
   532  	if len(channels) > 0 {
   533  		_ = pubsub.PSubscribe(channels...)
   534  	}
   535  	return pubsub
   536  }
   537  
   538  //------------------------------------------------------------------------------
   539  
   540  // Conn is like Client, but its pool contains single connection.
   541  type Conn struct {
   542  	baseClient
   543  	statefulCmdable
   544  }
   545  
   546  func newConn(opt *Options, cn *pool.Conn) *Conn {
   547  	connPool := pool.NewSingleConnPool(nil)
   548  	connPool.SetConn(cn)
   549  	c := Conn{
   550  		baseClient: baseClient{
   551  			opt:      opt,
   552  			connPool: connPool,
   553  		},
   554  	}
   555  	c.baseClient.init()
   556  	c.statefulCmdable.setProcessor(c.Process)
   557  	return &c
   558  }
   559  
   560  func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
   561  	return c.Pipeline().Pipelined(fn)
   562  }
   563  
   564  func (c *Conn) Pipeline() Pipeliner {
   565  	pipe := Pipeline{
   566  		exec: c.processPipeline,
   567  	}
   568  	pipe.statefulCmdable.setProcessor(pipe.Process)
   569  	return &pipe
   570  }
   571  
   572  func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
   573  	return c.TxPipeline().Pipelined(fn)
   574  }
   575  
   576  // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
   577  func (c *Conn) TxPipeline() Pipeliner {
   578  	pipe := Pipeline{
   579  		exec: c.processTxPipeline,
   580  	}
   581  	pipe.statefulCmdable.setProcessor(pipe.Process)
   582  	return &pipe
   583  }
   584  

View as plain text