...

Source file src/github.com/sourcegraph/conc/pool/context_pool.go

Documentation: github.com/sourcegraph/conc/pool

     1  package pool
     2  
     3  import (
     4  	"context"
     5  )
     6  
     7  // ContextPool is a pool that runs tasks that take a context.
     8  // A new ContextPool should be created with `New().WithContext(ctx)`.
     9  //
    10  // The configuration methods (With*) will panic if they are used after calling
    11  // Go() for the first time.
    12  type ContextPool struct {
    13  	errorPool ErrorPool
    14  
    15  	ctx    context.Context
    16  	cancel context.CancelFunc
    17  
    18  	cancelOnError bool
    19  }
    20  
    21  // Go submits a task. If it returns an error, the error will be
    22  // collected and returned by Wait(). If all goroutines in the pool
    23  // are busy, a call to Go() will block until the task can be started.
    24  func (p *ContextPool) Go(f func(ctx context.Context) error) {
    25  	p.errorPool.Go(func() error {
    26  		if p.cancelOnError {
    27  			// If we are cancelling on error, then we also want to cancel if a
    28  			// panic is raised. To do this, we need to recover, cancel, and then
    29  			// re-throw the caught panic.
    30  			defer func() {
    31  				if r := recover(); r != nil {
    32  					p.cancel()
    33  					panic(r)
    34  				}
    35  			}()
    36  		}
    37  
    38  		err := f(p.ctx)
    39  		if err != nil && p.cancelOnError {
    40  			// Leaky abstraction warning: We add the error directly because
    41  			// otherwise, canceling could cause another goroutine to exit and
    42  			// return an error before this error was added, which breaks the
    43  			// expectations of WithFirstError().
    44  			p.errorPool.addErr(err)
    45  			p.cancel()
    46  			return nil
    47  		}
    48  		return err
    49  	})
    50  }
    51  
    52  // Wait cleans up all spawned goroutines, propagates any panics, and
    53  // returns an error if any of the tasks errored.
    54  func (p *ContextPool) Wait() error {
    55  	// Make sure we call cancel after pool is done to avoid memory leakage.
    56  	defer p.cancel()
    57  	return p.errorPool.Wait()
    58  }
    59  
    60  // WithFirstError configures the pool to only return the first error
    61  // returned by a task. By default, Wait() will return a combined error.
    62  // This is particularly useful for (*ContextPool).WithCancelOnError(),
    63  // where all errors after the first are likely to be context.Canceled.
    64  func (p *ContextPool) WithFirstError() *ContextPool {
    65  	p.panicIfInitialized()
    66  	p.errorPool.WithFirstError()
    67  	return p
    68  }
    69  
    70  // WithCancelOnError configures the pool to cancel its context as soon as
    71  // any task returns an error or panics. By default, the pool's context is not
    72  // canceled until the parent context is canceled.
    73  //
    74  // In this case, all errors returned from the pool after the first will
    75  // likely be context.Canceled - you may want to also use
    76  // (*ContextPool).WithFirstError() to configure the pool to only return
    77  // the first error.
    78  func (p *ContextPool) WithCancelOnError() *ContextPool {
    79  	p.panicIfInitialized()
    80  	p.cancelOnError = true
    81  	return p
    82  }
    83  
    84  // WithMaxGoroutines limits the number of goroutines in a pool.
    85  // Defaults to unlimited. Panics if n < 1.
    86  func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool {
    87  	p.panicIfInitialized()
    88  	p.errorPool.WithMaxGoroutines(n)
    89  	return p
    90  }
    91  
    92  func (p *ContextPool) panicIfInitialized() {
    93  	p.errorPool.panicIfInitialized()
    94  }
    95  

View as plain text