1 package redis_test
2
3 import (
4 "io"
5 "net"
6 "sync"
7 "time"
8
9 "github.com/go-redis/redis"
10
11 . "github.com/onsi/ginkgo"
12 . "github.com/onsi/gomega"
13 )
14
15 var _ = Describe("PubSub", func() {
16 var client *redis.Client
17
18 BeforeEach(func() {
19 opt := redisOptions()
20 opt.MinIdleConns = 0
21 opt.MaxConnAge = 0
22 client = redis.NewClient(opt)
23 Expect(client.FlushDB().Err()).NotTo(HaveOccurred())
24 })
25
26 AfterEach(func() {
27 Expect(client.Close()).NotTo(HaveOccurred())
28 })
29
30 It("implements Stringer", func() {
31 pubsub := client.PSubscribe("mychannel*")
32 defer pubsub.Close()
33
34 Expect(pubsub.String()).To(Equal("PubSub(mychannel*)"))
35 })
36
37 It("should support pattern matching", func() {
38 pubsub := client.PSubscribe("mychannel*")
39 defer pubsub.Close()
40
41 {
42 msgi, err := pubsub.ReceiveTimeout(time.Second)
43 Expect(err).NotTo(HaveOccurred())
44 subscr := msgi.(*redis.Subscription)
45 Expect(subscr.Kind).To(Equal("psubscribe"))
46 Expect(subscr.Channel).To(Equal("mychannel*"))
47 Expect(subscr.Count).To(Equal(1))
48 }
49
50 {
51 msgi, err := pubsub.ReceiveTimeout(time.Second)
52 Expect(err.(net.Error).Timeout()).To(Equal(true))
53 Expect(msgi).To(BeNil())
54 }
55
56 n, err := client.Publish("mychannel1", "hello").Result()
57 Expect(err).NotTo(HaveOccurred())
58 Expect(n).To(Equal(int64(1)))
59
60 Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred())
61
62 {
63 msgi, err := pubsub.ReceiveTimeout(time.Second)
64 Expect(err).NotTo(HaveOccurred())
65 subscr := msgi.(*redis.Message)
66 Expect(subscr.Channel).To(Equal("mychannel1"))
67 Expect(subscr.Pattern).To(Equal("mychannel*"))
68 Expect(subscr.Payload).To(Equal("hello"))
69 }
70
71 {
72 msgi, err := pubsub.ReceiveTimeout(time.Second)
73 Expect(err).NotTo(HaveOccurred())
74 subscr := msgi.(*redis.Subscription)
75 Expect(subscr.Kind).To(Equal("punsubscribe"))
76 Expect(subscr.Channel).To(Equal("mychannel*"))
77 Expect(subscr.Count).To(Equal(0))
78 }
79
80 stats := client.PoolStats()
81 Expect(stats.Misses).To(Equal(uint32(2)))
82 })
83
84 It("should pub/sub channels", func() {
85 channels, err := client.PubSubChannels("mychannel*").Result()
86 Expect(err).NotTo(HaveOccurred())
87 Expect(channels).To(BeEmpty())
88
89 pubsub := client.Subscribe("mychannel", "mychannel2")
90 defer pubsub.Close()
91
92 channels, err = client.PubSubChannels("mychannel*").Result()
93 Expect(err).NotTo(HaveOccurred())
94 Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
95
96 channels, err = client.PubSubChannels("").Result()
97 Expect(err).NotTo(HaveOccurred())
98 Expect(channels).To(BeEmpty())
99
100 channels, err = client.PubSubChannels("*").Result()
101 Expect(err).NotTo(HaveOccurred())
102 Expect(len(channels)).To(BeNumerically(">=", 2))
103 })
104
105 It("should return the numbers of subscribers", func() {
106 pubsub := client.Subscribe("mychannel", "mychannel2")
107 defer pubsub.Close()
108
109 channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
110 Expect(err).NotTo(HaveOccurred())
111 Expect(channels).To(Equal(map[string]int64{
112 "mychannel": 1,
113 "mychannel2": 1,
114 "mychannel3": 0,
115 }))
116 })
117
118 It("should return the numbers of subscribers by pattern", func() {
119 num, err := client.PubSubNumPat().Result()
120 Expect(err).NotTo(HaveOccurred())
121 Expect(num).To(Equal(int64(0)))
122
123 pubsub := client.PSubscribe("*")
124 defer pubsub.Close()
125
126 num, err = client.PubSubNumPat().Result()
127 Expect(err).NotTo(HaveOccurred())
128 Expect(num).To(Equal(int64(1)))
129 })
130
131 It("should pub/sub", func() {
132 pubsub := client.Subscribe("mychannel", "mychannel2")
133 defer pubsub.Close()
134
135 {
136 msgi, err := pubsub.ReceiveTimeout(time.Second)
137 Expect(err).NotTo(HaveOccurred())
138 subscr := msgi.(*redis.Subscription)
139 Expect(subscr.Kind).To(Equal("subscribe"))
140 Expect(subscr.Channel).To(Equal("mychannel"))
141 Expect(subscr.Count).To(Equal(1))
142 }
143
144 {
145 msgi, err := pubsub.ReceiveTimeout(time.Second)
146 Expect(err).NotTo(HaveOccurred())
147 subscr := msgi.(*redis.Subscription)
148 Expect(subscr.Kind).To(Equal("subscribe"))
149 Expect(subscr.Channel).To(Equal("mychannel2"))
150 Expect(subscr.Count).To(Equal(2))
151 }
152
153 {
154 msgi, err := pubsub.ReceiveTimeout(time.Second)
155 Expect(err.(net.Error).Timeout()).To(Equal(true))
156 Expect(msgi).NotTo(HaveOccurred())
157 }
158
159 n, err := client.Publish("mychannel", "hello").Result()
160 Expect(err).NotTo(HaveOccurred())
161 Expect(n).To(Equal(int64(1)))
162
163 n, err = client.Publish("mychannel2", "hello2").Result()
164 Expect(err).NotTo(HaveOccurred())
165 Expect(n).To(Equal(int64(1)))
166
167 Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
168
169 {
170 msgi, err := pubsub.ReceiveTimeout(time.Second)
171 Expect(err).NotTo(HaveOccurred())
172 msg := msgi.(*redis.Message)
173 Expect(msg.Channel).To(Equal("mychannel"))
174 Expect(msg.Payload).To(Equal("hello"))
175 }
176
177 {
178 msgi, err := pubsub.ReceiveTimeout(time.Second)
179 Expect(err).NotTo(HaveOccurred())
180 msg := msgi.(*redis.Message)
181 Expect(msg.Channel).To(Equal("mychannel2"))
182 Expect(msg.Payload).To(Equal("hello2"))
183 }
184
185 {
186 msgi, err := pubsub.ReceiveTimeout(time.Second)
187 Expect(err).NotTo(HaveOccurred())
188 subscr := msgi.(*redis.Subscription)
189 Expect(subscr.Kind).To(Equal("unsubscribe"))
190 Expect(subscr.Channel).To(Equal("mychannel"))
191 Expect(subscr.Count).To(Equal(1))
192 }
193
194 {
195 msgi, err := pubsub.ReceiveTimeout(time.Second)
196 Expect(err).NotTo(HaveOccurred())
197 subscr := msgi.(*redis.Subscription)
198 Expect(subscr.Kind).To(Equal("unsubscribe"))
199 Expect(subscr.Channel).To(Equal("mychannel2"))
200 Expect(subscr.Count).To(Equal(0))
201 }
202
203 stats := client.PoolStats()
204 Expect(stats.Misses).To(Equal(uint32(2)))
205 })
206
207 It("should ping/pong", func() {
208 pubsub := client.Subscribe("mychannel")
209 defer pubsub.Close()
210
211 _, err := pubsub.ReceiveTimeout(time.Second)
212 Expect(err).NotTo(HaveOccurred())
213
214 err = pubsub.Ping("")
215 Expect(err).NotTo(HaveOccurred())
216
217 msgi, err := pubsub.ReceiveTimeout(time.Second)
218 Expect(err).NotTo(HaveOccurred())
219 pong := msgi.(*redis.Pong)
220 Expect(pong.Payload).To(Equal(""))
221 })
222
223 It("should ping/pong with payload", func() {
224 pubsub := client.Subscribe("mychannel")
225 defer pubsub.Close()
226
227 _, err := pubsub.ReceiveTimeout(time.Second)
228 Expect(err).NotTo(HaveOccurred())
229
230 err = pubsub.Ping("hello")
231 Expect(err).NotTo(HaveOccurred())
232
233 msgi, err := pubsub.ReceiveTimeout(time.Second)
234 Expect(err).NotTo(HaveOccurred())
235 pong := msgi.(*redis.Pong)
236 Expect(pong.Payload).To(Equal("hello"))
237 })
238
239 It("should multi-ReceiveMessage", func() {
240 pubsub := client.Subscribe("mychannel")
241 defer pubsub.Close()
242
243 subscr, err := pubsub.ReceiveTimeout(time.Second)
244 Expect(err).NotTo(HaveOccurred())
245 Expect(subscr).To(Equal(&redis.Subscription{
246 Kind: "subscribe",
247 Channel: "mychannel",
248 Count: 1,
249 }))
250
251 err = client.Publish("mychannel", "hello").Err()
252 Expect(err).NotTo(HaveOccurred())
253
254 err = client.Publish("mychannel", "world").Err()
255 Expect(err).NotTo(HaveOccurred())
256
257 msg, err := pubsub.ReceiveMessage()
258 Expect(err).NotTo(HaveOccurred())
259 Expect(msg.Channel).To(Equal("mychannel"))
260 Expect(msg.Payload).To(Equal("hello"))
261
262 msg, err = pubsub.ReceiveMessage()
263 Expect(err).NotTo(HaveOccurred())
264 Expect(msg.Channel).To(Equal("mychannel"))
265 Expect(msg.Payload).To(Equal("world"))
266 })
267
268 It("returns an error when subscribe fails", func() {
269 pubsub := client.Subscribe()
270 defer pubsub.Close()
271
272 pubsub.SetNetConn(&badConn{
273 readErr: io.EOF,
274 writeErr: io.EOF,
275 })
276
277 err := pubsub.Subscribe("mychannel")
278 Expect(err).To(MatchError("EOF"))
279
280 err = pubsub.Subscribe("mychannel")
281 Expect(err).NotTo(HaveOccurred())
282 })
283
284 expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
285 pubsub.SetNetConn(&badConn{
286 readErr: io.EOF,
287 writeErr: io.EOF,
288 })
289
290 step := make(chan struct{}, 3)
291
292 go func() {
293 defer GinkgoRecover()
294
295 Eventually(step).Should(Receive())
296 err := client.Publish("mychannel", "hello").Err()
297 Expect(err).NotTo(HaveOccurred())
298 step <- struct{}{}
299 }()
300
301 _, err := pubsub.ReceiveMessage()
302 Expect(err).To(Equal(io.EOF))
303 step <- struct{}{}
304
305 msg, err := pubsub.ReceiveMessage()
306 Expect(err).NotTo(HaveOccurred())
307 Expect(msg.Channel).To(Equal("mychannel"))
308 Expect(msg.Payload).To(Equal("hello"))
309
310 Eventually(step).Should(Receive())
311 }
312
313 It("Subscribe should reconnect on ReceiveMessage error", func() {
314 pubsub := client.Subscribe("mychannel")
315 defer pubsub.Close()
316
317 subscr, err := pubsub.ReceiveTimeout(time.Second)
318 Expect(err).NotTo(HaveOccurred())
319 Expect(subscr).To(Equal(&redis.Subscription{
320 Kind: "subscribe",
321 Channel: "mychannel",
322 Count: 1,
323 }))
324
325 expectReceiveMessageOnError(pubsub)
326 })
327
328 It("PSubscribe should reconnect on ReceiveMessage error", func() {
329 pubsub := client.PSubscribe("mychannel")
330 defer pubsub.Close()
331
332 subscr, err := pubsub.ReceiveTimeout(time.Second)
333 Expect(err).NotTo(HaveOccurred())
334 Expect(subscr).To(Equal(&redis.Subscription{
335 Kind: "psubscribe",
336 Channel: "mychannel",
337 Count: 1,
338 }))
339
340 expectReceiveMessageOnError(pubsub)
341 })
342
343 It("should return on Close", func() {
344 pubsub := client.Subscribe("mychannel")
345 defer pubsub.Close()
346
347 var wg sync.WaitGroup
348 wg.Add(1)
349 go func() {
350 defer GinkgoRecover()
351
352 wg.Done()
353 defer wg.Done()
354
355 _, err := pubsub.ReceiveMessage()
356 Expect(err).To(HaveOccurred())
357 Expect(err.Error()).To(SatisfyAny(
358 Equal("redis: client is closed"),
359 ContainSubstring("use of closed network connection"),
360 ))
361 }()
362
363 wg.Wait()
364 wg.Add(1)
365
366 Expect(pubsub.Close()).NotTo(HaveOccurred())
367
368 wg.Wait()
369 })
370
371 It("should ReceiveMessage without a subscription", func() {
372 timeout := 100 * time.Millisecond
373
374 pubsub := client.Subscribe()
375 defer pubsub.Close()
376
377 var wg sync.WaitGroup
378 wg.Add(1)
379 go func() {
380 defer GinkgoRecover()
381 defer wg.Done()
382
383 time.Sleep(timeout)
384
385 err := pubsub.Subscribe("mychannel")
386 Expect(err).NotTo(HaveOccurred())
387
388 time.Sleep(timeout)
389
390 err = client.Publish("mychannel", "hello").Err()
391 Expect(err).NotTo(HaveOccurred())
392 }()
393
394 msg, err := pubsub.ReceiveMessage()
395 Expect(err).NotTo(HaveOccurred())
396 Expect(msg.Channel).To(Equal("mychannel"))
397 Expect(msg.Payload).To(Equal("hello"))
398
399 wg.Wait()
400 })
401
402 It("handles big message payload", func() {
403 pubsub := client.Subscribe("mychannel")
404 defer pubsub.Close()
405
406 ch := pubsub.Channel()
407
408 bigVal := bigVal()
409 err := client.Publish("mychannel", bigVal).Err()
410 Expect(err).NotTo(HaveOccurred())
411
412 var msg *redis.Message
413 Eventually(ch).Should(Receive(&msg))
414 Expect(msg.Channel).To(Equal("mychannel"))
415 Expect(msg.Payload).To(Equal(string(bigVal)))
416 })
417
418 It("supports concurrent Ping and Receive", func() {
419 const N = 100
420
421 pubsub := client.Subscribe("mychannel")
422 defer pubsub.Close()
423
424 done := make(chan struct{})
425 go func() {
426 defer GinkgoRecover()
427
428 for i := 0; i < N; i++ {
429 _, err := pubsub.ReceiveTimeout(5 * time.Second)
430 Expect(err).NotTo(HaveOccurred())
431 }
432 close(done)
433 }()
434
435 for i := 0; i < N; i++ {
436 err := pubsub.Ping()
437 Expect(err).NotTo(HaveOccurred())
438 }
439
440 select {
441 case <-done:
442 case <-time.After(30 * time.Second):
443 Fail("timeout")
444 }
445 })
446 })
447
View as plain text