1 package puddle
2
3 import (
4 "context"
5 "errors"
6 "sync"
7 "sync/atomic"
8 "time"
9
10 "github.com/jackc/puddle/v2/internal/genstack"
11 "golang.org/x/sync/semaphore"
12 )
13
14 const (
15 resourceStatusConstructing = 0
16 resourceStatusIdle = iota
17 resourceStatusAcquired = iota
18 resourceStatusHijacked = iota
19 )
20
21
22
23 var ErrClosedPool = errors.New("closed pool")
24
25
26
27 var ErrNotAvailable = errors.New("resource not available")
28
29
30 type Constructor[T any] func(ctx context.Context) (res T, err error)
31
32
33 type Destructor[T any] func(res T)
34
35
36 type Resource[T any] struct {
37 value T
38 pool *Pool[T]
39 creationTime time.Time
40 lastUsedNano int64
41 poolResetCount int
42 status byte
43 }
44
45
46 func (res *Resource[T]) Value() T {
47 if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
48 panic("tried to access resource that is not acquired or hijacked")
49 }
50 return res.value
51 }
52
53
54 func (res *Resource[T]) Release() {
55 if res.status != resourceStatusAcquired {
56 panic("tried to release resource that is not acquired")
57 }
58 res.pool.releaseAcquiredResource(res, nanotime())
59 }
60
61
62
63 func (res *Resource[T]) ReleaseUnused() {
64 if res.status != resourceStatusAcquired {
65 panic("tried to release resource that is not acquired")
66 }
67 res.pool.releaseAcquiredResource(res, res.lastUsedNano)
68 }
69
70
71
72 func (res *Resource[T]) Destroy() {
73 if res.status != resourceStatusAcquired {
74 panic("tried to destroy resource that is not acquired")
75 }
76 go res.pool.destroyAcquiredResource(res)
77 }
78
79
80
81 func (res *Resource[T]) Hijack() {
82 if res.status != resourceStatusAcquired {
83 panic("tried to hijack resource that is not acquired")
84 }
85 res.pool.hijackAcquiredResource(res)
86 }
87
88
89 func (res *Resource[T]) CreationTime() time.Time {
90 if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
91 panic("tried to access resource that is not acquired or hijacked")
92 }
93 return res.creationTime
94 }
95
96
97
98
99 func (res *Resource[T]) LastUsedNanotime() int64 {
100 if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
101 panic("tried to access resource that is not acquired or hijacked")
102 }
103
104 return res.lastUsedNano
105 }
106
107
108
109 func (res *Resource[T]) IdleDuration() time.Duration {
110 if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
111 panic("tried to access resource that is not acquired or hijacked")
112 }
113
114 return time.Duration(nanotime() - res.lastUsedNano)
115 }
116
117
118 type Pool[T any] struct {
119
120
121
122
123 mux sync.Mutex
124
125
126
127
128
129 acquireSem *semaphore.Weighted
130 destructWG sync.WaitGroup
131
132 allResources resList[T]
133 idleResources *genstack.GenStack[*Resource[T]]
134
135 constructor Constructor[T]
136 destructor Destructor[T]
137 maxSize int32
138
139 acquireCount int64
140 acquireDuration time.Duration
141 emptyAcquireCount int64
142 canceledAcquireCount atomic.Int64
143
144 resetCount int
145
146 baseAcquireCtx context.Context
147 cancelBaseAcquireCtx context.CancelFunc
148 closed bool
149 }
150
151 type Config[T any] struct {
152 Constructor Constructor[T]
153 Destructor Destructor[T]
154 MaxSize int32
155 }
156
157
158 func NewPool[T any](config *Config[T]) (*Pool[T], error) {
159 if config.MaxSize < 1 {
160 return nil, errors.New("MaxSize must be >= 1")
161 }
162
163 baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background())
164
165 return &Pool[T]{
166 acquireSem: semaphore.NewWeighted(int64(config.MaxSize)),
167 idleResources: genstack.NewGenStack[*Resource[T]](),
168 maxSize: config.MaxSize,
169 constructor: config.Constructor,
170 destructor: config.Destructor,
171 baseAcquireCtx: baseAcquireCtx,
172 cancelBaseAcquireCtx: cancelBaseAcquireCtx,
173 }, nil
174 }
175
176
177
178 func (p *Pool[T]) Close() {
179 defer p.destructWG.Wait()
180
181 p.mux.Lock()
182 defer p.mux.Unlock()
183
184 if p.closed {
185 return
186 }
187 p.closed = true
188 p.cancelBaseAcquireCtx()
189
190 for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
191 p.allResources.remove(res)
192 go p.destructResourceValue(res.value)
193 }
194 }
195
196
197 type Stat struct {
198 constructingResources int32
199 acquiredResources int32
200 idleResources int32
201 maxResources int32
202 acquireCount int64
203 acquireDuration time.Duration
204 emptyAcquireCount int64
205 canceledAcquireCount int64
206 }
207
208
209
210
211 func (s *Stat) TotalResources() int32 {
212 return s.constructingResources + s.acquiredResources + s.idleResources
213 }
214
215
216
217 func (s *Stat) ConstructingResources() int32 {
218 return s.constructingResources
219 }
220
221
222 func (s *Stat) AcquiredResources() int32 {
223 return s.acquiredResources
224 }
225
226
227 func (s *Stat) IdleResources() int32 {
228 return s.idleResources
229 }
230
231
232 func (s *Stat) MaxResources() int32 {
233 return s.maxResources
234 }
235
236
237 func (s *Stat) AcquireCount() int64 {
238 return s.acquireCount
239 }
240
241
242
243 func (s *Stat) AcquireDuration() time.Duration {
244 return s.acquireDuration
245 }
246
247
248
249
250 func (s *Stat) EmptyAcquireCount() int64 {
251 return s.emptyAcquireCount
252 }
253
254
255
256 func (s *Stat) CanceledAcquireCount() int64 {
257 return s.canceledAcquireCount
258 }
259
260
261 func (p *Pool[T]) Stat() *Stat {
262 p.mux.Lock()
263 defer p.mux.Unlock()
264
265 s := &Stat{
266 maxResources: p.maxSize,
267 acquireCount: p.acquireCount,
268 emptyAcquireCount: p.emptyAcquireCount,
269 canceledAcquireCount: p.canceledAcquireCount.Load(),
270 acquireDuration: p.acquireDuration,
271 }
272
273 for _, res := range p.allResources {
274 switch res.status {
275 case resourceStatusConstructing:
276 s.constructingResources += 1
277 case resourceStatusIdle:
278 s.idleResources += 1
279 case resourceStatusAcquired:
280 s.acquiredResources += 1
281 }
282 }
283
284 return s
285 }
286
287
288
289
290
291
292 func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
293 res, ok := p.idleResources.Pop()
294 if !ok {
295 return nil
296 }
297
298 res.status = resourceStatusAcquired
299 return res
300 }
301
302
303
304
305
306 func (p *Pool[T]) createNewResource() *Resource[T] {
307 res := &Resource[T]{
308 pool: p,
309 creationTime: time.Now(),
310 lastUsedNano: nanotime(),
311 poolResetCount: p.resetCount,
312 status: resourceStatusConstructing,
313 }
314
315 p.allResources.append(res)
316 p.destructWG.Add(1)
317
318 return res
319 }
320
321
322
323
324
325
326
327
328
329 func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
330 select {
331 case <-ctx.Done():
332 p.canceledAcquireCount.Add(1)
333 return nil, ctx.Err()
334 default:
335 }
336
337 return p.acquire(ctx)
338 }
339
340
341
342
343
344 func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
345 startNano := nanotime()
346
347 var waitedForLock bool
348 if !p.acquireSem.TryAcquire(1) {
349 waitedForLock = true
350 err := p.acquireSem.Acquire(ctx, 1)
351 if err != nil {
352 p.canceledAcquireCount.Add(1)
353 return nil, err
354 }
355 }
356
357 p.mux.Lock()
358 if p.closed {
359 p.acquireSem.Release(1)
360 p.mux.Unlock()
361 return nil, ErrClosedPool
362 }
363
364
365 if res := p.tryAcquireIdleResource(); res != nil {
366 if waitedForLock {
367 p.emptyAcquireCount += 1
368 }
369 p.acquireCount += 1
370 p.acquireDuration += time.Duration(nanotime() - startNano)
371 p.mux.Unlock()
372 return res, nil
373 }
374
375 if len(p.allResources) >= int(p.maxSize) {
376
377 panic("bug: semaphore allowed more acquires than pool allows")
378 }
379
380
381 res := p.createNewResource()
382 p.mux.Unlock()
383
384 res, err := p.initResourceValue(ctx, res)
385 if err != nil {
386 return nil, err
387 }
388
389 p.mux.Lock()
390 defer p.mux.Unlock()
391
392 p.emptyAcquireCount += 1
393 p.acquireCount += 1
394 p.acquireDuration += time.Duration(nanotime() - startNano)
395
396 return res, nil
397 }
398
399 func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) {
400
401
402
403
404
405
406 constructErrChan := make(chan error)
407 go func() {
408 constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx)
409 value, err := p.constructor(constructorCtx)
410 if err != nil {
411 p.mux.Lock()
412 p.allResources.remove(res)
413 p.destructWG.Done()
414
415
416
417
418 p.acquireSem.Release(1)
419 p.mux.Unlock()
420
421 select {
422 case constructErrChan <- err:
423 case <-ctx.Done():
424
425
426 }
427 return
428 }
429
430
431
432 p.mux.Lock()
433 res.value = value
434 res.status = resourceStatusAcquired
435 p.mux.Unlock()
436
437
438 select {
439 case constructErrChan <- nil:
440 case <-ctx.Done():
441 p.releaseAcquiredResource(res, res.lastUsedNano)
442 }
443 }()
444
445 select {
446 case <-ctx.Done():
447 p.canceledAcquireCount.Add(1)
448 return nil, ctx.Err()
449 case err := <-constructErrChan:
450 if err != nil {
451 return nil, err
452 }
453 return res, nil
454 }
455 }
456
457
458
459
460 func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
461 if !p.acquireSem.TryAcquire(1) {
462 return nil, ErrNotAvailable
463 }
464
465 p.mux.Lock()
466 defer p.mux.Unlock()
467
468 if p.closed {
469 p.acquireSem.Release(1)
470 return nil, ErrClosedPool
471 }
472
473
474 if res := p.tryAcquireIdleResource(); res != nil {
475 p.acquireCount += 1
476 return res, nil
477 }
478
479 if len(p.allResources) >= int(p.maxSize) {
480
481 panic("bug: semaphore allowed more acquires than pool allows")
482 }
483
484 res := p.createNewResource()
485 go func() {
486 value, err := p.constructor(ctx)
487
488 p.mux.Lock()
489 defer p.mux.Unlock()
490
491
492
493 defer p.acquireSem.Release(1)
494
495 if err != nil {
496 p.allResources.remove(res)
497 p.destructWG.Done()
498 return
499 }
500
501 res.value = value
502 res.status = resourceStatusIdle
503 p.idleResources.Push(res)
504 }()
505
506 return nil, ErrNotAvailable
507 }
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531 func acquireSemAll(sem *semaphore.Weighted, num int) int {
532 if sem.TryAcquire(int64(num)) {
533 return num
534 }
535
536 var acquired int
537 for i := int(log2Int(num)); i >= 0; i-- {
538 val := 1 << i
539 if sem.TryAcquire(int64(val)) {
540 acquired += val
541 }
542 }
543
544 return acquired
545 }
546
547
548
549
550 func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
551 p.mux.Lock()
552 defer p.mux.Unlock()
553
554 if p.closed {
555 return nil
556 }
557
558 numIdle := p.idleResources.Len()
559 if numIdle == 0 {
560 return nil
561 }
562
563
564
565
566
567
568
569
570
571
572
573 acquired := acquireSemAll(p.acquireSem, numIdle)
574
575 idle := make([]*Resource[T], acquired)
576 for i := range idle {
577 res, _ := p.idleResources.Pop()
578 res.status = resourceStatusAcquired
579 idle[i] = res
580 }
581
582
583
584
585
586 p.idleResources.NextGen()
587
588 return idle
589 }
590
591
592
593 func (p *Pool[T]) CreateResource(ctx context.Context) error {
594 if !p.acquireSem.TryAcquire(1) {
595 return ErrNotAvailable
596 }
597
598 p.mux.Lock()
599 if p.closed {
600 p.acquireSem.Release(1)
601 p.mux.Unlock()
602 return ErrClosedPool
603 }
604
605 if len(p.allResources) >= int(p.maxSize) {
606 p.acquireSem.Release(1)
607 p.mux.Unlock()
608 return ErrNotAvailable
609 }
610
611 res := p.createNewResource()
612 p.mux.Unlock()
613
614 value, err := p.constructor(ctx)
615 p.mux.Lock()
616 defer p.mux.Unlock()
617 defer p.acquireSem.Release(1)
618 if err != nil {
619 p.allResources.remove(res)
620 p.destructWG.Done()
621 return err
622 }
623
624 res.value = value
625 res.status = resourceStatusIdle
626
627
628 if p.closed {
629 go p.destructResourceValue(res.value)
630 return ErrClosedPool
631 }
632
633 p.idleResources.Push(res)
634
635 return nil
636 }
637
638
639
640
641
642
643 func (p *Pool[T]) Reset() {
644 p.mux.Lock()
645 defer p.mux.Unlock()
646
647 p.resetCount++
648
649 for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
650 p.allResources.remove(res)
651 go p.destructResourceValue(res.value)
652 }
653 }
654
655
656 func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
657 p.mux.Lock()
658 defer p.mux.Unlock()
659 defer p.acquireSem.Release(1)
660
661 if p.closed || res.poolResetCount != p.resetCount {
662 p.allResources.remove(res)
663 go p.destructResourceValue(res.value)
664 } else {
665 res.lastUsedNano = lastUsedNano
666 res.status = resourceStatusIdle
667 p.idleResources.Push(res)
668 }
669 }
670
671
672
673 func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
674 p.destructResourceValue(res.value)
675
676 p.mux.Lock()
677 defer p.mux.Unlock()
678 defer p.acquireSem.Release(1)
679
680 p.allResources.remove(res)
681 }
682
683 func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
684 p.mux.Lock()
685 defer p.mux.Unlock()
686 defer p.acquireSem.Release(1)
687
688 p.allResources.remove(res)
689 res.status = resourceStatusHijacked
690 p.destructWG.Done()
691 }
692
693 func (p *Pool[T]) destructResourceValue(value T) {
694 p.destructor(value)
695 p.destructWG.Done()
696 }
697
View as plain text