...

Source file src/github.com/sourcegraph/conc/pool/result_pool.go

Documentation: github.com/sourcegraph/conc/pool

     1  package pool
     2  
     3  import (
     4  	"context"
     5  	"sync"
     6  )
     7  
     8  // NewWithResults creates a new ResultPool for tasks with a result of type T.
     9  //
    10  // The configuration methods (With*) will panic if they are used after calling
    11  // Go() for the first time.
    12  func NewWithResults[T any]() *ResultPool[T] {
    13  	return &ResultPool[T]{
    14  		pool: *New(),
    15  	}
    16  }
    17  
    18  // ResultPool is a pool that executes tasks that return a generic result type.
    19  // Tasks are executed in the pool with Go(), then the results of the tasks are
    20  // returned by Wait().
    21  //
    22  // The order of the results is not guaranteed to be the same as the order the
    23  // tasks were submitted. If your use case requires consistent ordering,
    24  // consider using the `stream` package or `Map` from the `iter` package.
    25  type ResultPool[T any] struct {
    26  	pool Pool
    27  	agg  resultAggregator[T]
    28  }
    29  
    30  // Go submits a task to the pool. If all goroutines in the pool
    31  // are busy, a call to Go() will block until the task can be started.
    32  func (p *ResultPool[T]) Go(f func() T) {
    33  	p.pool.Go(func() {
    34  		p.agg.add(f())
    35  	})
    36  }
    37  
    38  // Wait cleans up all spawned goroutines, propagating any panics, and returning
    39  // a slice of results from tasks that did not panic.
    40  func (p *ResultPool[T]) Wait() []T {
    41  	p.pool.Wait()
    42  	return p.agg.results
    43  }
    44  
    45  // MaxGoroutines returns the maximum size of the pool.
    46  func (p *ResultPool[T]) MaxGoroutines() int {
    47  	return p.pool.MaxGoroutines()
    48  }
    49  
    50  // WithErrors converts the pool to an ResultErrorPool so the submitted tasks
    51  // can return errors.
    52  func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T] {
    53  	p.panicIfInitialized()
    54  	return &ResultErrorPool[T]{
    55  		errorPool: *p.pool.WithErrors(),
    56  	}
    57  }
    58  
    59  // WithContext converts the pool to a ResultContextPool for tasks that should
    60  // run under the same context, such that they each respect shared cancellation.
    61  // For example, WithCancelOnError can be configured on the returned pool to
    62  // signal that all goroutines should be cancelled upon the first error.
    63  func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] {
    64  	p.panicIfInitialized()
    65  	return &ResultContextPool[T]{
    66  		contextPool: *p.pool.WithContext(ctx),
    67  	}
    68  }
    69  
    70  // WithMaxGoroutines limits the number of goroutines in a pool.
    71  // Defaults to unlimited. Panics if n < 1.
    72  func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T] {
    73  	p.panicIfInitialized()
    74  	p.pool.WithMaxGoroutines(n)
    75  	return p
    76  }
    77  
    78  func (p *ResultPool[T]) panicIfInitialized() {
    79  	p.pool.panicIfInitialized()
    80  }
    81  
    82  // resultAggregator is a utility type that lets us safely append from multiple
    83  // goroutines. The zero value is valid and ready to use.
    84  type resultAggregator[T any] struct {
    85  	mu      sync.Mutex
    86  	results []T
    87  }
    88  
    89  func (r *resultAggregator[T]) add(res T) {
    90  	r.mu.Lock()
    91  	r.results = append(r.results, res)
    92  	r.mu.Unlock()
    93  }
    94  

View as plain text