...

Source file src/github.com/go-redis/redis/internal/pool/pool_single.go

Documentation: github.com/go-redis/redis/internal/pool

     1  package pool
     2  
     3  import (
     4  	"fmt"
     5  	"sync/atomic"
     6  )
     7  
     8  const (
     9  	stateDefault = 0
    10  	stateInited  = 1
    11  	stateClosed  = 2
    12  )
    13  
    14  type BadConnError struct {
    15  	wrapped error
    16  }
    17  
    18  var _ error = (*BadConnError)(nil)
    19  
    20  func (e BadConnError) Error() string {
    21  	return "pg: Conn is in a bad state"
    22  }
    23  
    24  func (e BadConnError) Unwrap() error {
    25  	return e.wrapped
    26  }
    27  
    28  type SingleConnPool struct {
    29  	pool  Pooler
    30  	level int32 // atomic
    31  
    32  	state uint32 // atomic
    33  	ch    chan *Conn
    34  
    35  	_badConnError atomic.Value
    36  }
    37  
    38  var _ Pooler = (*SingleConnPool)(nil)
    39  
    40  func NewSingleConnPool(pool Pooler) *SingleConnPool {
    41  	p, ok := pool.(*SingleConnPool)
    42  	if !ok {
    43  		p = &SingleConnPool{
    44  			pool: pool,
    45  			ch:   make(chan *Conn, 1),
    46  		}
    47  	}
    48  	atomic.AddInt32(&p.level, 1)
    49  	return p
    50  }
    51  
    52  func (p *SingleConnPool) SetConn(cn *Conn) {
    53  	if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
    54  		p.ch <- cn
    55  	} else {
    56  		panic("not reached")
    57  	}
    58  }
    59  
    60  func (p *SingleConnPool) NewConn() (*Conn, error) {
    61  	return p.pool.NewConn()
    62  }
    63  
    64  func (p *SingleConnPool) CloseConn(cn *Conn) error {
    65  	return p.pool.CloseConn(cn)
    66  }
    67  
    68  func (p *SingleConnPool) Get() (*Conn, error) {
    69  	// In worst case this races with Close which is not a very common operation.
    70  	for i := 0; i < 1000; i++ {
    71  		switch atomic.LoadUint32(&p.state) {
    72  		case stateDefault:
    73  			cn, err := p.pool.Get()
    74  			if err != nil {
    75  				return nil, err
    76  			}
    77  			if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
    78  				return cn, nil
    79  			}
    80  			p.pool.Remove(cn, ErrClosed)
    81  		case stateInited:
    82  			if err := p.badConnError(); err != nil {
    83  				return nil, err
    84  			}
    85  			cn, ok := <-p.ch
    86  			if !ok {
    87  				return nil, ErrClosed
    88  			}
    89  			return cn, nil
    90  		case stateClosed:
    91  			return nil, ErrClosed
    92  		default:
    93  			panic("not reached")
    94  		}
    95  	}
    96  	return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop")
    97  }
    98  
    99  func (p *SingleConnPool) Put(cn *Conn) {
   100  	defer func() {
   101  		if recover() != nil {
   102  			p.freeConn(cn)
   103  		}
   104  	}()
   105  	p.ch <- cn
   106  }
   107  
   108  func (p *SingleConnPool) freeConn(cn *Conn) {
   109  	if err := p.badConnError(); err != nil {
   110  		p.pool.Remove(cn, err)
   111  	} else {
   112  		p.pool.Put(cn)
   113  	}
   114  }
   115  
   116  func (p *SingleConnPool) Remove(cn *Conn, reason error) {
   117  	defer func() {
   118  		if recover() != nil {
   119  			p.pool.Remove(cn, ErrClosed)
   120  		}
   121  	}()
   122  	p._badConnError.Store(BadConnError{wrapped: reason})
   123  	p.ch <- cn
   124  }
   125  
   126  func (p *SingleConnPool) Len() int {
   127  	switch atomic.LoadUint32(&p.state) {
   128  	case stateDefault:
   129  		return 0
   130  	case stateInited:
   131  		return 1
   132  	case stateClosed:
   133  		return 0
   134  	default:
   135  		panic("not reached")
   136  	}
   137  }
   138  
   139  func (p *SingleConnPool) IdleLen() int {
   140  	return len(p.ch)
   141  }
   142  
   143  func (p *SingleConnPool) Stats() *Stats {
   144  	return &Stats{}
   145  }
   146  
   147  func (p *SingleConnPool) Close() error {
   148  	level := atomic.AddInt32(&p.level, -1)
   149  	if level > 0 {
   150  		return nil
   151  	}
   152  
   153  	for i := 0; i < 1000; i++ {
   154  		state := atomic.LoadUint32(&p.state)
   155  		if state == stateClosed {
   156  			return ErrClosed
   157  		}
   158  		if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
   159  			close(p.ch)
   160  			cn, ok := <-p.ch
   161  			if ok {
   162  				p.freeConn(cn)
   163  			}
   164  			return nil
   165  		}
   166  	}
   167  
   168  	return fmt.Errorf("pg: SingleConnPool.Close: infinite loop")
   169  }
   170  
   171  func (p *SingleConnPool) Reset() error {
   172  	if p.badConnError() == nil {
   173  		return nil
   174  	}
   175  
   176  	select {
   177  	case cn, ok := <-p.ch:
   178  		if !ok {
   179  			return ErrClosed
   180  		}
   181  		p.pool.Remove(cn, ErrClosed)
   182  		p._badConnError.Store(BadConnError{wrapped: nil})
   183  	default:
   184  		return fmt.Errorf("pg: SingleConnPool does not have a Conn")
   185  	}
   186  
   187  	if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
   188  		state := atomic.LoadUint32(&p.state)
   189  		return fmt.Errorf("pg: invalid SingleConnPool state: %d", state)
   190  	}
   191  
   192  	return nil
   193  }
   194  
   195  func (p *SingleConnPool) badConnError() error {
   196  	if v := p._badConnError.Load(); v != nil {
   197  		err := v.(BadConnError)
   198  		if err.wrapped != nil {
   199  			return err
   200  		}
   201  	}
   202  	return nil
   203  }
   204  

View as plain text