1 package redis_test
2
3 import (
4 "bytes"
5 "fmt"
6 "net"
7 "strconv"
8 "strings"
9 "sync"
10 "testing"
11 "time"
12
13 "github.com/go-redis/redis"
14 "github.com/go-redis/redis/internal/hashtag"
15
16 . "github.com/onsi/ginkgo"
17 . "github.com/onsi/gomega"
18 )
19
20 type clusterScenario struct {
21 ports []string
22 nodeIds []string
23 processes map[string]*redisProcess
24 clients map[string]*redis.Client
25 }
26
27 func (s *clusterScenario) masters() []*redis.Client {
28 result := make([]*redis.Client, 3)
29 for pos, port := range s.ports[:3] {
30 result[pos] = s.clients[port]
31 }
32 return result
33 }
34
35 func (s *clusterScenario) slaves() []*redis.Client {
36 result := make([]*redis.Client, 3)
37 for pos, port := range s.ports[3:] {
38 result[pos] = s.clients[port]
39 }
40 return result
41 }
42
43 func (s *clusterScenario) addrs() []string {
44 addrs := make([]string, len(s.ports))
45 for i, port := range s.ports {
46 addrs[i] = net.JoinHostPort("127.0.0.1", port)
47 }
48 return addrs
49 }
50
51 func (s *clusterScenario) clusterClientUnsafe(opt *redis.ClusterOptions) *redis.ClusterClient {
52 opt.Addrs = s.addrs()
53 return redis.NewClusterClient(opt)
54
55 }
56
57 func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
58 client := s.clusterClientUnsafe(opt)
59
60 err := eventually(func() error {
61 if opt.ClusterSlots != nil {
62 return nil
63 }
64
65 state, err := client.LoadState()
66 if err != nil {
67 return err
68 }
69
70 if !state.IsConsistent() {
71 return fmt.Errorf("cluster state is not consistent")
72 }
73
74 return nil
75 }, 30*time.Second)
76 if err != nil {
77 panic(err)
78 }
79
80 return client
81 }
82
83 func startCluster(scenario *clusterScenario) error {
84
85 for pos, port := range scenario.ports {
86 process, err := startRedis(port, "--cluster-enabled", "yes")
87 if err != nil {
88 return err
89 }
90
91 client := redis.NewClient(&redis.Options{
92 Addr: ":" + port,
93 })
94
95 info, err := client.ClusterNodes().Result()
96 if err != nil {
97 return err
98 }
99
100 scenario.processes[port] = process
101 scenario.clients[port] = client
102 scenario.nodeIds[pos] = info[:40]
103 }
104
105
106 for _, client := range scenario.clients {
107 err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err()
108 if err != nil {
109 return err
110 }
111 }
112
113
114 slots := []int{0, 5000, 10000, 16384}
115 for pos, master := range scenario.masters() {
116 err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
117 if err != nil {
118 return err
119 }
120 }
121
122
123 for idx, slave := range scenario.slaves() {
124 masterId := scenario.nodeIds[idx]
125
126
127 err := eventually(func() error {
128 s := slave.ClusterNodes().Val()
129 wanted := masterId
130 if !strings.Contains(s, wanted) {
131 return fmt.Errorf("%q does not contain %q", s, wanted)
132 }
133 return nil
134 }, 10*time.Second)
135 if err != nil {
136 return err
137 }
138
139 err = slave.ClusterReplicate(masterId).Err()
140 if err != nil {
141 return err
142 }
143 }
144
145
146 wanted := []redis.ClusterSlot{{
147 Start: 0,
148 End: 4999,
149 Nodes: []redis.ClusterNode{{
150 Id: "",
151 Addr: "127.0.0.1:8220",
152 }, {
153 Id: "",
154 Addr: "127.0.0.1:8223",
155 }},
156 }, {
157 Start: 5000,
158 End: 9999,
159 Nodes: []redis.ClusterNode{{
160 Id: "",
161 Addr: "127.0.0.1:8221",
162 }, {
163 Id: "",
164 Addr: "127.0.0.1:8224",
165 }},
166 }, {
167 Start: 10000,
168 End: 16383,
169 Nodes: []redis.ClusterNode{{
170 Id: "",
171 Addr: "127.0.0.1:8222",
172 }, {
173 Id: "",
174 Addr: "127.0.0.1:8225",
175 }},
176 }}
177 for _, client := range scenario.clients {
178 err := eventually(func() error {
179 res, err := client.ClusterSlots().Result()
180 if err != nil {
181 return err
182 }
183 return assertSlotsEqual(res, wanted)
184 }, 30*time.Second)
185 if err != nil {
186 return err
187 }
188 }
189
190 return nil
191 }
192
193 func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
194 outerLoop:
195 for _, s2 := range wanted {
196 for _, s1 := range slots {
197 if slotEqual(s1, s2) {
198 continue outerLoop
199 }
200 }
201 return fmt.Errorf("%v not found in %v", s2, slots)
202 }
203 return nil
204 }
205
206 func slotEqual(s1, s2 redis.ClusterSlot) bool {
207 if s1.Start != s2.Start {
208 return false
209 }
210 if s1.End != s2.End {
211 return false
212 }
213 if len(s1.Nodes) != len(s2.Nodes) {
214 return false
215 }
216 for i, n1 := range s1.Nodes {
217 if n1.Addr != s2.Nodes[i].Addr {
218 return false
219 }
220 }
221 return true
222 }
223
224 func stopCluster(scenario *clusterScenario) error {
225 for _, client := range scenario.clients {
226 if err := client.Close(); err != nil {
227 return err
228 }
229 }
230 for _, process := range scenario.processes {
231 if err := process.Close(); err != nil {
232 return err
233 }
234 }
235 return nil
236 }
237
238
239
240 var _ = Describe("ClusterClient", func() {
241 var failover bool
242 var opt *redis.ClusterOptions
243 var client *redis.ClusterClient
244
245 assertClusterClient := func() {
246 It("should GET/SET/DEL", func() {
247 err := client.Get("A").Err()
248 Expect(err).To(Equal(redis.Nil))
249
250 err = client.Set("A", "VALUE", 0).Err()
251 Expect(err).NotTo(HaveOccurred())
252
253 Eventually(func() string {
254 return client.Get("A").Val()
255 }, 30*time.Second).Should(Equal("VALUE"))
256
257 cnt, err := client.Del("A").Result()
258 Expect(err).NotTo(HaveOccurred())
259 Expect(cnt).To(Equal(int64(1)))
260 })
261
262 It("GET follows redirects", func() {
263 err := client.Set("A", "VALUE", 0).Err()
264 Expect(err).NotTo(HaveOccurred())
265
266 if !failover {
267 Eventually(func() int64 {
268 nodes, err := client.Nodes("A")
269 if err != nil {
270 return 0
271 }
272 return nodes[1].Client.DBSize().Val()
273 }, 30*time.Second).Should(Equal(int64(1)))
274
275 Eventually(func() error {
276 return client.SwapNodes("A")
277 }, 30*time.Second).ShouldNot(HaveOccurred())
278 }
279
280 v, err := client.Get("A").Result()
281 Expect(err).NotTo(HaveOccurred())
282 Expect(v).To(Equal("VALUE"))
283 })
284
285 It("SET follows redirects", func() {
286 if !failover {
287 Eventually(func() error {
288 return client.SwapNodes("A")
289 }, 30*time.Second).ShouldNot(HaveOccurred())
290 }
291
292 err := client.Set("A", "VALUE", 0).Err()
293 Expect(err).NotTo(HaveOccurred())
294
295 v, err := client.Get("A").Result()
296 Expect(err).NotTo(HaveOccurred())
297 Expect(v).To(Equal("VALUE"))
298 })
299
300 It("distributes keys", func() {
301 for i := 0; i < 100; i++ {
302 err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
303 Expect(err).NotTo(HaveOccurred())
304 }
305
306 client.ForEachMaster(func(master *redis.Client) error {
307 defer GinkgoRecover()
308 Eventually(func() string {
309 return master.Info("keyspace").Val()
310 }, 30*time.Second).Should(Or(
311 ContainSubstring("keys=31"),
312 ContainSubstring("keys=29"),
313 ContainSubstring("keys=40"),
314 ))
315 return nil
316 })
317 })
318
319 It("distributes keys when using EVAL", func() {
320 script := redis.NewScript(`
321 local r = redis.call('SET', KEYS[1], ARGV[1])
322 return r
323 `)
324
325 var key string
326 for i := 0; i < 100; i++ {
327 key = fmt.Sprintf("key%d", i)
328 err := script.Run(client, []string{key}, "value").Err()
329 Expect(err).NotTo(HaveOccurred())
330 }
331
332 client.ForEachMaster(func(master *redis.Client) error {
333 defer GinkgoRecover()
334 Eventually(func() string {
335 return master.Info("keyspace").Val()
336 }, 30*time.Second).Should(Or(
337 ContainSubstring("keys=31"),
338 ContainSubstring("keys=29"),
339 ContainSubstring("keys=40"),
340 ))
341 return nil
342 })
343 })
344
345 It("supports Watch", func() {
346 var incr func(string) error
347
348
349 incr = func(key string) error {
350 err := client.Watch(func(tx *redis.Tx) error {
351 n, err := tx.Get(key).Int64()
352 if err != nil && err != redis.Nil {
353 return err
354 }
355
356 _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
357 pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
358 return nil
359 })
360 return err
361 }, key)
362 if err == redis.TxFailedErr {
363 return incr(key)
364 }
365 return err
366 }
367
368 var wg sync.WaitGroup
369 for i := 0; i < 100; i++ {
370 wg.Add(1)
371 go func() {
372 defer GinkgoRecover()
373 defer wg.Done()
374
375 err := incr("key")
376 Expect(err).NotTo(HaveOccurred())
377 }()
378 }
379 wg.Wait()
380
381 Eventually(func() string {
382 return client.Get("key").Val()
383 }, 30*time.Second).Should(Equal("100"))
384 })
385
386 Describe("pipelining", func() {
387 var pipe *redis.Pipeline
388
389 assertPipeline := func() {
390 keys := []string{"A", "B", "C", "D", "E", "F", "G"}
391
392 It("follows redirects", func() {
393 if !failover {
394 for _, key := range keys {
395 Eventually(func() error {
396 return client.SwapNodes(key)
397 }, 30*time.Second).ShouldNot(HaveOccurred())
398 }
399 }
400
401 for i, key := range keys {
402 pipe.Set(key, key+"_value", 0)
403 pipe.Expire(key, time.Duration(i+1)*time.Hour)
404 }
405 cmds, err := pipe.Exec()
406 Expect(err).NotTo(HaveOccurred())
407 Expect(cmds).To(HaveLen(14))
408
409 _ = client.ForEachNode(func(node *redis.Client) error {
410 defer GinkgoRecover()
411 Eventually(func() int64 {
412 return node.DBSize().Val()
413 }, 30*time.Second).ShouldNot(BeZero())
414 return nil
415 })
416
417 if !failover {
418 for _, key := range keys {
419 Eventually(func() error {
420 return client.SwapNodes(key)
421 }, 30*time.Second).ShouldNot(HaveOccurred())
422 }
423 }
424
425 for _, key := range keys {
426 pipe.Get(key)
427 pipe.TTL(key)
428 }
429 cmds, err = pipe.Exec()
430 Expect(err).NotTo(HaveOccurred())
431 Expect(cmds).To(HaveLen(14))
432
433 for i, key := range keys {
434 get := cmds[i*2].(*redis.StringCmd)
435 Expect(get.Val()).To(Equal(key + "_value"))
436
437 ttl := cmds[(i*2)+1].(*redis.DurationCmd)
438 dur := time.Duration(i+1) * time.Hour
439 Expect(ttl.Val()).To(BeNumerically("~", dur, 30*time.Second))
440 }
441 })
442
443 It("works with missing keys", func() {
444 pipe.Set("A", "A_value", 0)
445 pipe.Set("C", "C_value", 0)
446 _, err := pipe.Exec()
447 Expect(err).NotTo(HaveOccurred())
448
449 a := pipe.Get("A")
450 b := pipe.Get("B")
451 c := pipe.Get("C")
452 cmds, err := pipe.Exec()
453 Expect(err).To(Equal(redis.Nil))
454 Expect(cmds).To(HaveLen(3))
455
456 Expect(a.Err()).NotTo(HaveOccurred())
457 Expect(a.Val()).To(Equal("A_value"))
458
459 Expect(b.Err()).To(Equal(redis.Nil))
460 Expect(b.Val()).To(Equal(""))
461
462 Expect(c.Err()).NotTo(HaveOccurred())
463 Expect(c.Val()).To(Equal("C_value"))
464 })
465 }
466
467 Describe("with Pipeline", func() {
468 BeforeEach(func() {
469 pipe = client.Pipeline().(*redis.Pipeline)
470 })
471
472 AfterEach(func() {
473 Expect(pipe.Close()).NotTo(HaveOccurred())
474 })
475
476 assertPipeline()
477 })
478
479 Describe("with TxPipeline", func() {
480 BeforeEach(func() {
481 pipe = client.TxPipeline().(*redis.Pipeline)
482 })
483
484 AfterEach(func() {
485 Expect(pipe.Close()).NotTo(HaveOccurred())
486 })
487
488 assertPipeline()
489 })
490 })
491
492 It("supports PubSub", func() {
493 pubsub := client.Subscribe("mychannel")
494 defer pubsub.Close()
495
496 Eventually(func() error {
497 _, err := client.Publish("mychannel", "hello").Result()
498 if err != nil {
499 return err
500 }
501
502 msg, err := pubsub.ReceiveTimeout(time.Second)
503 if err != nil {
504 return err
505 }
506
507 _, ok := msg.(*redis.Message)
508 if !ok {
509 return fmt.Errorf("got %T, wanted *redis.Message", msg)
510 }
511
512 return nil
513 }, 30*time.Second).ShouldNot(HaveOccurred())
514 })
515
516 It("supports PubSub.Ping without channels", func() {
517 pubsub := client.Subscribe()
518 defer pubsub.Close()
519
520 err := pubsub.Ping()
521 Expect(err).NotTo(HaveOccurred())
522 })
523 }
524
525 Describe("ClusterClient", func() {
526 BeforeEach(func() {
527 opt = redisClusterOptions()
528 client = cluster.clusterClient(opt)
529
530 err := client.ForEachMaster(func(master *redis.Client) error {
531 return master.FlushDB().Err()
532 })
533 Expect(err).NotTo(HaveOccurred())
534 })
535
536 AfterEach(func() {
537 _ = client.ForEachMaster(func(master *redis.Client) error {
538 return master.FlushDB().Err()
539 })
540 Expect(client.Close()).NotTo(HaveOccurred())
541 })
542
543 It("returns pool stats", func() {
544 stats := client.PoolStats()
545 Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
546 })
547
548 It("removes idle connections", func() {
549 stats := client.PoolStats()
550 Expect(stats.TotalConns).NotTo(BeZero())
551 Expect(stats.IdleConns).NotTo(BeZero())
552
553 time.Sleep(2 * time.Second)
554
555 stats = client.PoolStats()
556 Expect(stats.TotalConns).To(BeZero())
557 Expect(stats.IdleConns).To(BeZero())
558 })
559
560 It("returns an error when there are no attempts left", func() {
561 opt := redisClusterOptions()
562 opt.MaxRedirects = -1
563 client := cluster.clusterClient(opt)
564
565 Eventually(func() error {
566 return client.SwapNodes("A")
567 }, 30*time.Second).ShouldNot(HaveOccurred())
568
569 err := client.Get("A").Err()
570 Expect(err).To(HaveOccurred())
571 Expect(err.Error()).To(ContainSubstring("MOVED"))
572
573 Expect(client.Close()).NotTo(HaveOccurred())
574 })
575
576 It("calls fn for every master node", func() {
577 for i := 0; i < 10; i++ {
578 Expect(client.Set(strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
579 }
580
581 err := client.ForEachMaster(func(master *redis.Client) error {
582 return master.FlushDB().Err()
583 })
584 Expect(err).NotTo(HaveOccurred())
585
586 size, err := client.DBSize().Result()
587 Expect(err).NotTo(HaveOccurred())
588 Expect(size).To(Equal(int64(0)))
589 })
590
591 It("should CLUSTER SLOTS", func() {
592 res, err := client.ClusterSlots().Result()
593 Expect(err).NotTo(HaveOccurred())
594 Expect(res).To(HaveLen(3))
595
596 wanted := []redis.ClusterSlot{{
597 Start: 0,
598 End: 4999,
599 Nodes: []redis.ClusterNode{{
600 Id: "",
601 Addr: "127.0.0.1:8220",
602 }, {
603 Id: "",
604 Addr: "127.0.0.1:8223",
605 }},
606 }, {
607 Start: 5000,
608 End: 9999,
609 Nodes: []redis.ClusterNode{{
610 Id: "",
611 Addr: "127.0.0.1:8221",
612 }, {
613 Id: "",
614 Addr: "127.0.0.1:8224",
615 }},
616 }, {
617 Start: 10000,
618 End: 16383,
619 Nodes: []redis.ClusterNode{{
620 Id: "",
621 Addr: "127.0.0.1:8222",
622 }, {
623 Id: "",
624 Addr: "127.0.0.1:8225",
625 }},
626 }}
627 Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
628 })
629
630 It("should CLUSTER NODES", func() {
631 res, err := client.ClusterNodes().Result()
632 Expect(err).NotTo(HaveOccurred())
633 Expect(len(res)).To(BeNumerically(">", 400))
634 })
635
636 It("should CLUSTER INFO", func() {
637 res, err := client.ClusterInfo().Result()
638 Expect(err).NotTo(HaveOccurred())
639 Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
640 })
641
642 It("should CLUSTER KEYSLOT", func() {
643 hashSlot, err := client.ClusterKeySlot("somekey").Result()
644 Expect(err).NotTo(HaveOccurred())
645 Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
646 })
647
648 It("should CLUSTER GETKEYSINSLOT", func() {
649 keys, err := client.ClusterGetKeysInSlot(hashtag.Slot("somekey"), 1).Result()
650 Expect(err).NotTo(HaveOccurred())
651 Expect(len(keys)).To(Equal(0))
652 })
653
654 It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
655 n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result()
656 Expect(err).NotTo(HaveOccurred())
657 Expect(n).To(Equal(int64(0)))
658 })
659
660 It("should CLUSTER COUNTKEYSINSLOT", func() {
661 n, err := client.ClusterCountKeysInSlot(10).Result()
662 Expect(err).NotTo(HaveOccurred())
663 Expect(n).To(Equal(int64(0)))
664 })
665
666 It("should CLUSTER SAVECONFIG", func() {
667 res, err := client.ClusterSaveConfig().Result()
668 Expect(err).NotTo(HaveOccurred())
669 Expect(res).To(Equal("OK"))
670 })
671
672 It("should CLUSTER SLAVES", func() {
673 nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result()
674 Expect(err).NotTo(HaveOccurred())
675 Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
676 Expect(nodesList).Should(HaveLen(1))
677 })
678
679 It("should RANDOMKEY", func() {
680 const nkeys = 100
681
682 for i := 0; i < nkeys; i++ {
683 err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
684 Expect(err).NotTo(HaveOccurred())
685 }
686
687 var keys []string
688 addKey := func(key string) {
689 for _, k := range keys {
690 if k == key {
691 return
692 }
693 }
694 keys = append(keys, key)
695 }
696
697 for i := 0; i < nkeys*10; i++ {
698 key := client.RandomKey().Val()
699 addKey(key)
700 }
701
702 Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
703 })
704
705 assertClusterClient()
706 })
707
708 Describe("ClusterClient failover", func() {
709 BeforeEach(func() {
710 failover = true
711
712 opt = redisClusterOptions()
713 opt.MinRetryBackoff = 250 * time.Millisecond
714 opt.MaxRetryBackoff = time.Second
715 client = cluster.clusterClient(opt)
716
717 err := client.ForEachMaster(func(master *redis.Client) error {
718 return master.FlushDB().Err()
719 })
720 Expect(err).NotTo(HaveOccurred())
721
722 err = client.ForEachSlave(func(slave *redis.Client) error {
723 defer GinkgoRecover()
724
725 Eventually(func() int64 {
726 return slave.DBSize().Val()
727 }, "30s").Should(Equal(int64(0)))
728
729 return nil
730 })
731 Expect(err).NotTo(HaveOccurred())
732
733 state, err := client.LoadState()
734 Eventually(func() bool {
735 state, err = client.LoadState()
736 if err != nil {
737 return false
738 }
739 return state.IsConsistent()
740 }, "30s").Should(BeTrue())
741
742 for _, slave := range state.Slaves {
743 err = slave.Client.ClusterFailover().Err()
744 Expect(err).NotTo(HaveOccurred())
745
746 Eventually(func() bool {
747 state, _ := client.LoadState()
748 return state.IsConsistent()
749 }, "30s").Should(BeTrue())
750 }
751 })
752
753 AfterEach(func() {
754 failover = false
755 Expect(client.Close()).NotTo(HaveOccurred())
756 })
757
758 assertClusterClient()
759 })
760
761 Describe("ClusterClient with RouteByLatency", func() {
762 BeforeEach(func() {
763 opt = redisClusterOptions()
764 opt.RouteByLatency = true
765 client = cluster.clusterClient(opt)
766
767 err := client.ForEachMaster(func(master *redis.Client) error {
768 return master.FlushDB().Err()
769 })
770 Expect(err).NotTo(HaveOccurred())
771
772 err = client.ForEachSlave(func(slave *redis.Client) error {
773 Eventually(func() int64 {
774 return client.DBSize().Val()
775 }, 30*time.Second).Should(Equal(int64(0)))
776 return nil
777 })
778 Expect(err).NotTo(HaveOccurred())
779 })
780
781 AfterEach(func() {
782 err := client.ForEachSlave(func(slave *redis.Client) error {
783 return slave.ReadWrite().Err()
784 })
785 Expect(err).NotTo(HaveOccurred())
786
787 err = client.Close()
788 Expect(err).NotTo(HaveOccurred())
789 })
790
791 assertClusterClient()
792 })
793
794 Describe("ClusterClient with ClusterSlots", func() {
795 BeforeEach(func() {
796 failover = true
797
798 opt = redisClusterOptions()
799 opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
800 slots := []redis.ClusterSlot{{
801 Start: 0,
802 End: 4999,
803 Nodes: []redis.ClusterNode{{
804 Addr: ":" + ringShard1Port,
805 }},
806 }, {
807 Start: 5000,
808 End: 9999,
809 Nodes: []redis.ClusterNode{{
810 Addr: ":" + ringShard2Port,
811 }},
812 }, {
813 Start: 10000,
814 End: 16383,
815 Nodes: []redis.ClusterNode{{
816 Addr: ":" + ringShard3Port,
817 }},
818 }}
819 return slots, nil
820 }
821 client = cluster.clusterClient(opt)
822
823 err := client.ForEachMaster(func(master *redis.Client) error {
824 return master.FlushDB().Err()
825 })
826 Expect(err).NotTo(HaveOccurred())
827
828 err = client.ForEachSlave(func(slave *redis.Client) error {
829 Eventually(func() int64 {
830 return client.DBSize().Val()
831 }, 30*time.Second).Should(Equal(int64(0)))
832 return nil
833 })
834 Expect(err).NotTo(HaveOccurred())
835 })
836
837 AfterEach(func() {
838 failover = false
839
840 err := client.Close()
841 Expect(err).NotTo(HaveOccurred())
842 })
843
844 assertClusterClient()
845 })
846
847 Describe("ClusterClient with RouteRandomly and ClusterSlots", func() {
848 BeforeEach(func() {
849 failover = true
850
851 opt = redisClusterOptions()
852 opt.RouteRandomly = true
853 opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
854 slots := []redis.ClusterSlot{{
855 Start: 0,
856 End: 4999,
857 Nodes: []redis.ClusterNode{{
858 Addr: ":" + ringShard1Port,
859 }},
860 }, {
861 Start: 5000,
862 End: 9999,
863 Nodes: []redis.ClusterNode{{
864 Addr: ":" + ringShard2Port,
865 }},
866 }, {
867 Start: 10000,
868 End: 16383,
869 Nodes: []redis.ClusterNode{{
870 Addr: ":" + ringShard3Port,
871 }},
872 }}
873 return slots, nil
874 }
875 client = cluster.clusterClient(opt)
876
877 err := client.ForEachMaster(func(master *redis.Client) error {
878 return master.FlushDB().Err()
879 })
880 Expect(err).NotTo(HaveOccurred())
881
882 err = client.ForEachSlave(func(slave *redis.Client) error {
883 Eventually(func() int64 {
884 return client.DBSize().Val()
885 }, 30*time.Second).Should(Equal(int64(0)))
886 return nil
887 })
888 Expect(err).NotTo(HaveOccurred())
889 })
890
891 AfterEach(func() {
892 failover = false
893
894 err := client.Close()
895 Expect(err).NotTo(HaveOccurred())
896 })
897
898 assertClusterClient()
899 })
900 })
901
902 var _ = Describe("ClusterClient without nodes", func() {
903 var client *redis.ClusterClient
904
905 BeforeEach(func() {
906 client = redis.NewClusterClient(&redis.ClusterOptions{})
907 })
908
909 AfterEach(func() {
910 Expect(client.Close()).NotTo(HaveOccurred())
911 })
912
913 It("Ping returns an error", func() {
914 err := client.Ping().Err()
915 Expect(err).To(MatchError("redis: cluster has no nodes"))
916 })
917
918 It("pipeline returns an error", func() {
919 _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
920 pipe.Ping()
921 return nil
922 })
923 Expect(err).To(MatchError("redis: cluster has no nodes"))
924 })
925 })
926
927 var _ = Describe("ClusterClient without valid nodes", func() {
928 var client *redis.ClusterClient
929
930 BeforeEach(func() {
931 client = redis.NewClusterClient(&redis.ClusterOptions{
932 Addrs: []string{redisAddr},
933 })
934 })
935
936 AfterEach(func() {
937 Expect(client.Close()).NotTo(HaveOccurred())
938 })
939
940 It("returns an error", func() {
941 err := client.Ping().Err()
942 Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
943 })
944
945 It("pipeline returns an error", func() {
946 _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
947 pipe.Ping()
948 return nil
949 })
950 Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
951 })
952 })
953
954 var _ = Describe("ClusterClient with unavailable Cluster", func() {
955 var client *redis.ClusterClient
956
957 BeforeEach(func() {
958 for _, node := range cluster.clients {
959 err := node.ClientPause(5 * time.Second).Err()
960 Expect(err).NotTo(HaveOccurred())
961 }
962
963 opt := redisClusterOptions()
964 opt.ReadTimeout = 250 * time.Millisecond
965 opt.WriteTimeout = 250 * time.Millisecond
966 opt.MaxRedirects = 1
967 client = cluster.clusterClientUnsafe(opt)
968 })
969
970 AfterEach(func() {
971 Expect(client.Close()).NotTo(HaveOccurred())
972 })
973
974 It("recovers when Cluster recovers", func() {
975 err := client.Ping().Err()
976 Expect(err).To(HaveOccurred())
977
978 Eventually(func() error {
979 return client.Ping().Err()
980 }, "30s").ShouldNot(HaveOccurred())
981 })
982 })
983
984 var _ = Describe("ClusterClient timeout", func() {
985 var client *redis.ClusterClient
986
987 AfterEach(func() {
988 _ = client.Close()
989 })
990
991 testTimeout := func() {
992 It("Ping timeouts", func() {
993 err := client.Ping().Err()
994 Expect(err).To(HaveOccurred())
995 Expect(err.(net.Error).Timeout()).To(BeTrue())
996 })
997
998 It("Pipeline timeouts", func() {
999 _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
1000 pipe.Ping()
1001 return nil
1002 })
1003 Expect(err).To(HaveOccurred())
1004 Expect(err.(net.Error).Timeout()).To(BeTrue())
1005 })
1006
1007 It("Tx timeouts", func() {
1008 err := client.Watch(func(tx *redis.Tx) error {
1009 return tx.Ping().Err()
1010 }, "foo")
1011 Expect(err).To(HaveOccurred())
1012 Expect(err.(net.Error).Timeout()).To(BeTrue())
1013 })
1014
1015 It("Tx Pipeline timeouts", func() {
1016 err := client.Watch(func(tx *redis.Tx) error {
1017 _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
1018 pipe.Ping()
1019 return nil
1020 })
1021 return err
1022 }, "foo")
1023 Expect(err).To(HaveOccurred())
1024 Expect(err.(net.Error).Timeout()).To(BeTrue())
1025 })
1026 }
1027
1028 const pause = 5 * time.Second
1029
1030 Context("read/write timeout", func() {
1031 BeforeEach(func() {
1032 opt := redisClusterOptions()
1033 opt.ReadTimeout = 250 * time.Millisecond
1034 opt.WriteTimeout = 250 * time.Millisecond
1035 opt.MaxRedirects = 1
1036 client = cluster.clusterClient(opt)
1037
1038 err := client.ForEachNode(func(client *redis.Client) error {
1039 return client.ClientPause(pause).Err()
1040 })
1041 Expect(err).NotTo(HaveOccurred())
1042 })
1043
1044 AfterEach(func() {
1045 _ = client.ForEachNode(func(client *redis.Client) error {
1046 defer GinkgoRecover()
1047 Eventually(func() error {
1048 return client.Ping().Err()
1049 }, 2*pause).ShouldNot(HaveOccurred())
1050 return nil
1051 })
1052 })
1053
1054 testTimeout()
1055 })
1056 })
1057
1058
1059
1060 func newClusterScenario() *clusterScenario {
1061 return &clusterScenario{
1062 ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
1063 nodeIds: make([]string, 6),
1064 processes: make(map[string]*redisProcess, 6),
1065 clients: make(map[string]*redis.Client, 6),
1066 }
1067 }
1068
1069 func BenchmarkClusterPing(b *testing.B) {
1070 if testing.Short() {
1071 b.Skip("skipping in short mode")
1072 }
1073
1074 cluster := newClusterScenario()
1075 if err := startCluster(cluster); err != nil {
1076 b.Fatal(err)
1077 }
1078 defer stopCluster(cluster)
1079
1080 client := cluster.clusterClient(redisClusterOptions())
1081 defer client.Close()
1082
1083 b.ResetTimer()
1084
1085 b.RunParallel(func(pb *testing.PB) {
1086 for pb.Next() {
1087 err := client.Ping().Err()
1088 if err != nil {
1089 b.Fatal(err)
1090 }
1091 }
1092 })
1093 }
1094
1095 func BenchmarkClusterSetString(b *testing.B) {
1096 if testing.Short() {
1097 b.Skip("skipping in short mode")
1098 }
1099
1100 cluster := newClusterScenario()
1101 if err := startCluster(cluster); err != nil {
1102 b.Fatal(err)
1103 }
1104 defer stopCluster(cluster)
1105
1106 client := cluster.clusterClient(redisClusterOptions())
1107 defer client.Close()
1108
1109 value := string(bytes.Repeat([]byte{'1'}, 10000))
1110
1111 b.ResetTimer()
1112
1113 b.RunParallel(func(pb *testing.PB) {
1114 for pb.Next() {
1115 err := client.Set("key", value, 0).Err()
1116 if err != nil {
1117 b.Fatal(err)
1118 }
1119 }
1120 })
1121 }
1122
1123 func BenchmarkClusterReloadState(b *testing.B) {
1124 if testing.Short() {
1125 b.Skip("skipping in short mode")
1126 }
1127
1128 cluster := newClusterScenario()
1129 if err := startCluster(cluster); err != nil {
1130 b.Fatal(err)
1131 }
1132 defer stopCluster(cluster)
1133
1134 client := cluster.clusterClient(redisClusterOptions())
1135 defer client.Close()
1136
1137 b.ResetTimer()
1138
1139 for i := 0; i < b.N; i++ {
1140 err := client.ReloadState()
1141 if err != nil {
1142 b.Fatal(err)
1143 }
1144 }
1145 }
1146
View as plain text