...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package redis
16
17 import (
18 "errors"
19 "time"
20 )
21
22
23 type Subscription struct {
24
25 Kind string
26
27
28 Channel string
29
30
31 Count int
32 }
33
34
35 type Message struct {
36
37 Channel string
38
39
40 Pattern string
41
42
43 Data []byte
44 }
45
46
47 type Pong struct {
48 Data string
49 }
50
51
52 type PubSubConn struct {
53 Conn Conn
54 }
55
56
57 func (c PubSubConn) Close() error {
58 return c.Conn.Close()
59 }
60
61
62 func (c PubSubConn) Subscribe(channel ...interface{}) error {
63 c.Conn.Send("SUBSCRIBE", channel...)
64 return c.Conn.Flush()
65 }
66
67
68 func (c PubSubConn) PSubscribe(channel ...interface{}) error {
69 c.Conn.Send("PSUBSCRIBE", channel...)
70 return c.Conn.Flush()
71 }
72
73
74
75 func (c PubSubConn) Unsubscribe(channel ...interface{}) error {
76 c.Conn.Send("UNSUBSCRIBE", channel...)
77 return c.Conn.Flush()
78 }
79
80
81
82 func (c PubSubConn) PUnsubscribe(channel ...interface{}) error {
83 c.Conn.Send("PUNSUBSCRIBE", channel...)
84 return c.Conn.Flush()
85 }
86
87
88
89
90
91 func (c PubSubConn) Ping(data string) error {
92 c.Conn.Send("PING", data)
93 return c.Conn.Flush()
94 }
95
96
97
98
99 func (c PubSubConn) Receive() interface{} {
100 return c.receiveInternal(c.Conn.Receive())
101 }
102
103
104
105 func (c PubSubConn) ReceiveWithTimeout(timeout time.Duration) interface{} {
106 return c.receiveInternal(ReceiveWithTimeout(c.Conn, timeout))
107 }
108
109 func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} {
110 reply, err := Values(replyArg, errArg)
111 if err != nil {
112 return err
113 }
114
115 var kind string
116 reply, err = Scan(reply, &kind)
117 if err != nil {
118 return err
119 }
120
121 switch kind {
122 case "message":
123 var m Message
124 if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {
125 return err
126 }
127 return m
128 case "pmessage":
129 var m Message
130 if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {
131 return err
132 }
133 return m
134 case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
135 s := Subscription{Kind: kind}
136 if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
137 return err
138 }
139 return s
140 case "pong":
141 var p Pong
142 if _, err := Scan(reply, &p.Data); err != nil {
143 return err
144 }
145 return p
146 }
147 return errors.New("redigo: unknown pubsub notification")
148 }
149
View as plain text