
Source file src/github.com/jackc/puddle/v2/pool.go

Documentation: github.com/jackc/puddle/v2

     1  package puddle
     3  import (
     4  	"context"
     5  	"errors"
     6  	"sync"
     7  	"sync/atomic"
     8  	"time"
    10  	"github.com/jackc/puddle/v2/internal/genstack"
    11  	"golang.org/x/sync/semaphore"
    12  )
    14  const (
    15  	resourceStatusConstructing = 0
    16  	resourceStatusIdle         = iota
    17  	resourceStatusAcquired     = iota
    18  	resourceStatusHijacked     = iota
    19  )
    21  // ErrClosedPool occurs on an attempt to acquire a connection from a closed pool
    22  // or a pool that is closed while the acquire is waiting.
    23  var ErrClosedPool = errors.New("closed pool")
    25  // ErrNotAvailable occurs on an attempt to acquire a resource from a pool
    26  // that is at maximum capacity and has no available resources.
    27  var ErrNotAvailable = errors.New("resource not available")
    29  // Constructor is a function called by the pool to construct a resource.
    30  type Constructor[T any] func(ctx context.Context) (res T, err error)
    32  // Destructor is a function called by the pool to destroy a resource.
    33  type Destructor[T any] func(res T)
    35  // Resource is the resource handle returned by acquiring from the pool.
    36  type Resource[T any] struct {
    37  	value          T
    38  	pool           *Pool[T]
    39  	creationTime   time.Time
    40  	lastUsedNano   int64
    41  	poolResetCount int
    42  	status         byte
    43  }
    45  // Value returns the resource value.
    46  func (res *Resource[T]) Value() T {
    47  	if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
    48  		panic("tried to access resource that is not acquired or hijacked")
    49  	}
    50  	return res.value
    51  }
    53  // Release returns the resource to the pool. res must not be subsequently used.
    54  func (res *Resource[T]) Release() {
    55  	if res.status != resourceStatusAcquired {
    56  		panic("tried to release resource that is not acquired")
    57  	}
    58  	res.pool.releaseAcquiredResource(res, nanotime())
    59  }
    61  // ReleaseUnused returns the resource to the pool without updating when it was last used used. i.e. LastUsedNanotime
    62  // will not change. res must not be subsequently used.
    63  func (res *Resource[T]) ReleaseUnused() {
    64  	if res.status != resourceStatusAcquired {
    65  		panic("tried to release resource that is not acquired")
    66  	}
    67  	res.pool.releaseAcquiredResource(res, res.lastUsedNano)
    68  }
    70  // Destroy returns the resource to the pool for destruction. res must not be
    71  // subsequently used.
    72  func (res *Resource[T]) Destroy() {
    73  	if res.status != resourceStatusAcquired {
    74  		panic("tried to destroy resource that is not acquired")
    75  	}
    76  	go res.pool.destroyAcquiredResource(res)
    77  }
    79  // Hijack assumes ownership of the resource from the pool. Caller is responsible
    80  // for cleanup of resource value.
    81  func (res *Resource[T]) Hijack() {
    82  	if res.status != resourceStatusAcquired {
    83  		panic("tried to hijack resource that is not acquired")
    84  	}
    85  	res.pool.hijackAcquiredResource(res)
    86  }
    88  // CreationTime returns when the resource was created by the pool.
    89  func (res *Resource[T]) CreationTime() time.Time {
    90  	if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
    91  		panic("tried to access resource that is not acquired or hijacked")
    92  	}
    93  	return res.creationTime
    94  }
    96  // LastUsedNanotime returns when Release was last called on the resource measured in nanoseconds from an arbitrary time
    97  // (a monotonic time). Returns creation time if Release has never been called. This is only useful to compare with
    98  // other calls to LastUsedNanotime. In almost all cases, IdleDuration should be used instead.
    99  func (res *Resource[T]) LastUsedNanotime() int64 {
   100  	if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
   101  		panic("tried to access resource that is not acquired or hijacked")
   102  	}
   104  	return res.lastUsedNano
   105  }
   107  // IdleDuration returns the duration since Release was last called on the resource. This is equivalent to subtracting
   108  // LastUsedNanotime to the current nanotime.
   109  func (res *Resource[T]) IdleDuration() time.Duration {
   110  	if !(res.status == resourceStatusAcquired || res.status == resourceStatusHijacked) {
   111  		panic("tried to access resource that is not acquired or hijacked")
   112  	}
   114  	return time.Duration(nanotime() - res.lastUsedNano)
   115  }
   117  // Pool is a concurrency-safe resource pool.
   118  type Pool[T any] struct {
   119  	// mux is the pool internal lock. Any modification of shared state of
   120  	// the pool (but Acquires of acquireSem) must be performed only by
   121  	// holder of the lock. Long running operations are not allowed when mux
   122  	// is held.
   123  	mux sync.Mutex
   124  	// acquireSem provides an allowance to acquire a resource.
   125  	//
   126  	// Releases are allowed only when caller holds mux. Acquires have to
   127  	// happen before mux is locked (doesn't apply to semaphore.TryAcquire in
   128  	// AcquireAllIdle).
   129  	acquireSem *semaphore.Weighted
   130  	destructWG sync.WaitGroup
   132  	allResources  resList[T]
   133  	idleResources *genstack.GenStack[*Resource[T]]
   135  	constructor Constructor[T]
   136  	destructor  Destructor[T]
   137  	maxSize     int32
   139  	acquireCount         int64
   140  	acquireDuration      time.Duration
   141  	emptyAcquireCount    int64
   142  	canceledAcquireCount atomic.Int64
   144  	resetCount int
   146  	baseAcquireCtx       context.Context
   147  	cancelBaseAcquireCtx context.CancelFunc
   148  	closed               bool
   149  }
   151  type Config[T any] struct {
   152  	Constructor Constructor[T]
   153  	Destructor  Destructor[T]
   154  	MaxSize     int32
   155  }
   157  // NewPool creates a new pool. Panics if maxSize is less than 1.
   158  func NewPool[T any](config *Config[T]) (*Pool[T], error) {
   159  	if config.MaxSize < 1 {
   160  		return nil, errors.New("MaxSize must be >= 1")
   161  	}
   163  	baseAcquireCtx, cancelBaseAcquireCtx := context.WithCancel(context.Background())
   165  	return &Pool[T]{
   166  		acquireSem:           semaphore.NewWeighted(int64(config.MaxSize)),
   167  		idleResources:        genstack.NewGenStack[*Resource[T]](),
   168  		maxSize:              config.MaxSize,
   169  		constructor:          config.Constructor,
   170  		destructor:           config.Destructor,
   171  		baseAcquireCtx:       baseAcquireCtx,
   172  		cancelBaseAcquireCtx: cancelBaseAcquireCtx,
   173  	}, nil
   174  }
   176  // Close destroys all resources in the pool and rejects future Acquire calls.
   177  // Blocks until all resources are returned to pool and destroyed.
   178  func (p *Pool[T]) Close() {
   179  	defer p.destructWG.Wait()
   181  	p.mux.Lock()
   182  	defer p.mux.Unlock()
   184  	if p.closed {
   185  		return
   186  	}
   187  	p.closed = true
   188  	p.cancelBaseAcquireCtx()
   190  	for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
   191  		p.allResources.remove(res)
   192  		go p.destructResourceValue(res.value)
   193  	}
   194  }
   196  // Stat is a snapshot of Pool statistics.
   197  type Stat struct {
   198  	constructingResources int32
   199  	acquiredResources     int32
   200  	idleResources         int32
   201  	maxResources          int32
   202  	acquireCount          int64
   203  	acquireDuration       time.Duration
   204  	emptyAcquireCount     int64
   205  	canceledAcquireCount  int64
   206  }
   208  // TotalResources returns the total number of resources currently in the pool.
   209  // The value is the sum of ConstructingResources, AcquiredResources, and
   210  // IdleResources.
   211  func (s *Stat) TotalResources() int32 {
   212  	return s.constructingResources + s.acquiredResources + s.idleResources
   213  }
   215  // ConstructingResources returns the number of resources with construction in progress in
   216  // the pool.
   217  func (s *Stat) ConstructingResources() int32 {
   218  	return s.constructingResources
   219  }
   221  // AcquiredResources returns the number of currently acquired resources in the pool.
   222  func (s *Stat) AcquiredResources() int32 {
   223  	return s.acquiredResources
   224  }
   226  // IdleResources returns the number of currently idle resources in the pool.
   227  func (s *Stat) IdleResources() int32 {
   228  	return s.idleResources
   229  }
   231  // MaxResources returns the maximum size of the pool.
   232  func (s *Stat) MaxResources() int32 {
   233  	return s.maxResources
   234  }
   236  // AcquireCount returns the cumulative count of successful acquires from the pool.
   237  func (s *Stat) AcquireCount() int64 {
   238  	return s.acquireCount
   239  }
   241  // AcquireDuration returns the total duration of all successful acquires from
   242  // the pool.
   243  func (s *Stat) AcquireDuration() time.Duration {
   244  	return s.acquireDuration
   245  }
   247  // EmptyAcquireCount returns the cumulative count of successful acquires from the pool
   248  // that waited for a resource to be released or constructed because the pool was
   249  // empty.
   250  func (s *Stat) EmptyAcquireCount() int64 {
   251  	return s.emptyAcquireCount
   252  }
   254  // CanceledAcquireCount returns the cumulative count of acquires from the pool
   255  // that were canceled by a context.
   256  func (s *Stat) CanceledAcquireCount() int64 {
   257  	return s.canceledAcquireCount
   258  }
   260  // Stat returns the current pool statistics.
   261  func (p *Pool[T]) Stat() *Stat {
   262  	p.mux.Lock()
   263  	defer p.mux.Unlock()
   265  	s := &Stat{
   266  		maxResources:         p.maxSize,
   267  		acquireCount:         p.acquireCount,
   268  		emptyAcquireCount:    p.emptyAcquireCount,
   269  		canceledAcquireCount: p.canceledAcquireCount.Load(),
   270  		acquireDuration:      p.acquireDuration,
   271  	}
   273  	for _, res := range p.allResources {
   274  		switch res.status {
   275  		case resourceStatusConstructing:
   276  			s.constructingResources += 1
   277  		case resourceStatusIdle:
   278  			s.idleResources += 1
   279  		case resourceStatusAcquired:
   280  			s.acquiredResources += 1
   281  		}
   282  	}
   284  	return s
   285  }
   287  // tryAcquireIdleResource checks if there is any idle resource. If there is
   288  // some, this method removes it from idle list and returns it. If the idle pool
   289  // is empty, this method returns nil and doesn't modify the idleResources slice.
   290  //
   291  // WARNING: Caller of this method must hold the pool mutex!
   292  func (p *Pool[T]) tryAcquireIdleResource() *Resource[T] {
   293  	res, ok := p.idleResources.Pop()
   294  	if !ok {
   295  		return nil
   296  	}
   298  	res.status = resourceStatusAcquired
   299  	return res
   300  }
   302  // createNewResource creates a new resource and inserts it into list of pool
   303  // resources.
   304  //
   305  // WARNING: Caller of this method must hold the pool mutex!
   306  func (p *Pool[T]) createNewResource() *Resource[T] {
   307  	res := &Resource[T]{
   308  		pool:           p,
   309  		creationTime:   time.Now(),
   310  		lastUsedNano:   nanotime(),
   311  		poolResetCount: p.resetCount,
   312  		status:         resourceStatusConstructing,
   313  	}
   315  	p.allResources.append(res)
   316  	p.destructWG.Add(1)
   318  	return res
   319  }
   321  // Acquire gets a resource from the pool. If no resources are available and the pool is not at maximum capacity it will
   322  // create a new resource. If the pool is at maximum capacity it will block until a resource is available. ctx can be
   323  // used to cancel the Acquire.
   324  //
   325  // If Acquire creates a new resource the resource constructor function will receive a context that delegates Value() to
   326  // ctx. Canceling ctx will cause Acquire to return immediately but it will not cancel the resource creation. This avoids
   327  // the problem of it being impossible to create resources when the time to create a resource is greater than any one
   328  // caller of Acquire is willing to wait.
   329  func (p *Pool[T]) Acquire(ctx context.Context) (_ *Resource[T], err error) {
   330  	select {
   331  	case <-ctx.Done():
   332  		p.canceledAcquireCount.Add(1)
   333  		return nil, ctx.Err()
   334  	default:
   335  	}
   337  	return p.acquire(ctx)
   338  }
   340  // acquire is a continuation of Acquire function that doesn't check context
   341  // validity.
   342  //
   343  // This function exists solely only for benchmarking purposes.
   344  func (p *Pool[T]) acquire(ctx context.Context) (*Resource[T], error) {
   345  	startNano := nanotime()
   347  	var waitedForLock bool
   348  	if !p.acquireSem.TryAcquire(1) {
   349  		waitedForLock = true
   350  		err := p.acquireSem.Acquire(ctx, 1)
   351  		if err != nil {
   352  			p.canceledAcquireCount.Add(1)
   353  			return nil, err
   354  		}
   355  	}
   357  	p.mux.Lock()
   358  	if p.closed {
   359  		p.acquireSem.Release(1)
   360  		p.mux.Unlock()
   361  		return nil, ErrClosedPool
   362  	}
   364  	// If a resource is available in the pool.
   365  	if res := p.tryAcquireIdleResource(); res != nil {
   366  		if waitedForLock {
   367  			p.emptyAcquireCount += 1
   368  		}
   369  		p.acquireCount += 1
   370  		p.acquireDuration += time.Duration(nanotime() - startNano)
   371  		p.mux.Unlock()
   372  		return res, nil
   373  	}
   375  	if len(p.allResources) >= int(p.maxSize) {
   376  		// Unreachable code.
   377  		panic("bug: semaphore allowed more acquires than pool allows")
   378  	}
   380  	// The resource is not idle, but there is enough space to create one.
   381  	res := p.createNewResource()
   382  	p.mux.Unlock()
   384  	res, err := p.initResourceValue(ctx, res)
   385  	if err != nil {
   386  		return nil, err
   387  	}
   389  	p.mux.Lock()
   390  	defer p.mux.Unlock()
   392  	p.emptyAcquireCount += 1
   393  	p.acquireCount += 1
   394  	p.acquireDuration += time.Duration(nanotime() - startNano)
   396  	return res, nil
   397  }
   399  func (p *Pool[T]) initResourceValue(ctx context.Context, res *Resource[T]) (*Resource[T], error) {
   400  	// Create the resource in a goroutine to immediately return from Acquire
   401  	// if ctx is canceled without also canceling the constructor.
   402  	//
   403  	// See:
   404  	// - https://github.com/jackc/pgx/issues/1287
   405  	// - https://github.com/jackc/pgx/issues/1259
   406  	constructErrChan := make(chan error)
   407  	go func() {
   408  		constructorCtx := newValueCancelCtx(ctx, p.baseAcquireCtx)
   409  		value, err := p.constructor(constructorCtx)
   410  		if err != nil {
   411  			p.mux.Lock()
   412  			p.allResources.remove(res)
   413  			p.destructWG.Done()
   415  			// The resource won't be acquired because its
   416  			// construction failed. We have to allow someone else to
   417  			// take that resouce.
   418  			p.acquireSem.Release(1)
   419  			p.mux.Unlock()
   421  			select {
   422  			case constructErrChan <- err:
   423  			case <-ctx.Done():
   424  				// The caller is cancelled, so no-one awaits the
   425  				// error. This branch avoid goroutine leak.
   426  			}
   427  			return
   428  		}
   430  		// The resource is already in p.allResources where it might be read. So we need to acquire the lock to update its
   431  		// status.
   432  		p.mux.Lock()
   433  		res.value = value
   434  		res.status = resourceStatusAcquired
   435  		p.mux.Unlock()
   437  		// This select works because the channel is unbuffered.
   438  		select {
   439  		case constructErrChan <- nil:
   440  		case <-ctx.Done():
   441  			p.releaseAcquiredResource(res, res.lastUsedNano)
   442  		}
   443  	}()
   445  	select {
   446  	case <-ctx.Done():
   447  		p.canceledAcquireCount.Add(1)
   448  		return nil, ctx.Err()
   449  	case err := <-constructErrChan:
   450  		if err != nil {
   451  			return nil, err
   452  		}
   453  		return res, nil
   454  	}
   455  }
   457  // TryAcquire gets a resource from the pool if one is immediately available. If not, it returns ErrNotAvailable. If no
   458  // resources are available but the pool has room to grow, a resource will be created in the background. ctx is only
   459  // used to cancel the background creation.
   460  func (p *Pool[T]) TryAcquire(ctx context.Context) (*Resource[T], error) {
   461  	if !p.acquireSem.TryAcquire(1) {
   462  		return nil, ErrNotAvailable
   463  	}
   465  	p.mux.Lock()
   466  	defer p.mux.Unlock()
   468  	if p.closed {
   469  		p.acquireSem.Release(1)
   470  		return nil, ErrClosedPool
   471  	}
   473  	// If a resource is available now
   474  	if res := p.tryAcquireIdleResource(); res != nil {
   475  		p.acquireCount += 1
   476  		return res, nil
   477  	}
   479  	if len(p.allResources) >= int(p.maxSize) {
   480  		// Unreachable code.
   481  		panic("bug: semaphore allowed more acquires than pool allows")
   482  	}
   484  	res := p.createNewResource()
   485  	go func() {
   486  		value, err := p.constructor(ctx)
   488  		p.mux.Lock()
   489  		defer p.mux.Unlock()
   490  		// We have to create the resource and only then release the
   491  		// semaphore - For the time being there is no resource that
   492  		// someone could acquire.
   493  		defer p.acquireSem.Release(1)
   495  		if err != nil {
   496  			p.allResources.remove(res)
   497  			p.destructWG.Done()
   498  			return
   499  		}
   501  		res.value = value
   502  		res.status = resourceStatusIdle
   503  		p.idleResources.Push(res)
   504  	}()
   506  	return nil, ErrNotAvailable
   507  }
   509  // acquireSemAll tries to acquire num free tokens from sem. This function is
   510  // guaranteed to acquire at least the lowest number of tokens that has been
   511  // available in the semaphore during runtime of this function.
   512  //
   513  // For the time being, semaphore doesn't allow to acquire all tokens atomically
   514  // (see https://github.com/golang/sync/pull/19). We simulate this by trying all
   515  // powers of 2 that are less or equal to num.
   516  //
   517  // For example, let's immagine we have 19 free tokens in the semaphore which in
   518  // total has 24 tokens (i.e. the maxSize of the pool is 24 resources). Then if
   519  // num is 24, the log2Uint(24) is 4 and we try to acquire 16, 8, 4, 2 and 1
   520  // tokens. Out of those, the acquire of 16, 2 and 1 tokens will succeed.
   521  //
   522  // Naturally, Acquires and Releases of the semaphore might take place
   523  // concurrently. For this reason, it's not guaranteed that absolutely all free
   524  // tokens in the semaphore will be acquired. But it's guaranteed that at least
   525  // the minimal number of tokens that has been present over the whole process
   526  // will be acquired. This is sufficient for the use-case we have in this
   527  // package.
   528  //
   529  // TODO: Replace this with acquireSem.TryAcquireAll() if it gets to
   530  // upstream. https://github.com/golang/sync/pull/19
   531  func acquireSemAll(sem *semaphore.Weighted, num int) int {
   532  	if sem.TryAcquire(int64(num)) {
   533  		return num
   534  	}
   536  	var acquired int
   537  	for i := int(log2Int(num)); i >= 0; i-- {
   538  		val := 1 << i
   539  		if sem.TryAcquire(int64(val)) {
   540  			acquired += val
   541  		}
   542  	}
   544  	return acquired
   545  }
   547  // AcquireAllIdle acquires all currently idle resources. Its intended use is for
   548  // health check and keep-alive functionality. It does not update pool
   549  // statistics.
   550  func (p *Pool[T]) AcquireAllIdle() []*Resource[T] {
   551  	p.mux.Lock()
   552  	defer p.mux.Unlock()
   554  	if p.closed {
   555  		return nil
   556  	}
   558  	numIdle := p.idleResources.Len()
   559  	if numIdle == 0 {
   560  		return nil
   561  	}
   563  	// In acquireSemAll we use only TryAcquire and not Acquire. Because
   564  	// TryAcquire cannot block, the fact that we hold mutex locked and try
   565  	// to acquire semaphore cannot result in dead-lock.
   566  	//
   567  	// Because the mutex is locked, no parallel Release can run. This
   568  	// implies that the number of tokens can only decrease because some
   569  	// Acquire/TryAcquire call can consume the semaphore token. Consequently
   570  	// acquired is always less or equal to numIdle. Moreover if acquired <
   571  	// numIdle, then there are some parallel Acquire/TryAcquire calls that
   572  	// will take the remaining idle connections.
   573  	acquired := acquireSemAll(p.acquireSem, numIdle)
   575  	idle := make([]*Resource[T], acquired)
   576  	for i := range idle {
   577  		res, _ := p.idleResources.Pop()
   578  		res.status = resourceStatusAcquired
   579  		idle[i] = res
   580  	}
   582  	// We have to bump the generation to ensure that Acquire/TryAcquire
   583  	// calls running in parallel (those which caused acquired < numIdle)
   584  	// will consume old connections and not freshly released connections
   585  	// instead.
   586  	p.idleResources.NextGen()
   588  	return idle
   589  }
   591  // CreateResource constructs a new resource without acquiring it. It goes straight in the IdlePool. If the pool is full
   592  // it returns an error. It can be useful to maintain warm resources under little load.
   593  func (p *Pool[T]) CreateResource(ctx context.Context) error {
   594  	if !p.acquireSem.TryAcquire(1) {
   595  		return ErrNotAvailable
   596  	}
   598  	p.mux.Lock()
   599  	if p.closed {
   600  		p.acquireSem.Release(1)
   601  		p.mux.Unlock()
   602  		return ErrClosedPool
   603  	}
   605  	if len(p.allResources) >= int(p.maxSize) {
   606  		p.acquireSem.Release(1)
   607  		p.mux.Unlock()
   608  		return ErrNotAvailable
   609  	}
   611  	res := p.createNewResource()
   612  	p.mux.Unlock()
   614  	value, err := p.constructor(ctx)
   615  	p.mux.Lock()
   616  	defer p.mux.Unlock()
   617  	defer p.acquireSem.Release(1)
   618  	if err != nil {
   619  		p.allResources.remove(res)
   620  		p.destructWG.Done()
   621  		return err
   622  	}
   624  	res.value = value
   625  	res.status = resourceStatusIdle
   627  	// If closed while constructing resource then destroy it and return an error
   628  	if p.closed {
   629  		go p.destructResourceValue(res.value)
   630  		return ErrClosedPool
   631  	}
   633  	p.idleResources.Push(res)
   635  	return nil
   636  }
   638  // Reset destroys all resources, but leaves the pool open. It is intended for use when an error is detected that would
   639  // disrupt all resources (such as a network interruption or a server state change).
   640  //
   641  // It is safe to reset a pool while resources are checked out. Those resources will be destroyed when they are returned
   642  // to the pool.
   643  func (p *Pool[T]) Reset() {
   644  	p.mux.Lock()
   645  	defer p.mux.Unlock()
   647  	p.resetCount++
   649  	for res, ok := p.idleResources.Pop(); ok; res, ok = p.idleResources.Pop() {
   650  		p.allResources.remove(res)
   651  		go p.destructResourceValue(res.value)
   652  	}
   653  }
   655  // releaseAcquiredResource returns res to the the pool.
   656  func (p *Pool[T]) releaseAcquiredResource(res *Resource[T], lastUsedNano int64) {
   657  	p.mux.Lock()
   658  	defer p.mux.Unlock()
   659  	defer p.acquireSem.Release(1)
   661  	if p.closed || res.poolResetCount != p.resetCount {
   662  		p.allResources.remove(res)
   663  		go p.destructResourceValue(res.value)
   664  	} else {
   665  		res.lastUsedNano = lastUsedNano
   666  		res.status = resourceStatusIdle
   667  		p.idleResources.Push(res)
   668  	}
   669  }
   671  // Remove removes res from the pool and closes it. If res is not part of the
   672  // pool Remove will panic.
   673  func (p *Pool[T]) destroyAcquiredResource(res *Resource[T]) {
   674  	p.destructResourceValue(res.value)
   676  	p.mux.Lock()
   677  	defer p.mux.Unlock()
   678  	defer p.acquireSem.Release(1)
   680  	p.allResources.remove(res)
   681  }
   683  func (p *Pool[T]) hijackAcquiredResource(res *Resource[T]) {
   684  	p.mux.Lock()
   685  	defer p.mux.Unlock()
   686  	defer p.acquireSem.Release(1)
   688  	p.allResources.remove(res)
   689  	res.status = resourceStatusHijacked
   690  	p.destructWG.Done() // not responsible for destructing hijacked resources
   691  }
   693  func (p *Pool[T]) destructResourceValue(value T) {
   694  	p.destructor(value)
   695  	p.destructWG.Done()
   696  }

View as plain text