1 package pool 2 3 import ( 4 "context" 5 "sync" 6 ) 7 8 // NewWithResults creates a new ResultPool for tasks with a result of type T. 9 // 10 // The configuration methods (With*) will panic if they are used after calling 11 // Go() for the first time. 12 func NewWithResults[T any]() *ResultPool[T] { 13 return &ResultPool[T]{ 14 pool: *New(), 15 } 16 } 17 18 // ResultPool is a pool that executes tasks that return a generic result type. 19 // Tasks are executed in the pool with Go(), then the results of the tasks are 20 // returned by Wait(). 21 // 22 // The order of the results is not guaranteed to be the same as the order the 23 // tasks were submitted. If your use case requires consistent ordering, 24 // consider using the `stream` package or `Map` from the `iter` package. 25 type ResultPool[T any] struct { 26 pool Pool 27 agg resultAggregator[T] 28 } 29 30 // Go submits a task to the pool. If all goroutines in the pool 31 // are busy, a call to Go() will block until the task can be started. 32 func (p *ResultPool[T]) Go(f func() T) { 33 p.pool.Go(func() { 34 p.agg.add(f()) 35 }) 36 } 37 38 // Wait cleans up all spawned goroutines, propagating any panics, and returning 39 // a slice of results from tasks that did not panic. 40 func (p *ResultPool[T]) Wait() []T { 41 p.pool.Wait() 42 return p.agg.results 43 } 44 45 // MaxGoroutines returns the maximum size of the pool. 46 func (p *ResultPool[T]) MaxGoroutines() int { 47 return p.pool.MaxGoroutines() 48 } 49 50 // WithErrors converts the pool to an ResultErrorPool so the submitted tasks 51 // can return errors. 52 func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T] { 53 p.panicIfInitialized() 54 return &ResultErrorPool[T]{ 55 errorPool: *p.pool.WithErrors(), 56 } 57 } 58 59 // WithContext converts the pool to a ResultContextPool for tasks that should 60 // run under the same context, such that they each respect shared cancellation. 61 // For example, WithCancelOnError can be configured on the returned pool to 62 // signal that all goroutines should be cancelled upon the first error. 63 func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] { 64 p.panicIfInitialized() 65 return &ResultContextPool[T]{ 66 contextPool: *p.pool.WithContext(ctx), 67 } 68 } 69 70 // WithMaxGoroutines limits the number of goroutines in a pool. 71 // Defaults to unlimited. Panics if n < 1. 72 func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T] { 73 p.panicIfInitialized() 74 p.pool.WithMaxGoroutines(n) 75 return p 76 } 77 78 func (p *ResultPool[T]) panicIfInitialized() { 79 p.pool.panicIfInitialized() 80 } 81 82 // resultAggregator is a utility type that lets us safely append from multiple 83 // goroutines. The zero value is valid and ready to use. 84 type resultAggregator[T any] struct { 85 mu sync.Mutex 86 results []T 87 } 88 89 func (r *resultAggregator[T]) add(res T) { 90 r.mu.Lock() 91 r.results = append(r.results, res) 92 r.mu.Unlock() 93 } 94