1 package pool 2 3 import ( 4 "context" 5 "sync" 6 7 "github.com/sourcegraph/conc" 8 ) 9 10 // New creates a new Pool. 11 func New() *Pool { 12 return &Pool{} 13 } 14 15 // Pool is a pool of goroutines used to execute tasks concurrently. 16 // 17 // Tasks are submitted with Go(). Once all your tasks have been submitted, you 18 // must call Wait() to clean up any spawned goroutines and propagate any 19 // panics. 20 // 21 // Goroutines are started lazily, so creating a new pool is cheap. There will 22 // never be more goroutines spawned than there are tasks submitted. 23 // 24 // The configuration methods (With*) will panic if they are used after calling 25 // Go() for the first time. 26 // 27 // Pool is efficient, but not zero cost. It should not be used for very short 28 // tasks. Startup and teardown come with an overhead of around 1µs, and each 29 // task has an overhead of around 300ns. 30 type Pool struct { 31 handle conc.WaitGroup 32 limiter limiter 33 tasks chan func() 34 initOnce sync.Once 35 } 36 37 // Go submits a task to be run in the pool. If all goroutines in the pool 38 // are busy, a call to Go() will block until the task can be started. 39 func (p *Pool) Go(f func()) { 40 p.init() 41 42 if p.limiter == nil { 43 // No limit on the number of goroutines. 44 select { 45 case p.tasks <- f: 46 // A goroutine was available to handle the task. 47 default: 48 // No goroutine was available to handle the task. 49 // Spawn a new one and send it the task. 50 p.handle.Go(p.worker) 51 p.tasks <- f 52 } 53 } else { 54 select { 55 case p.limiter <- struct{}{}: 56 // If we are below our limit, spawn a new worker rather 57 // than waiting for one to become available. 58 p.handle.Go(p.worker) 59 60 // We know there is at least one worker running, so wait 61 // for it to become available. This ensures we never spawn 62 // more workers than the number of tasks. 63 p.tasks <- f 64 case p.tasks <- f: 65 // A worker is available and has accepted the task. 66 return 67 } 68 } 69 70 } 71 72 // Wait cleans up spawned goroutines, propagating any panics that were 73 // raised by a tasks. 74 func (p *Pool) Wait() { 75 p.init() 76 77 close(p.tasks) 78 79 p.handle.Wait() 80 } 81 82 // MaxGoroutines returns the maximum size of the pool. 83 func (p *Pool) MaxGoroutines() int { 84 return p.limiter.limit() 85 } 86 87 // WithMaxGoroutines limits the number of goroutines in a pool. 88 // Defaults to unlimited. Panics if n < 1. 89 func (p *Pool) WithMaxGoroutines(n int) *Pool { 90 p.panicIfInitialized() 91 if n < 1 { 92 panic("max goroutines in a pool must be greater than zero") 93 } 94 p.limiter = make(limiter, n) 95 return p 96 } 97 98 // init ensures that the pool is initialized before use. This makes the 99 // zero value of the pool usable. 100 func (p *Pool) init() { 101 p.initOnce.Do(func() { 102 p.tasks = make(chan func()) 103 }) 104 } 105 106 // panicIfInitialized will trigger a panic if a configuration method is called 107 // after the pool has started any goroutines for the first time. In the case that 108 // new settings are needed, a new pool should be created. 109 func (p *Pool) panicIfInitialized() { 110 if p.tasks != nil { 111 panic("pool can not be reconfigured after calling Go() for the first time") 112 } 113 } 114 115 // WithErrors converts the pool to an ErrorPool so the submitted tasks can 116 // return errors. 117 func (p *Pool) WithErrors() *ErrorPool { 118 p.panicIfInitialized() 119 return &ErrorPool{ 120 pool: p.deref(), 121 } 122 } 123 124 // deref is a helper that creates a shallow copy of the pool with the same 125 // settings. We don't want to just dereference the pointer because that makes 126 // the copylock lint angry. 127 func (p *Pool) deref() Pool { 128 p.panicIfInitialized() 129 return Pool{ 130 limiter: p.limiter, 131 } 132 } 133 134 // WithContext converts the pool to a ContextPool for tasks that should 135 // run under the same context, such that they each respect shared cancellation. 136 // For example, WithCancelOnError can be configured on the returned pool to 137 // signal that all goroutines should be cancelled upon the first error. 138 func (p *Pool) WithContext(ctx context.Context) *ContextPool { 139 p.panicIfInitialized() 140 ctx, cancel := context.WithCancel(ctx) 141 return &ContextPool{ 142 errorPool: p.WithErrors().deref(), 143 ctx: ctx, 144 cancel: cancel, 145 } 146 } 147 148 func (p *Pool) worker() { 149 // The only time this matters is if the task panics. 150 // This makes it possible to spin up new workers in that case. 151 defer p.limiter.release() 152 153 for f := range p.tasks { 154 f() 155 } 156 } 157 158 type limiter chan struct{} 159 160 func (l limiter) limit() int { 161 return cap(l) 162 } 163 164 func (l limiter) release() { 165 if l != nil { 166 <-l 167 } 168 } 169