...

Source file src/github.com/gomodule/redigo/redis/pool.go

Documentation: github.com/gomodule/redigo/redis

     1  // Copyright 2012 Gary Burd
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"): you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
    11  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    12  // License for the specific language governing permissions and limitations
    13  // under the License.
    14  
    15  package redis
    16  
    17  import (
    18  	"bytes"
    19  	"crypto/rand"
    20  	"crypto/sha1"
    21  	"errors"
    22  	"io"
    23  	"strconv"
    24  	"sync"
    25  	"sync/atomic"
    26  	"time"
    27  
    28  	"github.com/gomodule/redigo/internal"
    29  )
    30  
    31  var (
    32  	_ ConnWithTimeout = (*activeConn)(nil)
    33  	_ ConnWithTimeout = (*errorConn)(nil)
    34  )
    35  
    36  var nowFunc = time.Now // for testing
    37  
    38  // ErrPoolExhausted is returned from a pool connection method (Do, Send,
    39  // Receive, Flush, Err) when the maximum number of database connections in the
    40  // pool has been reached.
    41  var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")
    42  
    43  var (
    44  	errPoolClosed = errors.New("redigo: connection pool closed")
    45  	errConnClosed = errors.New("redigo: connection closed")
    46  )
    47  
    48  // Pool maintains a pool of connections. The application calls the Get method
    49  // to get a connection from the pool and the connection's Close method to
    50  // return the connection's resources to the pool.
    51  //
    52  // The following example shows how to use a pool in a web application. The
    53  // application creates a pool at application startup and makes it available to
    54  // request handlers using a package level variable. The pool configuration used
    55  // here is an example, not a recommendation.
    56  //
    57  //  func newPool(addr string) *redis.Pool {
    58  //    return &redis.Pool{
    59  //      MaxIdle: 3,
    60  //      IdleTimeout: 240 * time.Second,
    61  //      Dial: func () (redis.Conn, error) { return redis.Dial("tcp", addr) },
    62  //    }
    63  //  }
    64  //
    65  //  var (
    66  //    pool *redis.Pool
    67  //    redisServer = flag.String("redisServer", ":6379", "")
    68  //  )
    69  //
    70  //  func main() {
    71  //    flag.Parse()
    72  //    pool = newPool(*redisServer)
    73  //    ...
    74  //  }
    75  //
    76  // A request handler gets a connection from the pool and closes the connection
    77  // when the handler is done:
    78  //
    79  //  func serveHome(w http.ResponseWriter, r *http.Request) {
    80  //      conn := pool.Get()
    81  //      defer conn.Close()
    82  //      ...
    83  //  }
    84  //
    85  // Use the Dial function to authenticate connections with the AUTH command or
    86  // select a database with the SELECT command:
    87  //
    88  //  pool := &redis.Pool{
    89  //    // Other pool configuration not shown in this example.
    90  //    Dial: func () (redis.Conn, error) {
    91  //      c, err := redis.Dial("tcp", server)
    92  //      if err != nil {
    93  //        return nil, err
    94  //      }
    95  //      if _, err := c.Do("AUTH", password); err != nil {
    96  //        c.Close()
    97  //        return nil, err
    98  //      }
    99  //      if _, err := c.Do("SELECT", db); err != nil {
   100  //        c.Close()
   101  //        return nil, err
   102  //      }
   103  //      return c, nil
   104  //    },
   105  //  }
   106  //
   107  // Use the TestOnBorrow function to check the health of an idle connection
   108  // before the connection is returned to the application. This example PINGs
   109  // connections that have been idle more than a minute:
   110  //
   111  //  pool := &redis.Pool{
   112  //    // Other pool configuration not shown in this example.
   113  //    TestOnBorrow: func(c redis.Conn, t time.Time) error {
   114  //      if time.Since(t) < time.Minute {
   115  //        return nil
   116  //      }
   117  //      _, err := c.Do("PING")
   118  //      return err
   119  //    },
   120  //  }
   121  //
   122  type Pool struct {
   123  	// Dial is an application supplied function for creating and configuring a
   124  	// connection.
   125  	//
   126  	// The connection returned from Dial must not be in a special state
   127  	// (subscribed to pubsub channel, transaction started, ...).
   128  	Dial func() (Conn, error)
   129  
   130  	// TestOnBorrow is an optional application supplied function for checking
   131  	// the health of an idle connection before the connection is used again by
   132  	// the application. Argument t is the time that the connection was returned
   133  	// to the pool. If the function returns an error, then the connection is
   134  	// closed.
   135  	TestOnBorrow func(c Conn, t time.Time) error
   136  
   137  	// Maximum number of idle connections in the pool.
   138  	MaxIdle int
   139  
   140  	// Maximum number of connections allocated by the pool at a given time.
   141  	// When zero, there is no limit on the number of connections in the pool.
   142  	MaxActive int
   143  
   144  	// Close connections after remaining idle for this duration. If the value
   145  	// is zero, then idle connections are not closed. Applications should set
   146  	// the timeout to a value less than the server's timeout.
   147  	IdleTimeout time.Duration
   148  
   149  	// If Wait is true and the pool is at the MaxActive limit, then Get() waits
   150  	// for a connection to be returned to the pool before returning.
   151  	Wait bool
   152  
   153  	// Close connections older than this duration. If the value is zero, then
   154  	// the pool does not close connections based on age.
   155  	MaxConnLifetime time.Duration
   156  
   157  	chInitialized uint32 // set to 1 when field ch is initialized
   158  
   159  	mu     sync.Mutex    // mu protects the following fields
   160  	closed bool          // set to true when the pool is closed.
   161  	active int           // the number of open connections in the pool
   162  	ch     chan struct{} // limits open connections when p.Wait is true
   163  	idle   idleList      // idle connections
   164  }
   165  
   166  // NewPool creates a new pool.
   167  //
   168  // Deprecated: Initialize the Pool directory as shown in the example.
   169  func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
   170  	return &Pool{Dial: newFn, MaxIdle: maxIdle}
   171  }
   172  
   173  // Get gets a connection. The application must close the returned connection.
   174  // This method always returns a valid connection so that applications can defer
   175  // error handling to the first use of the connection. If there is an error
   176  // getting an underlying connection, then the connection Err, Do, Send, Flush
   177  // and Receive methods return that error.
   178  func (p *Pool) Get() Conn {
   179  	pc, err := p.get(nil)
   180  	if err != nil {
   181  		return errorConn{err}
   182  	}
   183  	return &activeConn{p: p, pc: pc}
   184  }
   185  
   186  // PoolStats contains pool statistics.
   187  type PoolStats struct {
   188  	// ActiveCount is the number of connections in the pool. The count includes
   189  	// idle connections and connections in use.
   190  	ActiveCount int
   191  	// IdleCount is the number of idle connections in the pool.
   192  	IdleCount int
   193  }
   194  
   195  // Stats returns pool's statistics.
   196  func (p *Pool) Stats() PoolStats {
   197  	p.mu.Lock()
   198  	stats := PoolStats{
   199  		ActiveCount: p.active,
   200  		IdleCount:   p.idle.count,
   201  	}
   202  	p.mu.Unlock()
   203  
   204  	return stats
   205  }
   206  
   207  // ActiveCount returns the number of connections in the pool. The count
   208  // includes idle connections and connections in use.
   209  func (p *Pool) ActiveCount() int {
   210  	p.mu.Lock()
   211  	active := p.active
   212  	p.mu.Unlock()
   213  	return active
   214  }
   215  
   216  // IdleCount returns the number of idle connections in the pool.
   217  func (p *Pool) IdleCount() int {
   218  	p.mu.Lock()
   219  	idle := p.idle.count
   220  	p.mu.Unlock()
   221  	return idle
   222  }
   223  
   224  // Close releases the resources used by the pool.
   225  func (p *Pool) Close() error {
   226  	p.mu.Lock()
   227  	if p.closed {
   228  		p.mu.Unlock()
   229  		return nil
   230  	}
   231  	p.closed = true
   232  	p.active -= p.idle.count
   233  	pc := p.idle.front
   234  	p.idle.count = 0
   235  	p.idle.front, p.idle.back = nil, nil
   236  	if p.ch != nil {
   237  		close(p.ch)
   238  	}
   239  	p.mu.Unlock()
   240  	for ; pc != nil; pc = pc.next {
   241  		pc.c.Close()
   242  	}
   243  	return nil
   244  }
   245  
   246  func (p *Pool) lazyInit() {
   247  	// Fast path.
   248  	if atomic.LoadUint32(&p.chInitialized) == 1 {
   249  		return
   250  	}
   251  	// Slow path.
   252  	p.mu.Lock()
   253  	if p.chInitialized == 0 {
   254  		p.ch = make(chan struct{}, p.MaxActive)
   255  		if p.closed {
   256  			close(p.ch)
   257  		} else {
   258  			for i := 0; i < p.MaxActive; i++ {
   259  				p.ch <- struct{}{}
   260  			}
   261  		}
   262  		atomic.StoreUint32(&p.chInitialized, 1)
   263  	}
   264  	p.mu.Unlock()
   265  }
   266  
   267  // get prunes stale connections and returns a connection from the idle list or
   268  // creates a new connection.
   269  func (p *Pool) get(ctx interface {
   270  	Done() <-chan struct{}
   271  	Err() error
   272  }) (*poolConn, error) {
   273  
   274  	// Handle limit for p.Wait == true.
   275  	if p.Wait && p.MaxActive > 0 {
   276  		p.lazyInit()
   277  		if ctx == nil {
   278  			<-p.ch
   279  		} else {
   280  			select {
   281  			case <-p.ch:
   282  			case <-ctx.Done():
   283  				return nil, ctx.Err()
   284  			}
   285  		}
   286  	}
   287  
   288  	p.mu.Lock()
   289  
   290  	// Prune stale connections at the back of the idle list.
   291  	if p.IdleTimeout > 0 {
   292  		n := p.idle.count
   293  		for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
   294  			pc := p.idle.back
   295  			p.idle.popBack()
   296  			p.mu.Unlock()
   297  			pc.c.Close()
   298  			p.mu.Lock()
   299  			p.active--
   300  		}
   301  	}
   302  
   303  	// Get idle connection from the front of idle list.
   304  	for p.idle.front != nil {
   305  		pc := p.idle.front
   306  		p.idle.popFront()
   307  		p.mu.Unlock()
   308  		if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
   309  			(p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
   310  			return pc, nil
   311  		}
   312  		pc.c.Close()
   313  		p.mu.Lock()
   314  		p.active--
   315  	}
   316  
   317  	// Check for pool closed before dialing a new connection.
   318  	if p.closed {
   319  		p.mu.Unlock()
   320  		return nil, errors.New("redigo: get on closed pool")
   321  	}
   322  
   323  	// Handle limit for p.Wait == false.
   324  	if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
   325  		p.mu.Unlock()
   326  		return nil, ErrPoolExhausted
   327  	}
   328  
   329  	p.active++
   330  	p.mu.Unlock()
   331  	c, err := p.Dial()
   332  	if err != nil {
   333  		c = nil
   334  		p.mu.Lock()
   335  		p.active--
   336  		if p.ch != nil && !p.closed {
   337  			p.ch <- struct{}{}
   338  		}
   339  		p.mu.Unlock()
   340  	}
   341  	return &poolConn{c: c, created: nowFunc()}, err
   342  }
   343  
   344  func (p *Pool) put(pc *poolConn, forceClose bool) error {
   345  	p.mu.Lock()
   346  	if !p.closed && !forceClose {
   347  		pc.t = nowFunc()
   348  		p.idle.pushFront(pc)
   349  		if p.idle.count > p.MaxIdle {
   350  			pc = p.idle.back
   351  			p.idle.popBack()
   352  		} else {
   353  			pc = nil
   354  		}
   355  	}
   356  
   357  	if pc != nil {
   358  		p.mu.Unlock()
   359  		pc.c.Close()
   360  		p.mu.Lock()
   361  		p.active--
   362  	}
   363  
   364  	if p.ch != nil && !p.closed {
   365  		p.ch <- struct{}{}
   366  	}
   367  	p.mu.Unlock()
   368  	return nil
   369  }
   370  
   371  type activeConn struct {
   372  	p     *Pool
   373  	pc    *poolConn
   374  	state int
   375  }
   376  
   377  var (
   378  	sentinel     []byte
   379  	sentinelOnce sync.Once
   380  )
   381  
   382  func initSentinel() {
   383  	p := make([]byte, 64)
   384  	if _, err := rand.Read(p); err == nil {
   385  		sentinel = p
   386  	} else {
   387  		h := sha1.New()
   388  		io.WriteString(h, "Oops, rand failed. Use time instead.")
   389  		io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10))
   390  		sentinel = h.Sum(nil)
   391  	}
   392  }
   393  
   394  func (ac *activeConn) Close() error {
   395  	pc := ac.pc
   396  	if pc == nil {
   397  		return nil
   398  	}
   399  	ac.pc = nil
   400  
   401  	if ac.state&internal.MultiState != 0 {
   402  		pc.c.Send("DISCARD")
   403  		ac.state &^= (internal.MultiState | internal.WatchState)
   404  	} else if ac.state&internal.WatchState != 0 {
   405  		pc.c.Send("UNWATCH")
   406  		ac.state &^= internal.WatchState
   407  	}
   408  	if ac.state&internal.SubscribeState != 0 {
   409  		pc.c.Send("UNSUBSCRIBE")
   410  		pc.c.Send("PUNSUBSCRIBE")
   411  		// To detect the end of the message stream, ask the server to echo
   412  		// a sentinel value and read until we see that value.
   413  		sentinelOnce.Do(initSentinel)
   414  		pc.c.Send("ECHO", sentinel)
   415  		pc.c.Flush()
   416  		for {
   417  			p, err := pc.c.Receive()
   418  			if err != nil {
   419  				break
   420  			}
   421  			if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
   422  				ac.state &^= internal.SubscribeState
   423  				break
   424  			}
   425  		}
   426  	}
   427  	pc.c.Do("")
   428  	ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
   429  	return nil
   430  }
   431  
   432  func (ac *activeConn) Err() error {
   433  	pc := ac.pc
   434  	if pc == nil {
   435  		return errConnClosed
   436  	}
   437  	return pc.c.Err()
   438  }
   439  
   440  func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
   441  	pc := ac.pc
   442  	if pc == nil {
   443  		return nil, errConnClosed
   444  	}
   445  	ci := internal.LookupCommandInfo(commandName)
   446  	ac.state = (ac.state | ci.Set) &^ ci.Clear
   447  	return pc.c.Do(commandName, args...)
   448  }
   449  
   450  func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
   451  	pc := ac.pc
   452  	if pc == nil {
   453  		return nil, errConnClosed
   454  	}
   455  	cwt, ok := pc.c.(ConnWithTimeout)
   456  	if !ok {
   457  		return nil, errTimeoutNotSupported
   458  	}
   459  	ci := internal.LookupCommandInfo(commandName)
   460  	ac.state = (ac.state | ci.Set) &^ ci.Clear
   461  	return cwt.DoWithTimeout(timeout, commandName, args...)
   462  }
   463  
   464  func (ac *activeConn) Send(commandName string, args ...interface{}) error {
   465  	pc := ac.pc
   466  	if pc == nil {
   467  		return errConnClosed
   468  	}
   469  	ci := internal.LookupCommandInfo(commandName)
   470  	ac.state = (ac.state | ci.Set) &^ ci.Clear
   471  	return pc.c.Send(commandName, args...)
   472  }
   473  
   474  func (ac *activeConn) Flush() error {
   475  	pc := ac.pc
   476  	if pc == nil {
   477  		return errConnClosed
   478  	}
   479  	return pc.c.Flush()
   480  }
   481  
   482  func (ac *activeConn) Receive() (reply interface{}, err error) {
   483  	pc := ac.pc
   484  	if pc == nil {
   485  		return nil, errConnClosed
   486  	}
   487  	return pc.c.Receive()
   488  }
   489  
   490  func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
   491  	pc := ac.pc
   492  	if pc == nil {
   493  		return nil, errConnClosed
   494  	}
   495  	cwt, ok := pc.c.(ConnWithTimeout)
   496  	if !ok {
   497  		return nil, errTimeoutNotSupported
   498  	}
   499  	return cwt.ReceiveWithTimeout(timeout)
   500  }
   501  
   502  type errorConn struct{ err error }
   503  
   504  func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
   505  func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {
   506  	return nil, ec.err
   507  }
   508  func (ec errorConn) Send(string, ...interface{}) error                     { return ec.err }
   509  func (ec errorConn) Err() error                                            { return ec.err }
   510  func (ec errorConn) Close() error                                          { return nil }
   511  func (ec errorConn) Flush() error                                          { return ec.err }
   512  func (ec errorConn) Receive() (interface{}, error)                         { return nil, ec.err }
   513  func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }
   514  
   515  type idleList struct {
   516  	count       int
   517  	front, back *poolConn
   518  }
   519  
   520  type poolConn struct {
   521  	c          Conn
   522  	t          time.Time
   523  	created    time.Time
   524  	next, prev *poolConn
   525  }
   526  
   527  func (l *idleList) pushFront(pc *poolConn) {
   528  	pc.next = l.front
   529  	pc.prev = nil
   530  	if l.count == 0 {
   531  		l.back = pc
   532  	} else {
   533  		l.front.prev = pc
   534  	}
   535  	l.front = pc
   536  	l.count++
   537  	return
   538  }
   539  
   540  func (l *idleList) popFront() {
   541  	pc := l.front
   542  	l.count--
   543  	if l.count == 0 {
   544  		l.front, l.back = nil, nil
   545  	} else {
   546  		pc.next.prev = nil
   547  		l.front = pc.next
   548  	}
   549  	pc.next, pc.prev = nil, nil
   550  }
   551  
   552  func (l *idleList) popBack() {
   553  	pc := l.back
   554  	l.count--
   555  	if l.count == 0 {
   556  		l.front, l.back = nil, nil
   557  	} else {
   558  		pc.prev.next = nil
   559  		l.back = pc.prev
   560  	}
   561  	pc.next, pc.prev = nil, nil
   562  }
   563  

View as plain text