...

Source file src/github.com/go-redis/redis/internal/pool/conn.go

Documentation: github.com/go-redis/redis/internal/pool

     1  package pool
     2  
     3  import (
     4  	"net"
     5  	"sync/atomic"
     6  	"time"
     7  
     8  	"github.com/go-redis/redis/internal/proto"
     9  )
    10  
    11  var noDeadline = time.Time{}
    12  
    13  type Conn struct {
    14  	netConn net.Conn
    15  
    16  	rd       *proto.Reader
    17  	rdLocked bool
    18  	wr       *proto.Writer
    19  
    20  	Inited    bool
    21  	pooled    bool
    22  	createdAt time.Time
    23  	usedAt    atomic.Value
    24  }
    25  
    26  func NewConn(netConn net.Conn) *Conn {
    27  	cn := &Conn{
    28  		netConn:   netConn,
    29  		createdAt: time.Now(),
    30  	}
    31  	cn.rd = proto.NewReader(netConn)
    32  	cn.wr = proto.NewWriter(netConn)
    33  	cn.SetUsedAt(time.Now())
    34  	return cn
    35  }
    36  
    37  func (cn *Conn) UsedAt() time.Time {
    38  	return cn.usedAt.Load().(time.Time)
    39  }
    40  
    41  func (cn *Conn) SetUsedAt(tm time.Time) {
    42  	cn.usedAt.Store(tm)
    43  }
    44  
    45  func (cn *Conn) SetNetConn(netConn net.Conn) {
    46  	cn.netConn = netConn
    47  	cn.rd.Reset(netConn)
    48  	cn.wr.Reset(netConn)
    49  }
    50  
    51  func (cn *Conn) setReadTimeout(timeout time.Duration) error {
    52  	now := time.Now()
    53  	cn.SetUsedAt(now)
    54  	if timeout > 0 {
    55  		return cn.netConn.SetReadDeadline(now.Add(timeout))
    56  	}
    57  	return cn.netConn.SetReadDeadline(noDeadline)
    58  }
    59  
    60  func (cn *Conn) setWriteTimeout(timeout time.Duration) error {
    61  	now := time.Now()
    62  	cn.SetUsedAt(now)
    63  	if timeout > 0 {
    64  		return cn.netConn.SetWriteDeadline(now.Add(timeout))
    65  	}
    66  	return cn.netConn.SetWriteDeadline(noDeadline)
    67  }
    68  
    69  func (cn *Conn) Write(b []byte) (int, error) {
    70  	return cn.netConn.Write(b)
    71  }
    72  
    73  func (cn *Conn) RemoteAddr() net.Addr {
    74  	return cn.netConn.RemoteAddr()
    75  }
    76  
    77  func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error {
    78  	_ = cn.setReadTimeout(timeout)
    79  	return fn(cn.rd)
    80  }
    81  
    82  func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error {
    83  	_ = cn.setWriteTimeout(timeout)
    84  
    85  	firstErr := fn(cn.wr)
    86  	err := cn.wr.Flush()
    87  	if err != nil && firstErr == nil {
    88  		firstErr = err
    89  	}
    90  	return firstErr
    91  }
    92  
    93  func (cn *Conn) Close() error {
    94  	return cn.netConn.Close()
    95  }
    96  

View as plain text