1 package redis
2
3 import (
4 "crypto/tls"
5 "errors"
6 "net"
7 "strings"
8 "sync"
9 "time"
10
11 "github.com/go-redis/redis/internal"
12 "github.com/go-redis/redis/internal/pool"
13 )
14
15
16
17
18
19 type FailoverOptions struct {
20
21 MasterName string
22
23 SentinelAddrs []string
24
25
26
27 OnConnect func(*Conn) error
28
29 Password string
30 DB int
31
32 MaxRetries int
33 MinRetryBackoff time.Duration
34 MaxRetryBackoff time.Duration
35
36 DialTimeout time.Duration
37 ReadTimeout time.Duration
38 WriteTimeout time.Duration
39
40 PoolSize int
41 MinIdleConns int
42 MaxConnAge time.Duration
43 PoolTimeout time.Duration
44 IdleTimeout time.Duration
45 IdleCheckFrequency time.Duration
46
47 TLSConfig *tls.Config
48 }
49
50 func (opt *FailoverOptions) options() *Options {
51 return &Options{
52 Addr: "FailoverClient",
53
54 OnConnect: opt.OnConnect,
55
56 DB: opt.DB,
57 Password: opt.Password,
58
59 MaxRetries: opt.MaxRetries,
60
61 DialTimeout: opt.DialTimeout,
62 ReadTimeout: opt.ReadTimeout,
63 WriteTimeout: opt.WriteTimeout,
64
65 PoolSize: opt.PoolSize,
66 PoolTimeout: opt.PoolTimeout,
67 IdleTimeout: opt.IdleTimeout,
68 IdleCheckFrequency: opt.IdleCheckFrequency,
69
70 TLSConfig: opt.TLSConfig,
71 }
72 }
73
74
75
76
77 func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
78 opt := failoverOpt.options()
79 opt.init()
80
81 failover := &sentinelFailover{
82 masterName: failoverOpt.MasterName,
83 sentinelAddrs: failoverOpt.SentinelAddrs,
84
85 opt: opt,
86 }
87
88 c := Client{
89 baseClient: baseClient{
90 opt: opt,
91 connPool: failover.Pool(),
92
93 onClose: failover.Close,
94 },
95 }
96 c.baseClient.init()
97 c.cmdable.setProcessor(c.Process)
98
99 return &c
100 }
101
102
103
104 type SentinelClient struct {
105 baseClient
106 }
107
108 func NewSentinelClient(opt *Options) *SentinelClient {
109 opt.init()
110 c := &SentinelClient{
111 baseClient: baseClient{
112 opt: opt,
113 connPool: newConnPool(opt),
114 },
115 }
116 c.baseClient.init()
117 return c
118 }
119
120 func (c *SentinelClient) pubSub() *PubSub {
121 pubsub := &PubSub{
122 opt: c.opt,
123
124 newConn: func(channels []string) (*pool.Conn, error) {
125 return c.newConn()
126 },
127 closeConn: c.connPool.CloseConn,
128 }
129 pubsub.init()
130 return pubsub
131 }
132
133
134
135 func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
136 pubsub := c.pubSub()
137 if len(channels) > 0 {
138 _ = pubsub.Subscribe(channels...)
139 }
140 return pubsub
141 }
142
143
144
145 func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
146 pubsub := c.pubSub()
147 if len(channels) > 0 {
148 _ = pubsub.PSubscribe(channels...)
149 }
150 return pubsub
151 }
152
153 func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
154 cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
155 c.Process(cmd)
156 return cmd
157 }
158
159 func (c *SentinelClient) Sentinels(name string) *SliceCmd {
160 cmd := NewSliceCmd("sentinel", "sentinels", name)
161 c.Process(cmd)
162 return cmd
163 }
164
165
166
167 func (c *SentinelClient) Failover(name string) *StatusCmd {
168 cmd := NewStatusCmd("sentinel", "failover", name)
169 c.Process(cmd)
170 return cmd
171 }
172
173
174
175
176
177 func (c *SentinelClient) Reset(pattern string) *IntCmd {
178 cmd := NewIntCmd("sentinel", "reset", pattern)
179 c.Process(cmd)
180 return cmd
181 }
182
183
184
185 func (c *SentinelClient) FlushConfig() *StatusCmd {
186 cmd := NewStatusCmd("sentinel", "flushconfig")
187 c.Process(cmd)
188 return cmd
189 }
190
191
192 func (c *SentinelClient) Master(name string) *StringStringMapCmd {
193 cmd := NewStringStringMapCmd("sentinel", "master", name)
194 c.Process(cmd)
195 return cmd
196 }
197
198 type sentinelFailover struct {
199 sentinelAddrs []string
200
201 opt *Options
202
203 pool *pool.ConnPool
204 poolOnce sync.Once
205
206 mu sync.RWMutex
207 masterName string
208 _masterAddr string
209 sentinel *SentinelClient
210 pubsub *PubSub
211 }
212
213 func (c *sentinelFailover) Close() error {
214 c.mu.Lock()
215 defer c.mu.Unlock()
216 if c.sentinel != nil {
217 return c.closeSentinel()
218 }
219 return nil
220 }
221
222 func (c *sentinelFailover) Pool() *pool.ConnPool {
223 c.poolOnce.Do(func() {
224 c.opt.Dialer = c.dial
225 c.pool = newConnPool(c.opt)
226 })
227 return c.pool
228 }
229
230 func (c *sentinelFailover) dial() (net.Conn, error) {
231 addr, err := c.MasterAddr()
232 if err != nil {
233 return nil, err
234 }
235 return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
236 }
237
238 func (c *sentinelFailover) MasterAddr() (string, error) {
239 addr, err := c.masterAddr()
240 if err != nil {
241 return "", err
242 }
243 c.switchMaster(addr)
244 return addr, nil
245 }
246
247 func (c *sentinelFailover) masterAddr() (string, error) {
248 c.mu.RLock()
249 addr := c.getMasterAddr()
250 c.mu.RUnlock()
251 if addr != "" {
252 return addr, nil
253 }
254
255 c.mu.Lock()
256 defer c.mu.Unlock()
257
258 addr = c.getMasterAddr()
259 if addr != "" {
260 return addr, nil
261 }
262
263 if c.sentinel != nil {
264 c.closeSentinel()
265 }
266
267 for i, sentinelAddr := range c.sentinelAddrs {
268 sentinel := NewSentinelClient(&Options{
269 Addr: sentinelAddr,
270
271 MaxRetries: c.opt.MaxRetries,
272
273 DialTimeout: c.opt.DialTimeout,
274 ReadTimeout: c.opt.ReadTimeout,
275 WriteTimeout: c.opt.WriteTimeout,
276
277 PoolSize: c.opt.PoolSize,
278 PoolTimeout: c.opt.PoolTimeout,
279 IdleTimeout: c.opt.IdleTimeout,
280 IdleCheckFrequency: c.opt.IdleCheckFrequency,
281
282 TLSConfig: c.opt.TLSConfig,
283 })
284
285 masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
286 if err != nil {
287 internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
288 c.masterName, err)
289 _ = sentinel.Close()
290 continue
291 }
292
293
294 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
295 c.setSentinel(sentinel)
296
297 addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
298 return addr, nil
299 }
300
301 return "", errors.New("redis: all sentinels are unreachable")
302 }
303
304 func (c *sentinelFailover) getMasterAddr() string {
305 sentinel := c.sentinel
306
307 if sentinel == nil {
308 return ""
309 }
310
311 addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
312 if err != nil {
313 internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
314 c.masterName, err)
315 return ""
316 }
317
318 return net.JoinHostPort(addr[0], addr[1])
319 }
320
321 func (c *sentinelFailover) switchMaster(addr string) {
322 c.mu.RLock()
323 masterAddr := c._masterAddr
324 c.mu.RUnlock()
325 if masterAddr == addr {
326 return
327 }
328
329 c.mu.Lock()
330 defer c.mu.Unlock()
331
332 internal.Logf("sentinel: new master=%q addr=%q",
333 c.masterName, addr)
334 _ = c.Pool().Filter(func(cn *pool.Conn) bool {
335 return cn.RemoteAddr().String() != addr
336 })
337 c._masterAddr = addr
338 }
339
340 func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
341 c.discoverSentinels(sentinel)
342 c.sentinel = sentinel
343
344 c.pubsub = sentinel.Subscribe("+switch-master")
345 go c.listen(c.pubsub)
346 }
347
348 func (c *sentinelFailover) closeSentinel() error {
349 var firstErr error
350
351 err := c.pubsub.Close()
352 if err != nil && firstErr == err {
353 firstErr = err
354 }
355 c.pubsub = nil
356
357 err = c.sentinel.Close()
358 if err != nil && firstErr == err {
359 firstErr = err
360 }
361 c.sentinel = nil
362
363 return firstErr
364 }
365
366 func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
367 sentinels, err := sentinel.Sentinels(c.masterName).Result()
368 if err != nil {
369 internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
370 return
371 }
372 for _, sentinel := range sentinels {
373 vals := sentinel.([]interface{})
374 for i := 0; i < len(vals); i += 2 {
375 key := vals[i].(string)
376 if key == "name" {
377 sentinelAddr := vals[i+1].(string)
378 if !contains(c.sentinelAddrs, sentinelAddr) {
379 internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
380 sentinelAddr, c.masterName)
381 c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
382 }
383 }
384 }
385 }
386 }
387
388 func (c *sentinelFailover) listen(pubsub *PubSub) {
389 ch := pubsub.Channel()
390 for {
391 msg, ok := <-ch
392 if !ok {
393 break
394 }
395
396 if msg.Channel == "+switch-master" {
397 parts := strings.Split(msg.Payload, " ")
398 if parts[0] != c.masterName {
399 internal.Logf("sentinel: ignore addr for master=%q", parts[0])
400 continue
401 }
402 addr := net.JoinHostPort(parts[3], parts[4])
403 c.switchMaster(addr)
404 }
405 }
406 }
407
408 func contains(slice []string, str string) bool {
409 for _, s := range slice {
410 if s == str {
411 return true
412 }
413 }
414 return false
415 }
416
View as plain text