...

Source file src/github.com/go-redis/redis/main_test.go

Documentation: github.com/go-redis/redis

     1  package redis_test
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"net"
     7  	"os"
     8  	"os/exec"
     9  	"path/filepath"
    10  	"sync"
    11  	"testing"
    12  	"time"
    13  
    14  	"github.com/go-redis/redis"
    15  
    16  	. "github.com/onsi/ginkgo"
    17  	. "github.com/onsi/gomega"
    18  )
    19  
    20  const (
    21  	redisPort          = "6380"
    22  	redisAddr          = ":" + redisPort
    23  	redisSecondaryPort = "6381"
    24  )
    25  
    26  const (
    27  	ringShard1Port = "6390"
    28  	ringShard2Port = "6391"
    29  	ringShard3Port = "6392"
    30  )
    31  
    32  const (
    33  	sentinelName       = "mymaster"
    34  	sentinelMasterPort = "8123"
    35  	sentinelSlave1Port = "8124"
    36  	sentinelSlave2Port = "8125"
    37  	sentinelPort       = "8126"
    38  )
    39  
    40  var (
    41  	redisMain                                                *redisProcess
    42  	ringShard1, ringShard2, ringShard3                       *redisProcess
    43  	sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
    44  )
    45  
    46  var cluster = &clusterScenario{
    47  	ports:     []string{"8220", "8221", "8222", "8223", "8224", "8225"},
    48  	nodeIds:   make([]string, 6),
    49  	processes: make(map[string]*redisProcess, 6),
    50  	clients:   make(map[string]*redis.Client, 6),
    51  }
    52  
    53  var _ = BeforeSuite(func() {
    54  	var err error
    55  
    56  	redisMain, err = startRedis(redisPort)
    57  	Expect(err).NotTo(HaveOccurred())
    58  
    59  	ringShard1, err = startRedis(ringShard1Port)
    60  	Expect(err).NotTo(HaveOccurred())
    61  
    62  	ringShard2, err = startRedis(ringShard2Port)
    63  	Expect(err).NotTo(HaveOccurred())
    64  
    65  	ringShard3, err = startRedis(ringShard3Port)
    66  	Expect(err).NotTo(HaveOccurred())
    67  
    68  	sentinelMaster, err = startRedis(sentinelMasterPort)
    69  	Expect(err).NotTo(HaveOccurred())
    70  
    71  	sentinel, err = startSentinel(sentinelPort, sentinelName, sentinelMasterPort)
    72  	Expect(err).NotTo(HaveOccurred())
    73  
    74  	sentinelSlave1, err = startRedis(
    75  		sentinelSlave1Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
    76  	Expect(err).NotTo(HaveOccurred())
    77  
    78  	sentinelSlave2, err = startRedis(
    79  		sentinelSlave2Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
    80  	Expect(err).NotTo(HaveOccurred())
    81  
    82  	Expect(startCluster(cluster)).NotTo(HaveOccurred())
    83  })
    84  
    85  var _ = AfterSuite(func() {
    86  	Expect(redisMain.Close()).NotTo(HaveOccurred())
    87  
    88  	Expect(ringShard1.Close()).NotTo(HaveOccurred())
    89  	Expect(ringShard2.Close()).NotTo(HaveOccurred())
    90  	Expect(ringShard3.Close()).NotTo(HaveOccurred())
    91  
    92  	Expect(sentinel.Close()).NotTo(HaveOccurred())
    93  	Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
    94  	Expect(sentinelSlave2.Close()).NotTo(HaveOccurred())
    95  	Expect(sentinelMaster.Close()).NotTo(HaveOccurred())
    96  
    97  	Expect(stopCluster(cluster)).NotTo(HaveOccurred())
    98  })
    99  
   100  func TestGinkgoSuite(t *testing.T) {
   101  	RegisterFailHandler(Fail)
   102  	RunSpecs(t, "go-redis")
   103  }
   104  
   105  //------------------------------------------------------------------------------
   106  
   107  func redisOptions() *redis.Options {
   108  	return &redis.Options{
   109  		Addr:               redisAddr,
   110  		DB:                 15,
   111  		DialTimeout:        10 * time.Second,
   112  		ReadTimeout:        30 * time.Second,
   113  		WriteTimeout:       30 * time.Second,
   114  		PoolSize:           10,
   115  		PoolTimeout:        30 * time.Second,
   116  		IdleTimeout:        500 * time.Millisecond,
   117  		IdleCheckFrequency: 500 * time.Millisecond,
   118  	}
   119  }
   120  
   121  func redisClusterOptions() *redis.ClusterOptions {
   122  	return &redis.ClusterOptions{
   123  		DialTimeout:        10 * time.Second,
   124  		ReadTimeout:        30 * time.Second,
   125  		WriteTimeout:       30 * time.Second,
   126  		PoolSize:           10,
   127  		PoolTimeout:        30 * time.Second,
   128  		IdleTimeout:        500 * time.Millisecond,
   129  		IdleCheckFrequency: 500 * time.Millisecond,
   130  	}
   131  }
   132  
   133  func redisRingOptions() *redis.RingOptions {
   134  	return &redis.RingOptions{
   135  		Addrs: map[string]string{
   136  			"ringShardOne": ":" + ringShard1Port,
   137  			"ringShardTwo": ":" + ringShard2Port,
   138  		},
   139  		DialTimeout:        10 * time.Second,
   140  		ReadTimeout:        30 * time.Second,
   141  		WriteTimeout:       30 * time.Second,
   142  		PoolSize:           10,
   143  		PoolTimeout:        30 * time.Second,
   144  		IdleTimeout:        500 * time.Millisecond,
   145  		IdleCheckFrequency: 500 * time.Millisecond,
   146  	}
   147  }
   148  
   149  func performAsync(n int, cbs ...func(int)) *sync.WaitGroup {
   150  	var wg sync.WaitGroup
   151  	for _, cb := range cbs {
   152  		for i := 0; i < n; i++ {
   153  			wg.Add(1)
   154  			go func(cb func(int), i int) {
   155  				defer GinkgoRecover()
   156  				defer wg.Done()
   157  
   158  				cb(i)
   159  			}(cb, i)
   160  		}
   161  	}
   162  	return &wg
   163  }
   164  
   165  func perform(n int, cbs ...func(int)) {
   166  	wg := performAsync(n, cbs...)
   167  	wg.Wait()
   168  }
   169  
   170  func eventually(fn func() error, timeout time.Duration) error {
   171  	errCh := make(chan error, 1)
   172  	done := make(chan struct{})
   173  	exit := make(chan struct{})
   174  
   175  	go func() {
   176  		for {
   177  			err := fn()
   178  			if err == nil {
   179  				close(done)
   180  				return
   181  			}
   182  
   183  			select {
   184  			case errCh <- err:
   185  			default:
   186  			}
   187  
   188  			select {
   189  			case <-exit:
   190  				return
   191  			case <-time.After(timeout / 100):
   192  			}
   193  		}
   194  	}()
   195  
   196  	select {
   197  	case <-done:
   198  		return nil
   199  	case <-time.After(timeout):
   200  		close(exit)
   201  		select {
   202  		case err := <-errCh:
   203  			return err
   204  		default:
   205  			return fmt.Errorf("timeout after %s without an error", timeout)
   206  		}
   207  	}
   208  }
   209  
   210  func execCmd(name string, args ...string) (*os.Process, error) {
   211  	cmd := exec.Command(name, args...)
   212  	if testing.Verbose() {
   213  		cmd.Stdout = os.Stdout
   214  		cmd.Stderr = os.Stderr
   215  	}
   216  	return cmd.Process, cmd.Start()
   217  }
   218  
   219  func connectTo(port string) (*redis.Client, error) {
   220  	client := redis.NewClient(&redis.Options{
   221  		Addr: ":" + port,
   222  	})
   223  
   224  	err := eventually(func() error {
   225  		return client.Ping().Err()
   226  	}, 30*time.Second)
   227  	if err != nil {
   228  		return nil, err
   229  	}
   230  
   231  	return client, nil
   232  }
   233  
   234  type redisProcess struct {
   235  	*os.Process
   236  	*redis.Client
   237  }
   238  
   239  func (p *redisProcess) Close() error {
   240  	if err := p.Kill(); err != nil {
   241  		return err
   242  	}
   243  
   244  	err := eventually(func() error {
   245  		if err := p.Client.Ping().Err(); err != nil {
   246  			return nil
   247  		}
   248  		return errors.New("client is not shutdown")
   249  	}, 10*time.Second)
   250  	if err != nil {
   251  		return err
   252  	}
   253  
   254  	p.Client.Close()
   255  	return nil
   256  }
   257  
   258  var (
   259  	redisServerBin, _  = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server"))
   260  	redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis.conf"))
   261  )
   262  
   263  func redisDir(port string) (string, error) {
   264  	dir, err := filepath.Abs(filepath.Join("testdata", "instances", port))
   265  	if err != nil {
   266  		return "", err
   267  	}
   268  	if err := os.RemoveAll(dir); err != nil {
   269  		return "", err
   270  	}
   271  	if err := os.MkdirAll(dir, 0775); err != nil {
   272  		return "", err
   273  	}
   274  	return dir, nil
   275  }
   276  
   277  func startRedis(port string, args ...string) (*redisProcess, error) {
   278  	dir, err := redisDir(port)
   279  	if err != nil {
   280  		return nil, err
   281  	}
   282  	if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
   283  		return nil, err
   284  	}
   285  
   286  	baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
   287  	process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
   288  	if err != nil {
   289  		return nil, err
   290  	}
   291  
   292  	client, err := connectTo(port)
   293  	if err != nil {
   294  		process.Kill()
   295  		return nil, err
   296  	}
   297  	return &redisProcess{process, client}, err
   298  }
   299  
   300  func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
   301  	dir, err := redisDir(port)
   302  	if err != nil {
   303  		return nil, err
   304  	}
   305  	process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
   306  	if err != nil {
   307  		return nil, err
   308  	}
   309  	client, err := connectTo(port)
   310  	if err != nil {
   311  		process.Kill()
   312  		return nil, err
   313  	}
   314  	for _, cmd := range []*redis.StatusCmd{
   315  		redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"),
   316  		redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
   317  		redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"),
   318  		redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"),
   319  	} {
   320  		client.Process(cmd)
   321  		if err := cmd.Err(); err != nil {
   322  			process.Kill()
   323  			return nil, err
   324  		}
   325  	}
   326  	return &redisProcess{process, client}, nil
   327  }
   328  
   329  //------------------------------------------------------------------------------
   330  
   331  type badConnError string
   332  
   333  func (e badConnError) Error() string   { return string(e) }
   334  func (e badConnError) Timeout() bool   { return false }
   335  func (e badConnError) Temporary() bool { return false }
   336  
   337  type badConn struct {
   338  	net.TCPConn
   339  
   340  	readDelay, writeDelay time.Duration
   341  	readErr, writeErr     error
   342  }
   343  
   344  var _ net.Conn = &badConn{}
   345  
   346  func (cn *badConn) Read([]byte) (int, error) {
   347  	if cn.readDelay != 0 {
   348  		time.Sleep(cn.readDelay)
   349  	}
   350  	if cn.readErr != nil {
   351  		return 0, cn.readErr
   352  	}
   353  	return 0, badConnError("bad connection")
   354  }
   355  
   356  func (cn *badConn) Write([]byte) (int, error) {
   357  	if cn.writeDelay != 0 {
   358  		time.Sleep(cn.writeDelay)
   359  	}
   360  	if cn.writeErr != nil {
   361  		return 0, cn.writeErr
   362  	}
   363  	return 0, badConnError("bad connection")
   364  }
   365  

View as plain text