...

Source file src/github.com/sourcegraph/conc/pool/error_pool.go

Documentation: github.com/sourcegraph/conc/pool

     1  package pool
     2  
     3  import (
     4  	"context"
     5  	"sync"
     6  
     7  	"github.com/sourcegraph/conc/internal/multierror"
     8  )
     9  
    10  // ErrorPool is a pool that runs tasks that may return an error.
    11  // Errors are collected and returned by Wait().
    12  //
    13  // The configuration methods (With*) will panic if they are used after calling
    14  // Go() for the first time.
    15  //
    16  // A new ErrorPool should be created using `New().WithErrors()`.
    17  type ErrorPool struct {
    18  	pool Pool
    19  
    20  	onlyFirstError bool
    21  
    22  	mu   sync.Mutex
    23  	errs error
    24  }
    25  
    26  // Go submits a task to the pool. If all goroutines in the pool
    27  // are busy, a call to Go() will block until the task can be started.
    28  func (p *ErrorPool) Go(f func() error) {
    29  	p.pool.Go(func() {
    30  		p.addErr(f())
    31  	})
    32  }
    33  
    34  // Wait cleans up any spawned goroutines, propagating any panics and
    35  // returning any errors from tasks.
    36  func (p *ErrorPool) Wait() error {
    37  	p.pool.Wait()
    38  	return p.errs
    39  }
    40  
    41  // WithContext converts the pool to a ContextPool for tasks that should
    42  // run under the same context, such that they each respect shared cancellation.
    43  // For example, WithCancelOnError can be configured on the returned pool to
    44  // signal that all goroutines should be cancelled upon the first error.
    45  func (p *ErrorPool) WithContext(ctx context.Context) *ContextPool {
    46  	p.panicIfInitialized()
    47  	ctx, cancel := context.WithCancel(ctx)
    48  	return &ContextPool{
    49  		errorPool: p.deref(),
    50  		ctx:       ctx,
    51  		cancel:    cancel,
    52  	}
    53  }
    54  
    55  // WithFirstError configures the pool to only return the first error
    56  // returned by a task. By default, Wait() will return a combined error.
    57  func (p *ErrorPool) WithFirstError() *ErrorPool {
    58  	p.panicIfInitialized()
    59  	p.onlyFirstError = true
    60  	return p
    61  }
    62  
    63  // WithMaxGoroutines limits the number of goroutines in a pool.
    64  // Defaults to unlimited. Panics if n < 1.
    65  func (p *ErrorPool) WithMaxGoroutines(n int) *ErrorPool {
    66  	p.panicIfInitialized()
    67  	p.pool.WithMaxGoroutines(n)
    68  	return p
    69  }
    70  
    71  // deref is a helper that creates a shallow copy of the pool with the same
    72  // settings. We don't want to just dereference the pointer because that makes
    73  // the copylock lint angry.
    74  func (p *ErrorPool) deref() ErrorPool {
    75  	return ErrorPool{
    76  		pool:           p.pool.deref(),
    77  		onlyFirstError: p.onlyFirstError,
    78  	}
    79  }
    80  
    81  func (p *ErrorPool) panicIfInitialized() {
    82  	p.pool.panicIfInitialized()
    83  }
    84  
    85  func (p *ErrorPool) addErr(err error) {
    86  	if err != nil {
    87  		p.mu.Lock()
    88  		if p.onlyFirstError {
    89  			if p.errs == nil {
    90  				p.errs = err
    91  			}
    92  		} else {
    93  			p.errs = multierror.Join(p.errs, err)
    94  		}
    95  		p.mu.Unlock()
    96  	}
    97  }
    98  

View as plain text