...

Source file src/github.com/sourcegraph/conc/iter/iter.go

Documentation: github.com/sourcegraph/conc/iter

     1  package iter
     2  
     3  import (
     4  	"runtime"
     5  	"sync/atomic"
     6  
     7  	"github.com/sourcegraph/conc"
     8  )
     9  
    10  // defaultMaxGoroutines returns the default maximum number of
    11  // goroutines to use within this package.
    12  func defaultMaxGoroutines() int { return runtime.GOMAXPROCS(0) }
    13  
    14  // Iterator can be used to configure the behaviour of ForEach
    15  // and ForEachIdx. The zero value is safe to use with reasonable
    16  // defaults.
    17  //
    18  // Iterator is also safe for reuse and concurrent use.
    19  type Iterator[T any] struct {
    20  	// MaxGoroutines controls the maximum number of goroutines
    21  	// to use on this Iterator's methods.
    22  	//
    23  	// If unset, MaxGoroutines defaults to runtime.GOMAXPROCS(0).
    24  	MaxGoroutines int
    25  }
    26  
    27  // ForEach executes f in parallel over each element in input.
    28  //
    29  // It is safe to mutate the input parameter, which makes it
    30  // possible to map in place.
    31  //
    32  // ForEach always uses at most runtime.GOMAXPROCS goroutines.
    33  // It takes roughly 2µs to start up the goroutines and adds
    34  // an overhead of roughly 50ns per element of input. For
    35  // a configurable goroutine limit, use a custom Iterator.
    36  func ForEach[T any](input []T, f func(*T)) { Iterator[T]{}.ForEach(input, f) }
    37  
    38  // ForEach executes f in parallel over each element in input,
    39  // using up to the Iterator's configured maximum number of
    40  // goroutines.
    41  //
    42  // It is safe to mutate the input parameter, which makes it
    43  // possible to map in place.
    44  //
    45  // It takes roughly 2µs to start up the goroutines and adds
    46  // an overhead of roughly 50ns per element of input.
    47  func (iter Iterator[T]) ForEach(input []T, f func(*T)) {
    48  	iter.ForEachIdx(input, func(_ int, t *T) {
    49  		f(t)
    50  	})
    51  }
    52  
    53  // ForEachIdx is the same as ForEach except it also provides the
    54  // index of the element to the callback.
    55  func ForEachIdx[T any](input []T, f func(int, *T)) { Iterator[T]{}.ForEachIdx(input, f) }
    56  
    57  // ForEachIdx is the same as ForEach except it also provides the
    58  // index of the element to the callback.
    59  func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
    60  	if iter.MaxGoroutines == 0 {
    61  		// iter is a value receiver and is hence safe to mutate
    62  		iter.MaxGoroutines = defaultMaxGoroutines()
    63  	}
    64  
    65  	numInput := len(input)
    66  	if iter.MaxGoroutines > numInput {
    67  		// No more concurrent tasks than the number of input items.
    68  		iter.MaxGoroutines = numInput
    69  	}
    70  
    71  	var idx atomic.Int64
    72  	// Create the task outside the loop to avoid extra closure allocations.
    73  	task := func() {
    74  		i := int(idx.Add(1) - 1)
    75  		for ; i < numInput; i = int(idx.Add(1) - 1) {
    76  			f(i, &input[i])
    77  		}
    78  	}
    79  
    80  	var wg conc.WaitGroup
    81  	for i := 0; i < iter.MaxGoroutines; i++ {
    82  		wg.Go(task)
    83  	}
    84  	wg.Wait()
    85  }
    86  

View as plain text