...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package redisx
16
17 import (
18 "errors"
19 "sync"
20
21 "github.com/gomodule/redigo/internal"
22 "github.com/gomodule/redigo/redis"
23 )
24
25
26
27
28
29 type ConnMux struct {
30 c redis.Conn
31
32 sendMu sync.Mutex
33 sendID uint
34
35 recvMu sync.Mutex
36 recvID uint
37 recvWait map[uint]chan struct{}
38 }
39
40 func NewConnMux(c redis.Conn) *ConnMux {
41 return &ConnMux{c: c, recvWait: make(map[uint]chan struct{})}
42 }
43
44
45 func (p *ConnMux) Get() redis.Conn {
46 c := &muxConn{p: p}
47 c.ids = c.buf[:0]
48 return c
49 }
50
51
52 func (p *ConnMux) Close() error {
53 return p.c.Close()
54 }
55
56 type muxConn struct {
57 p *ConnMux
58 ids []uint
59 buf [8]uint
60 }
61
62 func (c *muxConn) send(flush bool, cmd string, args ...interface{}) error {
63 if internal.LookupCommandInfo(cmd).Set != 0 {
64 return errors.New("command not supported by mux pool")
65 }
66 p := c.p
67 p.sendMu.Lock()
68 id := p.sendID
69 c.ids = append(c.ids, id)
70 p.sendID++
71 err := p.c.Send(cmd, args...)
72 if flush {
73 err = p.c.Flush()
74 }
75 p.sendMu.Unlock()
76 return err
77 }
78
79 func (c *muxConn) Send(cmd string, args ...interface{}) error {
80 return c.send(false, cmd, args...)
81 }
82
83 func (c *muxConn) Flush() error {
84 p := c.p
85 p.sendMu.Lock()
86 err := p.c.Flush()
87 p.sendMu.Unlock()
88 return err
89 }
90
91 func (c *muxConn) Receive() (interface{}, error) {
92 if len(c.ids) == 0 {
93 return nil, errors.New("mux pool underflow")
94 }
95
96 id := c.ids[0]
97 c.ids = c.ids[1:]
98 if len(c.ids) == 0 {
99 c.ids = c.buf[:0]
100 }
101
102 p := c.p
103 p.recvMu.Lock()
104 if p.recvID != id {
105 ch := make(chan struct{})
106 p.recvWait[id] = ch
107 p.recvMu.Unlock()
108 <-ch
109 p.recvMu.Lock()
110 if p.recvID != id {
111 panic("out of sync")
112 }
113 }
114
115 v, err := p.c.Receive()
116
117 id++
118 p.recvID = id
119 ch, ok := p.recvWait[id]
120 if ok {
121 delete(p.recvWait, id)
122 }
123 p.recvMu.Unlock()
124 if ok {
125 ch <- struct{}{}
126 }
127
128 return v, err
129 }
130
131 func (c *muxConn) Close() error {
132 var err error
133 if len(c.ids) == 0 {
134 return nil
135 }
136 c.Flush()
137 for _ = range c.ids {
138 _, err = c.Receive()
139 }
140 return err
141 }
142
143 func (c *muxConn) Do(cmd string, args ...interface{}) (interface{}, error) {
144 if err := c.send(true, cmd, args...); err != nil {
145 return nil, err
146 }
147 return c.Receive()
148 }
149
150 func (c *muxConn) Err() error {
151 return c.p.c.Err()
152 }
153
View as plain text