1 package pool 2 3 import ( 4 "context" 5 ) 6 7 // ContextPool is a pool that runs tasks that take a context. 8 // A new ContextPool should be created with `New().WithContext(ctx)`. 9 // 10 // The configuration methods (With*) will panic if they are used after calling 11 // Go() for the first time. 12 type ContextPool struct { 13 errorPool ErrorPool 14 15 ctx context.Context 16 cancel context.CancelFunc 17 18 cancelOnError bool 19 } 20 21 // Go submits a task. If it returns an error, the error will be 22 // collected and returned by Wait(). If all goroutines in the pool 23 // are busy, a call to Go() will block until the task can be started. 24 func (p *ContextPool) Go(f func(ctx context.Context) error) { 25 p.errorPool.Go(func() error { 26 if p.cancelOnError { 27 // If we are cancelling on error, then we also want to cancel if a 28 // panic is raised. To do this, we need to recover, cancel, and then 29 // re-throw the caught panic. 30 defer func() { 31 if r := recover(); r != nil { 32 p.cancel() 33 panic(r) 34 } 35 }() 36 } 37 38 err := f(p.ctx) 39 if err != nil && p.cancelOnError { 40 // Leaky abstraction warning: We add the error directly because 41 // otherwise, canceling could cause another goroutine to exit and 42 // return an error before this error was added, which breaks the 43 // expectations of WithFirstError(). 44 p.errorPool.addErr(err) 45 p.cancel() 46 return nil 47 } 48 return err 49 }) 50 } 51 52 // Wait cleans up all spawned goroutines, propagates any panics, and 53 // returns an error if any of the tasks errored. 54 func (p *ContextPool) Wait() error { 55 // Make sure we call cancel after pool is done to avoid memory leakage. 56 defer p.cancel() 57 return p.errorPool.Wait() 58 } 59 60 // WithFirstError configures the pool to only return the first error 61 // returned by a task. By default, Wait() will return a combined error. 62 // This is particularly useful for (*ContextPool).WithCancelOnError(), 63 // where all errors after the first are likely to be context.Canceled. 64 func (p *ContextPool) WithFirstError() *ContextPool { 65 p.panicIfInitialized() 66 p.errorPool.WithFirstError() 67 return p 68 } 69 70 // WithCancelOnError configures the pool to cancel its context as soon as 71 // any task returns an error or panics. By default, the pool's context is not 72 // canceled until the parent context is canceled. 73 // 74 // In this case, all errors returned from the pool after the first will 75 // likely be context.Canceled - you may want to also use 76 // (*ContextPool).WithFirstError() to configure the pool to only return 77 // the first error. 78 func (p *ContextPool) WithCancelOnError() *ContextPool { 79 p.panicIfInitialized() 80 p.cancelOnError = true 81 return p 82 } 83 84 // WithMaxGoroutines limits the number of goroutines in a pool. 85 // Defaults to unlimited. Panics if n < 1. 86 func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool { 87 p.panicIfInitialized() 88 p.errorPool.WithMaxGoroutines(n) 89 return p 90 } 91 92 func (p *ContextPool) panicIfInitialized() { 93 p.errorPool.panicIfInitialized() 94 } 95