1 package redis
2
3 import (
4 "context"
5 "fmt"
6 "log"
7 "os"
8 "time"
9
10 "github.com/go-redis/redis/internal"
11 "github.com/go-redis/redis/internal/pool"
12 "github.com/go-redis/redis/internal/proto"
13 )
14
15
16 const Nil = proto.Nil
17
18 func init() {
19 SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
20 }
21
22 func SetLogger(logger *log.Logger) {
23 internal.Logger = logger
24 }
25
26 type baseClient struct {
27 opt *Options
28 connPool pool.Pooler
29 limiter Limiter
30
31 process func(Cmder) error
32 processPipeline func([]Cmder) error
33 processTxPipeline func([]Cmder) error
34
35 onClose func() error
36 }
37
38 func (c *baseClient) init() {
39 c.process = c.defaultProcess
40 c.processPipeline = c.defaultProcessPipeline
41 c.processTxPipeline = c.defaultProcessTxPipeline
42 }
43
44 func (c *baseClient) String() string {
45 return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
46 }
47
48 func (c *baseClient) newConn() (*pool.Conn, error) {
49 cn, err := c.connPool.NewConn()
50 if err != nil {
51 return nil, err
52 }
53
54 err = c.initConn(cn)
55 if err != nil {
56 _ = c.connPool.CloseConn(cn)
57 return nil, err
58 }
59
60 return cn, nil
61 }
62
63 func (c *baseClient) getConn() (*pool.Conn, error) {
64 if c.limiter != nil {
65 err := c.limiter.Allow()
66 if err != nil {
67 return nil, err
68 }
69 }
70
71 cn, err := c._getConn()
72 if err != nil {
73 if c.limiter != nil {
74 c.limiter.ReportResult(err)
75 }
76 return nil, err
77 }
78 return cn, nil
79 }
80
81 func (c *baseClient) _getConn() (*pool.Conn, error) {
82 cn, err := c.connPool.Get()
83 if err != nil {
84 return nil, err
85 }
86
87 err = c.initConn(cn)
88 if err != nil {
89 c.connPool.Remove(cn, err)
90 if err := internal.Unwrap(err); err != nil {
91 return nil, err
92 }
93 return nil, err
94 }
95
96 return cn, nil
97 }
98
99 func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
100 if c.limiter != nil {
101 c.limiter.ReportResult(err)
102 }
103
104 if internal.IsBadConn(err, false) {
105 c.connPool.Remove(cn, err)
106 } else {
107 c.connPool.Put(cn)
108 }
109 }
110
111 func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
112 if c.limiter != nil {
113 c.limiter.ReportResult(err)
114 }
115
116 if err == nil || internal.IsRedisError(err) {
117 c.connPool.Put(cn)
118 } else {
119 c.connPool.Remove(cn, err)
120 }
121 }
122
123 func (c *baseClient) initConn(cn *pool.Conn) error {
124 if cn.Inited {
125 return nil
126 }
127 cn.Inited = true
128
129 if c.opt.Password == "" &&
130 c.opt.DB == 0 &&
131 !c.opt.readOnly &&
132 c.opt.OnConnect == nil {
133 return nil
134 }
135
136 conn := newConn(c.opt, cn)
137 _, err := conn.Pipelined(func(pipe Pipeliner) error {
138 if c.opt.Password != "" {
139 pipe.Auth(c.opt.Password)
140 }
141
142 if c.opt.DB > 0 {
143 pipe.Select(c.opt.DB)
144 }
145
146 if c.opt.readOnly {
147 pipe.ReadOnly()
148 }
149
150 return nil
151 })
152 if err != nil {
153 return err
154 }
155
156 if c.opt.OnConnect != nil {
157 return c.opt.OnConnect(conn)
158 }
159 return nil
160 }
161
162
163 func (c *baseClient) Do(args ...interface{}) *Cmd {
164 cmd := NewCmd(args...)
165 _ = c.Process(cmd)
166 return cmd
167 }
168
169
170 func (c *baseClient) WrapProcess(
171 fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
172 ) {
173 c.process = fn(c.process)
174 }
175
176 func (c *baseClient) Process(cmd Cmder) error {
177 return c.process(cmd)
178 }
179
180 func (c *baseClient) defaultProcess(cmd Cmder) error {
181 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
182 if attempt > 0 {
183 time.Sleep(c.retryBackoff(attempt))
184 }
185
186 cn, err := c.getConn()
187 if err != nil {
188 cmd.setErr(err)
189 if internal.IsRetryableError(err, true) {
190 continue
191 }
192 return err
193 }
194
195 err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
196 return writeCmd(wr, cmd)
197 })
198 if err != nil {
199 c.releaseConn(cn, err)
200 cmd.setErr(err)
201 if internal.IsRetryableError(err, true) {
202 continue
203 }
204 return err
205 }
206
207 err = cn.WithReader(c.cmdTimeout(cmd), cmd.readReply)
208 c.releaseConn(cn, err)
209 if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
210 continue
211 }
212
213 return err
214 }
215
216 return cmd.Err()
217 }
218
219 func (c *baseClient) retryBackoff(attempt int) time.Duration {
220 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
221 }
222
223 func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
224 if timeout := cmd.readTimeout(); timeout != nil {
225 t := *timeout
226 if t == 0 {
227 return 0
228 }
229 return t + 10*time.Second
230 }
231 return c.opt.ReadTimeout
232 }
233
234
235
236
237
238 func (c *baseClient) Close() error {
239 var firstErr error
240 if c.onClose != nil {
241 if err := c.onClose(); err != nil {
242 firstErr = err
243 }
244 }
245 if err := c.connPool.Close(); err != nil && firstErr == nil {
246 firstErr = err
247 }
248 return firstErr
249 }
250
251 func (c *baseClient) getAddr() string {
252 return c.opt.Addr
253 }
254
255 func (c *baseClient) WrapProcessPipeline(
256 fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
257 ) {
258 c.processPipeline = fn(c.processPipeline)
259 c.processTxPipeline = fn(c.processTxPipeline)
260 }
261
262 func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
263 return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
264 }
265
266 func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
267 return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
268 }
269
270 type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
271
272 func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
273 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
274 if attempt > 0 {
275 time.Sleep(c.retryBackoff(attempt))
276 }
277
278 cn, err := c.getConn()
279 if err != nil {
280 setCmdsErr(cmds, err)
281 return err
282 }
283
284 canRetry, err := p(cn, cmds)
285 c.releaseConnStrict(cn, err)
286
287 if !canRetry || !internal.IsRetryableError(err, true) {
288 break
289 }
290 }
291 return cmdsFirstErr(cmds)
292 }
293
294 func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
295 err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
296 return writeCmd(wr, cmds...)
297 })
298 if err != nil {
299 setCmdsErr(cmds, err)
300 return true, err
301 }
302
303 err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
304 return pipelineReadCmds(rd, cmds)
305 })
306 return true, err
307 }
308
309 func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
310 for _, cmd := range cmds {
311 err := cmd.readReply(rd)
312 if err != nil && !internal.IsRedisError(err) {
313 return err
314 }
315 }
316 return nil
317 }
318
319 func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
320 err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
321 return txPipelineWriteMulti(wr, cmds)
322 })
323 if err != nil {
324 setCmdsErr(cmds, err)
325 return true, err
326 }
327
328 err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
329 err := txPipelineReadQueued(rd, cmds)
330 if err != nil {
331 setCmdsErr(cmds, err)
332 return err
333 }
334 return pipelineReadCmds(rd, cmds)
335 })
336 return false, err
337 }
338
339 func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
340 multiExec := make([]Cmder, 0, len(cmds)+2)
341 multiExec = append(multiExec, NewStatusCmd("MULTI"))
342 multiExec = append(multiExec, cmds...)
343 multiExec = append(multiExec, NewSliceCmd("EXEC"))
344 return writeCmd(wr, multiExec...)
345 }
346
347 func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
348
349 var statusCmd StatusCmd
350 err := statusCmd.readReply(rd)
351 if err != nil {
352 return err
353 }
354
355 for range cmds {
356 err = statusCmd.readReply(rd)
357 if err != nil && !internal.IsRedisError(err) {
358 return err
359 }
360 }
361
362
363 line, err := rd.ReadLine()
364 if err != nil {
365 if err == Nil {
366 err = TxFailedErr
367 }
368 return err
369 }
370
371 switch line[0] {
372 case proto.ErrorReply:
373 return proto.ParseErrorReply(line)
374 case proto.ArrayReply:
375
376 default:
377 err := fmt.Errorf("redis: expected '*', but got line %q", line)
378 return err
379 }
380
381 return nil
382 }
383
384
385
386
387
388
389 type Client struct {
390 baseClient
391 cmdable
392
393 ctx context.Context
394 }
395
396
397 func NewClient(opt *Options) *Client {
398 opt.init()
399
400 c := Client{
401 baseClient: baseClient{
402 opt: opt,
403 connPool: newConnPool(opt),
404 },
405 }
406 c.baseClient.init()
407 c.init()
408
409 return &c
410 }
411
412 func (c *Client) init() {
413 c.cmdable.setProcessor(c.Process)
414 }
415
416 func (c *Client) Context() context.Context {
417 if c.ctx != nil {
418 return c.ctx
419 }
420 return context.Background()
421 }
422
423 func (c *Client) WithContext(ctx context.Context) *Client {
424 if ctx == nil {
425 panic("nil context")
426 }
427 c2 := c.clone()
428 c2.ctx = ctx
429 return c2
430 }
431
432 func (c *Client) clone() *Client {
433 cp := *c
434 cp.init()
435 return &cp
436 }
437
438
439 func (c *Client) Options() *Options {
440 return c.opt
441 }
442
443 func (c *Client) SetLimiter(l Limiter) *Client {
444 c.limiter = l
445 return c
446 }
447
448 type PoolStats pool.Stats
449
450
451 func (c *Client) PoolStats() *PoolStats {
452 stats := c.connPool.Stats()
453 return (*PoolStats)(stats)
454 }
455
456 func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
457 return c.Pipeline().Pipelined(fn)
458 }
459
460 func (c *Client) Pipeline() Pipeliner {
461 pipe := Pipeline{
462 exec: c.processPipeline,
463 }
464 pipe.statefulCmdable.setProcessor(pipe.Process)
465 return &pipe
466 }
467
468 func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
469 return c.TxPipeline().Pipelined(fn)
470 }
471
472
473 func (c *Client) TxPipeline() Pipeliner {
474 pipe := Pipeline{
475 exec: c.processTxPipeline,
476 }
477 pipe.statefulCmdable.setProcessor(pipe.Process)
478 return &pipe
479 }
480
481 func (c *Client) pubSub() *PubSub {
482 pubsub := &PubSub{
483 opt: c.opt,
484
485 newConn: func(channels []string) (*pool.Conn, error) {
486 return c.newConn()
487 },
488 closeConn: c.connPool.CloseConn,
489 }
490 pubsub.init()
491 return pubsub
492 }
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520 func (c *Client) Subscribe(channels ...string) *PubSub {
521 pubsub := c.pubSub()
522 if len(channels) > 0 {
523 _ = pubsub.Subscribe(channels...)
524 }
525 return pubsub
526 }
527
528
529
530 func (c *Client) PSubscribe(channels ...string) *PubSub {
531 pubsub := c.pubSub()
532 if len(channels) > 0 {
533 _ = pubsub.PSubscribe(channels...)
534 }
535 return pubsub
536 }
537
538
539
540
541 type Conn struct {
542 baseClient
543 statefulCmdable
544 }
545
546 func newConn(opt *Options, cn *pool.Conn) *Conn {
547 connPool := pool.NewSingleConnPool(nil)
548 connPool.SetConn(cn)
549 c := Conn{
550 baseClient: baseClient{
551 opt: opt,
552 connPool: connPool,
553 },
554 }
555 c.baseClient.init()
556 c.statefulCmdable.setProcessor(c.Process)
557 return &c
558 }
559
560 func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
561 return c.Pipeline().Pipelined(fn)
562 }
563
564 func (c *Conn) Pipeline() Pipeliner {
565 pipe := Pipeline{
566 exec: c.processPipeline,
567 }
568 pipe.statefulCmdable.setProcessor(pipe.Process)
569 return &pipe
570 }
571
572 func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
573 return c.TxPipeline().Pipelined(fn)
574 }
575
576
577 func (c *Conn) TxPipeline() Pipeliner {
578 pipe := Pipeline{
579 exec: c.processTxPipeline,
580 }
581 pipe.statefulCmdable.setProcessor(pipe.Process)
582 return &pipe
583 }
584
View as plain text