...

Source file src/github.com/sourcegraph/conc/pool/result_error_pool.go

Documentation: github.com/sourcegraph/conc/pool

     1  package pool
     2  
     3  import (
     4  	"context"
     5  )
     6  
     7  // ResultErrorPool is a pool that executes tasks that return a generic result
     8  // type and an error. Tasks are executed in the pool with Go(), then the
     9  // results of the tasks are returned by Wait().
    10  //
    11  // The order of the results is not guaranteed to be the same as the order the
    12  // tasks were submitted. If your use case requires consistent ordering,
    13  // consider using the `stream` package or `Map` from the `iter` package.
    14  //
    15  // The configuration methods (With*) will panic if they are used after calling
    16  // Go() for the first time.
    17  type ResultErrorPool[T any] struct {
    18  	errorPool      ErrorPool
    19  	agg            resultAggregator[T]
    20  	collectErrored bool
    21  }
    22  
    23  // Go submits a task to the pool. If all goroutines in the pool
    24  // are busy, a call to Go() will block until the task can be started.
    25  func (p *ResultErrorPool[T]) Go(f func() (T, error)) {
    26  	p.errorPool.Go(func() error {
    27  		res, err := f()
    28  		if err == nil || p.collectErrored {
    29  			p.agg.add(res)
    30  		}
    31  		return err
    32  	})
    33  }
    34  
    35  // Wait cleans up any spawned goroutines, propagating any panics and
    36  // returning the results and any errors from tasks.
    37  func (p *ResultErrorPool[T]) Wait() ([]T, error) {
    38  	err := p.errorPool.Wait()
    39  	return p.agg.results, err
    40  }
    41  
    42  // WithCollectErrored configures the pool to still collect the result of a task
    43  // even if the task returned an error. By default, the result of tasks that errored
    44  // are ignored and only the error is collected.
    45  func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T] {
    46  	p.panicIfInitialized()
    47  	p.collectErrored = true
    48  	return p
    49  }
    50  
    51  // WithContext converts the pool to a ResultContextPool for tasks that should
    52  // run under the same context, such that they each respect shared cancellation.
    53  // For example, WithCancelOnError can be configured on the returned pool to
    54  // signal that all goroutines should be cancelled upon the first error.
    55  func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] {
    56  	p.panicIfInitialized()
    57  	return &ResultContextPool[T]{
    58  		contextPool: *p.errorPool.WithContext(ctx),
    59  	}
    60  }
    61  
    62  // WithFirstError configures the pool to only return the first error
    63  // returned by a task. By default, Wait() will return a combined error.
    64  func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T] {
    65  	p.panicIfInitialized()
    66  	p.errorPool.WithFirstError()
    67  	return p
    68  }
    69  
    70  // WithMaxGoroutines limits the number of goroutines in a pool.
    71  // Defaults to unlimited. Panics if n < 1.
    72  func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T] {
    73  	p.panicIfInitialized()
    74  	p.errorPool.WithMaxGoroutines(n)
    75  	return p
    76  }
    77  
    78  func (p *ResultErrorPool[T]) panicIfInitialized() {
    79  	p.errorPool.panicIfInitialized()
    80  }
    81  

View as plain text