...

Source file src/github.com/sourcegraph/conc/pool/result_context_pool.go

Documentation: github.com/sourcegraph/conc/pool

     1  package pool
     2  
     3  import (
     4  	"context"
     5  )
     6  
     7  // ResultContextPool is a pool that runs tasks that take a context and return a
     8  // result. The context passed to the task will be canceled if any of the tasks
     9  // return an error, which makes its functionality different than just capturing
    10  // a context with the task closure.
    11  //
    12  // The configuration methods (With*) will panic if they are used after calling
    13  // Go() for the first time.
    14  type ResultContextPool[T any] struct {
    15  	contextPool    ContextPool
    16  	agg            resultAggregator[T]
    17  	collectErrored bool
    18  }
    19  
    20  // Go submits a task to the pool. If all goroutines in the pool
    21  // are busy, a call to Go() will block until the task can be started.
    22  func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) {
    23  	p.contextPool.Go(func(ctx context.Context) error {
    24  		res, err := f(ctx)
    25  		if err == nil || p.collectErrored {
    26  			p.agg.add(res)
    27  		}
    28  		return err
    29  	})
    30  }
    31  
    32  // Wait cleans up all spawned goroutines, propagates any panics, and
    33  // returns an error if any of the tasks errored.
    34  func (p *ResultContextPool[T]) Wait() ([]T, error) {
    35  	err := p.contextPool.Wait()
    36  	return p.agg.results, err
    37  }
    38  
    39  // WithCollectErrored configures the pool to still collect the result of a task
    40  // even if the task returned an error. By default, the result of tasks that errored
    41  // are ignored and only the error is collected.
    42  func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T] {
    43  	p.panicIfInitialized()
    44  	p.collectErrored = true
    45  	return p
    46  }
    47  
    48  // WithFirstError configures the pool to only return the first error
    49  // returned by a task. By default, Wait() will return a combined error.
    50  func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T] {
    51  	p.panicIfInitialized()
    52  	p.contextPool.WithFirstError()
    53  	return p
    54  }
    55  
    56  // WithCancelOnError configures the pool to cancel its context as soon as
    57  // any task returns an error. By default, the pool's context is not
    58  // canceled until the parent context is canceled.
    59  func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T] {
    60  	p.panicIfInitialized()
    61  	p.contextPool.WithCancelOnError()
    62  	return p
    63  }
    64  
    65  // WithMaxGoroutines limits the number of goroutines in a pool.
    66  // Defaults to unlimited. Panics if n < 1.
    67  func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T] {
    68  	p.panicIfInitialized()
    69  	p.contextPool.WithMaxGoroutines(n)
    70  	return p
    71  }
    72  
    73  func (p *ResultContextPool[T]) panicIfInitialized() {
    74  	p.contextPool.panicIfInitialized()
    75  }
    76  

View as plain text