1 package puddle_test
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "log"
8 "math/rand"
9 "net"
10 "os"
11 "runtime"
12 "strconv"
13 "sync"
14 "testing"
15 "time"
16
17 "github.com/jackc/puddle/v2"
18 "github.com/stretchr/testify/assert"
19 "github.com/stretchr/testify/require"
20 "golang.org/x/sync/semaphore"
21 )
22
23 type Counter struct {
24 mutex sync.Mutex
25 n int
26 }
27
28
29 func (c *Counter) Next() int {
30 c.mutex.Lock()
31 defer c.mutex.Unlock()
32
33 c.n += 1
34 return c.n
35 }
36
37
38 func (c *Counter) Value() int {
39 c.mutex.Lock()
40 defer c.mutex.Unlock()
41
42 return c.n
43 }
44
45 func createConstructor() (puddle.Constructor[int], *Counter) {
46 var c Counter
47 f := func(ctx context.Context) (int, error) {
48 return c.Next(), nil
49 }
50 return f, &c
51 }
52
53 func stubDestructor(int) {}
54
55 func TestNewPoolRequiresMaxSizeGreaterThan0(t *testing.T) {
56 constructor, _ := createConstructor()
57 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: -1})
58 assert.Nil(t, pool)
59 assert.Error(t, err)
60
61 pool, err = puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 0})
62 assert.Nil(t, pool)
63 assert.Error(t, err)
64 }
65
66 func TestPoolAcquireCreatesResourceWhenNoneIdle(t *testing.T) {
67 constructor, _ := createConstructor()
68 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
69 require.NoError(t, err)
70 defer pool.Close()
71
72 res, err := pool.Acquire(context.Background())
73 require.NoError(t, err)
74 assert.Equal(t, 1, res.Value())
75 assert.WithinDuration(t, time.Now(), res.CreationTime(), time.Second)
76 res.Release()
77 }
78
79 func TestPoolAcquireCallsConstructorWithAcquireContextValuesButNotDeadline(t *testing.T) {
80 constructor := func(ctx context.Context) (int, error) {
81 if ctx.Value("test") != "from Acquire" {
82 return 0, errors.New("did not get value from Acquire")
83 }
84 if _, ok := ctx.Deadline(); ok {
85 return 0, errors.New("should not have gotten deadline from Acquire")
86 }
87
88 return 1, nil
89 }
90 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
91 require.NoError(t, err)
92 defer pool.Close()
93
94 ctx := context.WithValue(context.Background(), "test", "from Acquire")
95 ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
96 defer cancel()
97 res, err := pool.Acquire(ctx)
98 require.NoError(t, err)
99 assert.Equal(t, 1, res.Value())
100 assert.WithinDuration(t, time.Now(), res.CreationTime(), time.Second)
101 res.Release()
102 }
103
104 func TestPoolAcquireCalledConstructorIsNotCanceledByAcquireCancellation(t *testing.T) {
105 constructor := func(ctx context.Context) (int, error) {
106 time.Sleep(100 * time.Millisecond)
107 return 1, nil
108 }
109 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
110 require.NoError(t, err)
111 defer pool.Close()
112
113 ctx, cancel := context.WithTimeout(context.Background(), 25*time.Millisecond)
114 defer cancel()
115 res, err := pool.Acquire(ctx)
116 assert.Nil(t, res)
117 assert.Equal(t, context.DeadlineExceeded, err)
118
119 time.Sleep(200 * time.Millisecond)
120
121 assert.EqualValues(t, 1, pool.Stat().TotalResources())
122 assert.EqualValues(t, 1, pool.Stat().CanceledAcquireCount())
123 }
124
125 func TestPoolAcquireDoesNotCreatesResourceWhenItWouldExceedMaxSize(t *testing.T) {
126 constructor, createCounter := createConstructor()
127 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
128 require.NoError(t, err)
129
130 wg := &sync.WaitGroup{}
131
132 for i := 0; i < 100; i++ {
133 wg.Add(1)
134 go func() {
135 for j := 0; j < 100; j++ {
136 res, err := pool.Acquire(context.Background())
137 assert.NoError(t, err)
138 assert.Equal(t, 1, res.Value())
139 res.Release()
140 }
141 wg.Done()
142 }()
143 }
144
145 wg.Wait()
146
147 assert.EqualValues(t, 1, createCounter.Value())
148 assert.EqualValues(t, 1, pool.Stat().TotalResources())
149 }
150
151 func TestPoolAcquireWithCancellableContext(t *testing.T) {
152 constructor, createCounter := createConstructor()
153 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
154 require.NoError(t, err)
155
156 wg := &sync.WaitGroup{}
157
158 for i := 0; i < 100; i++ {
159 wg.Add(1)
160 go func() {
161 for j := 0; j < 100; j++ {
162 ctx, cancel := context.WithCancel(context.Background())
163 res, err := pool.Acquire(ctx)
164 assert.NoError(t, err)
165 assert.Equal(t, 1, res.Value())
166 res.Release()
167 cancel()
168 }
169 wg.Done()
170 }()
171 }
172
173 wg.Wait()
174
175 assert.EqualValues(t, 1, createCounter.Value())
176 assert.EqualValues(t, 1, pool.Stat().TotalResources())
177 }
178
179 func TestPoolAcquireReturnsErrorFromFailedResourceCreate(t *testing.T) {
180 errCreateFailed := errors.New("create failed")
181 constructor := func(ctx context.Context) (int, error) {
182 return 0, errCreateFailed
183 }
184 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
185 require.NoError(t, err)
186
187 res, err := pool.Acquire(context.Background())
188 assert.Equal(t, errCreateFailed, err)
189 assert.Nil(t, res)
190 }
191
192 func TestPoolAcquireCreatesResourceRespectingContext(t *testing.T) {
193 var cancel func()
194 constructor := func(ctx context.Context) (int, error) {
195 cancel()
196
197 time.Sleep(10 * time.Millisecond)
198 return 1, nil
199 }
200 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
201 require.NoError(t, err)
202 defer pool.Close()
203
204 var ctx context.Context
205 ctx, cancel = context.WithCancel(context.Background())
206 defer cancel()
207 _, err = pool.Acquire(ctx)
208 assert.ErrorIs(t, err, context.Canceled)
209
210
211
212 time.Sleep(100 * time.Millisecond)
213
214 stat := pool.Stat()
215 assert.EqualValues(t, 1, stat.IdleResources())
216 assert.EqualValues(t, 1, stat.TotalResources())
217 }
218
219 func TestPoolAcquireReusesResources(t *testing.T) {
220 constructor, createCounter := createConstructor()
221 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
222 require.NoError(t, err)
223
224 res, err := pool.Acquire(context.Background())
225 require.NoError(t, err)
226 assert.Equal(t, 1, res.Value())
227
228 res.Release()
229
230 res, err = pool.Acquire(context.Background())
231 require.NoError(t, err)
232 assert.Equal(t, 1, res.Value())
233
234 res.Release()
235
236 assert.Equal(t, 1, createCounter.Value())
237 }
238
239 func TestPoolTryAcquire(t *testing.T) {
240 constructor, createCounter := createConstructor()
241 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
242 require.NoError(t, err)
243
244
245 res, err := pool.TryAcquire(context.Background())
246 require.EqualError(t, err, puddle.ErrNotAvailable.Error())
247 assert.Nil(t, res)
248
249
250 time.Sleep(100 * time.Millisecond)
251
252 res, err = pool.TryAcquire(context.Background())
253 require.NoError(t, err)
254 assert.Equal(t, 1, res.Value())
255 defer res.Release()
256
257 res, err = pool.TryAcquire(context.Background())
258 require.EqualError(t, err, puddle.ErrNotAvailable.Error())
259 assert.Nil(t, res)
260
261 assert.Equal(t, 1, createCounter.Value())
262 }
263
264 func TestPoolTryAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
265 constructor, _ := createConstructor()
266 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
267 require.NoError(t, err)
268 pool.Close()
269
270 res, err := pool.TryAcquire(context.Background())
271 assert.Equal(t, puddle.ErrClosedPool, err)
272 assert.Nil(t, res)
273 }
274
275 func TestPoolTryAcquireWithFailedResourceCreate(t *testing.T) {
276 errCreateFailed := errors.New("create failed")
277 constructor := func(ctx context.Context) (int, error) {
278 return 0, errCreateFailed
279 }
280 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
281 require.NoError(t, err)
282
283 res, err := pool.TryAcquire(context.Background())
284 require.EqualError(t, err, puddle.ErrNotAvailable.Error())
285 assert.Nil(t, res)
286 }
287
288 func TestPoolAcquireNilContextDoesNotLeavePoolLocked(t *testing.T) {
289 constructor, createCounter := createConstructor()
290 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
291 require.NoError(t, err)
292
293 assert.Panics(t, func() { pool.Acquire(nil) })
294
295 res, err := pool.Acquire(context.Background())
296 require.NoError(t, err)
297 assert.Equal(t, 1, res.Value())
298 res.Release()
299
300 assert.Equal(t, 1, createCounter.Value())
301 }
302
303 func TestPoolAcquireContextAlreadyCanceled(t *testing.T) {
304 constructor := func(ctx context.Context) (int, error) {
305 panic("should never be called")
306 }
307 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
308 require.NoError(t, err)
309
310 ctx, cancel := context.WithCancel(context.Background())
311 cancel()
312 res, err := pool.Acquire(ctx)
313 assert.Equal(t, context.Canceled, err)
314 assert.Nil(t, res)
315 }
316
317 func TestPoolAcquireContextCanceledDuringCreate(t *testing.T) {
318 ctx, cancel := context.WithCancel(context.Background())
319 time.AfterFunc(100*time.Millisecond, cancel)
320 timeoutChan := time.After(1 * time.Second)
321
322 var constructorCalls Counter
323 constructor := func(ctx context.Context) (int, error) {
324 select {
325 case <-ctx.Done():
326 return 0, ctx.Err()
327 case <-timeoutChan:
328 }
329 return constructorCalls.Next(), nil
330 }
331 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
332 require.NoError(t, err)
333
334 res, err := pool.Acquire(ctx)
335 assert.Equal(t, context.Canceled, err)
336 assert.Nil(t, res)
337 }
338
339 func TestPoolAcquireAllIdle(t *testing.T) {
340 constructor, _ := createConstructor()
341 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
342 require.NoError(t, err)
343 defer pool.Close()
344
345 resources := make([]*puddle.Resource[int], 4)
346
347 resources[0], err = pool.Acquire(context.Background())
348 require.NoError(t, err)
349 resources[1], err = pool.Acquire(context.Background())
350 require.NoError(t, err)
351 resources[2], err = pool.Acquire(context.Background())
352 require.NoError(t, err)
353 resources[3], err = pool.Acquire(context.Background())
354 require.NoError(t, err)
355
356 assert.Len(t, pool.AcquireAllIdle(), 0)
357
358 resources[0].Release()
359 resources[3].Release()
360
361 assert.ElementsMatch(t, []*puddle.Resource[int]{resources[0], resources[3]}, pool.AcquireAllIdle())
362
363 resources[0].Release()
364 resources[3].Release()
365 resources[1].Release()
366 resources[2].Release()
367
368 assert.ElementsMatch(t, resources, pool.AcquireAllIdle())
369
370 resources[0].Release()
371 resources[1].Release()
372 resources[2].Release()
373 resources[3].Release()
374 }
375
376 func TestPoolAcquireAllIdleWhenClosedIsNil(t *testing.T) {
377 constructor, _ := createConstructor()
378 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
379 require.NoError(t, err)
380 pool.Close()
381 assert.Nil(t, pool.AcquireAllIdle())
382 }
383
384 func TestPoolCreateResource(t *testing.T) {
385 constructor, counter := createConstructor()
386 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
387 require.NoError(t, err)
388 defer pool.Close()
389
390 err = pool.CreateResource(context.Background())
391 require.NoError(t, err)
392
393 stats := pool.Stat()
394 assert.EqualValues(t, 1, stats.IdleResources())
395
396 res, err := pool.Acquire(context.Background())
397 require.NoError(t, err)
398 assert.Equal(t, counter.Value(), res.Value())
399 assert.True(t, res.LastUsedNanotime() > 0, "should set LastUsedNanotime so that idle calculations can still work")
400 assert.Equal(t, 1, res.Value())
401 assert.WithinDuration(t, time.Now(), res.CreationTime(), time.Second)
402 res.Release()
403
404 assert.EqualValues(t, 0, pool.Stat().EmptyAcquireCount(), "should have been a warm resource")
405 }
406
407 func TestPoolCreateResourceReturnsErrorFromFailedResourceCreate(t *testing.T) {
408 errCreateFailed := errors.New("create failed")
409 constructor := func(ctx context.Context) (int, error) {
410 return 0, errCreateFailed
411 }
412 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
413 require.NoError(t, err)
414
415 err = pool.CreateResource(context.Background())
416 assert.Equal(t, errCreateFailed, err)
417 }
418
419 func TestPoolCreateResourceReturnsErrorWhenAlreadyClosed(t *testing.T) {
420 constructor, _ := createConstructor()
421 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
422 require.NoError(t, err)
423 pool.Close()
424 err = pool.CreateResource(context.Background())
425 assert.Equal(t, puddle.ErrClosedPool, err)
426 }
427
428 func TestPoolCreateResourceReturnsErrorWhenClosedWhileCreatingResource(t *testing.T) {
429
430
431 constructor := func(ctx context.Context) (int, error) {
432 time.Sleep(500 * time.Millisecond)
433 return 123, nil
434 }
435 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
436 require.NoError(t, err)
437
438 acquireErrChan := make(chan error)
439 go func() {
440 err := pool.CreateResource(context.Background())
441 acquireErrChan <- err
442 }()
443
444 time.Sleep(250 * time.Millisecond)
445 pool.Close()
446
447 err = <-acquireErrChan
448 assert.Equal(t, puddle.ErrClosedPool, err)
449 }
450
451 func TestPoolCreateResourceReturnsErrorWhenPoolFull(t *testing.T) {
452 constructor, _ := createConstructor()
453 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 2})
454 require.NoError(t, err)
455 defer pool.Close()
456
457 err = pool.CreateResource(context.Background())
458 require.NoError(t, err)
459
460 stats := pool.Stat()
461 assert.EqualValues(t, 1, stats.IdleResources())
462
463 err = pool.CreateResource(context.Background())
464 require.NoError(t, err)
465
466 stats = pool.Stat()
467 assert.EqualValues(t, 2, stats.IdleResources())
468
469 err = pool.CreateResource(context.Background())
470 require.Error(t, err)
471 }
472
473 func TestPoolCloseClosesAllIdleResources(t *testing.T) {
474 constructor, _ := createConstructor()
475
476 var destructorCalls Counter
477 destructor := func(int) {
478 destructorCalls.Next()
479 }
480
481 p, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: destructor, MaxSize: 10})
482 require.NoError(t, err)
483
484 resources := make([]*puddle.Resource[int], 4)
485 for i := range resources {
486 var err error
487 resources[i], err = p.Acquire(context.Background())
488 require.Nil(t, err)
489 }
490
491 for _, res := range resources {
492 res.Release()
493 }
494
495 p.Close()
496
497 assert.Equal(t, len(resources), destructorCalls.Value())
498 }
499
500 func TestPoolCloseBlocksUntilAllResourcesReleasedAndClosed(t *testing.T) {
501 constructor, _ := createConstructor()
502 var destructorCalls Counter
503 destructor := func(int) {
504 destructorCalls.Next()
505 }
506
507 p, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: destructor, MaxSize: 10})
508 require.NoError(t, err)
509
510 resources := make([]*puddle.Resource[int], 4)
511 for i := range resources {
512 var err error
513 resources[i], err = p.Acquire(context.Background())
514 require.Nil(t, err)
515 }
516
517 for _, res := range resources {
518 go func(res *puddle.Resource[int]) {
519 time.Sleep(100 * time.Millisecond)
520 res.Release()
521 }(res)
522 }
523
524 p.Close()
525 assert.Equal(t, len(resources), destructorCalls.Value())
526 }
527
528 func TestPoolCloseIsSafeToCallMultipleTimes(t *testing.T) {
529 constructor, _ := createConstructor()
530
531 p, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
532 require.NoError(t, err)
533
534 p.Close()
535 p.Close()
536 }
537
538 func TestPoolResetDestroysAllIdleResources(t *testing.T) {
539 constructor, _ := createConstructor()
540
541 var destructorCalls Counter
542 destructor := func(int) {
543 destructorCalls.Next()
544 }
545
546 p, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: destructor, MaxSize: 10})
547 require.NoError(t, err)
548
549 resources := make([]*puddle.Resource[int], 4)
550 for i := range resources {
551 var err error
552 resources[i], err = p.Acquire(context.Background())
553 require.Nil(t, err)
554 }
555
556 for _, res := range resources {
557 res.Release()
558 }
559
560 require.EqualValues(t, 4, p.Stat().TotalResources())
561 p.Reset()
562 require.EqualValues(t, 0, p.Stat().TotalResources())
563
564
565 for i := 0; i < 100; i++ {
566 if destructorCalls.Value() == len(resources) {
567 break
568 }
569 time.Sleep(100 * time.Millisecond)
570 }
571 require.Equal(t, len(resources), destructorCalls.Value())
572
573 p.Close()
574 }
575
576 func TestPoolResetDestroysCheckedOutResourcesOnReturn(t *testing.T) {
577 constructor, _ := createConstructor()
578
579 var destructorCalls Counter
580 destructor := func(int) {
581 destructorCalls.Next()
582 }
583
584 p, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: destructor, MaxSize: 10})
585 require.NoError(t, err)
586
587 resources := make([]*puddle.Resource[int], 4)
588 for i := range resources {
589 var err error
590 resources[i], err = p.Acquire(context.Background())
591 require.Nil(t, err)
592 }
593
594 require.EqualValues(t, 4, p.Stat().TotalResources())
595 p.Reset()
596 require.EqualValues(t, 4, p.Stat().TotalResources())
597
598 for _, res := range resources {
599 res.Release()
600 }
601
602 require.EqualValues(t, 0, p.Stat().TotalResources())
603
604
605 for i := 0; i < 100; i++ {
606 if destructorCalls.Value() == len(resources) {
607 break
608 }
609 time.Sleep(100 * time.Millisecond)
610 }
611 require.Equal(t, len(resources), destructorCalls.Value())
612
613 p.Close()
614 }
615
616 func TestPoolStatResources(t *testing.T) {
617 startWaitChan := make(chan struct{})
618 waitingChan := make(chan struct{})
619 endWaitChan := make(chan struct{})
620
621 var constructorCalls Counter
622 constructor := func(ctx context.Context) (int, error) {
623 select {
624 case <-startWaitChan:
625 close(waitingChan)
626 <-endWaitChan
627 default:
628 }
629
630 return constructorCalls.Next(), nil
631 }
632 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
633 require.NoError(t, err)
634 defer pool.Close()
635
636 resAcquired, err := pool.Acquire(context.Background())
637 require.Nil(t, err)
638
639 close(startWaitChan)
640 go func() {
641 res, err := pool.Acquire(context.Background())
642 require.Nil(t, err)
643 res.Release()
644 }()
645 <-waitingChan
646 stat := pool.Stat()
647
648 assert.EqualValues(t, 2, stat.TotalResources())
649 assert.EqualValues(t, 1, stat.ConstructingResources())
650 assert.EqualValues(t, 1, stat.AcquiredResources())
651 assert.EqualValues(t, 0, stat.IdleResources())
652 assert.EqualValues(t, 10, stat.MaxResources())
653
654 resAcquired.Release()
655
656 stat = pool.Stat()
657 assert.EqualValues(t, 2, stat.TotalResources())
658 assert.EqualValues(t, 1, stat.ConstructingResources())
659 assert.EqualValues(t, 0, stat.AcquiredResources())
660 assert.EqualValues(t, 1, stat.IdleResources())
661 assert.EqualValues(t, 10, stat.MaxResources())
662
663 close(endWaitChan)
664 }
665
666 func TestPoolStatSuccessfulAcquireCounters(t *testing.T) {
667 constructor, _ := createConstructor()
668 sleepConstructor := func(ctx context.Context) (int, error) {
669
670 time.Sleep(time.Nanosecond)
671 return constructor(ctx)
672 }
673 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: sleepConstructor, Destructor: stubDestructor, MaxSize: 1})
674 require.NoError(t, err)
675 defer pool.Close()
676
677 res, err := pool.Acquire(context.Background())
678 require.NoError(t, err)
679 res.Release()
680
681 stat := pool.Stat()
682 assert.Equal(t, int64(1), stat.AcquireCount())
683 assert.Equal(t, int64(1), stat.EmptyAcquireCount())
684 assert.True(t, stat.AcquireDuration() > 0, "expected stat.AcquireDuration() > 0 but %v", stat.AcquireDuration())
685 lastAcquireDuration := stat.AcquireDuration()
686
687 res, err = pool.Acquire(context.Background())
688 require.NoError(t, err)
689 res.Release()
690
691 stat = pool.Stat()
692 assert.Equal(t, int64(2), stat.AcquireCount())
693 assert.Equal(t, int64(1), stat.EmptyAcquireCount())
694 assert.True(t, stat.AcquireDuration() > lastAcquireDuration)
695 lastAcquireDuration = stat.AcquireDuration()
696
697 wg := &sync.WaitGroup{}
698 for i := 0; i < 2; i++ {
699 wg.Add(1)
700 go func() {
701 res, err = pool.Acquire(context.Background())
702 require.NoError(t, err)
703 time.Sleep(50 * time.Millisecond)
704 res.Release()
705 wg.Done()
706 }()
707 }
708
709 wg.Wait()
710
711 stat = pool.Stat()
712 assert.Equal(t, int64(4), stat.AcquireCount())
713 assert.Equal(t, int64(2), stat.EmptyAcquireCount())
714 assert.True(t, stat.AcquireDuration() > lastAcquireDuration)
715 lastAcquireDuration = stat.AcquireDuration()
716 }
717
718 func TestPoolStatCanceledAcquireBeforeStart(t *testing.T) {
719 constructor, _ := createConstructor()
720 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
721 require.NoError(t, err)
722 defer pool.Close()
723
724 ctx, cancel := context.WithCancel(context.Background())
725 cancel()
726 _, err = pool.Acquire(ctx)
727 require.Equal(t, context.Canceled, err)
728
729 stat := pool.Stat()
730 assert.Equal(t, int64(0), stat.AcquireCount())
731 assert.Equal(t, int64(1), stat.CanceledAcquireCount())
732 }
733
734 func TestPoolStatCanceledAcquireDuringCreate(t *testing.T) {
735 constructor := func(ctx context.Context) (int, error) {
736 <-ctx.Done()
737 return 0, ctx.Err()
738 }
739
740 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
741 require.NoError(t, err)
742 defer pool.Close()
743
744 ctx, cancel := context.WithCancel(context.Background())
745 time.AfterFunc(50*time.Millisecond, cancel)
746 _, err = pool.Acquire(ctx)
747 require.Equal(t, context.Canceled, err)
748
749
750 time.Sleep(10 * time.Millisecond)
751
752 stat := pool.Stat()
753 assert.Equal(t, int64(0), stat.AcquireCount())
754 assert.Equal(t, int64(1), stat.CanceledAcquireCount())
755 }
756
757 func TestPoolStatCanceledAcquireDuringWait(t *testing.T) {
758 constructor, _ := createConstructor()
759 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
760 require.NoError(t, err)
761 defer pool.Close()
762
763 res, err := pool.Acquire(context.Background())
764 require.Nil(t, err)
765
766 ctx, cancel := context.WithCancel(context.Background())
767 time.AfterFunc(50*time.Millisecond, cancel)
768 _, err = pool.Acquire(ctx)
769 require.Equal(t, context.Canceled, err)
770
771 res.Release()
772
773 stat := pool.Stat()
774 assert.Equal(t, int64(1), stat.AcquireCount())
775 assert.Equal(t, int64(1), stat.CanceledAcquireCount())
776 }
777
778 func TestResourceHijackRemovesResourceFromPoolButDoesNotDestroy(t *testing.T) {
779 constructor, _ := createConstructor()
780 var destructorCalls Counter
781 destructor := func(int) {
782 destructorCalls.Next()
783 }
784
785 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: destructor, MaxSize: 10})
786 require.NoError(t, err)
787
788 res, err := pool.Acquire(context.Background())
789 require.NoError(t, err)
790 assert.Equal(t, 1, res.Value())
791
792 res.Hijack()
793
794 assert.EqualValues(t, 0, pool.Stat().TotalResources())
795 assert.EqualValues(t, 0, destructorCalls.Value())
796
797
798 res.Value()
799 res.CreationTime()
800 res.IdleDuration()
801 }
802
803 func TestResourceDestroyRemovesResourceFromPool(t *testing.T) {
804 constructor, _ := createConstructor()
805 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
806 require.NoError(t, err)
807
808 res, err := pool.Acquire(context.Background())
809 require.NoError(t, err)
810 assert.Equal(t, 1, res.Value())
811
812 assert.EqualValues(t, 1, pool.Stat().TotalResources())
813 res.Destroy()
814 for i := 0; i < 1000; i++ {
815 if pool.Stat().TotalResources() == 0 {
816 break
817 }
818 time.Sleep(time.Millisecond)
819 }
820
821 assert.EqualValues(t, 0, pool.Stat().TotalResources())
822 }
823
824 func TestResourceLastUsageTimeTracking(t *testing.T) {
825 constructor, _ := createConstructor()
826 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 1})
827 require.NoError(t, err)
828
829 res, err := pool.Acquire(context.Background())
830 require.NoError(t, err)
831 t1 := res.LastUsedNanotime()
832 res.Release()
833
834
835 res, err = pool.Acquire(context.Background())
836 require.NoError(t, err)
837 t2 := res.LastUsedNanotime()
838 d2 := res.IdleDuration()
839 assert.True(t, t2 > t1)
840 res.ReleaseUnused()
841
842
843 res, err = pool.Acquire(context.Background())
844 require.NoError(t, err)
845 t3 := res.LastUsedNanotime()
846 d3 := res.IdleDuration()
847 assert.EqualValues(t, t2, t3)
848 assert.True(t, d3 > d2)
849 res.Release()
850
851
852 res, err = pool.Acquire(context.Background())
853 require.NoError(t, err)
854 t4 := res.LastUsedNanotime()
855 assert.True(t, t4 > t3)
856 res.Release()
857 }
858
859 func TestResourcePanicsOnUsageWhenNotAcquired(t *testing.T) {
860 constructor, _ := createConstructor()
861 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
862 require.NoError(t, err)
863
864 res, err := pool.Acquire(context.Background())
865 require.NoError(t, err)
866 res.Release()
867
868 assert.PanicsWithValue(t, "tried to release resource that is not acquired", res.Release)
869 assert.PanicsWithValue(t, "tried to release resource that is not acquired", res.ReleaseUnused)
870 assert.PanicsWithValue(t, "tried to destroy resource that is not acquired", res.Destroy)
871 assert.PanicsWithValue(t, "tried to hijack resource that is not acquired", res.Hijack)
872 assert.PanicsWithValue(t, "tried to access resource that is not acquired or hijacked", func() { res.Value() })
873 assert.PanicsWithValue(t, "tried to access resource that is not acquired or hijacked", func() { res.CreationTime() })
874 assert.PanicsWithValue(t, "tried to access resource that is not acquired or hijacked", func() { res.LastUsedNanotime() })
875 assert.PanicsWithValue(t, "tried to access resource that is not acquired or hijacked", func() { res.IdleDuration() })
876 }
877
878 func TestPoolAcquireReturnsErrorWhenPoolIsClosed(t *testing.T) {
879 constructor, _ := createConstructor()
880 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: 10})
881 require.NoError(t, err)
882 pool.Close()
883
884 res, err := pool.Acquire(context.Background())
885 assert.Equal(t, puddle.ErrClosedPool, err)
886 assert.Nil(t, res)
887 }
888
889 func TestSignalIsSentWhenResourceFailedToCreate(t *testing.T) {
890 var c Counter
891 constructor := func(context.Context) (a any, err error) {
892 if c.Next() == 2 {
893 return nil, errors.New("outage")
894 }
895 return 1, nil
896 }
897 destructor := func(value any) {}
898
899 pool, err := puddle.NewPool(&puddle.Config[any]{Constructor: constructor, Destructor: destructor, MaxSize: 10})
900 require.NoError(t, err)
901
902 res1, err := pool.Acquire(context.Background())
903 require.NoError(t, err)
904
905 var wg sync.WaitGroup
906 for i := 0; i < 2; i++ {
907 wg.Add(1)
908 go func(name string) {
909 defer wg.Done()
910 _, _ = pool.Acquire(context.Background())
911 }(strconv.Itoa(i))
912 }
913
914
915 time.Sleep(500 * time.Millisecond)
916 res1.Destroy()
917 wg.Wait()
918 }
919
920 func stressTestDur(t testing.TB) time.Duration {
921 s := os.Getenv("STRESS_TEST_DURATION")
922 if s == "" {
923 s = "1s"
924 }
925
926 dur, err := time.ParseDuration(s)
927 require.Nil(t, err)
928 return dur
929 }
930
931 func TestStress(t *testing.T) {
932 constructor, _ := createConstructor()
933 var destructorCalls Counter
934 destructor := func(int) {
935 destructorCalls.Next()
936 }
937
938 poolSize := runtime.NumCPU()
939 if poolSize < 4 {
940 poolSize = 4
941 }
942
943 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: destructor, MaxSize: int32(poolSize)})
944 require.NoError(t, err)
945
946 finishChan := make(chan struct{})
947 wg := &sync.WaitGroup{}
948
949 releaseOrDestroyOrHijack := func(res *puddle.Resource[int]) {
950 n := rand.Intn(100)
951 if n < 5 {
952 res.Hijack()
953 destructor(res.Value())
954 } else if n < 10 {
955 res.Destroy()
956 } else {
957 res.Release()
958 }
959 }
960
961 actions := []func(){
962
963 func() {
964 res, err := pool.Acquire(context.Background())
965 if err != nil {
966 if err != puddle.ErrClosedPool {
967 assert.Failf(t, "stress acquire", "pool.Acquire returned unexpected err: %v", err)
968 }
969 return
970 }
971
972 time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
973 releaseOrDestroyOrHijack(res)
974 },
975
976 func() {
977 ctx, cancel := context.WithTimeout(context.Background(), time.Duration(rand.Int63n(2000))*time.Nanosecond)
978 defer cancel()
979 res, err := pool.Acquire(ctx)
980 if err != nil {
981 if err != puddle.ErrClosedPool && err != context.Canceled && err != context.DeadlineExceeded {
982 assert.Failf(t, "stress acquire possibly canceled by context", "pool.Acquire returned unexpected err: %v", err)
983 }
984 return
985 }
986
987 time.Sleep(time.Duration(rand.Int63n(2000)) * time.Nanosecond)
988 releaseOrDestroyOrHijack(res)
989 },
990
991 func() {
992 res, err := pool.TryAcquire(context.Background())
993 if err != nil {
994 if err != puddle.ErrClosedPool && err != puddle.ErrNotAvailable {
995 assert.Failf(t, "stress TryAcquire", "pool.TryAcquire returned unexpected err: %v", err)
996 }
997 return
998 }
999
1000 time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
1001 releaseOrDestroyOrHijack(res)
1002 },
1003
1004 func() {
1005 resources := pool.AcquireAllIdle()
1006 for _, res := range resources {
1007 res.Release()
1008 }
1009 },
1010
1011 func() {
1012 stat := pool.Stat()
1013 assert.NotNil(t, stat)
1014 },
1015
1016 func() {
1017 err := pool.CreateResource(context.Background())
1018 if err != nil && !errors.Is(err, puddle.ErrClosedPool) && !errors.Is(err, puddle.ErrNotAvailable) {
1019 t.Error(err)
1020 }
1021 },
1022 }
1023
1024 workerCount := int(poolSize) * 2
1025
1026 for i := 0; i < workerCount; i++ {
1027 wg.Add(1)
1028 go func() {
1029 defer wg.Done()
1030 for {
1031 select {
1032 case <-finishChan:
1033 return
1034 default:
1035 }
1036
1037 actions[rand.Intn(len(actions))]()
1038 }
1039 }()
1040 }
1041
1042 time.AfterFunc(stressTestDur(t), func() { close(finishChan) })
1043 wg.Wait()
1044 pool.Close()
1045 }
1046
1047 func TestStress_AcquireAllIdle_TryAcquire(t *testing.T) {
1048 r := require.New(t)
1049
1050 pool := testPool[int32](t)
1051
1052 var wg sync.WaitGroup
1053 done := make(chan struct{})
1054
1055 wg.Add(1)
1056 go func() {
1057 defer wg.Done()
1058
1059 for {
1060 select {
1061 case <-done:
1062 return
1063 default:
1064 }
1065
1066 idleRes := pool.AcquireAllIdle()
1067 r.Less(len(idleRes), 2)
1068 for _, res := range idleRes {
1069 res.Release()
1070 }
1071 }
1072 }()
1073
1074 wg.Add(1)
1075 go func() {
1076 defer wg.Done()
1077
1078 for {
1079 select {
1080 case <-done:
1081 return
1082 default:
1083 }
1084
1085 res, err := pool.TryAcquire(context.Background())
1086 if err != nil {
1087 r.Equal(puddle.ErrNotAvailable, err)
1088 } else {
1089 r.NotNil(res)
1090 res.Release()
1091 }
1092 }
1093 }()
1094
1095 time.AfterFunc(stressTestDur(t), func() { close(done) })
1096 wg.Wait()
1097 }
1098
1099 func TestStress_AcquireAllIdle_Acquire(t *testing.T) {
1100 r := require.New(t)
1101
1102 pool := testPool[int32](t)
1103
1104 var wg sync.WaitGroup
1105 done := make(chan struct{})
1106
1107 wg.Add(1)
1108 go func() {
1109 defer wg.Done()
1110
1111 for {
1112 select {
1113 case <-done:
1114 return
1115 default:
1116 }
1117
1118 idleRes := pool.AcquireAllIdle()
1119 r.Less(len(idleRes), 2)
1120 for _, res := range idleRes {
1121 r.NotNil(res)
1122 res.Release()
1123 }
1124 }
1125 }()
1126
1127 wg.Add(1)
1128 go func() {
1129 defer wg.Done()
1130
1131 for {
1132 select {
1133 case <-done:
1134 return
1135 default:
1136 }
1137
1138 res, err := pool.Acquire(context.Background())
1139 if err != nil {
1140 r.Equal(puddle.ErrNotAvailable, err)
1141 } else {
1142 r.NotNil(res)
1143 res.Release()
1144 }
1145 }
1146 }()
1147
1148 time.AfterFunc(stressTestDur(t), func() { close(done) })
1149 wg.Wait()
1150 }
1151
1152 func startAcceptOnceDummyServer(laddr string) {
1153 ln, err := net.Listen("tcp", laddr)
1154 if err != nil {
1155 log.Fatalln("Listen:", err)
1156 }
1157
1158
1159 go func() {
1160 conn, err := ln.Accept()
1161 if err != nil {
1162 log.Fatalln("Accept:", err)
1163 }
1164
1165 for {
1166 buf := make([]byte, 1)
1167 _, err := conn.Read(buf)
1168 if err != nil {
1169 return
1170 }
1171 }
1172 }()
1173
1174 }
1175
1176 func ExamplePool() {
1177
1178 laddr := "127.0.0.1:8080"
1179 startAcceptOnceDummyServer(laddr)
1180
1181
1182 constructor := func(context.Context) (any, error) {
1183 return net.Dial("tcp", laddr)
1184 }
1185 destructor := func(value any) {
1186 value.(net.Conn).Close()
1187 }
1188 maxPoolSize := int32(10)
1189
1190 pool, err := puddle.NewPool(&puddle.Config[any]{Constructor: constructor, Destructor: destructor, MaxSize: int32(maxPoolSize)})
1191 if err != nil {
1192 log.Fatalln("NewPool", err)
1193 }
1194
1195
1196 for i := 0; i < 10; i++ {
1197
1198 res, err := pool.Acquire(context.Background())
1199 if err != nil {
1200 log.Fatalln("Acquire", err)
1201 }
1202
1203
1204 _, err = res.Value().(net.Conn).Write([]byte{1})
1205 if err != nil {
1206 log.Fatalln("Write", err)
1207 }
1208
1209
1210 res.Release()
1211 }
1212
1213 stats := pool.Stat()
1214 pool.Close()
1215
1216 fmt.Println("Connections:", stats.TotalResources())
1217 fmt.Println("Acquires:", stats.AcquireCount())
1218
1219
1220
1221 }
1222
1223 func BenchmarkPoolAcquireAndRelease(b *testing.B) {
1224 benchmarks := []struct {
1225 poolSize int32
1226 clientCount int
1227 cancellable bool
1228 }{
1229 {8, 1, false},
1230 {8, 2, false},
1231 {8, 8, false},
1232 {8, 32, false},
1233 {8, 128, false},
1234 {8, 512, false},
1235 {8, 2048, false},
1236 {8, 8192, false},
1237
1238 {64, 2, false},
1239 {64, 8, false},
1240 {64, 32, false},
1241 {64, 128, false},
1242 {64, 512, false},
1243 {64, 2048, false},
1244 {64, 8192, false},
1245
1246 {512, 2, false},
1247 {512, 8, false},
1248 {512, 32, false},
1249 {512, 128, false},
1250 {512, 512, false},
1251 {512, 2048, false},
1252 {512, 8192, false},
1253
1254 {8, 2, true},
1255 {8, 8, true},
1256 {8, 32, true},
1257 {8, 128, true},
1258 {8, 512, true},
1259 {8, 2048, true},
1260 {8, 8192, true},
1261
1262 {64, 2, true},
1263 {64, 8, true},
1264 {64, 32, true},
1265 {64, 128, true},
1266 {64, 512, true},
1267 {64, 2048, true},
1268 {64, 8192, true},
1269
1270 {512, 2, true},
1271 {512, 8, true},
1272 {512, 32, true},
1273 {512, 128, true},
1274 {512, 512, true},
1275 {512, 2048, true},
1276 {512, 8192, true},
1277 }
1278
1279 for _, bm := range benchmarks {
1280 name := fmt.Sprintf("PoolSize=%d/ClientCount=%d/Cancellable=%v", bm.poolSize, bm.clientCount, bm.cancellable)
1281
1282 b.Run(name, func(b *testing.B) {
1283 ctx := context.Background()
1284 cancel := func() {}
1285 if bm.cancellable {
1286 ctx, cancel = context.WithCancel(ctx)
1287 }
1288
1289 wg := &sync.WaitGroup{}
1290
1291 constructor, _ := createConstructor()
1292 pool, err := puddle.NewPool(&puddle.Config[int]{Constructor: constructor, Destructor: stubDestructor, MaxSize: bm.poolSize})
1293 if err != nil {
1294 b.Fatal(err)
1295 }
1296
1297 for i := 0; i < bm.clientCount; i++ {
1298 wg.Add(1)
1299 go func() {
1300 defer wg.Done()
1301
1302 for j := 0; j < b.N; j++ {
1303 res, err := pool.Acquire(ctx)
1304 if err != nil {
1305 b.Fatal(err)
1306 }
1307 res.Release()
1308 }
1309 }()
1310 }
1311
1312 wg.Wait()
1313 cancel()
1314 })
1315 }
1316 }
1317
1318 func TestAcquireAllSem(t *testing.T) {
1319 r := require.New(t)
1320
1321 sem := semaphore.NewWeighted(5)
1322 r.Equal(4, puddle.AcquireSemAll(sem, 4))
1323 sem.Release(4)
1324
1325 r.Equal(5, puddle.AcquireSemAll(sem, 5))
1326 sem.Release(5)
1327
1328 r.Equal(5, puddle.AcquireSemAll(sem, 6))
1329 sem.Release(5)
1330 }
1331
1332 func testPool[T any](t testing.TB) *puddle.Pool[T] {
1333 cfg := puddle.Config[T]{
1334 MaxSize: 1,
1335 Constructor: func(ctx context.Context) (T, error) {
1336 var zero T
1337 return zero, nil
1338 },
1339 Destructor: func(T) {},
1340 }
1341
1342 pool, err := puddle.NewPool(&cfg)
1343 require.NoError(t, err)
1344 t.Cleanup(pool.Close)
1345
1346 return pool
1347 }
1348
1349 func releaser[T any](t testing.TB) chan<- *puddle.Resource[T] {
1350 startChan := make(chan struct{})
1351 workChan := make(chan *puddle.Resource[T], 1)
1352
1353 go func() {
1354 close(startChan)
1355
1356 for r := range workChan {
1357 r.Release()
1358 }
1359 }()
1360 t.Cleanup(func() { close(workChan) })
1361
1362
1363 <-startChan
1364 return workChan
1365 }
1366
1367 func TestReleaseAfterAcquire(t *testing.T) {
1368 const cnt = 100000
1369
1370 r := require.New(t)
1371 ctx := context.Background()
1372 pool := testPool[int32](t)
1373 releaseChan := releaser[int32](t)
1374
1375 res, err := pool.Acquire(ctx)
1376 r.NoError(err)
1377
1378
1379 defer func() { res.Release() }()
1380
1381 for i := 0; i < cnt; i++ {
1382 releaseChan <- res
1383 res, err = pool.Acquire(ctx)
1384 r.NoError(err)
1385 }
1386 }
1387
1388 func BenchmarkAcquire_ReleaseAfterAcquire(b *testing.B) {
1389 r := require.New(b)
1390 ctx := context.Background()
1391 pool := testPool[int32](b)
1392 releaseChan := releaser[int32](b)
1393
1394 res, err := pool.Acquire(ctx)
1395 r.NoError(err)
1396
1397
1398 defer func() { res.Release() }()
1399
1400 b.ResetTimer()
1401 for i := 0; i < b.N; i++ {
1402 releaseChan <- res
1403 res, err = pool.Acquire(ctx)
1404 r.NoError(err)
1405 }
1406 }
1407
1408 func withCPULoad() {
1409
1410 numGoroutines := runtime.NumCPU() * 2
1411
1412 var wg sync.WaitGroup
1413 for i := 0; i < numGoroutines; i++ {
1414 wg.Add(1)
1415 go func() {
1416 wg.Done()
1417
1418
1419 for j := 0; true; j++ {
1420 }
1421 }()
1422 }
1423
1424 wg.Wait()
1425 }
1426
1427 func BenchmarkAcquire_ReleaseAfterAcquireWithCPULoad(b *testing.B) {
1428 r := require.New(b)
1429 ctx := context.Background()
1430 pool := testPool[int32](b)
1431 releaseChan := releaser[int32](b)
1432
1433 withCPULoad()
1434
1435 res, err := pool.Acquire(ctx)
1436 r.NoError(err)
1437
1438
1439 defer func() { res.Release() }()
1440
1441 b.ResetTimer()
1442 for i := 0; i < b.N; i++ {
1443 releaseChan <- res
1444 res, err = pool.Acquire(ctx)
1445 r.NoError(err)
1446 }
1447 }
1448
1449 func BenchmarkAcquire_MultipleCancelled(b *testing.B) {
1450 const cancelCnt = 64
1451
1452 r := require.New(b)
1453 ctx := context.Background()
1454 pool := testPool[int32](b)
1455 releaseChan := releaser[int32](b)
1456
1457 cancelCtx, cancel := context.WithCancel(ctx)
1458 cancel()
1459
1460 res, err := pool.Acquire(ctx)
1461 r.NoError(err)
1462
1463
1464 defer func() { res.Release() }()
1465
1466 b.ResetTimer()
1467 for i := 0; i < b.N; i++ {
1468 for j := 0; j < cancelCnt; j++ {
1469 _, err = pool.AcquireRaw(cancelCtx)
1470 r.Equal(context.Canceled, err)
1471 }
1472
1473 releaseChan <- res
1474 res, err = pool.Acquire(ctx)
1475 r.NoError(err)
1476 }
1477 }
1478
1479 func BenchmarkAcquire_MultipleCancelledWithCPULoad(b *testing.B) {
1480 const cancelCnt = 3
1481
1482 r := require.New(b)
1483 ctx := context.Background()
1484 pool := testPool[int32](b)
1485 releaseChan := releaser[int32](b)
1486
1487 cancelCtx, cancel := context.WithCancel(ctx)
1488 cancel()
1489
1490 withCPULoad()
1491
1492 res, err := pool.Acquire(ctx)
1493 r.NoError(err)
1494
1495
1496 defer func() { res.Release() }()
1497
1498 b.ResetTimer()
1499 for i := 0; i < b.N; i++ {
1500 for j := 0; j < cancelCnt; j++ {
1501 _, err = pool.AcquireRaw(cancelCtx)
1502 r.Equal(context.Canceled, err)
1503 }
1504
1505 releaseChan <- res
1506 res, err = pool.Acquire(ctx)
1507 r.NoError(err)
1508 }
1509 }
1510
View as plain text