...

Source file src/github.com/go-redis/redis/cluster_test.go

Documentation: github.com/go-redis/redis

     1  package redis_test
     2  
     3  import (
     4  	"bytes"
     5  	"fmt"
     6  	"net"
     7  	"strconv"
     8  	"strings"
     9  	"sync"
    10  	"testing"
    11  	"time"
    12  
    13  	"github.com/go-redis/redis"
    14  	"github.com/go-redis/redis/internal/hashtag"
    15  
    16  	. "github.com/onsi/ginkgo"
    17  	. "github.com/onsi/gomega"
    18  )
    19  
    20  type clusterScenario struct {
    21  	ports     []string
    22  	nodeIds   []string
    23  	processes map[string]*redisProcess
    24  	clients   map[string]*redis.Client
    25  }
    26  
    27  func (s *clusterScenario) masters() []*redis.Client {
    28  	result := make([]*redis.Client, 3)
    29  	for pos, port := range s.ports[:3] {
    30  		result[pos] = s.clients[port]
    31  	}
    32  	return result
    33  }
    34  
    35  func (s *clusterScenario) slaves() []*redis.Client {
    36  	result := make([]*redis.Client, 3)
    37  	for pos, port := range s.ports[3:] {
    38  		result[pos] = s.clients[port]
    39  	}
    40  	return result
    41  }
    42  
    43  func (s *clusterScenario) addrs() []string {
    44  	addrs := make([]string, len(s.ports))
    45  	for i, port := range s.ports {
    46  		addrs[i] = net.JoinHostPort("127.0.0.1", port)
    47  	}
    48  	return addrs
    49  }
    50  
    51  func (s *clusterScenario) clusterClientUnsafe(opt *redis.ClusterOptions) *redis.ClusterClient {
    52  	opt.Addrs = s.addrs()
    53  	return redis.NewClusterClient(opt)
    54  
    55  }
    56  
    57  func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
    58  	client := s.clusterClientUnsafe(opt)
    59  
    60  	err := eventually(func() error {
    61  		if opt.ClusterSlots != nil {
    62  			return nil
    63  		}
    64  
    65  		state, err := client.LoadState()
    66  		if err != nil {
    67  			return err
    68  		}
    69  
    70  		if !state.IsConsistent() {
    71  			return fmt.Errorf("cluster state is not consistent")
    72  		}
    73  
    74  		return nil
    75  	}, 30*time.Second)
    76  	if err != nil {
    77  		panic(err)
    78  	}
    79  
    80  	return client
    81  }
    82  
    83  func startCluster(scenario *clusterScenario) error {
    84  	// Start processes and collect node ids
    85  	for pos, port := range scenario.ports {
    86  		process, err := startRedis(port, "--cluster-enabled", "yes")
    87  		if err != nil {
    88  			return err
    89  		}
    90  
    91  		client := redis.NewClient(&redis.Options{
    92  			Addr: ":" + port,
    93  		})
    94  
    95  		info, err := client.ClusterNodes().Result()
    96  		if err != nil {
    97  			return err
    98  		}
    99  
   100  		scenario.processes[port] = process
   101  		scenario.clients[port] = client
   102  		scenario.nodeIds[pos] = info[:40]
   103  	}
   104  
   105  	// Meet cluster nodes.
   106  	for _, client := range scenario.clients {
   107  		err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err()
   108  		if err != nil {
   109  			return err
   110  		}
   111  	}
   112  
   113  	// Bootstrap masters.
   114  	slots := []int{0, 5000, 10000, 16384}
   115  	for pos, master := range scenario.masters() {
   116  		err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
   117  		if err != nil {
   118  			return err
   119  		}
   120  	}
   121  
   122  	// Bootstrap slaves.
   123  	for idx, slave := range scenario.slaves() {
   124  		masterId := scenario.nodeIds[idx]
   125  
   126  		// Wait until master is available
   127  		err := eventually(func() error {
   128  			s := slave.ClusterNodes().Val()
   129  			wanted := masterId
   130  			if !strings.Contains(s, wanted) {
   131  				return fmt.Errorf("%q does not contain %q", s, wanted)
   132  			}
   133  			return nil
   134  		}, 10*time.Second)
   135  		if err != nil {
   136  			return err
   137  		}
   138  
   139  		err = slave.ClusterReplicate(masterId).Err()
   140  		if err != nil {
   141  			return err
   142  		}
   143  	}
   144  
   145  	// Wait until all nodes have consistent info.
   146  	wanted := []redis.ClusterSlot{{
   147  		Start: 0,
   148  		End:   4999,
   149  		Nodes: []redis.ClusterNode{{
   150  			Id:   "",
   151  			Addr: "127.0.0.1:8220",
   152  		}, {
   153  			Id:   "",
   154  			Addr: "127.0.0.1:8223",
   155  		}},
   156  	}, {
   157  		Start: 5000,
   158  		End:   9999,
   159  		Nodes: []redis.ClusterNode{{
   160  			Id:   "",
   161  			Addr: "127.0.0.1:8221",
   162  		}, {
   163  			Id:   "",
   164  			Addr: "127.0.0.1:8224",
   165  		}},
   166  	}, {
   167  		Start: 10000,
   168  		End:   16383,
   169  		Nodes: []redis.ClusterNode{{
   170  			Id:   "",
   171  			Addr: "127.0.0.1:8222",
   172  		}, {
   173  			Id:   "",
   174  			Addr: "127.0.0.1:8225",
   175  		}},
   176  	}}
   177  	for _, client := range scenario.clients {
   178  		err := eventually(func() error {
   179  			res, err := client.ClusterSlots().Result()
   180  			if err != nil {
   181  				return err
   182  			}
   183  			return assertSlotsEqual(res, wanted)
   184  		}, 30*time.Second)
   185  		if err != nil {
   186  			return err
   187  		}
   188  	}
   189  
   190  	return nil
   191  }
   192  
   193  func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
   194  outerLoop:
   195  	for _, s2 := range wanted {
   196  		for _, s1 := range slots {
   197  			if slotEqual(s1, s2) {
   198  				continue outerLoop
   199  			}
   200  		}
   201  		return fmt.Errorf("%v not found in %v", s2, slots)
   202  	}
   203  	return nil
   204  }
   205  
   206  func slotEqual(s1, s2 redis.ClusterSlot) bool {
   207  	if s1.Start != s2.Start {
   208  		return false
   209  	}
   210  	if s1.End != s2.End {
   211  		return false
   212  	}
   213  	if len(s1.Nodes) != len(s2.Nodes) {
   214  		return false
   215  	}
   216  	for i, n1 := range s1.Nodes {
   217  		if n1.Addr != s2.Nodes[i].Addr {
   218  			return false
   219  		}
   220  	}
   221  	return true
   222  }
   223  
   224  func stopCluster(scenario *clusterScenario) error {
   225  	for _, client := range scenario.clients {
   226  		if err := client.Close(); err != nil {
   227  			return err
   228  		}
   229  	}
   230  	for _, process := range scenario.processes {
   231  		if err := process.Close(); err != nil {
   232  			return err
   233  		}
   234  	}
   235  	return nil
   236  }
   237  
   238  //------------------------------------------------------------------------------
   239  
   240  var _ = Describe("ClusterClient", func() {
   241  	var failover bool
   242  	var opt *redis.ClusterOptions
   243  	var client *redis.ClusterClient
   244  
   245  	assertClusterClient := func() {
   246  		It("should GET/SET/DEL", func() {
   247  			err := client.Get("A").Err()
   248  			Expect(err).To(Equal(redis.Nil))
   249  
   250  			err = client.Set("A", "VALUE", 0).Err()
   251  			Expect(err).NotTo(HaveOccurred())
   252  
   253  			Eventually(func() string {
   254  				return client.Get("A").Val()
   255  			}, 30*time.Second).Should(Equal("VALUE"))
   256  
   257  			cnt, err := client.Del("A").Result()
   258  			Expect(err).NotTo(HaveOccurred())
   259  			Expect(cnt).To(Equal(int64(1)))
   260  		})
   261  
   262  		It("GET follows redirects", func() {
   263  			err := client.Set("A", "VALUE", 0).Err()
   264  			Expect(err).NotTo(HaveOccurred())
   265  
   266  			if !failover {
   267  				Eventually(func() int64 {
   268  					nodes, err := client.Nodes("A")
   269  					if err != nil {
   270  						return 0
   271  					}
   272  					return nodes[1].Client.DBSize().Val()
   273  				}, 30*time.Second).Should(Equal(int64(1)))
   274  
   275  				Eventually(func() error {
   276  					return client.SwapNodes("A")
   277  				}, 30*time.Second).ShouldNot(HaveOccurred())
   278  			}
   279  
   280  			v, err := client.Get("A").Result()
   281  			Expect(err).NotTo(HaveOccurred())
   282  			Expect(v).To(Equal("VALUE"))
   283  		})
   284  
   285  		It("SET follows redirects", func() {
   286  			if !failover {
   287  				Eventually(func() error {
   288  					return client.SwapNodes("A")
   289  				}, 30*time.Second).ShouldNot(HaveOccurred())
   290  			}
   291  
   292  			err := client.Set("A", "VALUE", 0).Err()
   293  			Expect(err).NotTo(HaveOccurred())
   294  
   295  			v, err := client.Get("A").Result()
   296  			Expect(err).NotTo(HaveOccurred())
   297  			Expect(v).To(Equal("VALUE"))
   298  		})
   299  
   300  		It("distributes keys", func() {
   301  			for i := 0; i < 100; i++ {
   302  				err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
   303  				Expect(err).NotTo(HaveOccurred())
   304  			}
   305  
   306  			client.ForEachMaster(func(master *redis.Client) error {
   307  				defer GinkgoRecover()
   308  				Eventually(func() string {
   309  					return master.Info("keyspace").Val()
   310  				}, 30*time.Second).Should(Or(
   311  					ContainSubstring("keys=31"),
   312  					ContainSubstring("keys=29"),
   313  					ContainSubstring("keys=40"),
   314  				))
   315  				return nil
   316  			})
   317  		})
   318  
   319  		It("distributes keys when using EVAL", func() {
   320  			script := redis.NewScript(`
   321  				local r = redis.call('SET', KEYS[1], ARGV[1])
   322  				return r
   323  			`)
   324  
   325  			var key string
   326  			for i := 0; i < 100; i++ {
   327  				key = fmt.Sprintf("key%d", i)
   328  				err := script.Run(client, []string{key}, "value").Err()
   329  				Expect(err).NotTo(HaveOccurred())
   330  			}
   331  
   332  			client.ForEachMaster(func(master *redis.Client) error {
   333  				defer GinkgoRecover()
   334  				Eventually(func() string {
   335  					return master.Info("keyspace").Val()
   336  				}, 30*time.Second).Should(Or(
   337  					ContainSubstring("keys=31"),
   338  					ContainSubstring("keys=29"),
   339  					ContainSubstring("keys=40"),
   340  				))
   341  				return nil
   342  			})
   343  		})
   344  
   345  		It("supports Watch", func() {
   346  			var incr func(string) error
   347  
   348  			// Transactionally increments key using GET and SET commands.
   349  			incr = func(key string) error {
   350  				err := client.Watch(func(tx *redis.Tx) error {
   351  					n, err := tx.Get(key).Int64()
   352  					if err != nil && err != redis.Nil {
   353  						return err
   354  					}
   355  
   356  					_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
   357  						pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
   358  						return nil
   359  					})
   360  					return err
   361  				}, key)
   362  				if err == redis.TxFailedErr {
   363  					return incr(key)
   364  				}
   365  				return err
   366  			}
   367  
   368  			var wg sync.WaitGroup
   369  			for i := 0; i < 100; i++ {
   370  				wg.Add(1)
   371  				go func() {
   372  					defer GinkgoRecover()
   373  					defer wg.Done()
   374  
   375  					err := incr("key")
   376  					Expect(err).NotTo(HaveOccurred())
   377  				}()
   378  			}
   379  			wg.Wait()
   380  
   381  			Eventually(func() string {
   382  				return client.Get("key").Val()
   383  			}, 30*time.Second).Should(Equal("100"))
   384  		})
   385  
   386  		Describe("pipelining", func() {
   387  			var pipe *redis.Pipeline
   388  
   389  			assertPipeline := func() {
   390  				keys := []string{"A", "B", "C", "D", "E", "F", "G"}
   391  
   392  				It("follows redirects", func() {
   393  					if !failover {
   394  						for _, key := range keys {
   395  							Eventually(func() error {
   396  								return client.SwapNodes(key)
   397  							}, 30*time.Second).ShouldNot(HaveOccurred())
   398  						}
   399  					}
   400  
   401  					for i, key := range keys {
   402  						pipe.Set(key, key+"_value", 0)
   403  						pipe.Expire(key, time.Duration(i+1)*time.Hour)
   404  					}
   405  					cmds, err := pipe.Exec()
   406  					Expect(err).NotTo(HaveOccurred())
   407  					Expect(cmds).To(HaveLen(14))
   408  
   409  					_ = client.ForEachNode(func(node *redis.Client) error {
   410  						defer GinkgoRecover()
   411  						Eventually(func() int64 {
   412  							return node.DBSize().Val()
   413  						}, 30*time.Second).ShouldNot(BeZero())
   414  						return nil
   415  					})
   416  
   417  					if !failover {
   418  						for _, key := range keys {
   419  							Eventually(func() error {
   420  								return client.SwapNodes(key)
   421  							}, 30*time.Second).ShouldNot(HaveOccurred())
   422  						}
   423  					}
   424  
   425  					for _, key := range keys {
   426  						pipe.Get(key)
   427  						pipe.TTL(key)
   428  					}
   429  					cmds, err = pipe.Exec()
   430  					Expect(err).NotTo(HaveOccurred())
   431  					Expect(cmds).To(HaveLen(14))
   432  
   433  					for i, key := range keys {
   434  						get := cmds[i*2].(*redis.StringCmd)
   435  						Expect(get.Val()).To(Equal(key + "_value"))
   436  
   437  						ttl := cmds[(i*2)+1].(*redis.DurationCmd)
   438  						dur := time.Duration(i+1) * time.Hour
   439  						Expect(ttl.Val()).To(BeNumerically("~", dur, 30*time.Second))
   440  					}
   441  				})
   442  
   443  				It("works with missing keys", func() {
   444  					pipe.Set("A", "A_value", 0)
   445  					pipe.Set("C", "C_value", 0)
   446  					_, err := pipe.Exec()
   447  					Expect(err).NotTo(HaveOccurred())
   448  
   449  					a := pipe.Get("A")
   450  					b := pipe.Get("B")
   451  					c := pipe.Get("C")
   452  					cmds, err := pipe.Exec()
   453  					Expect(err).To(Equal(redis.Nil))
   454  					Expect(cmds).To(HaveLen(3))
   455  
   456  					Expect(a.Err()).NotTo(HaveOccurred())
   457  					Expect(a.Val()).To(Equal("A_value"))
   458  
   459  					Expect(b.Err()).To(Equal(redis.Nil))
   460  					Expect(b.Val()).To(Equal(""))
   461  
   462  					Expect(c.Err()).NotTo(HaveOccurred())
   463  					Expect(c.Val()).To(Equal("C_value"))
   464  				})
   465  			}
   466  
   467  			Describe("with Pipeline", func() {
   468  				BeforeEach(func() {
   469  					pipe = client.Pipeline().(*redis.Pipeline)
   470  				})
   471  
   472  				AfterEach(func() {
   473  					Expect(pipe.Close()).NotTo(HaveOccurred())
   474  				})
   475  
   476  				assertPipeline()
   477  			})
   478  
   479  			Describe("with TxPipeline", func() {
   480  				BeforeEach(func() {
   481  					pipe = client.TxPipeline().(*redis.Pipeline)
   482  				})
   483  
   484  				AfterEach(func() {
   485  					Expect(pipe.Close()).NotTo(HaveOccurred())
   486  				})
   487  
   488  				assertPipeline()
   489  			})
   490  		})
   491  
   492  		It("supports PubSub", func() {
   493  			pubsub := client.Subscribe("mychannel")
   494  			defer pubsub.Close()
   495  
   496  			Eventually(func() error {
   497  				_, err := client.Publish("mychannel", "hello").Result()
   498  				if err != nil {
   499  					return err
   500  				}
   501  
   502  				msg, err := pubsub.ReceiveTimeout(time.Second)
   503  				if err != nil {
   504  					return err
   505  				}
   506  
   507  				_, ok := msg.(*redis.Message)
   508  				if !ok {
   509  					return fmt.Errorf("got %T, wanted *redis.Message", msg)
   510  				}
   511  
   512  				return nil
   513  			}, 30*time.Second).ShouldNot(HaveOccurred())
   514  		})
   515  
   516  		It("supports PubSub.Ping without channels", func() {
   517  			pubsub := client.Subscribe()
   518  			defer pubsub.Close()
   519  
   520  			err := pubsub.Ping()
   521  			Expect(err).NotTo(HaveOccurred())
   522  		})
   523  	}
   524  
   525  	Describe("ClusterClient", func() {
   526  		BeforeEach(func() {
   527  			opt = redisClusterOptions()
   528  			client = cluster.clusterClient(opt)
   529  
   530  			err := client.ForEachMaster(func(master *redis.Client) error {
   531  				return master.FlushDB().Err()
   532  			})
   533  			Expect(err).NotTo(HaveOccurred())
   534  		})
   535  
   536  		AfterEach(func() {
   537  			_ = client.ForEachMaster(func(master *redis.Client) error {
   538  				return master.FlushDB().Err()
   539  			})
   540  			Expect(client.Close()).NotTo(HaveOccurred())
   541  		})
   542  
   543  		It("returns pool stats", func() {
   544  			stats := client.PoolStats()
   545  			Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
   546  		})
   547  
   548  		It("removes idle connections", func() {
   549  			stats := client.PoolStats()
   550  			Expect(stats.TotalConns).NotTo(BeZero())
   551  			Expect(stats.IdleConns).NotTo(BeZero())
   552  
   553  			time.Sleep(2 * time.Second)
   554  
   555  			stats = client.PoolStats()
   556  			Expect(stats.TotalConns).To(BeZero())
   557  			Expect(stats.IdleConns).To(BeZero())
   558  		})
   559  
   560  		It("returns an error when there are no attempts left", func() {
   561  			opt := redisClusterOptions()
   562  			opt.MaxRedirects = -1
   563  			client := cluster.clusterClient(opt)
   564  
   565  			Eventually(func() error {
   566  				return client.SwapNodes("A")
   567  			}, 30*time.Second).ShouldNot(HaveOccurred())
   568  
   569  			err := client.Get("A").Err()
   570  			Expect(err).To(HaveOccurred())
   571  			Expect(err.Error()).To(ContainSubstring("MOVED"))
   572  
   573  			Expect(client.Close()).NotTo(HaveOccurred())
   574  		})
   575  
   576  		It("calls fn for every master node", func() {
   577  			for i := 0; i < 10; i++ {
   578  				Expect(client.Set(strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
   579  			}
   580  
   581  			err := client.ForEachMaster(func(master *redis.Client) error {
   582  				return master.FlushDB().Err()
   583  			})
   584  			Expect(err).NotTo(HaveOccurred())
   585  
   586  			size, err := client.DBSize().Result()
   587  			Expect(err).NotTo(HaveOccurred())
   588  			Expect(size).To(Equal(int64(0)))
   589  		})
   590  
   591  		It("should CLUSTER SLOTS", func() {
   592  			res, err := client.ClusterSlots().Result()
   593  			Expect(err).NotTo(HaveOccurred())
   594  			Expect(res).To(HaveLen(3))
   595  
   596  			wanted := []redis.ClusterSlot{{
   597  				Start: 0,
   598  				End:   4999,
   599  				Nodes: []redis.ClusterNode{{
   600  					Id:   "",
   601  					Addr: "127.0.0.1:8220",
   602  				}, {
   603  					Id:   "",
   604  					Addr: "127.0.0.1:8223",
   605  				}},
   606  			}, {
   607  				Start: 5000,
   608  				End:   9999,
   609  				Nodes: []redis.ClusterNode{{
   610  					Id:   "",
   611  					Addr: "127.0.0.1:8221",
   612  				}, {
   613  					Id:   "",
   614  					Addr: "127.0.0.1:8224",
   615  				}},
   616  			}, {
   617  				Start: 10000,
   618  				End:   16383,
   619  				Nodes: []redis.ClusterNode{{
   620  					Id:   "",
   621  					Addr: "127.0.0.1:8222",
   622  				}, {
   623  					Id:   "",
   624  					Addr: "127.0.0.1:8225",
   625  				}},
   626  			}}
   627  			Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
   628  		})
   629  
   630  		It("should CLUSTER NODES", func() {
   631  			res, err := client.ClusterNodes().Result()
   632  			Expect(err).NotTo(HaveOccurred())
   633  			Expect(len(res)).To(BeNumerically(">", 400))
   634  		})
   635  
   636  		It("should CLUSTER INFO", func() {
   637  			res, err := client.ClusterInfo().Result()
   638  			Expect(err).NotTo(HaveOccurred())
   639  			Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
   640  		})
   641  
   642  		It("should CLUSTER KEYSLOT", func() {
   643  			hashSlot, err := client.ClusterKeySlot("somekey").Result()
   644  			Expect(err).NotTo(HaveOccurred())
   645  			Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
   646  		})
   647  
   648  		It("should CLUSTER GETKEYSINSLOT", func() {
   649  			keys, err := client.ClusterGetKeysInSlot(hashtag.Slot("somekey"), 1).Result()
   650  			Expect(err).NotTo(HaveOccurred())
   651  			Expect(len(keys)).To(Equal(0))
   652  		})
   653  
   654  		It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
   655  			n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result()
   656  			Expect(err).NotTo(HaveOccurred())
   657  			Expect(n).To(Equal(int64(0)))
   658  		})
   659  
   660  		It("should CLUSTER COUNTKEYSINSLOT", func() {
   661  			n, err := client.ClusterCountKeysInSlot(10).Result()
   662  			Expect(err).NotTo(HaveOccurred())
   663  			Expect(n).To(Equal(int64(0)))
   664  		})
   665  
   666  		It("should CLUSTER SAVECONFIG", func() {
   667  			res, err := client.ClusterSaveConfig().Result()
   668  			Expect(err).NotTo(HaveOccurred())
   669  			Expect(res).To(Equal("OK"))
   670  		})
   671  
   672  		It("should CLUSTER SLAVES", func() {
   673  			nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result()
   674  			Expect(err).NotTo(HaveOccurred())
   675  			Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
   676  			Expect(nodesList).Should(HaveLen(1))
   677  		})
   678  
   679  		It("should RANDOMKEY", func() {
   680  			const nkeys = 100
   681  
   682  			for i := 0; i < nkeys; i++ {
   683  				err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
   684  				Expect(err).NotTo(HaveOccurred())
   685  			}
   686  
   687  			var keys []string
   688  			addKey := func(key string) {
   689  				for _, k := range keys {
   690  					if k == key {
   691  						return
   692  					}
   693  				}
   694  				keys = append(keys, key)
   695  			}
   696  
   697  			for i := 0; i < nkeys*10; i++ {
   698  				key := client.RandomKey().Val()
   699  				addKey(key)
   700  			}
   701  
   702  			Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
   703  		})
   704  
   705  		assertClusterClient()
   706  	})
   707  
   708  	Describe("ClusterClient failover", func() {
   709  		BeforeEach(func() {
   710  			failover = true
   711  
   712  			opt = redisClusterOptions()
   713  			opt.MinRetryBackoff = 250 * time.Millisecond
   714  			opt.MaxRetryBackoff = time.Second
   715  			client = cluster.clusterClient(opt)
   716  
   717  			err := client.ForEachMaster(func(master *redis.Client) error {
   718  				return master.FlushDB().Err()
   719  			})
   720  			Expect(err).NotTo(HaveOccurred())
   721  
   722  			err = client.ForEachSlave(func(slave *redis.Client) error {
   723  				defer GinkgoRecover()
   724  
   725  				Eventually(func() int64 {
   726  					return slave.DBSize().Val()
   727  				}, "30s").Should(Equal(int64(0)))
   728  
   729  				return nil
   730  			})
   731  			Expect(err).NotTo(HaveOccurred())
   732  
   733  			state, err := client.LoadState()
   734  			Eventually(func() bool {
   735  				state, err = client.LoadState()
   736  				if err != nil {
   737  					return false
   738  				}
   739  				return state.IsConsistent()
   740  			}, "30s").Should(BeTrue())
   741  
   742  			for _, slave := range state.Slaves {
   743  				err = slave.Client.ClusterFailover().Err()
   744  				Expect(err).NotTo(HaveOccurred())
   745  
   746  				Eventually(func() bool {
   747  					state, _ := client.LoadState()
   748  					return state.IsConsistent()
   749  				}, "30s").Should(BeTrue())
   750  			}
   751  		})
   752  
   753  		AfterEach(func() {
   754  			failover = false
   755  			Expect(client.Close()).NotTo(HaveOccurred())
   756  		})
   757  
   758  		assertClusterClient()
   759  	})
   760  
   761  	Describe("ClusterClient with RouteByLatency", func() {
   762  		BeforeEach(func() {
   763  			opt = redisClusterOptions()
   764  			opt.RouteByLatency = true
   765  			client = cluster.clusterClient(opt)
   766  
   767  			err := client.ForEachMaster(func(master *redis.Client) error {
   768  				return master.FlushDB().Err()
   769  			})
   770  			Expect(err).NotTo(HaveOccurred())
   771  
   772  			err = client.ForEachSlave(func(slave *redis.Client) error {
   773  				Eventually(func() int64 {
   774  					return client.DBSize().Val()
   775  				}, 30*time.Second).Should(Equal(int64(0)))
   776  				return nil
   777  			})
   778  			Expect(err).NotTo(HaveOccurred())
   779  		})
   780  
   781  		AfterEach(func() {
   782  			err := client.ForEachSlave(func(slave *redis.Client) error {
   783  				return slave.ReadWrite().Err()
   784  			})
   785  			Expect(err).NotTo(HaveOccurred())
   786  
   787  			err = client.Close()
   788  			Expect(err).NotTo(HaveOccurred())
   789  		})
   790  
   791  		assertClusterClient()
   792  	})
   793  
   794  	Describe("ClusterClient with ClusterSlots", func() {
   795  		BeforeEach(func() {
   796  			failover = true
   797  
   798  			opt = redisClusterOptions()
   799  			opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
   800  				slots := []redis.ClusterSlot{{
   801  					Start: 0,
   802  					End:   4999,
   803  					Nodes: []redis.ClusterNode{{
   804  						Addr: ":" + ringShard1Port,
   805  					}},
   806  				}, {
   807  					Start: 5000,
   808  					End:   9999,
   809  					Nodes: []redis.ClusterNode{{
   810  						Addr: ":" + ringShard2Port,
   811  					}},
   812  				}, {
   813  					Start: 10000,
   814  					End:   16383,
   815  					Nodes: []redis.ClusterNode{{
   816  						Addr: ":" + ringShard3Port,
   817  					}},
   818  				}}
   819  				return slots, nil
   820  			}
   821  			client = cluster.clusterClient(opt)
   822  
   823  			err := client.ForEachMaster(func(master *redis.Client) error {
   824  				return master.FlushDB().Err()
   825  			})
   826  			Expect(err).NotTo(HaveOccurred())
   827  
   828  			err = client.ForEachSlave(func(slave *redis.Client) error {
   829  				Eventually(func() int64 {
   830  					return client.DBSize().Val()
   831  				}, 30*time.Second).Should(Equal(int64(0)))
   832  				return nil
   833  			})
   834  			Expect(err).NotTo(HaveOccurred())
   835  		})
   836  
   837  		AfterEach(func() {
   838  			failover = false
   839  
   840  			err := client.Close()
   841  			Expect(err).NotTo(HaveOccurred())
   842  		})
   843  
   844  		assertClusterClient()
   845  	})
   846  
   847  	Describe("ClusterClient with RouteRandomly and ClusterSlots", func() {
   848  		BeforeEach(func() {
   849  			failover = true
   850  
   851  			opt = redisClusterOptions()
   852  			opt.RouteRandomly = true
   853  			opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
   854  				slots := []redis.ClusterSlot{{
   855  					Start: 0,
   856  					End:   4999,
   857  					Nodes: []redis.ClusterNode{{
   858  						Addr: ":" + ringShard1Port,
   859  					}},
   860  				}, {
   861  					Start: 5000,
   862  					End:   9999,
   863  					Nodes: []redis.ClusterNode{{
   864  						Addr: ":" + ringShard2Port,
   865  					}},
   866  				}, {
   867  					Start: 10000,
   868  					End:   16383,
   869  					Nodes: []redis.ClusterNode{{
   870  						Addr: ":" + ringShard3Port,
   871  					}},
   872  				}}
   873  				return slots, nil
   874  			}
   875  			client = cluster.clusterClient(opt)
   876  
   877  			err := client.ForEachMaster(func(master *redis.Client) error {
   878  				return master.FlushDB().Err()
   879  			})
   880  			Expect(err).NotTo(HaveOccurred())
   881  
   882  			err = client.ForEachSlave(func(slave *redis.Client) error {
   883  				Eventually(func() int64 {
   884  					return client.DBSize().Val()
   885  				}, 30*time.Second).Should(Equal(int64(0)))
   886  				return nil
   887  			})
   888  			Expect(err).NotTo(HaveOccurred())
   889  		})
   890  
   891  		AfterEach(func() {
   892  			failover = false
   893  
   894  			err := client.Close()
   895  			Expect(err).NotTo(HaveOccurred())
   896  		})
   897  
   898  		assertClusterClient()
   899  	})
   900  })
   901  
   902  var _ = Describe("ClusterClient without nodes", func() {
   903  	var client *redis.ClusterClient
   904  
   905  	BeforeEach(func() {
   906  		client = redis.NewClusterClient(&redis.ClusterOptions{})
   907  	})
   908  
   909  	AfterEach(func() {
   910  		Expect(client.Close()).NotTo(HaveOccurred())
   911  	})
   912  
   913  	It("Ping returns an error", func() {
   914  		err := client.Ping().Err()
   915  		Expect(err).To(MatchError("redis: cluster has no nodes"))
   916  	})
   917  
   918  	It("pipeline returns an error", func() {
   919  		_, err := client.Pipelined(func(pipe redis.Pipeliner) error {
   920  			pipe.Ping()
   921  			return nil
   922  		})
   923  		Expect(err).To(MatchError("redis: cluster has no nodes"))
   924  	})
   925  })
   926  
   927  var _ = Describe("ClusterClient without valid nodes", func() {
   928  	var client *redis.ClusterClient
   929  
   930  	BeforeEach(func() {
   931  		client = redis.NewClusterClient(&redis.ClusterOptions{
   932  			Addrs: []string{redisAddr},
   933  		})
   934  	})
   935  
   936  	AfterEach(func() {
   937  		Expect(client.Close()).NotTo(HaveOccurred())
   938  	})
   939  
   940  	It("returns an error", func() {
   941  		err := client.Ping().Err()
   942  		Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
   943  	})
   944  
   945  	It("pipeline returns an error", func() {
   946  		_, err := client.Pipelined(func(pipe redis.Pipeliner) error {
   947  			pipe.Ping()
   948  			return nil
   949  		})
   950  		Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
   951  	})
   952  })
   953  
   954  var _ = Describe("ClusterClient with unavailable Cluster", func() {
   955  	var client *redis.ClusterClient
   956  
   957  	BeforeEach(func() {
   958  		for _, node := range cluster.clients {
   959  			err := node.ClientPause(5 * time.Second).Err()
   960  			Expect(err).NotTo(HaveOccurred())
   961  		}
   962  
   963  		opt := redisClusterOptions()
   964  		opt.ReadTimeout = 250 * time.Millisecond
   965  		opt.WriteTimeout = 250 * time.Millisecond
   966  		opt.MaxRedirects = 1
   967  		client = cluster.clusterClientUnsafe(opt)
   968  	})
   969  
   970  	AfterEach(func() {
   971  		Expect(client.Close()).NotTo(HaveOccurred())
   972  	})
   973  
   974  	It("recovers when Cluster recovers", func() {
   975  		err := client.Ping().Err()
   976  		Expect(err).To(HaveOccurred())
   977  
   978  		Eventually(func() error {
   979  			return client.Ping().Err()
   980  		}, "30s").ShouldNot(HaveOccurred())
   981  	})
   982  })
   983  
   984  var _ = Describe("ClusterClient timeout", func() {
   985  	var client *redis.ClusterClient
   986  
   987  	AfterEach(func() {
   988  		_ = client.Close()
   989  	})
   990  
   991  	testTimeout := func() {
   992  		It("Ping timeouts", func() {
   993  			err := client.Ping().Err()
   994  			Expect(err).To(HaveOccurred())
   995  			Expect(err.(net.Error).Timeout()).To(BeTrue())
   996  		})
   997  
   998  		It("Pipeline timeouts", func() {
   999  			_, err := client.Pipelined(func(pipe redis.Pipeliner) error {
  1000  				pipe.Ping()
  1001  				return nil
  1002  			})
  1003  			Expect(err).To(HaveOccurred())
  1004  			Expect(err.(net.Error).Timeout()).To(BeTrue())
  1005  		})
  1006  
  1007  		It("Tx timeouts", func() {
  1008  			err := client.Watch(func(tx *redis.Tx) error {
  1009  				return tx.Ping().Err()
  1010  			}, "foo")
  1011  			Expect(err).To(HaveOccurred())
  1012  			Expect(err.(net.Error).Timeout()).To(BeTrue())
  1013  		})
  1014  
  1015  		It("Tx Pipeline timeouts", func() {
  1016  			err := client.Watch(func(tx *redis.Tx) error {
  1017  				_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
  1018  					pipe.Ping()
  1019  					return nil
  1020  				})
  1021  				return err
  1022  			}, "foo")
  1023  			Expect(err).To(HaveOccurred())
  1024  			Expect(err.(net.Error).Timeout()).To(BeTrue())
  1025  		})
  1026  	}
  1027  
  1028  	const pause = 5 * time.Second
  1029  
  1030  	Context("read/write timeout", func() {
  1031  		BeforeEach(func() {
  1032  			opt := redisClusterOptions()
  1033  			opt.ReadTimeout = 250 * time.Millisecond
  1034  			opt.WriteTimeout = 250 * time.Millisecond
  1035  			opt.MaxRedirects = 1
  1036  			client = cluster.clusterClient(opt)
  1037  
  1038  			err := client.ForEachNode(func(client *redis.Client) error {
  1039  				return client.ClientPause(pause).Err()
  1040  			})
  1041  			Expect(err).NotTo(HaveOccurred())
  1042  		})
  1043  
  1044  		AfterEach(func() {
  1045  			_ = client.ForEachNode(func(client *redis.Client) error {
  1046  				defer GinkgoRecover()
  1047  				Eventually(func() error {
  1048  					return client.Ping().Err()
  1049  				}, 2*pause).ShouldNot(HaveOccurred())
  1050  				return nil
  1051  			})
  1052  		})
  1053  
  1054  		testTimeout()
  1055  	})
  1056  })
  1057  
  1058  //------------------------------------------------------------------------------
  1059  
  1060  func newClusterScenario() *clusterScenario {
  1061  	return &clusterScenario{
  1062  		ports:     []string{"8220", "8221", "8222", "8223", "8224", "8225"},
  1063  		nodeIds:   make([]string, 6),
  1064  		processes: make(map[string]*redisProcess, 6),
  1065  		clients:   make(map[string]*redis.Client, 6),
  1066  	}
  1067  }
  1068  
  1069  func BenchmarkClusterPing(b *testing.B) {
  1070  	if testing.Short() {
  1071  		b.Skip("skipping in short mode")
  1072  	}
  1073  
  1074  	cluster := newClusterScenario()
  1075  	if err := startCluster(cluster); err != nil {
  1076  		b.Fatal(err)
  1077  	}
  1078  	defer stopCluster(cluster)
  1079  
  1080  	client := cluster.clusterClient(redisClusterOptions())
  1081  	defer client.Close()
  1082  
  1083  	b.ResetTimer()
  1084  
  1085  	b.RunParallel(func(pb *testing.PB) {
  1086  		for pb.Next() {
  1087  			err := client.Ping().Err()
  1088  			if err != nil {
  1089  				b.Fatal(err)
  1090  			}
  1091  		}
  1092  	})
  1093  }
  1094  
  1095  func BenchmarkClusterSetString(b *testing.B) {
  1096  	if testing.Short() {
  1097  		b.Skip("skipping in short mode")
  1098  	}
  1099  
  1100  	cluster := newClusterScenario()
  1101  	if err := startCluster(cluster); err != nil {
  1102  		b.Fatal(err)
  1103  	}
  1104  	defer stopCluster(cluster)
  1105  
  1106  	client := cluster.clusterClient(redisClusterOptions())
  1107  	defer client.Close()
  1108  
  1109  	value := string(bytes.Repeat([]byte{'1'}, 10000))
  1110  
  1111  	b.ResetTimer()
  1112  
  1113  	b.RunParallel(func(pb *testing.PB) {
  1114  		for pb.Next() {
  1115  			err := client.Set("key", value, 0).Err()
  1116  			if err != nil {
  1117  				b.Fatal(err)
  1118  			}
  1119  		}
  1120  	})
  1121  }
  1122  
  1123  func BenchmarkClusterReloadState(b *testing.B) {
  1124  	if testing.Short() {
  1125  		b.Skip("skipping in short mode")
  1126  	}
  1127  
  1128  	cluster := newClusterScenario()
  1129  	if err := startCluster(cluster); err != nil {
  1130  		b.Fatal(err)
  1131  	}
  1132  	defer stopCluster(cluster)
  1133  
  1134  	client := cluster.clusterClient(redisClusterOptions())
  1135  	defer client.Close()
  1136  
  1137  	b.ResetTimer()
  1138  
  1139  	for i := 0; i < b.N; i++ {
  1140  		err := client.ReloadState()
  1141  		if err != nil {
  1142  			b.Fatal(err)
  1143  		}
  1144  	}
  1145  }
  1146  

View as plain text