...

Source file src/github.com/sourcegraph/conc/pool/pool.go

Documentation: github.com/sourcegraph/conc/pool

     1  package pool
     2  
     3  import (
     4  	"context"
     5  	"sync"
     6  
     7  	"github.com/sourcegraph/conc"
     8  )
     9  
    10  // New creates a new Pool.
    11  func New() *Pool {
    12  	return &Pool{}
    13  }
    14  
    15  // Pool is a pool of goroutines used to execute tasks concurrently.
    16  //
    17  // Tasks are submitted with Go(). Once all your tasks have been submitted, you
    18  // must call Wait() to clean up any spawned goroutines and propagate any
    19  // panics.
    20  //
    21  // Goroutines are started lazily, so creating a new pool is cheap. There will
    22  // never be more goroutines spawned than there are tasks submitted.
    23  //
    24  // The configuration methods (With*) will panic if they are used after calling
    25  // Go() for the first time.
    26  //
    27  // Pool is efficient, but not zero cost. It should not be used for very short
    28  // tasks. Startup and teardown come with an overhead of around 1µs, and each
    29  // task has an overhead of around 300ns.
    30  type Pool struct {
    31  	handle   conc.WaitGroup
    32  	limiter  limiter
    33  	tasks    chan func()
    34  	initOnce sync.Once
    35  }
    36  
    37  // Go submits a task to be run in the pool. If all goroutines in the pool
    38  // are busy, a call to Go() will block until the task can be started.
    39  func (p *Pool) Go(f func()) {
    40  	p.init()
    41  
    42  	if p.limiter == nil {
    43  		// No limit on the number of goroutines.
    44  		select {
    45  		case p.tasks <- f:
    46  			// A goroutine was available to handle the task.
    47  		default:
    48  			// No goroutine was available to handle the task.
    49  			// Spawn a new one and send it the task.
    50  			p.handle.Go(p.worker)
    51  			p.tasks <- f
    52  		}
    53  	} else {
    54  		select {
    55  		case p.limiter <- struct{}{}:
    56  			// If we are below our limit, spawn a new worker rather
    57  			// than waiting for one to become available.
    58  			p.handle.Go(p.worker)
    59  
    60  			// We know there is at least one worker running, so wait
    61  			// for it to become available. This ensures we never spawn
    62  			// more workers than the number of tasks.
    63  			p.tasks <- f
    64  		case p.tasks <- f:
    65  			// A worker is available and has accepted the task.
    66  			return
    67  		}
    68  	}
    69  
    70  }
    71  
    72  // Wait cleans up spawned goroutines, propagating any panics that were
    73  // raised by a tasks.
    74  func (p *Pool) Wait() {
    75  	p.init()
    76  
    77  	close(p.tasks)
    78  
    79  	p.handle.Wait()
    80  }
    81  
    82  // MaxGoroutines returns the maximum size of the pool.
    83  func (p *Pool) MaxGoroutines() int {
    84  	return p.limiter.limit()
    85  }
    86  
    87  // WithMaxGoroutines limits the number of goroutines in a pool.
    88  // Defaults to unlimited. Panics if n < 1.
    89  func (p *Pool) WithMaxGoroutines(n int) *Pool {
    90  	p.panicIfInitialized()
    91  	if n < 1 {
    92  		panic("max goroutines in a pool must be greater than zero")
    93  	}
    94  	p.limiter = make(limiter, n)
    95  	return p
    96  }
    97  
    98  // init ensures that the pool is initialized before use. This makes the
    99  // zero value of the pool usable.
   100  func (p *Pool) init() {
   101  	p.initOnce.Do(func() {
   102  		p.tasks = make(chan func())
   103  	})
   104  }
   105  
   106  // panicIfInitialized will trigger a panic if a configuration method is called
   107  // after the pool has started any goroutines for the first time. In the case that
   108  // new settings are needed, a new pool should be created.
   109  func (p *Pool) panicIfInitialized() {
   110  	if p.tasks != nil {
   111  		panic("pool can not be reconfigured after calling Go() for the first time")
   112  	}
   113  }
   114  
   115  // WithErrors converts the pool to an ErrorPool so the submitted tasks can
   116  // return errors.
   117  func (p *Pool) WithErrors() *ErrorPool {
   118  	p.panicIfInitialized()
   119  	return &ErrorPool{
   120  		pool: p.deref(),
   121  	}
   122  }
   123  
   124  // deref is a helper that creates a shallow copy of the pool with the same
   125  // settings. We don't want to just dereference the pointer because that makes
   126  // the copylock lint angry.
   127  func (p *Pool) deref() Pool {
   128  	p.panicIfInitialized()
   129  	return Pool{
   130  		limiter: p.limiter,
   131  	}
   132  }
   133  
   134  // WithContext converts the pool to a ContextPool for tasks that should
   135  // run under the same context, such that they each respect shared cancellation.
   136  // For example, WithCancelOnError can be configured on the returned pool to
   137  // signal that all goroutines should be cancelled upon the first error.
   138  func (p *Pool) WithContext(ctx context.Context) *ContextPool {
   139  	p.panicIfInitialized()
   140  	ctx, cancel := context.WithCancel(ctx)
   141  	return &ContextPool{
   142  		errorPool: p.WithErrors().deref(),
   143  		ctx:       ctx,
   144  		cancel:    cancel,
   145  	}
   146  }
   147  
   148  func (p *Pool) worker() {
   149  	// The only time this matters is if the task panics.
   150  	// This makes it possible to spin up new workers in that case.
   151  	defer p.limiter.release()
   152  
   153  	for f := range p.tasks {
   154  		f()
   155  	}
   156  }
   157  
   158  type limiter chan struct{}
   159  
   160  func (l limiter) limit() int {
   161  	return cap(l)
   162  }
   163  
   164  func (l limiter) release() {
   165  	if l != nil {
   166  		<-l
   167  	}
   168  }
   169  

View as plain text