1 package pool 2 3 import ( 4 "context" 5 ) 6 7 // ResultErrorPool is a pool that executes tasks that return a generic result 8 // type and an error. Tasks are executed in the pool with Go(), then the 9 // results of the tasks are returned by Wait(). 10 // 11 // The order of the results is not guaranteed to be the same as the order the 12 // tasks were submitted. If your use case requires consistent ordering, 13 // consider using the `stream` package or `Map` from the `iter` package. 14 // 15 // The configuration methods (With*) will panic if they are used after calling 16 // Go() for the first time. 17 type ResultErrorPool[T any] struct { 18 errorPool ErrorPool 19 agg resultAggregator[T] 20 collectErrored bool 21 } 22 23 // Go submits a task to the pool. If all goroutines in the pool 24 // are busy, a call to Go() will block until the task can be started. 25 func (p *ResultErrorPool[T]) Go(f func() (T, error)) { 26 p.errorPool.Go(func() error { 27 res, err := f() 28 if err == nil || p.collectErrored { 29 p.agg.add(res) 30 } 31 return err 32 }) 33 } 34 35 // Wait cleans up any spawned goroutines, propagating any panics and 36 // returning the results and any errors from tasks. 37 func (p *ResultErrorPool[T]) Wait() ([]T, error) { 38 err := p.errorPool.Wait() 39 return p.agg.results, err 40 } 41 42 // WithCollectErrored configures the pool to still collect the result of a task 43 // even if the task returned an error. By default, the result of tasks that errored 44 // are ignored and only the error is collected. 45 func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T] { 46 p.panicIfInitialized() 47 p.collectErrored = true 48 return p 49 } 50 51 // WithContext converts the pool to a ResultContextPool for tasks that should 52 // run under the same context, such that they each respect shared cancellation. 53 // For example, WithCancelOnError can be configured on the returned pool to 54 // signal that all goroutines should be cancelled upon the first error. 55 func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] { 56 p.panicIfInitialized() 57 return &ResultContextPool[T]{ 58 contextPool: *p.errorPool.WithContext(ctx), 59 } 60 } 61 62 // WithFirstError configures the pool to only return the first error 63 // returned by a task. By default, Wait() will return a combined error. 64 func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T] { 65 p.panicIfInitialized() 66 p.errorPool.WithFirstError() 67 return p 68 } 69 70 // WithMaxGoroutines limits the number of goroutines in a pool. 71 // Defaults to unlimited. Panics if n < 1. 72 func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T] { 73 p.panicIfInitialized() 74 p.errorPool.WithMaxGoroutines(n) 75 return p 76 } 77 78 func (p *ResultErrorPool[T]) panicIfInitialized() { 79 p.errorPool.panicIfInitialized() 80 } 81