...

Source file src/github.com/go-redis/redis/internal/pool/pool.go

Documentation: github.com/go-redis/redis/internal/pool

     1  package pool
     2  
     3  import (
     4  	"errors"
     5  	"net"
     6  	"sync"
     7  	"sync/atomic"
     8  	"time"
     9  
    10  	"github.com/go-redis/redis/internal"
    11  )
    12  
    13  var ErrClosed = errors.New("redis: client is closed")
    14  var ErrPoolTimeout = errors.New("redis: connection pool timeout")
    15  
    16  var timers = sync.Pool{
    17  	New: func() interface{} {
    18  		t := time.NewTimer(time.Hour)
    19  		t.Stop()
    20  		return t
    21  	},
    22  }
    23  
    24  // Stats contains pool state information and accumulated stats.
    25  type Stats struct {
    26  	Hits     uint32 // number of times free connection was found in the pool
    27  	Misses   uint32 // number of times free connection was NOT found in the pool
    28  	Timeouts uint32 // number of times a wait timeout occurred
    29  
    30  	TotalConns uint32 // number of total connections in the pool
    31  	IdleConns  uint32 // number of idle connections in the pool
    32  	StaleConns uint32 // number of stale connections removed from the pool
    33  }
    34  
    35  type Pooler interface {
    36  	NewConn() (*Conn, error)
    37  	CloseConn(*Conn) error
    38  
    39  	Get() (*Conn, error)
    40  	Put(*Conn)
    41  	Remove(*Conn, error)
    42  
    43  	Len() int
    44  	IdleLen() int
    45  	Stats() *Stats
    46  
    47  	Close() error
    48  }
    49  
    50  type Options struct {
    51  	Dialer  func() (net.Conn, error)
    52  	OnClose func(*Conn) error
    53  
    54  	PoolSize           int
    55  	MinIdleConns       int
    56  	MaxConnAge         time.Duration
    57  	PoolTimeout        time.Duration
    58  	IdleTimeout        time.Duration
    59  	IdleCheckFrequency time.Duration
    60  }
    61  
    62  type ConnPool struct {
    63  	opt *Options
    64  
    65  	dialErrorsNum uint32 // atomic
    66  
    67  	lastDialErrorMu sync.RWMutex
    68  	lastDialError   error
    69  
    70  	queue chan struct{}
    71  
    72  	connsMu      sync.Mutex
    73  	conns        []*Conn
    74  	idleConns    []*Conn
    75  	poolSize     int
    76  	idleConnsLen int
    77  
    78  	stats Stats
    79  
    80  	_closed uint32 // atomic
    81  }
    82  
    83  var _ Pooler = (*ConnPool)(nil)
    84  
    85  func NewConnPool(opt *Options) *ConnPool {
    86  	p := &ConnPool{
    87  		opt: opt,
    88  
    89  		queue:     make(chan struct{}, opt.PoolSize),
    90  		conns:     make([]*Conn, 0, opt.PoolSize),
    91  		idleConns: make([]*Conn, 0, opt.PoolSize),
    92  	}
    93  
    94  	for i := 0; i < opt.MinIdleConns; i++ {
    95  		p.checkMinIdleConns()
    96  	}
    97  
    98  	if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
    99  		go p.reaper(opt.IdleCheckFrequency)
   100  	}
   101  
   102  	return p
   103  }
   104  
   105  func (p *ConnPool) checkMinIdleConns() {
   106  	if p.opt.MinIdleConns == 0 {
   107  		return
   108  	}
   109  	if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
   110  		p.poolSize++
   111  		p.idleConnsLen++
   112  		go p.addIdleConn()
   113  	}
   114  }
   115  
   116  func (p *ConnPool) addIdleConn() {
   117  	cn, err := p.newConn(true)
   118  	if err != nil {
   119  		return
   120  	}
   121  
   122  	p.connsMu.Lock()
   123  	p.conns = append(p.conns, cn)
   124  	p.idleConns = append(p.idleConns, cn)
   125  	p.connsMu.Unlock()
   126  }
   127  
   128  func (p *ConnPool) NewConn() (*Conn, error) {
   129  	return p._NewConn(false)
   130  }
   131  
   132  func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
   133  	cn, err := p.newConn(pooled)
   134  	if err != nil {
   135  		return nil, err
   136  	}
   137  
   138  	p.connsMu.Lock()
   139  	p.conns = append(p.conns, cn)
   140  	if pooled {
   141  		if p.poolSize < p.opt.PoolSize {
   142  			p.poolSize++
   143  		} else {
   144  			cn.pooled = false
   145  		}
   146  	}
   147  	p.connsMu.Unlock()
   148  	return cn, nil
   149  }
   150  
   151  func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
   152  	if p.closed() {
   153  		return nil, ErrClosed
   154  	}
   155  
   156  	if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
   157  		return nil, p.getLastDialError()
   158  	}
   159  
   160  	netConn, err := p.opt.Dialer()
   161  	if err != nil {
   162  		p.setLastDialError(err)
   163  		if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
   164  			go p.tryDial()
   165  		}
   166  		return nil, err
   167  	}
   168  
   169  	cn := NewConn(netConn)
   170  	cn.pooled = pooled
   171  	return cn, nil
   172  }
   173  
   174  func (p *ConnPool) tryDial() {
   175  	for {
   176  		if p.closed() {
   177  			return
   178  		}
   179  
   180  		conn, err := p.opt.Dialer()
   181  		if err != nil {
   182  			p.setLastDialError(err)
   183  			time.Sleep(time.Second)
   184  			continue
   185  		}
   186  
   187  		atomic.StoreUint32(&p.dialErrorsNum, 0)
   188  		_ = conn.Close()
   189  		return
   190  	}
   191  }
   192  
   193  func (p *ConnPool) setLastDialError(err error) {
   194  	p.lastDialErrorMu.Lock()
   195  	p.lastDialError = err
   196  	p.lastDialErrorMu.Unlock()
   197  }
   198  
   199  func (p *ConnPool) getLastDialError() error {
   200  	p.lastDialErrorMu.RLock()
   201  	err := p.lastDialError
   202  	p.lastDialErrorMu.RUnlock()
   203  	return err
   204  }
   205  
   206  // Get returns existed connection from the pool or creates a new one.
   207  func (p *ConnPool) Get() (*Conn, error) {
   208  	if p.closed() {
   209  		return nil, ErrClosed
   210  	}
   211  
   212  	err := p.waitTurn()
   213  	if err != nil {
   214  		return nil, err
   215  	}
   216  
   217  	for {
   218  		p.connsMu.Lock()
   219  		cn := p.popIdle()
   220  		p.connsMu.Unlock()
   221  
   222  		if cn == nil {
   223  			break
   224  		}
   225  
   226  		if p.isStaleConn(cn) {
   227  			_ = p.CloseConn(cn)
   228  			continue
   229  		}
   230  
   231  		atomic.AddUint32(&p.stats.Hits, 1)
   232  		return cn, nil
   233  	}
   234  
   235  	atomic.AddUint32(&p.stats.Misses, 1)
   236  
   237  	newcn, err := p._NewConn(true)
   238  	if err != nil {
   239  		p.freeTurn()
   240  		return nil, err
   241  	}
   242  
   243  	return newcn, nil
   244  }
   245  
   246  func (p *ConnPool) getTurn() {
   247  	p.queue <- struct{}{}
   248  }
   249  
   250  func (p *ConnPool) waitTurn() error {
   251  	select {
   252  	case p.queue <- struct{}{}:
   253  		return nil
   254  	default:
   255  		timer := timers.Get().(*time.Timer)
   256  		timer.Reset(p.opt.PoolTimeout)
   257  
   258  		select {
   259  		case p.queue <- struct{}{}:
   260  			if !timer.Stop() {
   261  				<-timer.C
   262  			}
   263  			timers.Put(timer)
   264  			return nil
   265  		case <-timer.C:
   266  			timers.Put(timer)
   267  			atomic.AddUint32(&p.stats.Timeouts, 1)
   268  			return ErrPoolTimeout
   269  		}
   270  	}
   271  }
   272  
   273  func (p *ConnPool) freeTurn() {
   274  	<-p.queue
   275  }
   276  
   277  func (p *ConnPool) popIdle() *Conn {
   278  	if len(p.idleConns) == 0 {
   279  		return nil
   280  	}
   281  
   282  	idx := len(p.idleConns) - 1
   283  	cn := p.idleConns[idx]
   284  	p.idleConns = p.idleConns[:idx]
   285  	p.idleConnsLen--
   286  	p.checkMinIdleConns()
   287  	return cn
   288  }
   289  
   290  func (p *ConnPool) Put(cn *Conn) {
   291  	if !cn.pooled {
   292  		p.Remove(cn, nil)
   293  		return
   294  	}
   295  
   296  	p.connsMu.Lock()
   297  	p.idleConns = append(p.idleConns, cn)
   298  	p.idleConnsLen++
   299  	p.connsMu.Unlock()
   300  	p.freeTurn()
   301  }
   302  
   303  func (p *ConnPool) Remove(cn *Conn, reason error) {
   304  	p.removeConn(cn)
   305  	p.freeTurn()
   306  	_ = p.closeConn(cn)
   307  }
   308  
   309  func (p *ConnPool) CloseConn(cn *Conn) error {
   310  	p.removeConn(cn)
   311  	return p.closeConn(cn)
   312  }
   313  
   314  func (p *ConnPool) removeConn(cn *Conn) {
   315  	p.connsMu.Lock()
   316  	for i, c := range p.conns {
   317  		if c == cn {
   318  			p.conns = append(p.conns[:i], p.conns[i+1:]...)
   319  			if cn.pooled {
   320  				p.poolSize--
   321  				p.checkMinIdleConns()
   322  			}
   323  			break
   324  		}
   325  	}
   326  	p.connsMu.Unlock()
   327  }
   328  
   329  func (p *ConnPool) closeConn(cn *Conn) error {
   330  	if p.opt.OnClose != nil {
   331  		_ = p.opt.OnClose(cn)
   332  	}
   333  	return cn.Close()
   334  }
   335  
   336  // Len returns total number of connections.
   337  func (p *ConnPool) Len() int {
   338  	p.connsMu.Lock()
   339  	n := len(p.conns)
   340  	p.connsMu.Unlock()
   341  	return n
   342  }
   343  
   344  // IdleLen returns number of idle connections.
   345  func (p *ConnPool) IdleLen() int {
   346  	p.connsMu.Lock()
   347  	n := p.idleConnsLen
   348  	p.connsMu.Unlock()
   349  	return n
   350  }
   351  
   352  func (p *ConnPool) Stats() *Stats {
   353  	idleLen := p.IdleLen()
   354  	return &Stats{
   355  		Hits:     atomic.LoadUint32(&p.stats.Hits),
   356  		Misses:   atomic.LoadUint32(&p.stats.Misses),
   357  		Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
   358  
   359  		TotalConns: uint32(p.Len()),
   360  		IdleConns:  uint32(idleLen),
   361  		StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
   362  	}
   363  }
   364  
   365  func (p *ConnPool) closed() bool {
   366  	return atomic.LoadUint32(&p._closed) == 1
   367  }
   368  
   369  func (p *ConnPool) Filter(fn func(*Conn) bool) error {
   370  	var firstErr error
   371  	p.connsMu.Lock()
   372  	for _, cn := range p.conns {
   373  		if fn(cn) {
   374  			if err := p.closeConn(cn); err != nil && firstErr == nil {
   375  				firstErr = err
   376  			}
   377  		}
   378  	}
   379  	p.connsMu.Unlock()
   380  	return firstErr
   381  }
   382  
   383  func (p *ConnPool) Close() error {
   384  	if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
   385  		return ErrClosed
   386  	}
   387  
   388  	var firstErr error
   389  	p.connsMu.Lock()
   390  	for _, cn := range p.conns {
   391  		if err := p.closeConn(cn); err != nil && firstErr == nil {
   392  			firstErr = err
   393  		}
   394  	}
   395  	p.conns = nil
   396  	p.poolSize = 0
   397  	p.idleConns = nil
   398  	p.idleConnsLen = 0
   399  	p.connsMu.Unlock()
   400  
   401  	return firstErr
   402  }
   403  
   404  func (p *ConnPool) reapStaleConn() *Conn {
   405  	if len(p.idleConns) == 0 {
   406  		return nil
   407  	}
   408  
   409  	cn := p.idleConns[0]
   410  	if !p.isStaleConn(cn) {
   411  		return nil
   412  	}
   413  
   414  	p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
   415  	p.idleConnsLen--
   416  
   417  	return cn
   418  }
   419  
   420  func (p *ConnPool) ReapStaleConns() (int, error) {
   421  	var n int
   422  	for {
   423  		p.getTurn()
   424  
   425  		p.connsMu.Lock()
   426  		cn := p.reapStaleConn()
   427  		p.connsMu.Unlock()
   428  
   429  		if cn != nil {
   430  			p.removeConn(cn)
   431  		}
   432  
   433  		p.freeTurn()
   434  
   435  		if cn != nil {
   436  			p.closeConn(cn)
   437  			n++
   438  		} else {
   439  			break
   440  		}
   441  	}
   442  	return n, nil
   443  }
   444  
   445  func (p *ConnPool) reaper(frequency time.Duration) {
   446  	ticker := time.NewTicker(frequency)
   447  	defer ticker.Stop()
   448  
   449  	for range ticker.C {
   450  		if p.closed() {
   451  			break
   452  		}
   453  		n, err := p.ReapStaleConns()
   454  		if err != nil {
   455  			internal.Logf("ReapStaleConns failed: %s", err)
   456  			continue
   457  		}
   458  		atomic.AddUint32(&p.stats.StaleConns, uint32(n))
   459  	}
   460  }
   461  
   462  func (p *ConnPool) isStaleConn(cn *Conn) bool {
   463  	if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
   464  		return false
   465  	}
   466  
   467  	now := time.Now()
   468  	if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
   469  		return true
   470  	}
   471  	if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge {
   472  		return true
   473  	}
   474  
   475  	return false
   476  }
   477  

View as plain text