1 package iter 2 3 import ( 4 "runtime" 5 "sync/atomic" 6 7 "github.com/sourcegraph/conc" 8 ) 9 10 // defaultMaxGoroutines returns the default maximum number of 11 // goroutines to use within this package. 12 func defaultMaxGoroutines() int { return runtime.GOMAXPROCS(0) } 13 14 // Iterator can be used to configure the behaviour of ForEach 15 // and ForEachIdx. The zero value is safe to use with reasonable 16 // defaults. 17 // 18 // Iterator is also safe for reuse and concurrent use. 19 type Iterator[T any] struct { 20 // MaxGoroutines controls the maximum number of goroutines 21 // to use on this Iterator's methods. 22 // 23 // If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0). 24 MaxGoroutines int 25 } 26 27 // ForEach executes f in parallel over each element in input. 28 // 29 // It is safe to mutate the input parameter, which makes it 30 // possible to map in place. 31 // 32 // ForEach always uses at most runtime.GOMAXPROCS goroutines. 33 // It takes roughly 2µs to start up the goroutines and adds 34 // an overhead of roughly 50ns per element of input. For 35 // a configurable goroutine limit, use a custom Iterator. 36 func ForEach[T any](input []T, f func(*T)) { Iterator[T]{}.ForEach(input, f) } 37 38 // ForEach executes f in parallel over each element in input, 39 // using up to the Iterator's configured maximum number of 40 // goroutines. 41 // 42 // It is safe to mutate the input parameter, which makes it 43 // possible to map in place. 44 // 45 // It takes roughly 2µs to start up the goroutines and adds 46 // an overhead of roughly 50ns per element of input. 47 func (iter Iterator[T]) ForEach(input []T, f func(*T)) { 48 iter.ForEachIdx(input, func(_ int, t *T) { 49 f(t) 50 }) 51 } 52 53 // ForEachIdx is the same as ForEach except it also provides the 54 // index of the element to the callback. 55 func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(input, f) } 56 57 // ForEachIdx is the same as ForEach except it also provides the 58 // index of the element to the callback. 59 func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) { 60 if iter.MaxGoroutines == 0 { 61 // iter is a value receiver and is hence safe to mutate 62 iter.MaxGoroutines = defaultMaxGoroutines() 63 } 64 65 numInput := len(input) 66 if iter.MaxGoroutines > numInput { 67 // No more concurrent tasks than the number of input items. 68 iter.MaxGoroutines = numInput 69 } 70 71 var idx atomic.Int64 72 // Create the task outside the loop to avoid extra closure allocations. 73 task := func() { 74 i := int(idx.Add(1) - 1) 75 for ; i < numInput; i = int(idx.Add(1) - 1) { 76 f(i, &input[i]) 77 } 78 } 79 80 var wg conc.WaitGroup 81 for i := 0; i < iter.MaxGoroutines; i++ { 82 wg.Go(task) 83 } 84 wg.Wait() 85 } 86