1 package redis_test
2
3 import (
4 "errors"
5 "fmt"
6 "net"
7 "os"
8 "os/exec"
9 "path/filepath"
10 "sync"
11 "testing"
12 "time"
13
14 "github.com/go-redis/redis"
15
16 . "github.com/onsi/ginkgo"
17 . "github.com/onsi/gomega"
18 )
19
20 const (
21 redisPort = "6380"
22 redisAddr = ":" + redisPort
23 redisSecondaryPort = "6381"
24 )
25
26 const (
27 ringShard1Port = "6390"
28 ringShard2Port = "6391"
29 ringShard3Port = "6392"
30 )
31
32 const (
33 sentinelName = "mymaster"
34 sentinelMasterPort = "8123"
35 sentinelSlave1Port = "8124"
36 sentinelSlave2Port = "8125"
37 sentinelPort = "8126"
38 )
39
40 var (
41 redisMain *redisProcess
42 ringShard1, ringShard2, ringShard3 *redisProcess
43 sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
44 )
45
46 var cluster = &clusterScenario{
47 ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
48 nodeIds: make([]string, 6),
49 processes: make(map[string]*redisProcess, 6),
50 clients: make(map[string]*redis.Client, 6),
51 }
52
53 var _ = BeforeSuite(func() {
54 var err error
55
56 redisMain, err = startRedis(redisPort)
57 Expect(err).NotTo(HaveOccurred())
58
59 ringShard1, err = startRedis(ringShard1Port)
60 Expect(err).NotTo(HaveOccurred())
61
62 ringShard2, err = startRedis(ringShard2Port)
63 Expect(err).NotTo(HaveOccurred())
64
65 ringShard3, err = startRedis(ringShard3Port)
66 Expect(err).NotTo(HaveOccurred())
67
68 sentinelMaster, err = startRedis(sentinelMasterPort)
69 Expect(err).NotTo(HaveOccurred())
70
71 sentinel, err = startSentinel(sentinelPort, sentinelName, sentinelMasterPort)
72 Expect(err).NotTo(HaveOccurred())
73
74 sentinelSlave1, err = startRedis(
75 sentinelSlave1Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
76 Expect(err).NotTo(HaveOccurred())
77
78 sentinelSlave2, err = startRedis(
79 sentinelSlave2Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
80 Expect(err).NotTo(HaveOccurred())
81
82 Expect(startCluster(cluster)).NotTo(HaveOccurred())
83 })
84
85 var _ = AfterSuite(func() {
86 Expect(redisMain.Close()).NotTo(HaveOccurred())
87
88 Expect(ringShard1.Close()).NotTo(HaveOccurred())
89 Expect(ringShard2.Close()).NotTo(HaveOccurred())
90 Expect(ringShard3.Close()).NotTo(HaveOccurred())
91
92 Expect(sentinel.Close()).NotTo(HaveOccurred())
93 Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
94 Expect(sentinelSlave2.Close()).NotTo(HaveOccurred())
95 Expect(sentinelMaster.Close()).NotTo(HaveOccurred())
96
97 Expect(stopCluster(cluster)).NotTo(HaveOccurred())
98 })
99
100 func TestGinkgoSuite(t *testing.T) {
101 RegisterFailHandler(Fail)
102 RunSpecs(t, "go-redis")
103 }
104
105
106
107 func redisOptions() *redis.Options {
108 return &redis.Options{
109 Addr: redisAddr,
110 DB: 15,
111 DialTimeout: 10 * time.Second,
112 ReadTimeout: 30 * time.Second,
113 WriteTimeout: 30 * time.Second,
114 PoolSize: 10,
115 PoolTimeout: 30 * time.Second,
116 IdleTimeout: 500 * time.Millisecond,
117 IdleCheckFrequency: 500 * time.Millisecond,
118 }
119 }
120
121 func redisClusterOptions() *redis.ClusterOptions {
122 return &redis.ClusterOptions{
123 DialTimeout: 10 * time.Second,
124 ReadTimeout: 30 * time.Second,
125 WriteTimeout: 30 * time.Second,
126 PoolSize: 10,
127 PoolTimeout: 30 * time.Second,
128 IdleTimeout: 500 * time.Millisecond,
129 IdleCheckFrequency: 500 * time.Millisecond,
130 }
131 }
132
133 func redisRingOptions() *redis.RingOptions {
134 return &redis.RingOptions{
135 Addrs: map[string]string{
136 "ringShardOne": ":" + ringShard1Port,
137 "ringShardTwo": ":" + ringShard2Port,
138 },
139 DialTimeout: 10 * time.Second,
140 ReadTimeout: 30 * time.Second,
141 WriteTimeout: 30 * time.Second,
142 PoolSize: 10,
143 PoolTimeout: 30 * time.Second,
144 IdleTimeout: 500 * time.Millisecond,
145 IdleCheckFrequency: 500 * time.Millisecond,
146 }
147 }
148
149 func performAsync(n int, cbs ...func(int)) *sync.WaitGroup {
150 var wg sync.WaitGroup
151 for _, cb := range cbs {
152 for i := 0; i < n; i++ {
153 wg.Add(1)
154 go func(cb func(int), i int) {
155 defer GinkgoRecover()
156 defer wg.Done()
157
158 cb(i)
159 }(cb, i)
160 }
161 }
162 return &wg
163 }
164
165 func perform(n int, cbs ...func(int)) {
166 wg := performAsync(n, cbs...)
167 wg.Wait()
168 }
169
170 func eventually(fn func() error, timeout time.Duration) error {
171 errCh := make(chan error, 1)
172 done := make(chan struct{})
173 exit := make(chan struct{})
174
175 go func() {
176 for {
177 err := fn()
178 if err == nil {
179 close(done)
180 return
181 }
182
183 select {
184 case errCh <- err:
185 default:
186 }
187
188 select {
189 case <-exit:
190 return
191 case <-time.After(timeout / 100):
192 }
193 }
194 }()
195
196 select {
197 case <-done:
198 return nil
199 case <-time.After(timeout):
200 close(exit)
201 select {
202 case err := <-errCh:
203 return err
204 default:
205 return fmt.Errorf("timeout after %s without an error", timeout)
206 }
207 }
208 }
209
210 func execCmd(name string, args ...string) (*os.Process, error) {
211 cmd := exec.Command(name, args...)
212 if testing.Verbose() {
213 cmd.Stdout = os.Stdout
214 cmd.Stderr = os.Stderr
215 }
216 return cmd.Process, cmd.Start()
217 }
218
219 func connectTo(port string) (*redis.Client, error) {
220 client := redis.NewClient(&redis.Options{
221 Addr: ":" + port,
222 })
223
224 err := eventually(func() error {
225 return client.Ping().Err()
226 }, 30*time.Second)
227 if err != nil {
228 return nil, err
229 }
230
231 return client, nil
232 }
233
234 type redisProcess struct {
235 *os.Process
236 *redis.Client
237 }
238
239 func (p *redisProcess) Close() error {
240 if err := p.Kill(); err != nil {
241 return err
242 }
243
244 err := eventually(func() error {
245 if err := p.Client.Ping().Err(); err != nil {
246 return nil
247 }
248 return errors.New("client is not shutdown")
249 }, 10*time.Second)
250 if err != nil {
251 return err
252 }
253
254 p.Client.Close()
255 return nil
256 }
257
258 var (
259 redisServerBin, _ = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server"))
260 redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis.conf"))
261 )
262
263 func redisDir(port string) (string, error) {
264 dir, err := filepath.Abs(filepath.Join("testdata", "instances", port))
265 if err != nil {
266 return "", err
267 }
268 if err := os.RemoveAll(dir); err != nil {
269 return "", err
270 }
271 if err := os.MkdirAll(dir, 0775); err != nil {
272 return "", err
273 }
274 return dir, nil
275 }
276
277 func startRedis(port string, args ...string) (*redisProcess, error) {
278 dir, err := redisDir(port)
279 if err != nil {
280 return nil, err
281 }
282 if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
283 return nil, err
284 }
285
286 baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
287 process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
288 if err != nil {
289 return nil, err
290 }
291
292 client, err := connectTo(port)
293 if err != nil {
294 process.Kill()
295 return nil, err
296 }
297 return &redisProcess{process, client}, err
298 }
299
300 func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
301 dir, err := redisDir(port)
302 if err != nil {
303 return nil, err
304 }
305 process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
306 if err != nil {
307 return nil, err
308 }
309 client, err := connectTo(port)
310 if err != nil {
311 process.Kill()
312 return nil, err
313 }
314 for _, cmd := range []*redis.StatusCmd{
315 redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"),
316 redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
317 redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"),
318 redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"),
319 } {
320 client.Process(cmd)
321 if err := cmd.Err(); err != nil {
322 process.Kill()
323 return nil, err
324 }
325 }
326 return &redisProcess{process, client}, nil
327 }
328
329
330
331 type badConnError string
332
333 func (e badConnError) Error() string { return string(e) }
334 func (e badConnError) Timeout() bool { return false }
335 func (e badConnError) Temporary() bool { return false }
336
337 type badConn struct {
338 net.TCPConn
339
340 readDelay, writeDelay time.Duration
341 readErr, writeErr error
342 }
343
344 var _ net.Conn = &badConn{}
345
346 func (cn *badConn) Read([]byte) (int, error) {
347 if cn.readDelay != 0 {
348 time.Sleep(cn.readDelay)
349 }
350 if cn.readErr != nil {
351 return 0, cn.readErr
352 }
353 return 0, badConnError("bad connection")
354 }
355
356 func (cn *badConn) Write([]byte) (int, error) {
357 if cn.writeDelay != 0 {
358 time.Sleep(cn.writeDelay)
359 }
360 if cn.writeErr != nil {
361 return 0, cn.writeErr
362 }
363 return 0, badConnError("bad connection")
364 }
365
View as plain text