...

Source file src/github.com/go-redis/redis/internal/pool/pool_test.go

Documentation: github.com/go-redis/redis/internal/pool

     1  package pool_test
     2  
     3  import (
     4  	"sync"
     5  	"testing"
     6  	"time"
     7  
     8  	"github.com/go-redis/redis/internal/pool"
     9  
    10  	. "github.com/onsi/ginkgo"
    11  	. "github.com/onsi/gomega"
    12  )
    13  
    14  var _ = Describe("ConnPool", func() {
    15  	var connPool *pool.ConnPool
    16  
    17  	BeforeEach(func() {
    18  		connPool = pool.NewConnPool(&pool.Options{
    19  			Dialer:             dummyDialer,
    20  			PoolSize:           10,
    21  			PoolTimeout:        time.Hour,
    22  			IdleTimeout:        time.Millisecond,
    23  			IdleCheckFrequency: time.Millisecond,
    24  		})
    25  	})
    26  
    27  	AfterEach(func() {
    28  		connPool.Close()
    29  	})
    30  
    31  	It("should unblock client when conn is removed", func() {
    32  		// Reserve one connection.
    33  		cn, err := connPool.Get()
    34  		Expect(err).NotTo(HaveOccurred())
    35  
    36  		// Reserve all other connections.
    37  		var cns []*pool.Conn
    38  		for i := 0; i < 9; i++ {
    39  			cn, err := connPool.Get()
    40  			Expect(err).NotTo(HaveOccurred())
    41  			cns = append(cns, cn)
    42  		}
    43  
    44  		started := make(chan bool, 1)
    45  		done := make(chan bool, 1)
    46  		go func() {
    47  			defer GinkgoRecover()
    48  
    49  			started <- true
    50  			_, err := connPool.Get()
    51  			Expect(err).NotTo(HaveOccurred())
    52  			done <- true
    53  
    54  			connPool.Put(cn)
    55  		}()
    56  		<-started
    57  
    58  		// Check that Get is blocked.
    59  		select {
    60  		case <-done:
    61  			Fail("Get is not blocked")
    62  		case <-time.After(time.Millisecond):
    63  			// ok
    64  		}
    65  
    66  		connPool.Remove(cn, nil)
    67  
    68  		// Check that Get is unblocked.
    69  		select {
    70  		case <-done:
    71  			// ok
    72  		case <-time.After(time.Second):
    73  			Fail("Get is not unblocked")
    74  		}
    75  
    76  		for _, cn := range cns {
    77  			connPool.Put(cn)
    78  		}
    79  	})
    80  })
    81  
    82  var _ = Describe("MinIdleConns", func() {
    83  	const poolSize = 100
    84  	var minIdleConns int
    85  	var connPool *pool.ConnPool
    86  
    87  	newConnPool := func() *pool.ConnPool {
    88  		connPool := pool.NewConnPool(&pool.Options{
    89  			Dialer:             dummyDialer,
    90  			PoolSize:           poolSize,
    91  			MinIdleConns:       minIdleConns,
    92  			PoolTimeout:        100 * time.Millisecond,
    93  			IdleTimeout:        -1,
    94  			IdleCheckFrequency: -1,
    95  		})
    96  		Eventually(func() int {
    97  			return connPool.Len()
    98  		}).Should(Equal(minIdleConns))
    99  		return connPool
   100  	}
   101  
   102  	assert := func() {
   103  		It("has idle connections when created", func() {
   104  			Expect(connPool.Len()).To(Equal(minIdleConns))
   105  			Expect(connPool.IdleLen()).To(Equal(minIdleConns))
   106  		})
   107  
   108  		Context("after Get", func() {
   109  			var cn *pool.Conn
   110  
   111  			BeforeEach(func() {
   112  				var err error
   113  				cn, err = connPool.Get()
   114  				Expect(err).NotTo(HaveOccurred())
   115  
   116  				Eventually(func() int {
   117  					return connPool.Len()
   118  				}).Should(Equal(minIdleConns + 1))
   119  			})
   120  
   121  			It("has idle connections", func() {
   122  				Expect(connPool.Len()).To(Equal(minIdleConns + 1))
   123  				Expect(connPool.IdleLen()).To(Equal(minIdleConns))
   124  			})
   125  
   126  			Context("after Remove", func() {
   127  				BeforeEach(func() {
   128  					connPool.Remove(cn, nil)
   129  				})
   130  
   131  				It("has idle connections", func() {
   132  					Expect(connPool.Len()).To(Equal(minIdleConns))
   133  					Expect(connPool.IdleLen()).To(Equal(minIdleConns))
   134  				})
   135  			})
   136  		})
   137  
   138  		Describe("Get does not exceed pool size", func() {
   139  			var mu sync.RWMutex
   140  			var cns []*pool.Conn
   141  
   142  			BeforeEach(func() {
   143  				cns = make([]*pool.Conn, 0)
   144  
   145  				perform(poolSize, func(_ int) {
   146  					defer GinkgoRecover()
   147  
   148  					cn, err := connPool.Get()
   149  					Expect(err).NotTo(HaveOccurred())
   150  					mu.Lock()
   151  					cns = append(cns, cn)
   152  					mu.Unlock()
   153  				})
   154  
   155  				Eventually(func() int {
   156  					return connPool.Len()
   157  				}).Should(BeNumerically(">=", poolSize))
   158  			})
   159  
   160  			It("Get is blocked", func() {
   161  				done := make(chan struct{})
   162  				go func() {
   163  					connPool.Get()
   164  					close(done)
   165  				}()
   166  
   167  				select {
   168  				case <-done:
   169  					Fail("Get is not blocked")
   170  				case <-time.After(time.Millisecond):
   171  					// ok
   172  				}
   173  
   174  				select {
   175  				case <-done:
   176  					// ok
   177  				case <-time.After(time.Second):
   178  					Fail("Get is not unblocked")
   179  				}
   180  			})
   181  
   182  			Context("after Put", func() {
   183  				BeforeEach(func() {
   184  					perform(len(cns), func(i int) {
   185  						mu.RLock()
   186  						connPool.Put(cns[i])
   187  						mu.RUnlock()
   188  					})
   189  
   190  					Eventually(func() int {
   191  						return connPool.Len()
   192  					}).Should(Equal(poolSize))
   193  				})
   194  
   195  				It("pool.Len is back to normal", func() {
   196  					Expect(connPool.Len()).To(Equal(poolSize))
   197  					Expect(connPool.IdleLen()).To(Equal(poolSize))
   198  				})
   199  			})
   200  
   201  			Context("after Remove", func() {
   202  				BeforeEach(func() {
   203  					perform(len(cns), func(i int) {
   204  						mu.RLock()
   205  						connPool.Remove(cns[i], nil)
   206  						mu.RUnlock()
   207  					})
   208  
   209  					Eventually(func() int {
   210  						return connPool.Len()
   211  					}).Should(Equal(minIdleConns))
   212  				})
   213  
   214  				It("has idle connections", func() {
   215  					Expect(connPool.Len()).To(Equal(minIdleConns))
   216  					Expect(connPool.IdleLen()).To(Equal(minIdleConns))
   217  				})
   218  			})
   219  		})
   220  	}
   221  
   222  	Context("minIdleConns = 1", func() {
   223  		BeforeEach(func() {
   224  			minIdleConns = 1
   225  			connPool = newConnPool()
   226  		})
   227  
   228  		AfterEach(func() {
   229  			connPool.Close()
   230  		})
   231  
   232  		assert()
   233  	})
   234  
   235  	Context("minIdleConns = 32", func() {
   236  		BeforeEach(func() {
   237  			minIdleConns = 32
   238  			connPool = newConnPool()
   239  		})
   240  
   241  		AfterEach(func() {
   242  			connPool.Close()
   243  		})
   244  
   245  		assert()
   246  	})
   247  })
   248  
   249  var _ = Describe("conns reaper", func() {
   250  	const idleTimeout = time.Minute
   251  	const maxAge = time.Hour
   252  
   253  	var connPool *pool.ConnPool
   254  	var conns, staleConns, closedConns []*pool.Conn
   255  
   256  	assert := func(typ string) {
   257  		BeforeEach(func() {
   258  			closedConns = nil
   259  			connPool = pool.NewConnPool(&pool.Options{
   260  				Dialer:             dummyDialer,
   261  				PoolSize:           10,
   262  				IdleTimeout:        idleTimeout,
   263  				MaxConnAge:         maxAge,
   264  				PoolTimeout:        time.Second,
   265  				IdleCheckFrequency: time.Hour,
   266  				OnClose: func(cn *pool.Conn) error {
   267  					closedConns = append(closedConns, cn)
   268  					return nil
   269  				},
   270  			})
   271  
   272  			conns = nil
   273  
   274  			// add stale connections
   275  			staleConns = nil
   276  			for i := 0; i < 3; i++ {
   277  				cn, err := connPool.Get()
   278  				Expect(err).NotTo(HaveOccurred())
   279  				switch typ {
   280  				case "idle":
   281  					cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
   282  				case "aged":
   283  					cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
   284  				}
   285  				conns = append(conns, cn)
   286  				staleConns = append(staleConns, cn)
   287  			}
   288  
   289  			// add fresh connections
   290  			for i := 0; i < 3; i++ {
   291  				cn, err := connPool.Get()
   292  				Expect(err).NotTo(HaveOccurred())
   293  				conns = append(conns, cn)
   294  			}
   295  
   296  			for _, cn := range conns {
   297  				connPool.Put(cn)
   298  			}
   299  
   300  			Expect(connPool.Len()).To(Equal(6))
   301  			Expect(connPool.IdleLen()).To(Equal(6))
   302  
   303  			n, err := connPool.ReapStaleConns()
   304  			Expect(err).NotTo(HaveOccurred())
   305  			Expect(n).To(Equal(3))
   306  		})
   307  
   308  		AfterEach(func() {
   309  			_ = connPool.Close()
   310  			Expect(connPool.Len()).To(Equal(0))
   311  			Expect(connPool.IdleLen()).To(Equal(0))
   312  			Expect(len(closedConns)).To(Equal(len(conns)))
   313  			Expect(closedConns).To(ConsistOf(conns))
   314  		})
   315  
   316  		It("reaps stale connections", func() {
   317  			Expect(connPool.Len()).To(Equal(3))
   318  			Expect(connPool.IdleLen()).To(Equal(3))
   319  		})
   320  
   321  		It("does not reap fresh connections", func() {
   322  			n, err := connPool.ReapStaleConns()
   323  			Expect(err).NotTo(HaveOccurred())
   324  			Expect(n).To(Equal(0))
   325  		})
   326  
   327  		It("stale connections are closed", func() {
   328  			Expect(len(closedConns)).To(Equal(len(staleConns)))
   329  			Expect(closedConns).To(ConsistOf(staleConns))
   330  		})
   331  
   332  		It("pool is functional", func() {
   333  			for j := 0; j < 3; j++ {
   334  				var freeCns []*pool.Conn
   335  				for i := 0; i < 3; i++ {
   336  					cn, err := connPool.Get()
   337  					Expect(err).NotTo(HaveOccurred())
   338  					Expect(cn).NotTo(BeNil())
   339  					freeCns = append(freeCns, cn)
   340  				}
   341  
   342  				Expect(connPool.Len()).To(Equal(3))
   343  				Expect(connPool.IdleLen()).To(Equal(0))
   344  
   345  				cn, err := connPool.Get()
   346  				Expect(err).NotTo(HaveOccurred())
   347  				Expect(cn).NotTo(BeNil())
   348  				conns = append(conns, cn)
   349  
   350  				Expect(connPool.Len()).To(Equal(4))
   351  				Expect(connPool.IdleLen()).To(Equal(0))
   352  
   353  				connPool.Remove(cn, nil)
   354  
   355  				Expect(connPool.Len()).To(Equal(3))
   356  				Expect(connPool.IdleLen()).To(Equal(0))
   357  
   358  				for _, cn := range freeCns {
   359  					connPool.Put(cn)
   360  				}
   361  
   362  				Expect(connPool.Len()).To(Equal(3))
   363  				Expect(connPool.IdleLen()).To(Equal(3))
   364  			}
   365  		})
   366  	}
   367  
   368  	assert("idle")
   369  	assert("aged")
   370  })
   371  
   372  var _ = Describe("race", func() {
   373  	var connPool *pool.ConnPool
   374  	var C, N int
   375  
   376  	BeforeEach(func() {
   377  		C, N = 10, 1000
   378  		if testing.Short() {
   379  			C = 4
   380  			N = 100
   381  		}
   382  	})
   383  
   384  	AfterEach(func() {
   385  		connPool.Close()
   386  	})
   387  
   388  	It("does not happen on Get, Put, and Remove", func() {
   389  		connPool = pool.NewConnPool(&pool.Options{
   390  			Dialer:             dummyDialer,
   391  			PoolSize:           10,
   392  			PoolTimeout:        time.Minute,
   393  			IdleTimeout:        time.Millisecond,
   394  			IdleCheckFrequency: time.Millisecond,
   395  		})
   396  
   397  		perform(C, func(id int) {
   398  			for i := 0; i < N; i++ {
   399  				cn, err := connPool.Get()
   400  				Expect(err).NotTo(HaveOccurred())
   401  				if err == nil {
   402  					connPool.Put(cn)
   403  				}
   404  			}
   405  		}, func(id int) {
   406  			for i := 0; i < N; i++ {
   407  				cn, err := connPool.Get()
   408  				Expect(err).NotTo(HaveOccurred())
   409  				if err == nil {
   410  					connPool.Remove(cn, nil)
   411  				}
   412  			}
   413  		})
   414  	})
   415  })
   416  

View as plain text