...
1 package pool
2
3 import (
4 "net"
5 "sync/atomic"
6 "time"
7
8 "github.com/go-redis/redis/internal/proto"
9 )
10
11 var noDeadline = time.Time{}
12
13 type Conn struct {
14 netConn net.Conn
15
16 rd *proto.Reader
17 rdLocked bool
18 wr *proto.Writer
19
20 Inited bool
21 pooled bool
22 createdAt time.Time
23 usedAt atomic.Value
24 }
25
26 func NewConn(netConn net.Conn) *Conn {
27 cn := &Conn{
28 netConn: netConn,
29 createdAt: time.Now(),
30 }
31 cn.rd = proto.NewReader(netConn)
32 cn.wr = proto.NewWriter(netConn)
33 cn.SetUsedAt(time.Now())
34 return cn
35 }
36
37 func (cn *Conn) UsedAt() time.Time {
38 return cn.usedAt.Load().(time.Time)
39 }
40
41 func (cn *Conn) SetUsedAt(tm time.Time) {
42 cn.usedAt.Store(tm)
43 }
44
45 func (cn *Conn) SetNetConn(netConn net.Conn) {
46 cn.netConn = netConn
47 cn.rd.Reset(netConn)
48 cn.wr.Reset(netConn)
49 }
50
51 func (cn *Conn) setReadTimeout(timeout time.Duration) error {
52 now := time.Now()
53 cn.SetUsedAt(now)
54 if timeout > 0 {
55 return cn.netConn.SetReadDeadline(now.Add(timeout))
56 }
57 return cn.netConn.SetReadDeadline(noDeadline)
58 }
59
60 func (cn *Conn) setWriteTimeout(timeout time.Duration) error {
61 now := time.Now()
62 cn.SetUsedAt(now)
63 if timeout > 0 {
64 return cn.netConn.SetWriteDeadline(now.Add(timeout))
65 }
66 return cn.netConn.SetWriteDeadline(noDeadline)
67 }
68
69 func (cn *Conn) Write(b []byte) (int, error) {
70 return cn.netConn.Write(b)
71 }
72
73 func (cn *Conn) RemoteAddr() net.Addr {
74 return cn.netConn.RemoteAddr()
75 }
76
77 func (cn *Conn) WithReader(timeout time.Duration, fn func(rd *proto.Reader) error) error {
78 _ = cn.setReadTimeout(timeout)
79 return fn(cn.rd)
80 }
81
82 func (cn *Conn) WithWriter(timeout time.Duration, fn func(wr *proto.Writer) error) error {
83 _ = cn.setWriteTimeout(timeout)
84
85 firstErr := fn(cn.wr)
86 err := cn.wr.Flush()
87 if err != nil && firstErr == nil {
88 firstErr = err
89 }
90 return firstErr
91 }
92
93 func (cn *Conn) Close() error {
94 return cn.netConn.Close()
95 }
96
View as plain text