...

Source file src/github.com/go-redis/redis/ring_test.go

Documentation: github.com/go-redis/redis

     1  package redis_test
     2  
     3  import (
     4  	"context"
     5  	"crypto/rand"
     6  	"fmt"
     7  	"net"
     8  	"strconv"
     9  	"sync"
    10  	"time"
    11  
    12  	"github.com/go-redis/redis"
    13  
    14  	. "github.com/onsi/ginkgo"
    15  	. "github.com/onsi/gomega"
    16  )
    17  
    18  var _ = Describe("Redis Ring", func() {
    19  	const heartbeat = 100 * time.Millisecond
    20  
    21  	var ring *redis.Ring
    22  
    23  	setRingKeys := func() {
    24  		for i := 0; i < 100; i++ {
    25  			err := ring.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
    26  			Expect(err).NotTo(HaveOccurred())
    27  		}
    28  	}
    29  
    30  	BeforeEach(func() {
    31  		opt := redisRingOptions()
    32  		opt.HeartbeatFrequency = heartbeat
    33  		ring = redis.NewRing(opt)
    34  
    35  		err := ring.ForEachShard(func(cl *redis.Client) error {
    36  			return cl.FlushDB().Err()
    37  		})
    38  		Expect(err).NotTo(HaveOccurred())
    39  	})
    40  
    41  	AfterEach(func() {
    42  		Expect(ring.Close()).NotTo(HaveOccurred())
    43  	})
    44  
    45  	It("distributes keys", func() {
    46  		setRingKeys()
    47  
    48  		// Both shards should have some keys now.
    49  		Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
    50  		Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
    51  	})
    52  
    53  	It("distributes keys when using EVAL", func() {
    54  		script := redis.NewScript(`
    55  			local r = redis.call('SET', KEYS[1], ARGV[1])
    56  			return r
    57  		`)
    58  
    59  		var key string
    60  		for i := 0; i < 100; i++ {
    61  			key = fmt.Sprintf("key%d", i)
    62  			err := script.Run(ring, []string{key}, "value").Err()
    63  			Expect(err).NotTo(HaveOccurred())
    64  		}
    65  
    66  		Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
    67  		Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
    68  	})
    69  
    70  	It("uses single shard when one of the shards is down", func() {
    71  		// Stop ringShard2.
    72  		Expect(ringShard2.Close()).NotTo(HaveOccurred())
    73  
    74  		Eventually(func() int {
    75  			return ring.Len()
    76  		}, "30s").Should(Equal(1))
    77  
    78  		setRingKeys()
    79  
    80  		// RingShard1 should have all keys.
    81  		Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
    82  
    83  		// Start ringShard2.
    84  		var err error
    85  		ringShard2, err = startRedis(ringShard2Port)
    86  		Expect(err).NotTo(HaveOccurred())
    87  
    88  		Eventually(func() int {
    89  			return ring.Len()
    90  		}, "30s").Should(Equal(2))
    91  
    92  		setRingKeys()
    93  
    94  		// RingShard2 should have its keys.
    95  		Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
    96  	})
    97  
    98  	It("supports hash tags", func() {
    99  		for i := 0; i < 100; i++ {
   100  			err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
   101  			Expect(err).NotTo(HaveOccurred())
   102  		}
   103  
   104  		Expect(ringShard1.Info("keyspace").Val()).ToNot(ContainSubstring("keys="))
   105  		Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
   106  	})
   107  
   108  	It("propagates process for WithContext", func() {
   109  		var fromWrap []string
   110  		wrapper := func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
   111  			return func(cmd redis.Cmder) error {
   112  				fromWrap = append(fromWrap, cmd.Name())
   113  
   114  				return oldProcess(cmd)
   115  			}
   116  		}
   117  
   118  		ctx := context.Background()
   119  		ring = ring.WithContext(ctx)
   120  		ring.WrapProcess(wrapper)
   121  
   122  		ring.Ping()
   123  		Expect(fromWrap).To(Equal([]string{"ping"}))
   124  
   125  		ring.Ping()
   126  		Expect(fromWrap).To(Equal([]string{"ping", "ping"}))
   127  	})
   128  
   129  	Describe("pipeline", func() {
   130  		It("distributes keys", func() {
   131  			pipe := ring.Pipeline()
   132  			for i := 0; i < 100; i++ {
   133  				err := pipe.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
   134  				Expect(err).NotTo(HaveOccurred())
   135  			}
   136  			cmds, err := pipe.Exec()
   137  			Expect(err).NotTo(HaveOccurred())
   138  			Expect(cmds).To(HaveLen(100))
   139  			Expect(pipe.Close()).NotTo(HaveOccurred())
   140  
   141  			for _, cmd := range cmds {
   142  				Expect(cmd.Err()).NotTo(HaveOccurred())
   143  				Expect(cmd.(*redis.StatusCmd).Val()).To(Equal("OK"))
   144  			}
   145  
   146  			// Both shards should have some keys now.
   147  			Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57"))
   148  			Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
   149  		})
   150  
   151  		It("is consistent with ring", func() {
   152  			var keys []string
   153  			for i := 0; i < 100; i++ {
   154  				key := make([]byte, 64)
   155  				_, err := rand.Read(key)
   156  				Expect(err).NotTo(HaveOccurred())
   157  				keys = append(keys, string(key))
   158  			}
   159  
   160  			_, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
   161  				for _, key := range keys {
   162  					pipe.Set(key, "value", 0).Err()
   163  				}
   164  				return nil
   165  			})
   166  			Expect(err).NotTo(HaveOccurred())
   167  
   168  			for _, key := range keys {
   169  				val, err := ring.Get(key).Result()
   170  				Expect(err).NotTo(HaveOccurred())
   171  				Expect(val).To(Equal("value"))
   172  			}
   173  		})
   174  
   175  		It("supports hash tags", func() {
   176  			_, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
   177  				for i := 0; i < 100; i++ {
   178  					pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
   179  				}
   180  				return nil
   181  			})
   182  			Expect(err).NotTo(HaveOccurred())
   183  
   184  			Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
   185  			Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
   186  		})
   187  	})
   188  })
   189  
   190  var _ = Describe("empty Redis Ring", func() {
   191  	var ring *redis.Ring
   192  
   193  	BeforeEach(func() {
   194  		ring = redis.NewRing(&redis.RingOptions{})
   195  	})
   196  
   197  	AfterEach(func() {
   198  		Expect(ring.Close()).NotTo(HaveOccurred())
   199  	})
   200  
   201  	It("returns an error", func() {
   202  		err := ring.Ping().Err()
   203  		Expect(err).To(MatchError("redis: all ring shards are down"))
   204  	})
   205  
   206  	It("pipeline returns an error", func() {
   207  		_, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
   208  			pipe.Ping()
   209  			return nil
   210  		})
   211  		Expect(err).To(MatchError("redis: all ring shards are down"))
   212  	})
   213  })
   214  
   215  var _ = Describe("Ring watch", func() {
   216  	const heartbeat = 100 * time.Millisecond
   217  
   218  	var ring *redis.Ring
   219  
   220  	BeforeEach(func() {
   221  		opt := redisRingOptions()
   222  		opt.HeartbeatFrequency = heartbeat
   223  		ring = redis.NewRing(opt)
   224  
   225  		err := ring.ForEachShard(func(cl *redis.Client) error {
   226  			return cl.FlushDB().Err()
   227  		})
   228  		Expect(err).NotTo(HaveOccurred())
   229  	})
   230  
   231  	AfterEach(func() {
   232  		Expect(ring.Close()).NotTo(HaveOccurred())
   233  	})
   234  
   235  	It("should Watch", func() {
   236  		var incr func(string) error
   237  
   238  		// Transactionally increments key using GET and SET commands.
   239  		incr = func(key string) error {
   240  			err := ring.Watch(func(tx *redis.Tx) error {
   241  				n, err := tx.Get(key).Int64()
   242  				if err != nil && err != redis.Nil {
   243  					return err
   244  				}
   245  
   246  				_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
   247  					pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
   248  					return nil
   249  				})
   250  				return err
   251  			}, key)
   252  			if err == redis.TxFailedErr {
   253  				return incr(key)
   254  			}
   255  			return err
   256  		}
   257  
   258  		var wg sync.WaitGroup
   259  		for i := 0; i < 100; i++ {
   260  			wg.Add(1)
   261  			go func() {
   262  				defer GinkgoRecover()
   263  				defer wg.Done()
   264  
   265  				err := incr("key")
   266  				Expect(err).NotTo(HaveOccurred())
   267  			}()
   268  		}
   269  		wg.Wait()
   270  
   271  		n, err := ring.Get("key").Int64()
   272  		Expect(err).NotTo(HaveOccurred())
   273  		Expect(n).To(Equal(int64(100)))
   274  	})
   275  
   276  	It("should discard", func() {
   277  		err := ring.Watch(func(tx *redis.Tx) error {
   278  			cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
   279  				pipe.Set("key1", "hello1", 0)
   280  				pipe.Discard()
   281  				pipe.Set("key2", "hello2", 0)
   282  				return nil
   283  			})
   284  			Expect(err).NotTo(HaveOccurred())
   285  			Expect(cmds).To(HaveLen(1))
   286  			return err
   287  		}, "key1", "key2")
   288  		Expect(err).NotTo(HaveOccurred())
   289  
   290  		get := ring.Get("key1")
   291  		Expect(get.Err()).To(Equal(redis.Nil))
   292  		Expect(get.Val()).To(Equal(""))
   293  
   294  		get = ring.Get("key2")
   295  		Expect(get.Err()).NotTo(HaveOccurred())
   296  		Expect(get.Val()).To(Equal("hello2"))
   297  	})
   298  
   299  	It("returns no error when there are no commands", func() {
   300  		err := ring.Watch(func(tx *redis.Tx) error {
   301  			_, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
   302  			return err
   303  		}, "key")
   304  		Expect(err).NotTo(HaveOccurred())
   305  
   306  		v, err := ring.Ping().Result()
   307  		Expect(err).NotTo(HaveOccurred())
   308  		Expect(v).To(Equal("PONG"))
   309  	})
   310  
   311  	It("should exec bulks", func() {
   312  		const N = 20000
   313  
   314  		err := ring.Watch(func(tx *redis.Tx) error {
   315  			cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
   316  				for i := 0; i < N; i++ {
   317  					pipe.Incr("key")
   318  				}
   319  				return nil
   320  			})
   321  			Expect(err).NotTo(HaveOccurred())
   322  			Expect(len(cmds)).To(Equal(N))
   323  			for _, cmd := range cmds {
   324  				Expect(cmd.Err()).NotTo(HaveOccurred())
   325  			}
   326  			return err
   327  		}, "key")
   328  		Expect(err).NotTo(HaveOccurred())
   329  
   330  		num, err := ring.Get("key").Int64()
   331  		Expect(err).NotTo(HaveOccurred())
   332  		Expect(num).To(Equal(int64(N)))
   333  	})
   334  
   335  	It("should Watch/Unwatch", func() {
   336  		var C, N int
   337  
   338  		err := ring.Set("key", "0", 0).Err()
   339  		Expect(err).NotTo(HaveOccurred())
   340  
   341  		perform(C, func(id int) {
   342  			for i := 0; i < N; i++ {
   343  				err := ring.Watch(func(tx *redis.Tx) error {
   344  					val, err := tx.Get("key").Result()
   345  					Expect(err).NotTo(HaveOccurred())
   346  					Expect(val).NotTo(Equal(redis.Nil))
   347  
   348  					num, err := strconv.ParseInt(val, 10, 64)
   349  					Expect(err).NotTo(HaveOccurred())
   350  
   351  					cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
   352  						pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
   353  						return nil
   354  					})
   355  					Expect(cmds).To(HaveLen(1))
   356  					return err
   357  				}, "key")
   358  				if err == redis.TxFailedErr {
   359  					i--
   360  					continue
   361  				}
   362  				Expect(err).NotTo(HaveOccurred())
   363  			}
   364  		})
   365  
   366  		val, err := ring.Get("key").Int64()
   367  		Expect(err).NotTo(HaveOccurred())
   368  		Expect(val).To(Equal(int64(C * N)))
   369  	})
   370  
   371  	It("should close Tx without closing the client", func() {
   372  		err := ring.Watch(func(tx *redis.Tx) error {
   373  			_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
   374  				pipe.Ping()
   375  				return nil
   376  			})
   377  			return err
   378  		}, "key")
   379  		Expect(err).NotTo(HaveOccurred())
   380  
   381  		Expect(ring.Ping().Err()).NotTo(HaveOccurred())
   382  	})
   383  
   384  	It("respects max size on multi", func() {
   385  		perform(1000, func(id int) {
   386  			var ping *redis.StatusCmd
   387  
   388  			err := ring.Watch(func(tx *redis.Tx) error {
   389  				cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
   390  					ping = pipe.Ping()
   391  					return nil
   392  				})
   393  				Expect(err).NotTo(HaveOccurred())
   394  				Expect(cmds).To(HaveLen(1))
   395  				return err
   396  			}, "key")
   397  			Expect(err).NotTo(HaveOccurred())
   398  
   399  			Expect(ping.Err()).NotTo(HaveOccurred())
   400  			Expect(ping.Val()).To(Equal("PONG"))
   401  		})
   402  
   403  		ring.ForEachShard(func(cl *redis.Client) error {
   404  			pool := cl.Pool()
   405  			Expect(pool.Len()).To(BeNumerically("<=", 10))
   406  			Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
   407  			Expect(pool.Len()).To(Equal(pool.IdleLen()))
   408  
   409  			return nil
   410  		})
   411  	})
   412  })
   413  
   414  var _ = Describe("Ring Tx timeout", func() {
   415  	const heartbeat = 100 * time.Millisecond
   416  
   417  	var ring *redis.Ring
   418  
   419  	AfterEach(func() {
   420  		_ = ring.Close()
   421  	})
   422  
   423  	testTimeout := func() {
   424  		It("Tx timeouts", func() {
   425  			err := ring.Watch(func(tx *redis.Tx) error {
   426  				return tx.Ping().Err()
   427  			}, "foo")
   428  			Expect(err).To(HaveOccurred())
   429  			Expect(err.(net.Error).Timeout()).To(BeTrue())
   430  		})
   431  
   432  		It("Tx Pipeline timeouts", func() {
   433  			err := ring.Watch(func(tx *redis.Tx) error {
   434  				_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
   435  					pipe.Ping()
   436  					return nil
   437  				})
   438  				return err
   439  			}, "foo")
   440  			Expect(err).To(HaveOccurred())
   441  			Expect(err.(net.Error).Timeout()).To(BeTrue())
   442  		})
   443  	}
   444  
   445  	const pause = 5 * time.Second
   446  
   447  	Context("read/write timeout", func() {
   448  		BeforeEach(func() {
   449  			opt := redisRingOptions()
   450  			opt.ReadTimeout = 250 * time.Millisecond
   451  			opt.WriteTimeout = 250 * time.Millisecond
   452  			opt.HeartbeatFrequency = heartbeat
   453  			ring = redis.NewRing(opt)
   454  
   455  			err := ring.ForEachShard(func(client *redis.Client) error {
   456  				return client.ClientPause(pause).Err()
   457  			})
   458  			Expect(err).NotTo(HaveOccurred())
   459  		})
   460  
   461  		AfterEach(func() {
   462  			_ = ring.ForEachShard(func(client *redis.Client) error {
   463  				defer GinkgoRecover()
   464  				Eventually(func() error {
   465  					return client.Ping().Err()
   466  				}, 2*pause).ShouldNot(HaveOccurred())
   467  				return nil
   468  			})
   469  		})
   470  
   471  		testTimeout()
   472  	})
   473  })
   474  

View as plain text