...

Source file src/github.com/go-redis/redis/pubsub_test.go

Documentation: github.com/go-redis/redis

     1  package redis_test
     2  
     3  import (
     4  	"io"
     5  	"net"
     6  	"sync"
     7  	"time"
     8  
     9  	"github.com/go-redis/redis"
    10  
    11  	. "github.com/onsi/ginkgo"
    12  	. "github.com/onsi/gomega"
    13  )
    14  
    15  var _ = Describe("PubSub", func() {
    16  	var client *redis.Client
    17  
    18  	BeforeEach(func() {
    19  		opt := redisOptions()
    20  		opt.MinIdleConns = 0
    21  		opt.MaxConnAge = 0
    22  		client = redis.NewClient(opt)
    23  		Expect(client.FlushDB().Err()).NotTo(HaveOccurred())
    24  	})
    25  
    26  	AfterEach(func() {
    27  		Expect(client.Close()).NotTo(HaveOccurred())
    28  	})
    29  
    30  	It("implements Stringer", func() {
    31  		pubsub := client.PSubscribe("mychannel*")
    32  		defer pubsub.Close()
    33  
    34  		Expect(pubsub.String()).To(Equal("PubSub(mychannel*)"))
    35  	})
    36  
    37  	It("should support pattern matching", func() {
    38  		pubsub := client.PSubscribe("mychannel*")
    39  		defer pubsub.Close()
    40  
    41  		{
    42  			msgi, err := pubsub.ReceiveTimeout(time.Second)
    43  			Expect(err).NotTo(HaveOccurred())
    44  			subscr := msgi.(*redis.Subscription)
    45  			Expect(subscr.Kind).To(Equal("psubscribe"))
    46  			Expect(subscr.Channel).To(Equal("mychannel*"))
    47  			Expect(subscr.Count).To(Equal(1))
    48  		}
    49  
    50  		{
    51  			msgi, err := pubsub.ReceiveTimeout(time.Second)
    52  			Expect(err.(net.Error).Timeout()).To(Equal(true))
    53  			Expect(msgi).To(BeNil())
    54  		}
    55  
    56  		n, err := client.Publish("mychannel1", "hello").Result()
    57  		Expect(err).NotTo(HaveOccurred())
    58  		Expect(n).To(Equal(int64(1)))
    59  
    60  		Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred())
    61  
    62  		{
    63  			msgi, err := pubsub.ReceiveTimeout(time.Second)
    64  			Expect(err).NotTo(HaveOccurred())
    65  			subscr := msgi.(*redis.Message)
    66  			Expect(subscr.Channel).To(Equal("mychannel1"))
    67  			Expect(subscr.Pattern).To(Equal("mychannel*"))
    68  			Expect(subscr.Payload).To(Equal("hello"))
    69  		}
    70  
    71  		{
    72  			msgi, err := pubsub.ReceiveTimeout(time.Second)
    73  			Expect(err).NotTo(HaveOccurred())
    74  			subscr := msgi.(*redis.Subscription)
    75  			Expect(subscr.Kind).To(Equal("punsubscribe"))
    76  			Expect(subscr.Channel).To(Equal("mychannel*"))
    77  			Expect(subscr.Count).To(Equal(0))
    78  		}
    79  
    80  		stats := client.PoolStats()
    81  		Expect(stats.Misses).To(Equal(uint32(2)))
    82  	})
    83  
    84  	It("should pub/sub channels", func() {
    85  		channels, err := client.PubSubChannels("mychannel*").Result()
    86  		Expect(err).NotTo(HaveOccurred())
    87  		Expect(channels).To(BeEmpty())
    88  
    89  		pubsub := client.Subscribe("mychannel", "mychannel2")
    90  		defer pubsub.Close()
    91  
    92  		channels, err = client.PubSubChannels("mychannel*").Result()
    93  		Expect(err).NotTo(HaveOccurred())
    94  		Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
    95  
    96  		channels, err = client.PubSubChannels("").Result()
    97  		Expect(err).NotTo(HaveOccurred())
    98  		Expect(channels).To(BeEmpty())
    99  
   100  		channels, err = client.PubSubChannels("*").Result()
   101  		Expect(err).NotTo(HaveOccurred())
   102  		Expect(len(channels)).To(BeNumerically(">=", 2))
   103  	})
   104  
   105  	It("should return the numbers of subscribers", func() {
   106  		pubsub := client.Subscribe("mychannel", "mychannel2")
   107  		defer pubsub.Close()
   108  
   109  		channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
   110  		Expect(err).NotTo(HaveOccurred())
   111  		Expect(channels).To(Equal(map[string]int64{
   112  			"mychannel":  1,
   113  			"mychannel2": 1,
   114  			"mychannel3": 0,
   115  		}))
   116  	})
   117  
   118  	It("should return the numbers of subscribers by pattern", func() {
   119  		num, err := client.PubSubNumPat().Result()
   120  		Expect(err).NotTo(HaveOccurred())
   121  		Expect(num).To(Equal(int64(0)))
   122  
   123  		pubsub := client.PSubscribe("*")
   124  		defer pubsub.Close()
   125  
   126  		num, err = client.PubSubNumPat().Result()
   127  		Expect(err).NotTo(HaveOccurred())
   128  		Expect(num).To(Equal(int64(1)))
   129  	})
   130  
   131  	It("should pub/sub", func() {
   132  		pubsub := client.Subscribe("mychannel", "mychannel2")
   133  		defer pubsub.Close()
   134  
   135  		{
   136  			msgi, err := pubsub.ReceiveTimeout(time.Second)
   137  			Expect(err).NotTo(HaveOccurred())
   138  			subscr := msgi.(*redis.Subscription)
   139  			Expect(subscr.Kind).To(Equal("subscribe"))
   140  			Expect(subscr.Channel).To(Equal("mychannel"))
   141  			Expect(subscr.Count).To(Equal(1))
   142  		}
   143  
   144  		{
   145  			msgi, err := pubsub.ReceiveTimeout(time.Second)
   146  			Expect(err).NotTo(HaveOccurred())
   147  			subscr := msgi.(*redis.Subscription)
   148  			Expect(subscr.Kind).To(Equal("subscribe"))
   149  			Expect(subscr.Channel).To(Equal("mychannel2"))
   150  			Expect(subscr.Count).To(Equal(2))
   151  		}
   152  
   153  		{
   154  			msgi, err := pubsub.ReceiveTimeout(time.Second)
   155  			Expect(err.(net.Error).Timeout()).To(Equal(true))
   156  			Expect(msgi).NotTo(HaveOccurred())
   157  		}
   158  
   159  		n, err := client.Publish("mychannel", "hello").Result()
   160  		Expect(err).NotTo(HaveOccurred())
   161  		Expect(n).To(Equal(int64(1)))
   162  
   163  		n, err = client.Publish("mychannel2", "hello2").Result()
   164  		Expect(err).NotTo(HaveOccurred())
   165  		Expect(n).To(Equal(int64(1)))
   166  
   167  		Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
   168  
   169  		{
   170  			msgi, err := pubsub.ReceiveTimeout(time.Second)
   171  			Expect(err).NotTo(HaveOccurred())
   172  			msg := msgi.(*redis.Message)
   173  			Expect(msg.Channel).To(Equal("mychannel"))
   174  			Expect(msg.Payload).To(Equal("hello"))
   175  		}
   176  
   177  		{
   178  			msgi, err := pubsub.ReceiveTimeout(time.Second)
   179  			Expect(err).NotTo(HaveOccurred())
   180  			msg := msgi.(*redis.Message)
   181  			Expect(msg.Channel).To(Equal("mychannel2"))
   182  			Expect(msg.Payload).To(Equal("hello2"))
   183  		}
   184  
   185  		{
   186  			msgi, err := pubsub.ReceiveTimeout(time.Second)
   187  			Expect(err).NotTo(HaveOccurred())
   188  			subscr := msgi.(*redis.Subscription)
   189  			Expect(subscr.Kind).To(Equal("unsubscribe"))
   190  			Expect(subscr.Channel).To(Equal("mychannel"))
   191  			Expect(subscr.Count).To(Equal(1))
   192  		}
   193  
   194  		{
   195  			msgi, err := pubsub.ReceiveTimeout(time.Second)
   196  			Expect(err).NotTo(HaveOccurred())
   197  			subscr := msgi.(*redis.Subscription)
   198  			Expect(subscr.Kind).To(Equal("unsubscribe"))
   199  			Expect(subscr.Channel).To(Equal("mychannel2"))
   200  			Expect(subscr.Count).To(Equal(0))
   201  		}
   202  
   203  		stats := client.PoolStats()
   204  		Expect(stats.Misses).To(Equal(uint32(2)))
   205  	})
   206  
   207  	It("should ping/pong", func() {
   208  		pubsub := client.Subscribe("mychannel")
   209  		defer pubsub.Close()
   210  
   211  		_, err := pubsub.ReceiveTimeout(time.Second)
   212  		Expect(err).NotTo(HaveOccurred())
   213  
   214  		err = pubsub.Ping("")
   215  		Expect(err).NotTo(HaveOccurred())
   216  
   217  		msgi, err := pubsub.ReceiveTimeout(time.Second)
   218  		Expect(err).NotTo(HaveOccurred())
   219  		pong := msgi.(*redis.Pong)
   220  		Expect(pong.Payload).To(Equal(""))
   221  	})
   222  
   223  	It("should ping/pong with payload", func() {
   224  		pubsub := client.Subscribe("mychannel")
   225  		defer pubsub.Close()
   226  
   227  		_, err := pubsub.ReceiveTimeout(time.Second)
   228  		Expect(err).NotTo(HaveOccurred())
   229  
   230  		err = pubsub.Ping("hello")
   231  		Expect(err).NotTo(HaveOccurred())
   232  
   233  		msgi, err := pubsub.ReceiveTimeout(time.Second)
   234  		Expect(err).NotTo(HaveOccurred())
   235  		pong := msgi.(*redis.Pong)
   236  		Expect(pong.Payload).To(Equal("hello"))
   237  	})
   238  
   239  	It("should multi-ReceiveMessage", func() {
   240  		pubsub := client.Subscribe("mychannel")
   241  		defer pubsub.Close()
   242  
   243  		subscr, err := pubsub.ReceiveTimeout(time.Second)
   244  		Expect(err).NotTo(HaveOccurred())
   245  		Expect(subscr).To(Equal(&redis.Subscription{
   246  			Kind:    "subscribe",
   247  			Channel: "mychannel",
   248  			Count:   1,
   249  		}))
   250  
   251  		err = client.Publish("mychannel", "hello").Err()
   252  		Expect(err).NotTo(HaveOccurred())
   253  
   254  		err = client.Publish("mychannel", "world").Err()
   255  		Expect(err).NotTo(HaveOccurred())
   256  
   257  		msg, err := pubsub.ReceiveMessage()
   258  		Expect(err).NotTo(HaveOccurred())
   259  		Expect(msg.Channel).To(Equal("mychannel"))
   260  		Expect(msg.Payload).To(Equal("hello"))
   261  
   262  		msg, err = pubsub.ReceiveMessage()
   263  		Expect(err).NotTo(HaveOccurred())
   264  		Expect(msg.Channel).To(Equal("mychannel"))
   265  		Expect(msg.Payload).To(Equal("world"))
   266  	})
   267  
   268  	It("returns an error when subscribe fails", func() {
   269  		pubsub := client.Subscribe()
   270  		defer pubsub.Close()
   271  
   272  		pubsub.SetNetConn(&badConn{
   273  			readErr:  io.EOF,
   274  			writeErr: io.EOF,
   275  		})
   276  
   277  		err := pubsub.Subscribe("mychannel")
   278  		Expect(err).To(MatchError("EOF"))
   279  
   280  		err = pubsub.Subscribe("mychannel")
   281  		Expect(err).NotTo(HaveOccurred())
   282  	})
   283  
   284  	expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
   285  		pubsub.SetNetConn(&badConn{
   286  			readErr:  io.EOF,
   287  			writeErr: io.EOF,
   288  		})
   289  
   290  		step := make(chan struct{}, 3)
   291  
   292  		go func() {
   293  			defer GinkgoRecover()
   294  
   295  			Eventually(step).Should(Receive())
   296  			err := client.Publish("mychannel", "hello").Err()
   297  			Expect(err).NotTo(HaveOccurred())
   298  			step <- struct{}{}
   299  		}()
   300  
   301  		_, err := pubsub.ReceiveMessage()
   302  		Expect(err).To(Equal(io.EOF))
   303  		step <- struct{}{}
   304  
   305  		msg, err := pubsub.ReceiveMessage()
   306  		Expect(err).NotTo(HaveOccurred())
   307  		Expect(msg.Channel).To(Equal("mychannel"))
   308  		Expect(msg.Payload).To(Equal("hello"))
   309  
   310  		Eventually(step).Should(Receive())
   311  	}
   312  
   313  	It("Subscribe should reconnect on ReceiveMessage error", func() {
   314  		pubsub := client.Subscribe("mychannel")
   315  		defer pubsub.Close()
   316  
   317  		subscr, err := pubsub.ReceiveTimeout(time.Second)
   318  		Expect(err).NotTo(HaveOccurred())
   319  		Expect(subscr).To(Equal(&redis.Subscription{
   320  			Kind:    "subscribe",
   321  			Channel: "mychannel",
   322  			Count:   1,
   323  		}))
   324  
   325  		expectReceiveMessageOnError(pubsub)
   326  	})
   327  
   328  	It("PSubscribe should reconnect on ReceiveMessage error", func() {
   329  		pubsub := client.PSubscribe("mychannel")
   330  		defer pubsub.Close()
   331  
   332  		subscr, err := pubsub.ReceiveTimeout(time.Second)
   333  		Expect(err).NotTo(HaveOccurred())
   334  		Expect(subscr).To(Equal(&redis.Subscription{
   335  			Kind:    "psubscribe",
   336  			Channel: "mychannel",
   337  			Count:   1,
   338  		}))
   339  
   340  		expectReceiveMessageOnError(pubsub)
   341  	})
   342  
   343  	It("should return on Close", func() {
   344  		pubsub := client.Subscribe("mychannel")
   345  		defer pubsub.Close()
   346  
   347  		var wg sync.WaitGroup
   348  		wg.Add(1)
   349  		go func() {
   350  			defer GinkgoRecover()
   351  
   352  			wg.Done()
   353  			defer wg.Done()
   354  
   355  			_, err := pubsub.ReceiveMessage()
   356  			Expect(err).To(HaveOccurred())
   357  			Expect(err.Error()).To(SatisfyAny(
   358  				Equal("redis: client is closed"),
   359  				ContainSubstring("use of closed network connection"),
   360  			))
   361  		}()
   362  
   363  		wg.Wait()
   364  		wg.Add(1)
   365  
   366  		Expect(pubsub.Close()).NotTo(HaveOccurred())
   367  
   368  		wg.Wait()
   369  	})
   370  
   371  	It("should ReceiveMessage without a subscription", func() {
   372  		timeout := 100 * time.Millisecond
   373  
   374  		pubsub := client.Subscribe()
   375  		defer pubsub.Close()
   376  
   377  		var wg sync.WaitGroup
   378  		wg.Add(1)
   379  		go func() {
   380  			defer GinkgoRecover()
   381  			defer wg.Done()
   382  
   383  			time.Sleep(timeout)
   384  
   385  			err := pubsub.Subscribe("mychannel")
   386  			Expect(err).NotTo(HaveOccurred())
   387  
   388  			time.Sleep(timeout)
   389  
   390  			err = client.Publish("mychannel", "hello").Err()
   391  			Expect(err).NotTo(HaveOccurred())
   392  		}()
   393  
   394  		msg, err := pubsub.ReceiveMessage()
   395  		Expect(err).NotTo(HaveOccurred())
   396  		Expect(msg.Channel).To(Equal("mychannel"))
   397  		Expect(msg.Payload).To(Equal("hello"))
   398  
   399  		wg.Wait()
   400  	})
   401  
   402  	It("handles big message payload", func() {
   403  		pubsub := client.Subscribe("mychannel")
   404  		defer pubsub.Close()
   405  
   406  		ch := pubsub.Channel()
   407  
   408  		bigVal := bigVal()
   409  		err := client.Publish("mychannel", bigVal).Err()
   410  		Expect(err).NotTo(HaveOccurred())
   411  
   412  		var msg *redis.Message
   413  		Eventually(ch).Should(Receive(&msg))
   414  		Expect(msg.Channel).To(Equal("mychannel"))
   415  		Expect(msg.Payload).To(Equal(string(bigVal)))
   416  	})
   417  
   418  	It("supports concurrent Ping and Receive", func() {
   419  		const N = 100
   420  
   421  		pubsub := client.Subscribe("mychannel")
   422  		defer pubsub.Close()
   423  
   424  		done := make(chan struct{})
   425  		go func() {
   426  			defer GinkgoRecover()
   427  
   428  			for i := 0; i < N; i++ {
   429  				_, err := pubsub.ReceiveTimeout(5 * time.Second)
   430  				Expect(err).NotTo(HaveOccurred())
   431  			}
   432  			close(done)
   433  		}()
   434  
   435  		for i := 0; i < N; i++ {
   436  			err := pubsub.Ping()
   437  			Expect(err).NotTo(HaveOccurred())
   438  		}
   439  
   440  		select {
   441  		case <-done:
   442  		case <-time.After(30 * time.Second):
   443  			Fail("timeout")
   444  		}
   445  	})
   446  })
   447  

View as plain text