...

Source file src/github.com/go-redis/redis/sentinel.go

Documentation: github.com/go-redis/redis

     1  package redis
     2  
     3  import (
     4  	"crypto/tls"
     5  	"errors"
     6  	"net"
     7  	"strings"
     8  	"sync"
     9  	"time"
    10  
    11  	"github.com/go-redis/redis/internal"
    12  	"github.com/go-redis/redis/internal/pool"
    13  )
    14  
    15  //------------------------------------------------------------------------------
    16  
    17  // FailoverOptions are used to configure a failover client and should
    18  // be passed to NewFailoverClient.
    19  type FailoverOptions struct {
    20  	// The master name.
    21  	MasterName string
    22  	// A seed list of host:port addresses of sentinel nodes.
    23  	SentinelAddrs []string
    24  
    25  	// Following options are copied from Options struct.
    26  
    27  	OnConnect func(*Conn) error
    28  
    29  	Password string
    30  	DB       int
    31  
    32  	MaxRetries      int
    33  	MinRetryBackoff time.Duration
    34  	MaxRetryBackoff time.Duration
    35  
    36  	DialTimeout  time.Duration
    37  	ReadTimeout  time.Duration
    38  	WriteTimeout time.Duration
    39  
    40  	PoolSize           int
    41  	MinIdleConns       int
    42  	MaxConnAge         time.Duration
    43  	PoolTimeout        time.Duration
    44  	IdleTimeout        time.Duration
    45  	IdleCheckFrequency time.Duration
    46  
    47  	TLSConfig *tls.Config
    48  }
    49  
    50  func (opt *FailoverOptions) options() *Options {
    51  	return &Options{
    52  		Addr: "FailoverClient",
    53  
    54  		OnConnect: opt.OnConnect,
    55  
    56  		DB:       opt.DB,
    57  		Password: opt.Password,
    58  
    59  		MaxRetries: opt.MaxRetries,
    60  
    61  		DialTimeout:  opt.DialTimeout,
    62  		ReadTimeout:  opt.ReadTimeout,
    63  		WriteTimeout: opt.WriteTimeout,
    64  
    65  		PoolSize:           opt.PoolSize,
    66  		PoolTimeout:        opt.PoolTimeout,
    67  		IdleTimeout:        opt.IdleTimeout,
    68  		IdleCheckFrequency: opt.IdleCheckFrequency,
    69  
    70  		TLSConfig: opt.TLSConfig,
    71  	}
    72  }
    73  
    74  // NewFailoverClient returns a Redis client that uses Redis Sentinel
    75  // for automatic failover. It's safe for concurrent use by multiple
    76  // goroutines.
    77  func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
    78  	opt := failoverOpt.options()
    79  	opt.init()
    80  
    81  	failover := &sentinelFailover{
    82  		masterName:    failoverOpt.MasterName,
    83  		sentinelAddrs: failoverOpt.SentinelAddrs,
    84  
    85  		opt: opt,
    86  	}
    87  
    88  	c := Client{
    89  		baseClient: baseClient{
    90  			opt:      opt,
    91  			connPool: failover.Pool(),
    92  
    93  			onClose: failover.Close,
    94  		},
    95  	}
    96  	c.baseClient.init()
    97  	c.cmdable.setProcessor(c.Process)
    98  
    99  	return &c
   100  }
   101  
   102  //------------------------------------------------------------------------------
   103  
   104  type SentinelClient struct {
   105  	baseClient
   106  }
   107  
   108  func NewSentinelClient(opt *Options) *SentinelClient {
   109  	opt.init()
   110  	c := &SentinelClient{
   111  		baseClient: baseClient{
   112  			opt:      opt,
   113  			connPool: newConnPool(opt),
   114  		},
   115  	}
   116  	c.baseClient.init()
   117  	return c
   118  }
   119  
   120  func (c *SentinelClient) pubSub() *PubSub {
   121  	pubsub := &PubSub{
   122  		opt: c.opt,
   123  
   124  		newConn: func(channels []string) (*pool.Conn, error) {
   125  			return c.newConn()
   126  		},
   127  		closeConn: c.connPool.CloseConn,
   128  	}
   129  	pubsub.init()
   130  	return pubsub
   131  }
   132  
   133  // Subscribe subscribes the client to the specified channels.
   134  // Channels can be omitted to create empty subscription.
   135  func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
   136  	pubsub := c.pubSub()
   137  	if len(channels) > 0 {
   138  		_ = pubsub.Subscribe(channels...)
   139  	}
   140  	return pubsub
   141  }
   142  
   143  // PSubscribe subscribes the client to the given patterns.
   144  // Patterns can be omitted to create empty subscription.
   145  func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
   146  	pubsub := c.pubSub()
   147  	if len(channels) > 0 {
   148  		_ = pubsub.PSubscribe(channels...)
   149  	}
   150  	return pubsub
   151  }
   152  
   153  func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
   154  	cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
   155  	c.Process(cmd)
   156  	return cmd
   157  }
   158  
   159  func (c *SentinelClient) Sentinels(name string) *SliceCmd {
   160  	cmd := NewSliceCmd("sentinel", "sentinels", name)
   161  	c.Process(cmd)
   162  	return cmd
   163  }
   164  
   165  // Failover forces a failover as if the master was not reachable, and without
   166  // asking for agreement to other Sentinels.
   167  func (c *SentinelClient) Failover(name string) *StatusCmd {
   168  	cmd := NewStatusCmd("sentinel", "failover", name)
   169  	c.Process(cmd)
   170  	return cmd
   171  }
   172  
   173  // Reset resets all the masters with matching name. The pattern argument is a
   174  // glob-style pattern. The reset process clears any previous state in a master
   175  // (including a failover in progress), and removes every slave and sentinel
   176  // already discovered and associated with the master.
   177  func (c *SentinelClient) Reset(pattern string) *IntCmd {
   178  	cmd := NewIntCmd("sentinel", "reset", pattern)
   179  	c.Process(cmd)
   180  	return cmd
   181  }
   182  
   183  // FlushConfig forces Sentinel to rewrite its configuration on disk, including
   184  // the current Sentinel state.
   185  func (c *SentinelClient) FlushConfig() *StatusCmd {
   186  	cmd := NewStatusCmd("sentinel", "flushconfig")
   187  	c.Process(cmd)
   188  	return cmd
   189  }
   190  
   191  // Master shows the state and info of the specified master.
   192  func (c *SentinelClient) Master(name string) *StringStringMapCmd {
   193  	cmd := NewStringStringMapCmd("sentinel", "master", name)
   194  	c.Process(cmd)
   195  	return cmd
   196  }
   197  
   198  type sentinelFailover struct {
   199  	sentinelAddrs []string
   200  
   201  	opt *Options
   202  
   203  	pool     *pool.ConnPool
   204  	poolOnce sync.Once
   205  
   206  	mu          sync.RWMutex
   207  	masterName  string
   208  	_masterAddr string
   209  	sentinel    *SentinelClient
   210  	pubsub      *PubSub
   211  }
   212  
   213  func (c *sentinelFailover) Close() error {
   214  	c.mu.Lock()
   215  	defer c.mu.Unlock()
   216  	if c.sentinel != nil {
   217  		return c.closeSentinel()
   218  	}
   219  	return nil
   220  }
   221  
   222  func (c *sentinelFailover) Pool() *pool.ConnPool {
   223  	c.poolOnce.Do(func() {
   224  		c.opt.Dialer = c.dial
   225  		c.pool = newConnPool(c.opt)
   226  	})
   227  	return c.pool
   228  }
   229  
   230  func (c *sentinelFailover) dial() (net.Conn, error) {
   231  	addr, err := c.MasterAddr()
   232  	if err != nil {
   233  		return nil, err
   234  	}
   235  	return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
   236  }
   237  
   238  func (c *sentinelFailover) MasterAddr() (string, error) {
   239  	addr, err := c.masterAddr()
   240  	if err != nil {
   241  		return "", err
   242  	}
   243  	c.switchMaster(addr)
   244  	return addr, nil
   245  }
   246  
   247  func (c *sentinelFailover) masterAddr() (string, error) {
   248  	c.mu.RLock()
   249  	addr := c.getMasterAddr()
   250  	c.mu.RUnlock()
   251  	if addr != "" {
   252  		return addr, nil
   253  	}
   254  
   255  	c.mu.Lock()
   256  	defer c.mu.Unlock()
   257  
   258  	addr = c.getMasterAddr()
   259  	if addr != "" {
   260  		return addr, nil
   261  	}
   262  
   263  	if c.sentinel != nil {
   264  		c.closeSentinel()
   265  	}
   266  
   267  	for i, sentinelAddr := range c.sentinelAddrs {
   268  		sentinel := NewSentinelClient(&Options{
   269  			Addr: sentinelAddr,
   270  
   271  			MaxRetries: c.opt.MaxRetries,
   272  
   273  			DialTimeout:  c.opt.DialTimeout,
   274  			ReadTimeout:  c.opt.ReadTimeout,
   275  			WriteTimeout: c.opt.WriteTimeout,
   276  
   277  			PoolSize:           c.opt.PoolSize,
   278  			PoolTimeout:        c.opt.PoolTimeout,
   279  			IdleTimeout:        c.opt.IdleTimeout,
   280  			IdleCheckFrequency: c.opt.IdleCheckFrequency,
   281  
   282  			TLSConfig: c.opt.TLSConfig,
   283  		})
   284  
   285  		masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
   286  		if err != nil {
   287  			internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
   288  				c.masterName, err)
   289  			_ = sentinel.Close()
   290  			continue
   291  		}
   292  
   293  		// Push working sentinel to the top.
   294  		c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
   295  		c.setSentinel(sentinel)
   296  
   297  		addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
   298  		return addr, nil
   299  	}
   300  
   301  	return "", errors.New("redis: all sentinels are unreachable")
   302  }
   303  
   304  func (c *sentinelFailover) getMasterAddr() string {
   305  	sentinel := c.sentinel
   306  
   307  	if sentinel == nil {
   308  		return ""
   309  	}
   310  
   311  	addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
   312  	if err != nil {
   313  		internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
   314  			c.masterName, err)
   315  		return ""
   316  	}
   317  
   318  	return net.JoinHostPort(addr[0], addr[1])
   319  }
   320  
   321  func (c *sentinelFailover) switchMaster(addr string) {
   322  	c.mu.RLock()
   323  	masterAddr := c._masterAddr
   324  	c.mu.RUnlock()
   325  	if masterAddr == addr {
   326  		return
   327  	}
   328  
   329  	c.mu.Lock()
   330  	defer c.mu.Unlock()
   331  
   332  	internal.Logf("sentinel: new master=%q addr=%q",
   333  		c.masterName, addr)
   334  	_ = c.Pool().Filter(func(cn *pool.Conn) bool {
   335  		return cn.RemoteAddr().String() != addr
   336  	})
   337  	c._masterAddr = addr
   338  }
   339  
   340  func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
   341  	c.discoverSentinels(sentinel)
   342  	c.sentinel = sentinel
   343  
   344  	c.pubsub = sentinel.Subscribe("+switch-master")
   345  	go c.listen(c.pubsub)
   346  }
   347  
   348  func (c *sentinelFailover) closeSentinel() error {
   349  	var firstErr error
   350  
   351  	err := c.pubsub.Close()
   352  	if err != nil && firstErr == err {
   353  		firstErr = err
   354  	}
   355  	c.pubsub = nil
   356  
   357  	err = c.sentinel.Close()
   358  	if err != nil && firstErr == err {
   359  		firstErr = err
   360  	}
   361  	c.sentinel = nil
   362  
   363  	return firstErr
   364  }
   365  
   366  func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
   367  	sentinels, err := sentinel.Sentinels(c.masterName).Result()
   368  	if err != nil {
   369  		internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
   370  		return
   371  	}
   372  	for _, sentinel := range sentinels {
   373  		vals := sentinel.([]interface{})
   374  		for i := 0; i < len(vals); i += 2 {
   375  			key := vals[i].(string)
   376  			if key == "name" {
   377  				sentinelAddr := vals[i+1].(string)
   378  				if !contains(c.sentinelAddrs, sentinelAddr) {
   379  					internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
   380  						sentinelAddr, c.masterName)
   381  					c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
   382  				}
   383  			}
   384  		}
   385  	}
   386  }
   387  
   388  func (c *sentinelFailover) listen(pubsub *PubSub) {
   389  	ch := pubsub.Channel()
   390  	for {
   391  		msg, ok := <-ch
   392  		if !ok {
   393  			break
   394  		}
   395  
   396  		if msg.Channel == "+switch-master" {
   397  			parts := strings.Split(msg.Payload, " ")
   398  			if parts[0] != c.masterName {
   399  				internal.Logf("sentinel: ignore addr for master=%q", parts[0])
   400  				continue
   401  			}
   402  			addr := net.JoinHostPort(parts[3], parts[4])
   403  			c.switchMaster(addr)
   404  		}
   405  	}
   406  }
   407  
   408  func contains(slice []string, str string) bool {
   409  	for _, s := range slice {
   410  		if s == str {
   411  			return true
   412  		}
   413  	}
   414  	return false
   415  }
   416  

View as plain text