1 package pool_test
2
3 import (
4 "sync"
5 "testing"
6 "time"
7
8 "github.com/go-redis/redis/internal/pool"
9
10 . "github.com/onsi/ginkgo"
11 . "github.com/onsi/gomega"
12 )
13
14 var _ = Describe("ConnPool", func() {
15 var connPool *pool.ConnPool
16
17 BeforeEach(func() {
18 connPool = pool.NewConnPool(&pool.Options{
19 Dialer: dummyDialer,
20 PoolSize: 10,
21 PoolTimeout: time.Hour,
22 IdleTimeout: time.Millisecond,
23 IdleCheckFrequency: time.Millisecond,
24 })
25 })
26
27 AfterEach(func() {
28 connPool.Close()
29 })
30
31 It("should unblock client when conn is removed", func() {
32
33 cn, err := connPool.Get()
34 Expect(err).NotTo(HaveOccurred())
35
36
37 var cns []*pool.Conn
38 for i := 0; i < 9; i++ {
39 cn, err := connPool.Get()
40 Expect(err).NotTo(HaveOccurred())
41 cns = append(cns, cn)
42 }
43
44 started := make(chan bool, 1)
45 done := make(chan bool, 1)
46 go func() {
47 defer GinkgoRecover()
48
49 started <- true
50 _, err := connPool.Get()
51 Expect(err).NotTo(HaveOccurred())
52 done <- true
53
54 connPool.Put(cn)
55 }()
56 <-started
57
58
59 select {
60 case <-done:
61 Fail("Get is not blocked")
62 case <-time.After(time.Millisecond):
63
64 }
65
66 connPool.Remove(cn, nil)
67
68
69 select {
70 case <-done:
71
72 case <-time.After(time.Second):
73 Fail("Get is not unblocked")
74 }
75
76 for _, cn := range cns {
77 connPool.Put(cn)
78 }
79 })
80 })
81
82 var _ = Describe("MinIdleConns", func() {
83 const poolSize = 100
84 var minIdleConns int
85 var connPool *pool.ConnPool
86
87 newConnPool := func() *pool.ConnPool {
88 connPool := pool.NewConnPool(&pool.Options{
89 Dialer: dummyDialer,
90 PoolSize: poolSize,
91 MinIdleConns: minIdleConns,
92 PoolTimeout: 100 * time.Millisecond,
93 IdleTimeout: -1,
94 IdleCheckFrequency: -1,
95 })
96 Eventually(func() int {
97 return connPool.Len()
98 }).Should(Equal(minIdleConns))
99 return connPool
100 }
101
102 assert := func() {
103 It("has idle connections when created", func() {
104 Expect(connPool.Len()).To(Equal(minIdleConns))
105 Expect(connPool.IdleLen()).To(Equal(minIdleConns))
106 })
107
108 Context("after Get", func() {
109 var cn *pool.Conn
110
111 BeforeEach(func() {
112 var err error
113 cn, err = connPool.Get()
114 Expect(err).NotTo(HaveOccurred())
115
116 Eventually(func() int {
117 return connPool.Len()
118 }).Should(Equal(minIdleConns + 1))
119 })
120
121 It("has idle connections", func() {
122 Expect(connPool.Len()).To(Equal(minIdleConns + 1))
123 Expect(connPool.IdleLen()).To(Equal(minIdleConns))
124 })
125
126 Context("after Remove", func() {
127 BeforeEach(func() {
128 connPool.Remove(cn, nil)
129 })
130
131 It("has idle connections", func() {
132 Expect(connPool.Len()).To(Equal(minIdleConns))
133 Expect(connPool.IdleLen()).To(Equal(minIdleConns))
134 })
135 })
136 })
137
138 Describe("Get does not exceed pool size", func() {
139 var mu sync.RWMutex
140 var cns []*pool.Conn
141
142 BeforeEach(func() {
143 cns = make([]*pool.Conn, 0)
144
145 perform(poolSize, func(_ int) {
146 defer GinkgoRecover()
147
148 cn, err := connPool.Get()
149 Expect(err).NotTo(HaveOccurred())
150 mu.Lock()
151 cns = append(cns, cn)
152 mu.Unlock()
153 })
154
155 Eventually(func() int {
156 return connPool.Len()
157 }).Should(BeNumerically(">=", poolSize))
158 })
159
160 It("Get is blocked", func() {
161 done := make(chan struct{})
162 go func() {
163 connPool.Get()
164 close(done)
165 }()
166
167 select {
168 case <-done:
169 Fail("Get is not blocked")
170 case <-time.After(time.Millisecond):
171
172 }
173
174 select {
175 case <-done:
176
177 case <-time.After(time.Second):
178 Fail("Get is not unblocked")
179 }
180 })
181
182 Context("after Put", func() {
183 BeforeEach(func() {
184 perform(len(cns), func(i int) {
185 mu.RLock()
186 connPool.Put(cns[i])
187 mu.RUnlock()
188 })
189
190 Eventually(func() int {
191 return connPool.Len()
192 }).Should(Equal(poolSize))
193 })
194
195 It("pool.Len is back to normal", func() {
196 Expect(connPool.Len()).To(Equal(poolSize))
197 Expect(connPool.IdleLen()).To(Equal(poolSize))
198 })
199 })
200
201 Context("after Remove", func() {
202 BeforeEach(func() {
203 perform(len(cns), func(i int) {
204 mu.RLock()
205 connPool.Remove(cns[i], nil)
206 mu.RUnlock()
207 })
208
209 Eventually(func() int {
210 return connPool.Len()
211 }).Should(Equal(minIdleConns))
212 })
213
214 It("has idle connections", func() {
215 Expect(connPool.Len()).To(Equal(minIdleConns))
216 Expect(connPool.IdleLen()).To(Equal(minIdleConns))
217 })
218 })
219 })
220 }
221
222 Context("minIdleConns = 1", func() {
223 BeforeEach(func() {
224 minIdleConns = 1
225 connPool = newConnPool()
226 })
227
228 AfterEach(func() {
229 connPool.Close()
230 })
231
232 assert()
233 })
234
235 Context("minIdleConns = 32", func() {
236 BeforeEach(func() {
237 minIdleConns = 32
238 connPool = newConnPool()
239 })
240
241 AfterEach(func() {
242 connPool.Close()
243 })
244
245 assert()
246 })
247 })
248
249 var _ = Describe("conns reaper", func() {
250 const idleTimeout = time.Minute
251 const maxAge = time.Hour
252
253 var connPool *pool.ConnPool
254 var conns, staleConns, closedConns []*pool.Conn
255
256 assert := func(typ string) {
257 BeforeEach(func() {
258 closedConns = nil
259 connPool = pool.NewConnPool(&pool.Options{
260 Dialer: dummyDialer,
261 PoolSize: 10,
262 IdleTimeout: idleTimeout,
263 MaxConnAge: maxAge,
264 PoolTimeout: time.Second,
265 IdleCheckFrequency: time.Hour,
266 OnClose: func(cn *pool.Conn) error {
267 closedConns = append(closedConns, cn)
268 return nil
269 },
270 })
271
272 conns = nil
273
274
275 staleConns = nil
276 for i := 0; i < 3; i++ {
277 cn, err := connPool.Get()
278 Expect(err).NotTo(HaveOccurred())
279 switch typ {
280 case "idle":
281 cn.SetUsedAt(time.Now().Add(-2 * idleTimeout))
282 case "aged":
283 cn.SetCreatedAt(time.Now().Add(-2 * maxAge))
284 }
285 conns = append(conns, cn)
286 staleConns = append(staleConns, cn)
287 }
288
289
290 for i := 0; i < 3; i++ {
291 cn, err := connPool.Get()
292 Expect(err).NotTo(HaveOccurred())
293 conns = append(conns, cn)
294 }
295
296 for _, cn := range conns {
297 connPool.Put(cn)
298 }
299
300 Expect(connPool.Len()).To(Equal(6))
301 Expect(connPool.IdleLen()).To(Equal(6))
302
303 n, err := connPool.ReapStaleConns()
304 Expect(err).NotTo(HaveOccurred())
305 Expect(n).To(Equal(3))
306 })
307
308 AfterEach(func() {
309 _ = connPool.Close()
310 Expect(connPool.Len()).To(Equal(0))
311 Expect(connPool.IdleLen()).To(Equal(0))
312 Expect(len(closedConns)).To(Equal(len(conns)))
313 Expect(closedConns).To(ConsistOf(conns))
314 })
315
316 It("reaps stale connections", func() {
317 Expect(connPool.Len()).To(Equal(3))
318 Expect(connPool.IdleLen()).To(Equal(3))
319 })
320
321 It("does not reap fresh connections", func() {
322 n, err := connPool.ReapStaleConns()
323 Expect(err).NotTo(HaveOccurred())
324 Expect(n).To(Equal(0))
325 })
326
327 It("stale connections are closed", func() {
328 Expect(len(closedConns)).To(Equal(len(staleConns)))
329 Expect(closedConns).To(ConsistOf(staleConns))
330 })
331
332 It("pool is functional", func() {
333 for j := 0; j < 3; j++ {
334 var freeCns []*pool.Conn
335 for i := 0; i < 3; i++ {
336 cn, err := connPool.Get()
337 Expect(err).NotTo(HaveOccurred())
338 Expect(cn).NotTo(BeNil())
339 freeCns = append(freeCns, cn)
340 }
341
342 Expect(connPool.Len()).To(Equal(3))
343 Expect(connPool.IdleLen()).To(Equal(0))
344
345 cn, err := connPool.Get()
346 Expect(err).NotTo(HaveOccurred())
347 Expect(cn).NotTo(BeNil())
348 conns = append(conns, cn)
349
350 Expect(connPool.Len()).To(Equal(4))
351 Expect(connPool.IdleLen()).To(Equal(0))
352
353 connPool.Remove(cn, nil)
354
355 Expect(connPool.Len()).To(Equal(3))
356 Expect(connPool.IdleLen()).To(Equal(0))
357
358 for _, cn := range freeCns {
359 connPool.Put(cn)
360 }
361
362 Expect(connPool.Len()).To(Equal(3))
363 Expect(connPool.IdleLen()).To(Equal(3))
364 }
365 })
366 }
367
368 assert("idle")
369 assert("aged")
370 })
371
372 var _ = Describe("race", func() {
373 var connPool *pool.ConnPool
374 var C, N int
375
376 BeforeEach(func() {
377 C, N = 10, 1000
378 if testing.Short() {
379 C = 4
380 N = 100
381 }
382 })
383
384 AfterEach(func() {
385 connPool.Close()
386 })
387
388 It("does not happen on Get, Put, and Remove", func() {
389 connPool = pool.NewConnPool(&pool.Options{
390 Dialer: dummyDialer,
391 PoolSize: 10,
392 PoolTimeout: time.Minute,
393 IdleTimeout: time.Millisecond,
394 IdleCheckFrequency: time.Millisecond,
395 })
396
397 perform(C, func(id int) {
398 for i := 0; i < N; i++ {
399 cn, err := connPool.Get()
400 Expect(err).NotTo(HaveOccurred())
401 if err == nil {
402 connPool.Put(cn)
403 }
404 }
405 }, func(id int) {
406 for i := 0; i < N; i++ {
407 cn, err := connPool.Get()
408 Expect(err).NotTo(HaveOccurred())
409 if err == nil {
410 connPool.Remove(cn, nil)
411 }
412 }
413 })
414 })
415 })
416
View as plain text