1 package puddle_test
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "log"
8 "math/rand"
9 "net"
10 "os"
11 "runtime"
12 "strconv"
13 "sync"
14 "testing"
15 "time"
16
17 "github.com/jackc/puddle"
18 "github.com/stretchr/testify/assert"
19 "github.com/stretchr/testify/require"
20 )
21
22 type Counter struct {
23 mutex sync.Mutex
24 n int
25 }
26
27
28 func (c *Counter) Next() int {
29 c.mutex.Lock()
30 c.n += 1
31 n := c.n
32 c.mutex.Unlock()
33 return n
34 }
35
36
37 func (c *Counter) Value() int {
38 c.mutex.Lock()
39 n := c.n
40 c.mutex.Unlock()
41 return n
42 }
43
44 func createConstructor() (puddle.Constructor, *Counter) {
45 var c Counter
46 f := func(ctx context.Context) (interface{}, error) {
47 return c.Next(), nil
48 }
49 return f, &c
50 }
51
52 func createConstructorWithNotifierChan() (puddle.Constructor, *Counter, chan int) {
53 ch := make(chan int)
54 var c Counter
55 f := func(ctx context.Context) (interface{}, error) {
56 n := c.Next()
57
58
59 go func() { ch <- n }()
60
61 return n, nil
62 }
63 return f, &c, ch
64 }
65
66 func stubDestructor(interface{}) {}
67
68 func waitForRead(ch chan int) bool {
69 select {
70 case <-ch:
71 return true
72 case <-time.NewTimer(time.Second).C:
73 return false
74 }
75 }
76
77 func TestNewPoolRequiresMaxSizeGreaterThan0(t *testing.T) {
78 constructor, _ := createConstructor()
79 assert.Panics(t, func() { puddle.NewPool(constructor, stubDestructor, -1) })
80 assert.Panics(t, func() { puddle.NewPool(constructor, stubDestructor, 0) })
81 }
82
83 func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) {
84 constructor, _ := createConstructor()
85 pool := puddle.NewPool(constructor, stubDestructor, 10)
86 defer pool.Close()
87
88 res, err := pool.Acquire(context.Background())
89 require.NoError(t, err)
90 assert.Equal(t, 1, res.Value())
91 assert.WithinDuration(t, time.Now(), res.CreationTime(), time.Second)
92 res.Release()
93 }
94
95 func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) {
96 constructor, createCounter := createConstructor()
97 pool := puddle.NewPool(constructor, stubDestructor, 1)
98
99 wg := &sync.WaitGroup{}
100
101 for i := 0; i < 100; i++ {
102 wg.Add(1)
103 go func() {
104 for j := 0; j < 100; j++ {
105 res, err := pool.Acquire(context.Background())
106 assert.NoError(t, err)
107 assert.Equal(t, 1, res.Value())
108 res.Release()
109 }
110 wg.Done()
111 }()
112 }
113
114 wg.Wait()
115
116 assert.EqualValues(t, 1, createCounter.Value())
117 assert.EqualValues(t, 1, pool.Stat().TotalResources())
118 }
119
120 func TestPoolAcquireWithCancellableContext(t *testing.T) {
121 constructor, createCounter := createConstructor()
122 pool := puddle.NewPool(constructor, stubDestructor, 1)
123
124 wg := &sync.WaitGroup{}
125
126 for i := 0; i < 100; i++ {
127 wg.Add(1)
128 go func() {
129 for j := 0; j < 100; j++ {
130 ctx, cancel := context.WithCancel(context.Background())
131 res, err := pool.Acquire(ctx)
132 assert.NoError(t, err)
133 assert.Equal(t, 1, res.Value())
134 res.Release()
135 cancel()
136 }
137 wg.Done()
138 }()
139 }
140
141 wg.Wait()
142
143 assert.EqualValues(t, 1, createCounter.Value())
144 assert.EqualValues(t, 1, pool.Stat().TotalResources())
145 }
146
147 func TestPoolAcquireReturnsErrorFromFailedResourceCreate(t *testing.T) {
148 errCreateFailed := errors.New("create failed")
149 constructor := func(ctx context.Context) (interface{}, error) {
150 return nil, errCreateFailed
151 }
152 pool := puddle.NewPool(constructor, stubDestructor, 10)
153
154 res, err := pool.Acquire(context.Background())
155 assert.Equal(t, errCreateFailed, err)
156 assert.Nil(t, res)
157 }
158
159 func TestPoolAcquireCreatesResourceRespectingContext(t *testing.T) {
160 var cancel func()
161 constructor := func(ctx context.Context) (interface{}, error) {
162 cancel()
163
164 time.Sleep(10 * time.Millisecond)
165 return 1, nil
166 }
167 pool := puddle.NewPool(constructor, stubDestructor, 1)
168 defer pool.Close()
169
170 var ctx context.Context
171 ctx, cancel = context.WithCancel(context.Background())
172 defer cancel()
173 _, err := pool.Acquire(ctx)
174 assert.Equal(t, context.Canceled, err)
175
176
177
178 time.Sleep(100 * time.Millisecond)
179
180 stat := pool.Stat()
181 assert.EqualValues(t, 1, stat.IdleResources())
182 assert.EqualValues(t, 1, stat.TotalResources())
183 }
184
185 func TestPoolAcquireReusesResources(t *testing.T) {
186 constructor, createCounter := createConstructor()
187 pool := puddle.NewPool(constructor, stubDestructor, 10)
188
189 res, err := pool.Acquire(context.Background())
190 require.NoError(t, err)
191 assert.Equal(t, 1, res.Value())
192
193 res.Release()
194
195 res, err = pool.Acquire(context.Background())
196 require.NoError(t, err)
197 assert.Equal(t, 1, res.Value())
198
199 res.Release()
200
201 assert.Equal(t, 1, createCounter.Value())
202 }
203
204 func TestPoolTryAcquire(t *testing.T) {
205 constructor, createCounter := createConstructor()
206 pool := puddle.NewPool(constructor, stubDestructor, 1)
207
208
209 res, err := pool.TryAcquire(context.Background())
210 require.EqualError(t, err, puddle.ErrNotAvailable.Error())
211 assert.Nil(t, res)
212
213
214 time.Sleep(100 * time.Millisecond)
215
216 res, err = pool.TryAcquire(context.Background())
217 require.NoError(t, err)
218 assert.Equal(t, 1, res.Value())
219 defer res.Release()
220
221 res, err = pool.TryAcquire(context.Background())
222 require.EqualError(t, err, puddle.ErrNotAvailable.Error())
223 assert.Nil(t, res)
224
225 assert.Equal(t, 1, createCounter.Value())
226 }
227
228 func TestPoolTryAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
229 constructor, _ := createConstructor()
230 pool := puddle.NewPool(constructor, stubDestructor, 10)
231 pool.Close()
232
233 res, err := pool.TryAcquire(context.Background())
234 assert.Equal(t, puddle.ErrClosedPool, err)
235 assert.Nil(t, res)
236 }
237
238 func TestPoolTryAcquireWithFailedResourceCreate(t *testing.T) {
239 errCreateFailed := errors.New("create failed")
240 constructor := func(ctx context.Context) (interface{}, error) {
241 return nil, errCreateFailed
242 }
243 pool := puddle.NewPool(constructor, stubDestructor, 10)
244
245 res, err := pool.TryAcquire(context.Background())
246 require.EqualError(t, err, puddle.ErrNotAvailable.Error())
247 assert.Nil(t, res)
248 }
249
250 func TestPoolAcquireNilContextDoesNotLeavePoolLocked(t *testing.T) {
251 constructor, createCounter := createConstructor()
252 pool := puddle.NewPool(constructor, stubDestructor, 10)
253
254 assert.Panics(t, func() { pool.Acquire(nil) })
255
256 res, err := pool.Acquire(context.Background())
257 require.NoError(t, err)
258 assert.Equal(t, 1, res.Value())
259 res.Release()
260
261 assert.Equal(t, 1, createCounter.Value())
262 }
263
264 func TestPoolAcquireContextAlreadyCanceled(t *testing.T) {
265 constructor := func(ctx context.Context) (interface{}, error) {
266 panic("should never be called")
267 }
268 pool := puddle.NewPool(constructor, stubDestructor, 10)
269
270 ctx, cancel := context.WithCancel(context.Background())
271 cancel()
272 res, err := pool.Acquire(ctx)
273 assert.Equal(t, context.Canceled, err)
274 assert.Nil(t, res)
275 }
276
277 func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) {
278 ctx, cancel := context.WithCancel(context.Background())
279 time.AfterFunc(100*time.Millisecond, cancel)
280 timeoutChan := time.After(1 * time.Second)
281
282 var constructorCalls Counter
283 constructor := func(ctx context.Context) (interface{}, error) {
284 select {
285 case <-ctx.Done():
286 return nil, ctx.Err()
287 case <-timeoutChan:
288 }
289 return constructorCalls.Next(), nil
290 }
291 pool := puddle.NewPool(constructor, stubDestructor, 10)
292
293 res, err := pool.Acquire(ctx)
294 assert.Equal(t, context.Canceled, err)
295 assert.Nil(t, res)
296 }
297
298 func TestPoolAcquireAllIdle(t *testing.T) {
299 constructor, _ := createConstructor()
300 pool := puddle.NewPool(constructor, stubDestructor, 10)
301 defer pool.Close()
302
303 resources := make([]*puddle.Resource, 4)
304 var err error
305
306 resources[0], err = pool.Acquire(context.Background())
307 require.NoError(t, err)
308 resources[1], err = pool.Acquire(context.Background())
309 require.NoError(t, err)
310 resources[2], err = pool.Acquire(context.Background())
311 require.NoError(t, err)
312 resources[3], err = pool.Acquire(context.Background())
313 require.NoError(t, err)
314
315 assert.Len(t, pool.AcquireAllIdle(), 0)
316
317 resources[0].Release()
318 resources[3].Release()
319
320 assert.ElementsMatch(t, []*puddle.Resource{resources[0], resources[3]}, pool.AcquireAllIdle())
321
322 resources[0].Release()
323 resources[3].Release()
324 resources[1].Release()
325 resources[2].Release()
326
327 assert.ElementsMatch(t, resources, pool.AcquireAllIdle())
328
329 resources[0].Release()
330 resources[1].Release()
331 resources[2].Release()
332 resources[3].Release()
333 }
334
335 func TestPoolAcquireAllIdleWhenClosedIsNil(t *testing.T) {
336 constructor, _ := createConstructor()
337 pool := puddle.NewPool(constructor, stubDestructor, 10)
338 pool.Close()
339 assert.Nil(t, pool.AcquireAllIdle())
340 }
341
342 func TestPoolCreateResource(t *testing.T) {
343 constructor, counter := createConstructor()
344 pool := puddle.NewPool(constructor, stubDestructor, 10)
345 defer pool.Close()
346
347 var err error
348
349 err = pool.CreateResource(context.Background())
350 require.NoError(t, err)
351
352 stats := pool.Stat()
353 assert.EqualValues(t, 1, stats.IdleResources())
354
355 res, err := pool.Acquire(context.Background())
356 require.NoError(t, err)
357 assert.Equal(t, counter.Value(), res.Value())
358 assert.True(t, res.LastUsedNanotime() > 0, "should set LastUsedNanotime so that idle calculations can still work")
359 assert.Equal(t, 1, res.Value())
360 assert.WithinDuration(t, time.Now(), res.CreationTime(), time.Second)
361 res.Release()
362
363 assert.EqualValues(t, 0, pool.Stat().EmptyAcquireCount(), "should have been a warm resource")
364 }
365
366 func TestPoolCreateResourceReturnsErrorFromFailedResourceCreate(t *testing.T) {
367 errCreateFailed := errors.New("create failed")
368 constructor := func(ctx context.Context) (interface{}, error) {
369 return nil, errCreateFailed
370 }
371 pool := puddle.NewPool(constructor, stubDestructor, 10)
372
373 err := pool.CreateResource(context.Background())
374 assert.Equal(t, errCreateFailed, err)
375 }
376
377 func TestPoolCreateResourceReturnsErrorWhenAlreadyClosed(t *testing.T) {
378 constructor, _ := createConstructor()
379 pool := puddle.NewPool(constructor, stubDestructor, 10)
380 pool.Close()
381 err := pool.CreateResource(context.Background())
382 assert.Equal(t, puddle.ErrClosedPool, err)
383 }
384
385 func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testing.T) {
386
387
388 constructor := func(ctx context.Context) (interface{}, error) {
389 time.Sleep(500 * time.Millisecond)
390 return "abc", nil
391 }
392 pool := puddle.NewPool(constructor, stubDestructor, 10)
393
394 acquireErrChan := make(chan error)
395 go func() {
396 err := pool.CreateResource(context.Background())
397 acquireErrChan <- err
398 }()
399
400 time.Sleep(250 * time.Millisecond)
401 pool.Close()
402
403 err := <-acquireErrChan
404 assert.Equal(t, puddle.ErrClosedPool, err)
405 }
406
407 func TestPoolCloseClosesAllIdleResources(t *testing.T) {
408 constructor, _ := createConstructor()
409
410 var destructorCalls Counter
411 destructor := func(interface{}) {
412 destructorCalls.Next()
413 }
414
415 p := puddle.NewPool(constructor, destructor, 10)
416
417 resources := make([]*puddle.Resource, 4)
418 for i := range resources {
419 var err error
420 resources[i], err = p.Acquire(context.Background())
421 require.Nil(t, err)
422 }
423
424 for _, res := range resources {
425 res.Release()
426 }
427
428 p.Close()
429
430 assert.Equal(t, len(resources), destructorCalls.Value())
431 }
432
433 func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) {
434 constructor, _ := createConstructor()
435 var destructorCalls Counter
436 destructor := func(interface{}) {
437 destructorCalls.Next()
438 }
439
440 p := puddle.NewPool(constructor, destructor, 10)
441
442 resources := make([]*puddle.Resource, 4)
443 for i := range resources {
444 var err error
445 resources[i], err = p.Acquire(context.Background())
446 require.Nil(t, err)
447 }
448
449 for _, res := range resources {
450 go func(res *puddle.Resource) {
451 time.Sleep(100 * time.Millisecond)
452 res.Release()
453 }(res)
454 }
455
456 p.Close()
457 assert.Equal(t, len(resources), destructorCalls.Value())
458 }
459
460 func TestPoolCloseIsSafeToCallMultipleTimes(t *testing.T) {
461 constructor, _ := createConstructor()
462
463 p := puddle.NewPool(constructor, stubDestructor, 10)
464
465 p.Close()
466 p.Close()
467 }
468
469 func TestPoolStatResources(t *testing.T) {
470 startWaitChan := make(chan struct{})
471 waitingChan := make(chan struct{})
472 endWaitChan := make(chan struct{})
473
474 var constructorCalls Counter
475 constructor := func(ctx context.Context) (interface{}, error) {
476 select {
477 case <-startWaitChan:
478 close(waitingChan)
479 <-endWaitChan
480 default:
481 }
482
483 return constructorCalls.Next(), nil
484 }
485 pool := puddle.NewPool(constructor, stubDestructor, 10)
486 defer pool.Close()
487
488 resAcquired, err := pool.Acquire(context.Background())
489 require.Nil(t, err)
490
491 close(startWaitChan)
492 go func() {
493 res, err := pool.Acquire(context.Background())
494 require.Nil(t, err)
495 res.Release()
496 }()
497 <-waitingChan
498 stat := pool.Stat()
499
500 assert.EqualValues(t, 2, stat.TotalResources())
501 assert.EqualValues(t, 1, stat.ConstructingResources())
502 assert.EqualValues(t, 1, stat.AcquiredResources())
503 assert.EqualValues(t, 0, stat.IdleResources())
504 assert.EqualValues(t, 10, stat.MaxResources())
505
506 resAcquired.Release()
507
508 stat = pool.Stat()
509 assert.EqualValues(t, 2, stat.TotalResources())
510 assert.EqualValues(t, 1, stat.ConstructingResources())
511 assert.EqualValues(t, 0, stat.AcquiredResources())
512 assert.EqualValues(t, 1, stat.IdleResources())
513 assert.EqualValues(t, 10, stat.MaxResources())
514
515 close(endWaitChan)
516 }
517
518 func TestPoolStatSuccessfulAcquireCounters(t *testing.T) {
519 constructor, _ := createConstructor()
520 sleepConstructor := func(ctx context.Context) (interface{}, error) {
521
522 time.Sleep(time.Nanosecond)
523 return constructor(ctx)
524 }
525 pool := puddle.NewPool(sleepConstructor, stubDestructor, 1)
526 defer pool.Close()
527
528 res, err := pool.Acquire(context.Background())
529 require.NoError(t, err)
530 res.Release()
531
532 stat := pool.Stat()
533 assert.Equal(t, int64(1), stat.AcquireCount())
534 assert.Equal(t, int64(1), stat.EmptyAcquireCount())
535 assert.True(t, stat.AcquireDuration() > 0, "expected stat.AcquireDuration() > 0 but %v", stat.AcquireDuration())
536 lastAcquireDuration := stat.AcquireDuration()
537
538 res, err = pool.Acquire(context.Background())
539 require.NoError(t, err)
540 res.Release()
541
542 stat = pool.Stat()
543 assert.Equal(t, int64(2), stat.AcquireCount())
544 assert.Equal(t, int64(1), stat.EmptyAcquireCount())
545 assert.True(t, stat.AcquireDuration() > lastAcquireDuration)
546 lastAcquireDuration = stat.AcquireDuration()
547
548 wg := &sync.WaitGroup{}
549 for i := 0; i < 2; i++ {
550 wg.Add(1)
551 go func() {
552 res, err = pool.Acquire(context.Background())
553 require.NoError(t, err)
554 time.Sleep(50 * time.Millisecond)
555 res.Release()
556 wg.Done()
557 }()
558 }
559
560 wg.Wait()
561
562 stat = pool.Stat()
563 assert.Equal(t, int64(4), stat.AcquireCount())
564 assert.Equal(t, int64(2), stat.EmptyAcquireCount())
565 assert.True(t, stat.AcquireDuration() > lastAcquireDuration)
566 lastAcquireDuration = stat.AcquireDuration()
567 }
568
569 func TestPoolStatCanceledAcquireBeforeStart(t *testing.T) {
570 constructor, _ := createConstructor()
571 pool := puddle.NewPool(constructor, stubDestructor, 1)
572 defer pool.Close()
573
574 ctx, cancel := context.WithCancel(context.Background())
575 cancel()
576 _, err := pool.Acquire(ctx)
577 require.Equal(t, context.Canceled, err)
578
579 stat := pool.Stat()
580 assert.Equal(t, int64(0), stat.AcquireCount())
581 assert.Equal(t, int64(1), stat.CanceledAcquireCount())
582 }
583
584 func TestPoolStatCanceledAcquireDuringCreate(t *testing.T) {
585 constructor := func(ctx context.Context) (interface{}, error) {
586 <-ctx.Done()
587 return nil, ctx.Err()
588 }
589
590 pool := puddle.NewPool(constructor, stubDestructor, 1)
591 defer pool.Close()
592
593 ctx, cancel := context.WithCancel(context.Background())
594 time.AfterFunc(50*time.Millisecond, cancel)
595 _, err := pool.Acquire(ctx)
596 require.Equal(t, context.Canceled, err)
597
598
599 time.Sleep(10 * time.Millisecond)
600
601 stat := pool.Stat()
602 assert.Equal(t, int64(0), stat.AcquireCount())
603 assert.Equal(t, int64(1), stat.CanceledAcquireCount())
604 }
605
606 func TestPoolStatCanceledAcquireDuringWait(t *testing.T) {
607 constructor, _ := createConstructor()
608 pool := puddle.NewPool(constructor, stubDestructor, 1)
609 defer pool.Close()
610
611 res, err := pool.Acquire(context.Background())
612 require.Nil(t, err)
613
614 ctx, cancel := context.WithCancel(context.Background())
615 time.AfterFunc(50*time.Millisecond, cancel)
616 _, err = pool.Acquire(ctx)
617 require.Equal(t, context.Canceled, err)
618
619 res.Release()
620
621 stat := pool.Stat()
622 assert.Equal(t, int64(1), stat.AcquireCount())
623 assert.Equal(t, int64(1), stat.CanceledAcquireCount())
624 }
625
626 func TestResourceHijackRemovesResourceFromPoolButDoesNotDestroy(t *testing.T) {
627 constructor, _ := createConstructor()
628 var destructorCalls Counter
629 destructor := func(interface{}) {
630 destructorCalls.Next()
631 }
632
633 pool := puddle.NewPool(constructor, destructor, 10)
634
635 res, err := pool.Acquire(context.Background())
636 require.NoError(t, err)
637 assert.Equal(t, 1, res.Value())
638
639 res.Hijack()
640
641 assert.EqualValues(t, 0, pool.Stat().TotalResources())
642 assert.EqualValues(t, 0, destructorCalls.Value())
643
644
645 res.Value()
646 res.CreationTime()
647 res.IdleDuration()
648 }
649
650 func TestResourceDestroyRemovesResourceFromPool(t *testing.T) {
651 constructor, _ := createConstructor()
652 pool := puddle.NewPool(constructor, stubDestructor, 10)
653
654 res, err := pool.Acquire(context.Background())
655 require.NoError(t, err)
656 assert.Equal(t, 1, res.Value())
657
658 assert.EqualValues(t, 1, pool.Stat().TotalResources())
659 res.Destroy()
660 for i := 0; i < 1000; i++ {
661 if pool.Stat().TotalResources() == 0 {
662 break
663 }
664 time.Sleep(time.Millisecond)
665 }
666
667 assert.EqualValues(t, 0, pool.Stat().TotalResources())
668 }
669
670 func TestResourceLastUsageTimeTracking(t *testing.T) {
671 constructor, _ := createConstructor()
672 pool := puddle.NewPool(constructor, stubDestructor, 1)
673
674 res, err := pool.Acquire(context.Background())
675 require.NoError(t, err)
676 t1 := res.LastUsedNanotime()
677 res.Release()
678
679
680 res, err = pool.Acquire(context.Background())
681 require.NoError(t, err)
682 t2 := res.LastUsedNanotime()
683 d2 := res.IdleDuration()
684 assert.True(t, t2 > t1)
685 res.ReleaseUnused()
686
687
688 res, err = pool.Acquire(context.Background())
689 require.NoError(t, err)
690 t3 := res.LastUsedNanotime()
691 d3 := res.IdleDuration()
692 assert.EqualValues(t, t2, t3)
693 assert.True(t, d3 > d2)
694 res.Release()
695
696
697 res, err = pool.Acquire(context.Background())
698 require.NoError(t, err)
699 t4 := res.LastUsedNanotime()
700 assert.True(t, t4 > t3)
701 res.Release()
702 }
703
704 func TestResourcePanicsOnUsageWhenNotAcquired(t *testing.T) {
705 constructor, _ := createConstructor()
706 pool := puddle.NewPool(constructor, stubDestructor, 10)
707
708 res, err := pool.Acquire(context.Background())
709 require.NoError(t, err)
710 res.Release()
711
712 assert.PanicsWithValue(t, "tried to release resource that is not acquired", res.Release)
713 assert.PanicsWithValue(t, "tried to release resource that is not acquired", res.ReleaseUnused)
714 assert.PanicsWithValue(t, "tried to destroy resource that is not acquired", res.Destroy)
715 assert.PanicsWithValue(t, "tried to hijack resource that is not acquired", res.Hijack)
716 assert.PanicsWithValue(t, "tried to access resource that is not acquired or hijacked", func() { res.Value() })
717 assert.PanicsWithValue(t, "tried to access resource that is not acquired or hijacked", func() { res.CreationTime() })
718 assert.PanicsWithValue(t, "tried to access resource that is not acquired or hijacked", func() { res.LastUsedNanotime() })
719 assert.PanicsWithValue(t, "tried to access resource that is not acquired or hijacked", func() { res.IdleDuration() })
720 }
721
722 func TestPoolAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
723 constructor, _ := createConstructor()
724 pool := puddle.NewPool(constructor, stubDestructor, 10)
725 pool.Close()
726
727 res, err := pool.Acquire(context.Background())
728 assert.Equal(t, puddle.ErrClosedPool, err)
729 assert.Nil(t, res)
730 }
731
732 func TestSignalIsSentWhenResourceFailedToCreate(t *testing.T) {
733 var c Counter
734 constructor := func(context.Context) (a interface{}, err error) {
735 if c.Next() == 2 {
736 return nil, errors.New("outage")
737 }
738 return 1, nil
739 }
740 destructor := func(value interface{}) {}
741
742 pool := puddle.NewPool(constructor, destructor, 1)
743
744 res1, err := pool.Acquire(context.Background())
745 require.NoError(t, err)
746
747 var wg sync.WaitGroup
748 for i := 0; i < 2; i++ {
749 wg.Add(1)
750 go func(name string) {
751 defer wg.Done()
752 _, _ = pool.Acquire(context.Background())
753 }(strconv.Itoa(i))
754 }
755
756
757 time.Sleep(500 * time.Millisecond)
758 res1.Destroy()
759 wg.Wait()
760 }
761
762 func TestStress(t *testing.T) {
763 constructor, _ := createConstructor()
764 var destructorCalls Counter
765 destructor := func(interface{}) {
766 destructorCalls.Next()
767 }
768
769 poolSize := runtime.NumCPU()
770 if poolSize < 4 {
771 poolSize = 4
772 }
773
774 pool := puddle.NewPool(constructor, destructor, int32(poolSize))
775
776 finishChan := make(chan struct{})
777 wg := &sync.WaitGroup{}
778
779 releaseOrDestroyOrHijack := func(res *puddle.Resource) {
780 n := rand.Intn(100)
781 if n < 5 {
782 res.Hijack()
783 destructor(res)
784 } else if n < 10 {
785 res.Destroy()
786 } else {
787 res.Release()
788 }
789 }
790
791 actions := []func(){
792
793 func() {
794 res, err := pool.Acquire(context.Background())
795 if err != nil {
796 if err != puddle.ErrClosedPool {
797 assert.Failf(t, "stress acquire", "pool.Acquire returned unexpected err: %v", err)
798 }
799 return
800 }
801
802 time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
803 releaseOrDestroyOrHijack(res)
804 },
805
806 func() {
807 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(rand.Int63n(2000))*time.Nanosecond)
808 defer cancel()
809 res, err := pool.Acquire(ctx)
810 if err != nil {
811 if err != puddle.ErrClosedPool && err != context.Canceled && err != context.DeadlineExceeded {
812 assert.Failf(t, "stress acquire possibly canceled by context", "pool.Acquire returned unexpected err: %v", err)
813 }
814 return
815 }
816
817 time.Sleep(time.Duration(rand.Int63n(2000)) * time.Nanosecond)
818 releaseOrDestroyOrHijack(res)
819 },
820
821 func() {
822 res, err := pool.TryAcquire(context.Background())
823 if err != nil {
824 if err != puddle.ErrClosedPool && err != puddle.ErrNotAvailable {
825 assert.Failf(t, "stress TryAcquire", "pool.TryAcquire returned unexpected err: %v", err)
826 }
827 return
828 }
829
830 time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
831 releaseOrDestroyOrHijack(res)
832 },
833
834 func() {
835 resources := pool.AcquireAllIdle()
836 for _, res := range resources {
837 res.Release()
838 }
839 },
840 }
841
842 workerCount := int(poolSize) * 2
843
844 for i := 0; i < workerCount; i++ {
845 wg.Add(1)
846 go func() {
847 for {
848 select {
849 case <-finishChan:
850 wg.Done()
851 return
852 default:
853 }
854
855 actions[rand.Intn(len(actions))]()
856 }
857 }()
858 }
859
860 s := os.Getenv("STRESS_TEST_DURATION")
861 if s == "" {
862 s = "1s"
863 }
864 testDuration, err := time.ParseDuration(s)
865 require.Nil(t, err)
866 time.AfterFunc(testDuration, func() { close(finishChan) })
867 wg.Wait()
868
869 pool.Close()
870 }
871
872 func startAcceptOnceDummyServer(laddr string) {
873 ln, err := net.Listen("tcp", laddr)
874 if err != nil {
875 log.Fatalln("Listen:", err)
876 }
877
878
879 go func() {
880 conn, err := ln.Accept()
881 if err != nil {
882 log.Fatalln("Accept:", err)
883 }
884
885 for {
886 buf := make([]byte, 1)
887 _, err := conn.Read(buf)
888 if err != nil {
889 return
890 }
891 }
892 }()
893
894 }
895
896 func ExamplePool() {
897
898 laddr := "127.0.0.1:8080"
899 startAcceptOnceDummyServer(laddr)
900
901
902 constructor := func(context.Context) (interface{}, error) {
903 return net.Dial("tcp", laddr)
904 }
905 destructor := func(value interface{}) {
906 value.(net.Conn).Close()
907 }
908 maxPoolSize := int32(10)
909
910 pool := puddle.NewPool(constructor, destructor, maxPoolSize)
911
912
913 for i := 0; i < 10; i++ {
914
915 res, err := pool.Acquire(context.Background())
916 if err != nil {
917 log.Fatalln("Acquire", err)
918 }
919
920
921 _, err = res.Value().(net.Conn).Write([]byte{1})
922 if err != nil {
923 log.Fatalln("Write", err)
924 }
925
926
927 res.Release()
928 }
929
930 stats := pool.Stat()
931 pool.Close()
932
933 fmt.Println("Connections:", stats.TotalResources())
934 fmt.Println("Acquires:", stats.AcquireCount())
935
936
937
938 }
939
940 func BenchmarkPoolAcquireAndRelease(b *testing.B) {
941 benchmarks := []struct {
942 poolSize int32
943 clientCount int
944 cancellable bool
945 }{
946 {8, 1, false},
947 {8, 2, false},
948 {8, 8, false},
949 {8, 32, false},
950 {8, 128, false},
951 {8, 512, false},
952 {8, 2048, false},
953 {8, 8192, false},
954
955 {64, 2, false},
956 {64, 8, false},
957 {64, 32, false},
958 {64, 128, false},
959 {64, 512, false},
960 {64, 2048, false},
961 {64, 8192, false},
962
963 {512, 2, false},
964 {512, 8, false},
965 {512, 32, false},
966 {512, 128, false},
967 {512, 512, false},
968 {512, 2048, false},
969 {512, 8192, false},
970
971 {8, 2, true},
972 {8, 8, true},
973 {8, 32, true},
974 {8, 128, true},
975 {8, 512, true},
976 {8, 2048, true},
977 {8, 8192, true},
978
979 {64, 2, true},
980 {64, 8, true},
981 {64, 32, true},
982 {64, 128, true},
983 {64, 512, true},
984 {64, 2048, true},
985 {64, 8192, true},
986
987 {512, 2, true},
988 {512, 8, true},
989 {512, 32, true},
990 {512, 128, true},
991 {512, 512, true},
992 {512, 2048, true},
993 {512, 8192, true},
994 }
995
996 for _, bm := range benchmarks {
997 name := fmt.Sprintf("PoolSize=%d/ClientCount=%d/Cancellable=%v", bm.poolSize, bm.clientCount, bm.cancellable)
998
999 b.Run(name, func(b *testing.B) {
1000 ctx := context.Background()
1001 cancel := func() {}
1002 if bm.cancellable {
1003 ctx, cancel = context.WithCancel(ctx)
1004 }
1005
1006 wg := &sync.WaitGroup{}
1007
1008 constructor, _ := createConstructor()
1009 pool := puddle.NewPool(constructor, stubDestructor, bm.poolSize)
1010
1011 for i := 0; i < bm.clientCount; i++ {
1012 wg.Add(1)
1013 go func() {
1014 defer wg.Done()
1015
1016 for j := 0; j < b.N; j++ {
1017 res, err := pool.Acquire(ctx)
1018 if err != nil {
1019 b.Fatal(err)
1020 }
1021 res.Release()
1022 }
1023 }()
1024 }
1025
1026 wg.Wait()
1027 cancel()
1028 })
1029 }
1030 }
1031
View as plain text