1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package redis
16
17 import (
18 "bytes"
19 "crypto/rand"
20 "crypto/sha1"
21 "errors"
22 "io"
23 "strconv"
24 "sync"
25 "sync/atomic"
26 "time"
27
28 "github.com/gomodule/redigo/internal"
29 )
30
31 var (
32 _ ConnWithTimeout = (*activeConn)(nil)
33 _ ConnWithTimeout = (*errorConn)(nil)
34 )
35
36 var nowFunc = time.Now
37
38
39
40
41 var ErrPoolExhausted = errors.New("redigo: connection pool exhausted")
42
43 var (
44 errPoolClosed = errors.New("redigo: connection pool closed")
45 errConnClosed = errors.New("redigo: connection closed")
46 )
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 type Pool struct {
123
124
125
126
127
128 Dial func() (Conn, error)
129
130
131
132
133
134
135 TestOnBorrow func(c Conn, t time.Time) error
136
137
138 MaxIdle int
139
140
141
142 MaxActive int
143
144
145
146
147 IdleTimeout time.Duration
148
149
150
151 Wait bool
152
153
154
155 MaxConnLifetime time.Duration
156
157 chInitialized uint32
158
159 mu sync.Mutex
160 closed bool
161 active int
162 ch chan struct{}
163 idle idleList
164 }
165
166
167
168
169 func NewPool(newFn func() (Conn, error), maxIdle int) *Pool {
170 return &Pool{Dial: newFn, MaxIdle: maxIdle}
171 }
172
173
174
175
176
177
178 func (p *Pool) Get() Conn {
179 pc, err := p.get(nil)
180 if err != nil {
181 return errorConn{err}
182 }
183 return &activeConn{p: p, pc: pc}
184 }
185
186
187 type PoolStats struct {
188
189
190 ActiveCount int
191
192 IdleCount int
193 }
194
195
196 func (p *Pool) Stats() PoolStats {
197 p.mu.Lock()
198 stats := PoolStats{
199 ActiveCount: p.active,
200 IdleCount: p.idle.count,
201 }
202 p.mu.Unlock()
203
204 return stats
205 }
206
207
208
209 func (p *Pool) ActiveCount() int {
210 p.mu.Lock()
211 active := p.active
212 p.mu.Unlock()
213 return active
214 }
215
216
217 func (p *Pool) IdleCount() int {
218 p.mu.Lock()
219 idle := p.idle.count
220 p.mu.Unlock()
221 return idle
222 }
223
224
225 func (p *Pool) Close() error {
226 p.mu.Lock()
227 if p.closed {
228 p.mu.Unlock()
229 return nil
230 }
231 p.closed = true
232 p.active -= p.idle.count
233 pc := p.idle.front
234 p.idle.count = 0
235 p.idle.front, p.idle.back = nil, nil
236 if p.ch != nil {
237 close(p.ch)
238 }
239 p.mu.Unlock()
240 for ; pc != nil; pc = pc.next {
241 pc.c.Close()
242 }
243 return nil
244 }
245
246 func (p *Pool) lazyInit() {
247
248 if atomic.LoadUint32(&p.chInitialized) == 1 {
249 return
250 }
251
252 p.mu.Lock()
253 if p.chInitialized == 0 {
254 p.ch = make(chan struct{}, p.MaxActive)
255 if p.closed {
256 close(p.ch)
257 } else {
258 for i := 0; i < p.MaxActive; i++ {
259 p.ch <- struct{}{}
260 }
261 }
262 atomic.StoreUint32(&p.chInitialized, 1)
263 }
264 p.mu.Unlock()
265 }
266
267
268
269 func (p *Pool) get(ctx interface {
270 Done() <-chan struct{}
271 Err() error
272 }) (*poolConn, error) {
273
274
275 if p.Wait && p.MaxActive > 0 {
276 p.lazyInit()
277 if ctx == nil {
278 <-p.ch
279 } else {
280 select {
281 case <-p.ch:
282 case <-ctx.Done():
283 return nil, ctx.Err()
284 }
285 }
286 }
287
288 p.mu.Lock()
289
290
291 if p.IdleTimeout > 0 {
292 n := p.idle.count
293 for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ {
294 pc := p.idle.back
295 p.idle.popBack()
296 p.mu.Unlock()
297 pc.c.Close()
298 p.mu.Lock()
299 p.active--
300 }
301 }
302
303
304 for p.idle.front != nil {
305 pc := p.idle.front
306 p.idle.popFront()
307 p.mu.Unlock()
308 if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) &&
309 (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) {
310 return pc, nil
311 }
312 pc.c.Close()
313 p.mu.Lock()
314 p.active--
315 }
316
317
318 if p.closed {
319 p.mu.Unlock()
320 return nil, errors.New("redigo: get on closed pool")
321 }
322
323
324 if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive {
325 p.mu.Unlock()
326 return nil, ErrPoolExhausted
327 }
328
329 p.active++
330 p.mu.Unlock()
331 c, err := p.Dial()
332 if err != nil {
333 c = nil
334 p.mu.Lock()
335 p.active--
336 if p.ch != nil && !p.closed {
337 p.ch <- struct{}{}
338 }
339 p.mu.Unlock()
340 }
341 return &poolConn{c: c, created: nowFunc()}, err
342 }
343
344 func (p *Pool) put(pc *poolConn, forceClose bool) error {
345 p.mu.Lock()
346 if !p.closed && !forceClose {
347 pc.t = nowFunc()
348 p.idle.pushFront(pc)
349 if p.idle.count > p.MaxIdle {
350 pc = p.idle.back
351 p.idle.popBack()
352 } else {
353 pc = nil
354 }
355 }
356
357 if pc != nil {
358 p.mu.Unlock()
359 pc.c.Close()
360 p.mu.Lock()
361 p.active--
362 }
363
364 if p.ch != nil && !p.closed {
365 p.ch <- struct{}{}
366 }
367 p.mu.Unlock()
368 return nil
369 }
370
371 type activeConn struct {
372 p *Pool
373 pc *poolConn
374 state int
375 }
376
377 var (
378 sentinel []byte
379 sentinelOnce sync.Once
380 )
381
382 func initSentinel() {
383 p := make([]byte, 64)
384 if _, err := rand.Read(p); err == nil {
385 sentinel = p
386 } else {
387 h := sha1.New()
388 io.WriteString(h, "Oops, rand failed. Use time instead.")
389 io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10))
390 sentinel = h.Sum(nil)
391 }
392 }
393
394 func (ac *activeConn) Close() error {
395 pc := ac.pc
396 if pc == nil {
397 return nil
398 }
399 ac.pc = nil
400
401 if ac.state&internal.MultiState != 0 {
402 pc.c.Send("DISCARD")
403 ac.state &^= (internal.MultiState | internal.WatchState)
404 } else if ac.state&internal.WatchState != 0 {
405 pc.c.Send("UNWATCH")
406 ac.state &^= internal.WatchState
407 }
408 if ac.state&internal.SubscribeState != 0 {
409 pc.c.Send("UNSUBSCRIBE")
410 pc.c.Send("PUNSUBSCRIBE")
411
412
413 sentinelOnce.Do(initSentinel)
414 pc.c.Send("ECHO", sentinel)
415 pc.c.Flush()
416 for {
417 p, err := pc.c.Receive()
418 if err != nil {
419 break
420 }
421 if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
422 ac.state &^= internal.SubscribeState
423 break
424 }
425 }
426 }
427 pc.c.Do("")
428 ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil)
429 return nil
430 }
431
432 func (ac *activeConn) Err() error {
433 pc := ac.pc
434 if pc == nil {
435 return errConnClosed
436 }
437 return pc.c.Err()
438 }
439
440 func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
441 pc := ac.pc
442 if pc == nil {
443 return nil, errConnClosed
444 }
445 ci := internal.LookupCommandInfo(commandName)
446 ac.state = (ac.state | ci.Set) &^ ci.Clear
447 return pc.c.Do(commandName, args...)
448 }
449
450 func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
451 pc := ac.pc
452 if pc == nil {
453 return nil, errConnClosed
454 }
455 cwt, ok := pc.c.(ConnWithTimeout)
456 if !ok {
457 return nil, errTimeoutNotSupported
458 }
459 ci := internal.LookupCommandInfo(commandName)
460 ac.state = (ac.state | ci.Set) &^ ci.Clear
461 return cwt.DoWithTimeout(timeout, commandName, args...)
462 }
463
464 func (ac *activeConn) Send(commandName string, args ...interface{}) error {
465 pc := ac.pc
466 if pc == nil {
467 return errConnClosed
468 }
469 ci := internal.LookupCommandInfo(commandName)
470 ac.state = (ac.state | ci.Set) &^ ci.Clear
471 return pc.c.Send(commandName, args...)
472 }
473
474 func (ac *activeConn) Flush() error {
475 pc := ac.pc
476 if pc == nil {
477 return errConnClosed
478 }
479 return pc.c.Flush()
480 }
481
482 func (ac *activeConn) Receive() (reply interface{}, err error) {
483 pc := ac.pc
484 if pc == nil {
485 return nil, errConnClosed
486 }
487 return pc.c.Receive()
488 }
489
490 func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
491 pc := ac.pc
492 if pc == nil {
493 return nil, errConnClosed
494 }
495 cwt, ok := pc.c.(ConnWithTimeout)
496 if !ok {
497 return nil, errTimeoutNotSupported
498 }
499 return cwt.ReceiveWithTimeout(timeout)
500 }
501
502 type errorConn struct{ err error }
503
504 func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err }
505 func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) {
506 return nil, ec.err
507 }
508 func (ec errorConn) Send(string, ...interface{}) error { return ec.err }
509 func (ec errorConn) Err() error { return ec.err }
510 func (ec errorConn) Close() error { return nil }
511 func (ec errorConn) Flush() error { return ec.err }
512 func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err }
513 func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err }
514
515 type idleList struct {
516 count int
517 front, back *poolConn
518 }
519
520 type poolConn struct {
521 c Conn
522 t time.Time
523 created time.Time
524 next, prev *poolConn
525 }
526
527 func (l *idleList) pushFront(pc *poolConn) {
528 pc.next = l.front
529 pc.prev = nil
530 if l.count == 0 {
531 l.back = pc
532 } else {
533 l.front.prev = pc
534 }
535 l.front = pc
536 l.count++
537 return
538 }
539
540 func (l *idleList) popFront() {
541 pc := l.front
542 l.count--
543 if l.count == 0 {
544 l.front, l.back = nil, nil
545 } else {
546 pc.next.prev = nil
547 l.front = pc.next
548 }
549 pc.next, pc.prev = nil, nil
550 }
551
552 func (l *idleList) popBack() {
553 pc := l.back
554 l.count--
555 if l.count == 0 {
556 l.front, l.back = nil, nil
557 } else {
558 pc.prev.next = nil
559 l.back = pc.prev
560 }
561 pc.next, pc.prev = nil, nil
562 }
563
View as plain text