1 package pool
2
3 import (
4 "errors"
5 "net"
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "github.com/go-redis/redis/internal"
11 )
12
13 var ErrClosed = errors.New("redis: client is closed")
14 var ErrPoolTimeout = errors.New("redis: connection pool timeout")
15
16 var timers = sync.Pool{
17 New: func() interface{} {
18 t := time.NewTimer(time.Hour)
19 t.Stop()
20 return t
21 },
22 }
23
24
25 type Stats struct {
26 Hits uint32
27 Misses uint32
28 Timeouts uint32
29
30 TotalConns uint32
31 IdleConns uint32
32 StaleConns uint32
33 }
34
35 type Pooler interface {
36 NewConn() (*Conn, error)
37 CloseConn(*Conn) error
38
39 Get() (*Conn, error)
40 Put(*Conn)
41 Remove(*Conn, error)
42
43 Len() int
44 IdleLen() int
45 Stats() *Stats
46
47 Close() error
48 }
49
50 type Options struct {
51 Dialer func() (net.Conn, error)
52 OnClose func(*Conn) error
53
54 PoolSize int
55 MinIdleConns int
56 MaxConnAge time.Duration
57 PoolTimeout time.Duration
58 IdleTimeout time.Duration
59 IdleCheckFrequency time.Duration
60 }
61
62 type ConnPool struct {
63 opt *Options
64
65 dialErrorsNum uint32
66
67 lastDialErrorMu sync.RWMutex
68 lastDialError error
69
70 queue chan struct{}
71
72 connsMu sync.Mutex
73 conns []*Conn
74 idleConns []*Conn
75 poolSize int
76 idleConnsLen int
77
78 stats Stats
79
80 _closed uint32
81 }
82
83 var _ Pooler = (*ConnPool)(nil)
84
85 func NewConnPool(opt *Options) *ConnPool {
86 p := &ConnPool{
87 opt: opt,
88
89 queue: make(chan struct{}, opt.PoolSize),
90 conns: make([]*Conn, 0, opt.PoolSize),
91 idleConns: make([]*Conn, 0, opt.PoolSize),
92 }
93
94 for i := 0; i < opt.MinIdleConns; i++ {
95 p.checkMinIdleConns()
96 }
97
98 if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
99 go p.reaper(opt.IdleCheckFrequency)
100 }
101
102 return p
103 }
104
105 func (p *ConnPool) checkMinIdleConns() {
106 if p.opt.MinIdleConns == 0 {
107 return
108 }
109 if p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
110 p.poolSize++
111 p.idleConnsLen++
112 go p.addIdleConn()
113 }
114 }
115
116 func (p *ConnPool) addIdleConn() {
117 cn, err := p.newConn(true)
118 if err != nil {
119 return
120 }
121
122 p.connsMu.Lock()
123 p.conns = append(p.conns, cn)
124 p.idleConns = append(p.idleConns, cn)
125 p.connsMu.Unlock()
126 }
127
128 func (p *ConnPool) NewConn() (*Conn, error) {
129 return p._NewConn(false)
130 }
131
132 func (p *ConnPool) _NewConn(pooled bool) (*Conn, error) {
133 cn, err := p.newConn(pooled)
134 if err != nil {
135 return nil, err
136 }
137
138 p.connsMu.Lock()
139 p.conns = append(p.conns, cn)
140 if pooled {
141 if p.poolSize < p.opt.PoolSize {
142 p.poolSize++
143 } else {
144 cn.pooled = false
145 }
146 }
147 p.connsMu.Unlock()
148 return cn, nil
149 }
150
151 func (p *ConnPool) newConn(pooled bool) (*Conn, error) {
152 if p.closed() {
153 return nil, ErrClosed
154 }
155
156 if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
157 return nil, p.getLastDialError()
158 }
159
160 netConn, err := p.opt.Dialer()
161 if err != nil {
162 p.setLastDialError(err)
163 if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
164 go p.tryDial()
165 }
166 return nil, err
167 }
168
169 cn := NewConn(netConn)
170 cn.pooled = pooled
171 return cn, nil
172 }
173
174 func (p *ConnPool) tryDial() {
175 for {
176 if p.closed() {
177 return
178 }
179
180 conn, err := p.opt.Dialer()
181 if err != nil {
182 p.setLastDialError(err)
183 time.Sleep(time.Second)
184 continue
185 }
186
187 atomic.StoreUint32(&p.dialErrorsNum, 0)
188 _ = conn.Close()
189 return
190 }
191 }
192
193 func (p *ConnPool) setLastDialError(err error) {
194 p.lastDialErrorMu.Lock()
195 p.lastDialError = err
196 p.lastDialErrorMu.Unlock()
197 }
198
199 func (p *ConnPool) getLastDialError() error {
200 p.lastDialErrorMu.RLock()
201 err := p.lastDialError
202 p.lastDialErrorMu.RUnlock()
203 return err
204 }
205
206
207 func (p *ConnPool) Get() (*Conn, error) {
208 if p.closed() {
209 return nil, ErrClosed
210 }
211
212 err := p.waitTurn()
213 if err != nil {
214 return nil, err
215 }
216
217 for {
218 p.connsMu.Lock()
219 cn := p.popIdle()
220 p.connsMu.Unlock()
221
222 if cn == nil {
223 break
224 }
225
226 if p.isStaleConn(cn) {
227 _ = p.CloseConn(cn)
228 continue
229 }
230
231 atomic.AddUint32(&p.stats.Hits, 1)
232 return cn, nil
233 }
234
235 atomic.AddUint32(&p.stats.Misses, 1)
236
237 newcn, err := p._NewConn(true)
238 if err != nil {
239 p.freeTurn()
240 return nil, err
241 }
242
243 return newcn, nil
244 }
245
246 func (p *ConnPool) getTurn() {
247 p.queue <- struct{}{}
248 }
249
250 func (p *ConnPool) waitTurn() error {
251 select {
252 case p.queue <- struct{}{}:
253 return nil
254 default:
255 timer := timers.Get().(*time.Timer)
256 timer.Reset(p.opt.PoolTimeout)
257
258 select {
259 case p.queue <- struct{}{}:
260 if !timer.Stop() {
261 <-timer.C
262 }
263 timers.Put(timer)
264 return nil
265 case <-timer.C:
266 timers.Put(timer)
267 atomic.AddUint32(&p.stats.Timeouts, 1)
268 return ErrPoolTimeout
269 }
270 }
271 }
272
273 func (p *ConnPool) freeTurn() {
274 <-p.queue
275 }
276
277 func (p *ConnPool) popIdle() *Conn {
278 if len(p.idleConns) == 0 {
279 return nil
280 }
281
282 idx := len(p.idleConns) - 1
283 cn := p.idleConns[idx]
284 p.idleConns = p.idleConns[:idx]
285 p.idleConnsLen--
286 p.checkMinIdleConns()
287 return cn
288 }
289
290 func (p *ConnPool) Put(cn *Conn) {
291 if !cn.pooled {
292 p.Remove(cn, nil)
293 return
294 }
295
296 p.connsMu.Lock()
297 p.idleConns = append(p.idleConns, cn)
298 p.idleConnsLen++
299 p.connsMu.Unlock()
300 p.freeTurn()
301 }
302
303 func (p *ConnPool) Remove(cn *Conn, reason error) {
304 p.removeConn(cn)
305 p.freeTurn()
306 _ = p.closeConn(cn)
307 }
308
309 func (p *ConnPool) CloseConn(cn *Conn) error {
310 p.removeConn(cn)
311 return p.closeConn(cn)
312 }
313
314 func (p *ConnPool) removeConn(cn *Conn) {
315 p.connsMu.Lock()
316 for i, c := range p.conns {
317 if c == cn {
318 p.conns = append(p.conns[:i], p.conns[i+1:]...)
319 if cn.pooled {
320 p.poolSize--
321 p.checkMinIdleConns()
322 }
323 break
324 }
325 }
326 p.connsMu.Unlock()
327 }
328
329 func (p *ConnPool) closeConn(cn *Conn) error {
330 if p.opt.OnClose != nil {
331 _ = p.opt.OnClose(cn)
332 }
333 return cn.Close()
334 }
335
336
337 func (p *ConnPool) Len() int {
338 p.connsMu.Lock()
339 n := len(p.conns)
340 p.connsMu.Unlock()
341 return n
342 }
343
344
345 func (p *ConnPool) IdleLen() int {
346 p.connsMu.Lock()
347 n := p.idleConnsLen
348 p.connsMu.Unlock()
349 return n
350 }
351
352 func (p *ConnPool) Stats() *Stats {
353 idleLen := p.IdleLen()
354 return &Stats{
355 Hits: atomic.LoadUint32(&p.stats.Hits),
356 Misses: atomic.LoadUint32(&p.stats.Misses),
357 Timeouts: atomic.LoadUint32(&p.stats.Timeouts),
358
359 TotalConns: uint32(p.Len()),
360 IdleConns: uint32(idleLen),
361 StaleConns: atomic.LoadUint32(&p.stats.StaleConns),
362 }
363 }
364
365 func (p *ConnPool) closed() bool {
366 return atomic.LoadUint32(&p._closed) == 1
367 }
368
369 func (p *ConnPool) Filter(fn func(*Conn) bool) error {
370 var firstErr error
371 p.connsMu.Lock()
372 for _, cn := range p.conns {
373 if fn(cn) {
374 if err := p.closeConn(cn); err != nil && firstErr == nil {
375 firstErr = err
376 }
377 }
378 }
379 p.connsMu.Unlock()
380 return firstErr
381 }
382
383 func (p *ConnPool) Close() error {
384 if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
385 return ErrClosed
386 }
387
388 var firstErr error
389 p.connsMu.Lock()
390 for _, cn := range p.conns {
391 if err := p.closeConn(cn); err != nil && firstErr == nil {
392 firstErr = err
393 }
394 }
395 p.conns = nil
396 p.poolSize = 0
397 p.idleConns = nil
398 p.idleConnsLen = 0
399 p.connsMu.Unlock()
400
401 return firstErr
402 }
403
404 func (p *ConnPool) reapStaleConn() *Conn {
405 if len(p.idleConns) == 0 {
406 return nil
407 }
408
409 cn := p.idleConns[0]
410 if !p.isStaleConn(cn) {
411 return nil
412 }
413
414 p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
415 p.idleConnsLen--
416
417 return cn
418 }
419
420 func (p *ConnPool) ReapStaleConns() (int, error) {
421 var n int
422 for {
423 p.getTurn()
424
425 p.connsMu.Lock()
426 cn := p.reapStaleConn()
427 p.connsMu.Unlock()
428
429 if cn != nil {
430 p.removeConn(cn)
431 }
432
433 p.freeTurn()
434
435 if cn != nil {
436 p.closeConn(cn)
437 n++
438 } else {
439 break
440 }
441 }
442 return n, nil
443 }
444
445 func (p *ConnPool) reaper(frequency time.Duration) {
446 ticker := time.NewTicker(frequency)
447 defer ticker.Stop()
448
449 for range ticker.C {
450 if p.closed() {
451 break
452 }
453 n, err := p.ReapStaleConns()
454 if err != nil {
455 internal.Logf("ReapStaleConns failed: %s", err)
456 continue
457 }
458 atomic.AddUint32(&p.stats.StaleConns, uint32(n))
459 }
460 }
461
462 func (p *ConnPool) isStaleConn(cn *Conn) bool {
463 if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
464 return false
465 }
466
467 now := time.Now()
468 if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
469 return true
470 }
471 if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge {
472 return true
473 }
474
475 return false
476 }
477
View as plain text