...
1 package redis
2
3 import (
4 "sync"
5
6 "github.com/go-redis/redis/internal/pool"
7 )
8
9 type pipelineExecer func([]Cmder) error
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 type Pipeliner interface {
25 StatefulCmdable
26 Do(args ...interface{}) *Cmd
27 Process(cmd Cmder) error
28 Close() error
29 Discard() error
30 Exec() ([]Cmder, error)
31 }
32
33 var _ Pipeliner = (*Pipeline)(nil)
34
35
36
37
38 type Pipeline struct {
39 statefulCmdable
40
41 exec pipelineExecer
42
43 mu sync.Mutex
44 cmds []Cmder
45 closed bool
46 }
47
48 func (c *Pipeline) Do(args ...interface{}) *Cmd {
49 cmd := NewCmd(args...)
50 _ = c.Process(cmd)
51 return cmd
52 }
53
54
55 func (c *Pipeline) Process(cmd Cmder) error {
56 c.mu.Lock()
57 c.cmds = append(c.cmds, cmd)
58 c.mu.Unlock()
59 return nil
60 }
61
62
63 func (c *Pipeline) Close() error {
64 c.mu.Lock()
65 c.discard()
66 c.closed = true
67 c.mu.Unlock()
68 return nil
69 }
70
71
72 func (c *Pipeline) Discard() error {
73 c.mu.Lock()
74 err := c.discard()
75 c.mu.Unlock()
76 return err
77 }
78
79 func (c *Pipeline) discard() error {
80 if c.closed {
81 return pool.ErrClosed
82 }
83 c.cmds = c.cmds[:0]
84 return nil
85 }
86
87
88
89
90
91
92 func (c *Pipeline) Exec() ([]Cmder, error) {
93 c.mu.Lock()
94 defer c.mu.Unlock()
95
96 if c.closed {
97 return nil, pool.ErrClosed
98 }
99
100 if len(c.cmds) == 0 {
101 return nil, nil
102 }
103
104 cmds := c.cmds
105 c.cmds = nil
106
107 return cmds, c.exec(cmds)
108 }
109
110 func (c *Pipeline) pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
111 if err := fn(c); err != nil {
112 return nil, err
113 }
114 cmds, err := c.Exec()
115 _ = c.Close()
116 return cmds, err
117 }
118
119 func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
120 return c.pipelined(fn)
121 }
122
123 func (c *Pipeline) Pipeline() Pipeliner {
124 return c
125 }
126
127 func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
128 return c.pipelined(fn)
129 }
130
131 func (c *Pipeline) TxPipeline() Pipeliner {
132 return c
133 }
134
View as plain text