1 package redis_test
2
3 import (
4 "context"
5 "crypto/rand"
6 "fmt"
7 "net"
8 "strconv"
9 "sync"
10 "time"
11
12 "github.com/go-redis/redis"
13
14 . "github.com/onsi/ginkgo"
15 . "github.com/onsi/gomega"
16 )
17
18 var _ = Describe("Redis Ring", func() {
19 const heartbeat = 100 * time.Millisecond
20
21 var ring *redis.Ring
22
23 setRingKeys := func() {
24 for i := 0; i < 100; i++ {
25 err := ring.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
26 Expect(err).NotTo(HaveOccurred())
27 }
28 }
29
30 BeforeEach(func() {
31 opt := redisRingOptions()
32 opt.HeartbeatFrequency = heartbeat
33 ring = redis.NewRing(opt)
34
35 err := ring.ForEachShard(func(cl *redis.Client) error {
36 return cl.FlushDB().Err()
37 })
38 Expect(err).NotTo(HaveOccurred())
39 })
40
41 AfterEach(func() {
42 Expect(ring.Close()).NotTo(HaveOccurred())
43 })
44
45 It("distributes keys", func() {
46 setRingKeys()
47
48
49 Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
50 Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
51 })
52
53 It("distributes keys when using EVAL", func() {
54 script := redis.NewScript(`
55 local r = redis.call('SET', KEYS[1], ARGV[1])
56 return r
57 `)
58
59 var key string
60 for i := 0; i < 100; i++ {
61 key = fmt.Sprintf("key%d", i)
62 err := script.Run(ring, []string{key}, "value").Err()
63 Expect(err).NotTo(HaveOccurred())
64 }
65
66 Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
67 Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
68 })
69
70 It("uses single shard when one of the shards is down", func() {
71
72 Expect(ringShard2.Close()).NotTo(HaveOccurred())
73
74 Eventually(func() int {
75 return ring.Len()
76 }, "30s").Should(Equal(1))
77
78 setRingKeys()
79
80
81 Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
82
83
84 var err error
85 ringShard2, err = startRedis(ringShard2Port)
86 Expect(err).NotTo(HaveOccurred())
87
88 Eventually(func() int {
89 return ring.Len()
90 }, "30s").Should(Equal(2))
91
92 setRingKeys()
93
94
95 Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
96 })
97
98 It("supports hash tags", func() {
99 for i := 0; i < 100; i++ {
100 err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
101 Expect(err).NotTo(HaveOccurred())
102 }
103
104 Expect(ringShard1.Info("keyspace").Val()).ToNot(ContainSubstring("keys="))
105 Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
106 })
107
108 It("propagates process for WithContext", func() {
109 var fromWrap []string
110 wrapper := func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
111 return func(cmd redis.Cmder) error {
112 fromWrap = append(fromWrap, cmd.Name())
113
114 return oldProcess(cmd)
115 }
116 }
117
118 ctx := context.Background()
119 ring = ring.WithContext(ctx)
120 ring.WrapProcess(wrapper)
121
122 ring.Ping()
123 Expect(fromWrap).To(Equal([]string{"ping"}))
124
125 ring.Ping()
126 Expect(fromWrap).To(Equal([]string{"ping", "ping"}))
127 })
128
129 Describe("pipeline", func() {
130 It("distributes keys", func() {
131 pipe := ring.Pipeline()
132 for i := 0; i < 100; i++ {
133 err := pipe.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
134 Expect(err).NotTo(HaveOccurred())
135 }
136 cmds, err := pipe.Exec()
137 Expect(err).NotTo(HaveOccurred())
138 Expect(cmds).To(HaveLen(100))
139 Expect(pipe.Close()).NotTo(HaveOccurred())
140
141 for _, cmd := range cmds {
142 Expect(cmd.Err()).NotTo(HaveOccurred())
143 Expect(cmd.(*redis.StatusCmd).Val()).To(Equal("OK"))
144 }
145
146
147 Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57"))
148 Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
149 })
150
151 It("is consistent with ring", func() {
152 var keys []string
153 for i := 0; i < 100; i++ {
154 key := make([]byte, 64)
155 _, err := rand.Read(key)
156 Expect(err).NotTo(HaveOccurred())
157 keys = append(keys, string(key))
158 }
159
160 _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
161 for _, key := range keys {
162 pipe.Set(key, "value", 0).Err()
163 }
164 return nil
165 })
166 Expect(err).NotTo(HaveOccurred())
167
168 for _, key := range keys {
169 val, err := ring.Get(key).Result()
170 Expect(err).NotTo(HaveOccurred())
171 Expect(val).To(Equal("value"))
172 }
173 })
174
175 It("supports hash tags", func() {
176 _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
177 for i := 0; i < 100; i++ {
178 pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
179 }
180 return nil
181 })
182 Expect(err).NotTo(HaveOccurred())
183
184 Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
185 Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
186 })
187 })
188 })
189
190 var _ = Describe("empty Redis Ring", func() {
191 var ring *redis.Ring
192
193 BeforeEach(func() {
194 ring = redis.NewRing(&redis.RingOptions{})
195 })
196
197 AfterEach(func() {
198 Expect(ring.Close()).NotTo(HaveOccurred())
199 })
200
201 It("returns an error", func() {
202 err := ring.Ping().Err()
203 Expect(err).To(MatchError("redis: all ring shards are down"))
204 })
205
206 It("pipeline returns an error", func() {
207 _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
208 pipe.Ping()
209 return nil
210 })
211 Expect(err).To(MatchError("redis: all ring shards are down"))
212 })
213 })
214
215 var _ = Describe("Ring watch", func() {
216 const heartbeat = 100 * time.Millisecond
217
218 var ring *redis.Ring
219
220 BeforeEach(func() {
221 opt := redisRingOptions()
222 opt.HeartbeatFrequency = heartbeat
223 ring = redis.NewRing(opt)
224
225 err := ring.ForEachShard(func(cl *redis.Client) error {
226 return cl.FlushDB().Err()
227 })
228 Expect(err).NotTo(HaveOccurred())
229 })
230
231 AfterEach(func() {
232 Expect(ring.Close()).NotTo(HaveOccurred())
233 })
234
235 It("should Watch", func() {
236 var incr func(string) error
237
238
239 incr = func(key string) error {
240 err := ring.Watch(func(tx *redis.Tx) error {
241 n, err := tx.Get(key).Int64()
242 if err != nil && err != redis.Nil {
243 return err
244 }
245
246 _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
247 pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
248 return nil
249 })
250 return err
251 }, key)
252 if err == redis.TxFailedErr {
253 return incr(key)
254 }
255 return err
256 }
257
258 var wg sync.WaitGroup
259 for i := 0; i < 100; i++ {
260 wg.Add(1)
261 go func() {
262 defer GinkgoRecover()
263 defer wg.Done()
264
265 err := incr("key")
266 Expect(err).NotTo(HaveOccurred())
267 }()
268 }
269 wg.Wait()
270
271 n, err := ring.Get("key").Int64()
272 Expect(err).NotTo(HaveOccurred())
273 Expect(n).To(Equal(int64(100)))
274 })
275
276 It("should discard", func() {
277 err := ring.Watch(func(tx *redis.Tx) error {
278 cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
279 pipe.Set("key1", "hello1", 0)
280 pipe.Discard()
281 pipe.Set("key2", "hello2", 0)
282 return nil
283 })
284 Expect(err).NotTo(HaveOccurred())
285 Expect(cmds).To(HaveLen(1))
286 return err
287 }, "key1", "key2")
288 Expect(err).NotTo(HaveOccurred())
289
290 get := ring.Get("key1")
291 Expect(get.Err()).To(Equal(redis.Nil))
292 Expect(get.Val()).To(Equal(""))
293
294 get = ring.Get("key2")
295 Expect(get.Err()).NotTo(HaveOccurred())
296 Expect(get.Val()).To(Equal("hello2"))
297 })
298
299 It("returns no error when there are no commands", func() {
300 err := ring.Watch(func(tx *redis.Tx) error {
301 _, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
302 return err
303 }, "key")
304 Expect(err).NotTo(HaveOccurred())
305
306 v, err := ring.Ping().Result()
307 Expect(err).NotTo(HaveOccurred())
308 Expect(v).To(Equal("PONG"))
309 })
310
311 It("should exec bulks", func() {
312 const N = 20000
313
314 err := ring.Watch(func(tx *redis.Tx) error {
315 cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
316 for i := 0; i < N; i++ {
317 pipe.Incr("key")
318 }
319 return nil
320 })
321 Expect(err).NotTo(HaveOccurred())
322 Expect(len(cmds)).To(Equal(N))
323 for _, cmd := range cmds {
324 Expect(cmd.Err()).NotTo(HaveOccurred())
325 }
326 return err
327 }, "key")
328 Expect(err).NotTo(HaveOccurred())
329
330 num, err := ring.Get("key").Int64()
331 Expect(err).NotTo(HaveOccurred())
332 Expect(num).To(Equal(int64(N)))
333 })
334
335 It("should Watch/Unwatch", func() {
336 var C, N int
337
338 err := ring.Set("key", "0", 0).Err()
339 Expect(err).NotTo(HaveOccurred())
340
341 perform(C, func(id int) {
342 for i := 0; i < N; i++ {
343 err := ring.Watch(func(tx *redis.Tx) error {
344 val, err := tx.Get("key").Result()
345 Expect(err).NotTo(HaveOccurred())
346 Expect(val).NotTo(Equal(redis.Nil))
347
348 num, err := strconv.ParseInt(val, 10, 64)
349 Expect(err).NotTo(HaveOccurred())
350
351 cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
352 pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
353 return nil
354 })
355 Expect(cmds).To(HaveLen(1))
356 return err
357 }, "key")
358 if err == redis.TxFailedErr {
359 i--
360 continue
361 }
362 Expect(err).NotTo(HaveOccurred())
363 }
364 })
365
366 val, err := ring.Get("key").Int64()
367 Expect(err).NotTo(HaveOccurred())
368 Expect(val).To(Equal(int64(C * N)))
369 })
370
371 It("should close Tx without closing the client", func() {
372 err := ring.Watch(func(tx *redis.Tx) error {
373 _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
374 pipe.Ping()
375 return nil
376 })
377 return err
378 }, "key")
379 Expect(err).NotTo(HaveOccurred())
380
381 Expect(ring.Ping().Err()).NotTo(HaveOccurred())
382 })
383
384 It("respects max size on multi", func() {
385 perform(1000, func(id int) {
386 var ping *redis.StatusCmd
387
388 err := ring.Watch(func(tx *redis.Tx) error {
389 cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
390 ping = pipe.Ping()
391 return nil
392 })
393 Expect(err).NotTo(HaveOccurred())
394 Expect(cmds).To(HaveLen(1))
395 return err
396 }, "key")
397 Expect(err).NotTo(HaveOccurred())
398
399 Expect(ping.Err()).NotTo(HaveOccurred())
400 Expect(ping.Val()).To(Equal("PONG"))
401 })
402
403 ring.ForEachShard(func(cl *redis.Client) error {
404 pool := cl.Pool()
405 Expect(pool.Len()).To(BeNumerically("<=", 10))
406 Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
407 Expect(pool.Len()).To(Equal(pool.IdleLen()))
408
409 return nil
410 })
411 })
412 })
413
414 var _ = Describe("Ring Tx timeout", func() {
415 const heartbeat = 100 * time.Millisecond
416
417 var ring *redis.Ring
418
419 AfterEach(func() {
420 _ = ring.Close()
421 })
422
423 testTimeout := func() {
424 It("Tx timeouts", func() {
425 err := ring.Watch(func(tx *redis.Tx) error {
426 return tx.Ping().Err()
427 }, "foo")
428 Expect(err).To(HaveOccurred())
429 Expect(err.(net.Error).Timeout()).To(BeTrue())
430 })
431
432 It("Tx Pipeline timeouts", func() {
433 err := ring.Watch(func(tx *redis.Tx) error {
434 _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
435 pipe.Ping()
436 return nil
437 })
438 return err
439 }, "foo")
440 Expect(err).To(HaveOccurred())
441 Expect(err.(net.Error).Timeout()).To(BeTrue())
442 })
443 }
444
445 const pause = 5 * time.Second
446
447 Context("read/write timeout", func() {
448 BeforeEach(func() {
449 opt := redisRingOptions()
450 opt.ReadTimeout = 250 * time.Millisecond
451 opt.WriteTimeout = 250 * time.Millisecond
452 opt.HeartbeatFrequency = heartbeat
453 ring = redis.NewRing(opt)
454
455 err := ring.ForEachShard(func(client *redis.Client) error {
456 return client.ClientPause(pause).Err()
457 })
458 Expect(err).NotTo(HaveOccurred())
459 })
460
461 AfterEach(func() {
462 _ = ring.ForEachShard(func(client *redis.Client) error {
463 defer GinkgoRecover()
464 Eventually(func() error {
465 return client.Ping().Err()
466 }, 2*pause).ShouldNot(HaveOccurred())
467 return nil
468 })
469 })
470
471 testTimeout()
472 })
473 })
474
View as plain text