...

Text file src/github.com/sourcegraph/conc/README.md

Documentation: github.com/sourcegraph/conc

     1![conch](https://user-images.githubusercontent.com/12631702/210295964-785cc63d-d697-420c-99ff-f492eb81dec9.svg)
     2
     3# `conc`: better structured concurrency for go
     4
     5[![Go Reference](https://pkg.go.dev/badge/github.com/sourcegraph/conc.svg)](https://pkg.go.dev/github.com/sourcegraph/conc)
     6[![Sourcegraph](https://img.shields.io/badge/view%20on-sourcegraph-A112FE?logo=)](https://sourcegraph.com/github.com/sourcegraph/conc)
     7[![Go Report Card](https://goreportcard.com/badge/github.com/sourcegraph/conc)](https://goreportcard.com/report/github.com/sourcegraph/conc)
     8[![codecov](https://codecov.io/gh/sourcegraph/conc/branch/main/graph/badge.svg?token=MQZTEA1QWT)](https://codecov.io/gh/sourcegraph/conc)
     9[![Discord](https://img.shields.io/badge/discord-chat-%235765F2)](https://discord.gg/bvXQXmtRjN)
    10
    11`conc` is your toolbelt for structured concurrency in go, making common tasks
    12easier and safer.
    13
    14```sh
    15go get github.com/sourcegraph/conc
    16```
    17
    18# At a glance
    19
    20- Use [`conc.WaitGroup`](https://pkg.go.dev/github.com/sourcegraph/conc#WaitGroup) if you just want a safer version of `sync.WaitGroup`
    21- Use [`pool.Pool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool) if you want a concurrency-limited task runner
    22- Use [`pool.ResultPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultPool) if you want a concurrent task runner that collects task results
    23- Use [`pool.(Result)?ErrorPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool) if your tasks are fallible
    24- Use [`pool.(Result)?ContextPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ContextPool) if your tasks should be canceled on failure
    25- Use [`stream.Stream`](https://pkg.go.dev/github.com/sourcegraph/conc/stream#Stream) if you want to process an ordered stream of tasks in parallel with serial callbacks
    26- Use [`iter.Map`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#Map) if you want to concurrently map a slice
    27- Use [`iter.ForEach`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#ForEach) if you want to concurrently iterate over a slice
    28- Use [`panics.Catcher`](https://pkg.go.dev/github.com/sourcegraph/conc/panics#Catcher) if you want to catch panics in your own goroutines
    29
    30All pools are created with
    31[`pool.New()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#New)
    32or
    33[`pool.NewWithResults[T]()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#NewWithResults),
    34then configured with methods:
    35
    36- [`p.WithMaxGoroutines()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.MaxGoroutines) configures the maximum number of goroutines in the pool
    37- [`p.WithErrors()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithErrors) configures the pool to run tasks that return errors
    38- [`p.WithContext(ctx)`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithContext) configures the pool to run tasks that should be canceled on first error
    39- [`p.WithFirstError()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool.WithFirstError) configures error pools to only keep the first returned error rather than an aggregated error
    40- [`p.WithCollectErrored()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultContextPool.WithCollectErrored) configures result pools to collect results even when the task errored
    41
    42# Goals
    43
    44The main goals of the package are:
    451) Make it harder to leak goroutines
    462) Handle panics gracefully
    473) Make concurrent code easier to read
    48
    49## Goal #1: Make it harder to leak goroutines
    50
    51A common pain point when working with goroutines is cleaning them up. It's
    52really easy to fire off a `go` statement and fail to properly wait for it to
    53complete.
    54
    55`conc` takes the opinionated stance that all concurrency should be scoped.
    56That is, goroutines should have an owner and that owner should always
    57ensure that its owned goroutines exit properly.
    58
    59In `conc`, the owner of a goroutine is always a `conc.WaitGroup`. Goroutines
    60are spawned in a `WaitGroup` with `(*WaitGroup).Go()`, and
    61`(*WaitGroup).Wait()` should always be called before the `WaitGroup` goes out
    62of scope.
    63
    64In some cases, you might want a spawned goroutine to outlast the scope of the
    65caller. In that case, you could pass a `WaitGroup` into the spawning function.
    66
    67```go
    68func main() {
    69    var wg conc.WaitGroup
    70    defer wg.Wait()
    71
    72    startTheThing(&wg)
    73}
    74
    75func startTheThing(wg *conc.WaitGroup) {
    76    wg.Go(func() { ... })
    77}
    78```
    79
    80For some more discussion on why scoped concurrency is nice, check out [this
    81blog
    82post](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).
    83
    84## Goal #2: Handle panics gracefully
    85
    86A frequent problem with goroutines in long-running applications is handling
    87panics. A goroutine spawned without a panic handler will crash the whole process
    88on panic. This is usually undesirable.
    89
    90However, if you do add a panic handler to a goroutine, what do you do with the
    91panic once you catch it? Some options:
    921) Ignore it
    932) Log it
    943) Turn it into an error and return that to the goroutine spawner
    954) Propagate the panic to the goroutine spawner
    96
    97Ignoring panics is a bad idea since panics usually mean there is actually
    98something wrong and someone should fix it.
    99
   100Just logging panics isn't great either because then there is no indication to the spawner
   101that something bad happened, and it might just continue on as normal even though your
   102program is in a really bad state.
   103
   104Both (3) and (4) are reasonable options, but both require the goroutine to have
   105an owner that can actually receive the message that something went wrong. This
   106is generally not true with a goroutine spawned with `go`, but in the `conc`
   107package, all goroutines have an owner that must collect the spawned goroutine.
   108In the conc package, any call to `Wait()` will panic if any of the spawned goroutines
   109panicked. Additionally, it decorates the panic value with a stacktrace from the child
   110goroutine so that you don't lose information about what caused the panic.
   111
   112Doing this all correctly every time you spawn something with `go` is not
   113trivial and it requires a lot of boilerplate that makes the important parts of
   114the code more difficult to read, so `conc` does this for you.
   115
   116<table>
   117<tr>
   118<th><code>stdlib</code></th>
   119<th><code>conc</code></th>
   120</tr>
   121<tr>
   122<td>
   123
   124```go
   125type caughtPanicError struct {
   126    val   any
   127    stack []byte
   128}
   129
   130func (e *caughtPanicError) Error() string {
   131    return fmt.Sprintf(
   132        "panic: %q\n%s",
   133        e.val,
   134        string(e.stack)
   135    )
   136}
   137
   138func main() {
   139    done := make(chan error)
   140    go func() {
   141        defer func() {
   142            if v := recover(); v != nil {
   143                done <- &caughtPanicError{
   144                    val: v,
   145                    stack: debug.Stack()
   146                }
   147            } else {
   148                done <- nil
   149            }
   150        }()
   151        doSomethingThatMightPanic()
   152    }()
   153    err := <-done
   154    if err != nil {
   155        panic(err)
   156    }
   157}
   158```
   159</td>
   160<td>
   161
   162```go
   163func main() {
   164    var wg conc.WaitGroup
   165    wg.Go(doSomethingThatMightPanic)
   166    // panics with a nice stacktrace
   167    wg.Wait()
   168}
   169```
   170</td>
   171</tr>
   172</table>
   173
   174## Goal #3: Make concurrent code easier to read
   175
   176Doing concurrency correctly is difficult. Doing it in a way that doesn't
   177obfuscate what the code is actually doing is more difficult. The `conc` package
   178attempts to make common operations easier by abstracting as much boilerplate
   179complexity as possible.
   180
   181Want to run a set of concurrent tasks with a bounded set of goroutines? Use
   182`pool.New()`. Want to process an ordered stream of results concurrently, but
   183still maintain order? Try `stream.New()`. What about a concurrent map over
   184a slice? Take a peek at `iter.Map()`.
   185
   186Browse some examples below for some comparisons with doing these by hand.
   187
   188# Examples
   189
   190Each of these examples forgoes propagating panics for simplicity. To see
   191what kind of complexity that would add, check out the "Goal #2" header above.
   192
   193Spawn a set of goroutines and waiting for them to finish:
   194
   195<table>
   196<tr>
   197<th><code>stdlib</code></th>
   198<th><code>conc</code></th>
   199</tr>
   200<tr>
   201<td>
   202
   203```go
   204func main() {
   205    var wg sync.WaitGroup
   206    for i := 0; i < 10; i++ {
   207        wg.Add(1)
   208        go func() {
   209            defer wg.Done()
   210            // crashes on panic!
   211            doSomething()
   212        }()
   213    }
   214    wg.Wait()
   215}
   216```
   217</td>
   218<td>
   219
   220```go
   221func main() {
   222    var wg conc.WaitGroup
   223    for i := 0; i < 10; i++ {
   224        wg.Go(doSomething)
   225    }
   226    wg.Wait()
   227}
   228```
   229</td>
   230</tr>
   231</table>
   232
   233Process each element of a stream in a static pool of goroutines:
   234
   235<table>
   236<tr>
   237<th><code>stdlib</code></th>
   238<th><code>conc</code></th>
   239</tr>
   240<tr>
   241<td>
   242
   243```go
   244func process(stream chan int) {
   245    var wg sync.WaitGroup
   246    for i := 0; i < 10; i++ {
   247        wg.Add(1)
   248        go func() {
   249            defer wg.Done()
   250            for elem := range stream {
   251                handle(elem)
   252            }
   253        }()
   254    }
   255    wg.Wait()
   256}
   257```
   258</td>
   259<td>
   260
   261```go
   262func process(stream chan int) {
   263    p := pool.New().WithMaxGoroutines(10)
   264    for elem := range stream {
   265        elem := elem
   266        p.Go(func() {
   267            handle(elem)
   268        })
   269    }
   270    p.Wait()
   271}
   272```
   273</td>
   274</tr>
   275</table>
   276
   277Process each element of a slice in a static pool of goroutines:
   278
   279<table>
   280<tr>
   281<th><code>stdlib</code></th>
   282<th><code>conc</code></th>
   283</tr>
   284<tr>
   285<td>
   286
   287```go
   288func process(values []int) {
   289    feeder := make(chan int, 8)
   290
   291    var wg sync.WaitGroup
   292    for i := 0; i < 10; i++ {
   293        wg.Add(1)
   294        go func() {
   295            defer wg.Done()
   296            for elem := range feeder {
   297                handle(elem)
   298            }
   299        }()
   300    }
   301
   302    for _, value := range values {
   303        feeder <- value
   304    }
   305    close(feeder)
   306    wg.Wait()
   307}
   308```
   309</td>
   310<td>
   311
   312```go
   313func process(values []int) {
   314    iter.ForEach(values, handle)
   315}
   316```
   317</td>
   318</tr>
   319</table>
   320
   321Concurrently map a slice:
   322
   323<table>
   324<tr>
   325<th><code>stdlib</code></th>
   326<th><code>conc</code></th>
   327</tr>
   328<tr>
   329<td>
   330
   331```go
   332func concMap(
   333    input []int,
   334    f func(int) int,
   335) []int {
   336    res := make([]int, len(input))
   337    var idx atomic.Int64
   338
   339    var wg sync.WaitGroup
   340    for i := 0; i < 10; i++ {
   341        wg.Add(1)
   342        go func() {
   343            defer wg.Done()
   344
   345            for {
   346                i := int(idx.Add(1) - 1)
   347                if i >= len(input) {
   348                    return
   349                }
   350
   351                res[i] = f(input[i])
   352            }
   353        }()
   354    }
   355    wg.Wait()
   356    return res
   357}
   358```
   359</td>
   360<td>
   361
   362```go
   363func concMap(
   364    input []int,
   365    f func(*int) int,
   366) []int {
   367    return iter.Map(input, f)
   368}
   369```
   370</td>
   371</tr>
   372</table>
   373
   374Process an ordered stream concurrently:
   375
   376
   377<table>
   378<tr>
   379<th><code>stdlib</code></th>
   380<th><code>conc</code></th>
   381</tr>
   382<tr>
   383<td>
   384
   385```go
   386func mapStream(
   387    in chan int,
   388    out chan int,
   389    f func(int) int,
   390) {
   391    tasks := make(chan func())
   392    taskResults := make(chan chan int)
   393
   394    // Worker goroutines
   395    var workerWg sync.WaitGroup
   396    for i := 0; i < 10; i++ {
   397        workerWg.Add(1)
   398        go func() {
   399            defer workerWg.Done()
   400            for task := range tasks {
   401                task()
   402            }
   403        }()
   404    }
   405
   406    // Ordered reader goroutines
   407    var readerWg sync.WaitGroup
   408    readerWg.Add(1)
   409    go func() {
   410        defer readerWg.Done()
   411        for result := range taskResults {
   412            item := <-result
   413            out <- item
   414        }
   415    }()
   416
   417    // Feed the workers with tasks
   418    for elem := range in {
   419        resultCh := make(chan int, 1)
   420        taskResults <- resultCh
   421        tasks <- func() {
   422            resultCh <- f(elem)
   423        }
   424    }
   425
   426    // We've exhausted input.
   427    // Wait for everything to finish
   428    close(tasks)
   429    workerWg.Wait()
   430    close(taskResults)
   431    readerWg.Wait()
   432}
   433```
   434</td>
   435<td>
   436
   437```go
   438func mapStream(
   439    in chan int,
   440    out chan int,
   441    f func(int) int,
   442) {
   443    s := stream.New().WithMaxGoroutines(10)
   444    for elem := range in {
   445        elem := elem
   446        s.Go(func() stream.Callback {
   447            res := f(elem)
   448            return func() { out <- res }
   449        })
   450    }
   451    s.Wait()
   452}
   453```
   454</td>
   455</tr>
   456</table>
   457
   458# Status
   459
   460This package is currently pre-1.0. There are likely to be minor breaking
   461changes before a 1.0 release as we stabilize the APIs and tweak defaults.
   462Please open an issue if you have questions, concerns, or requests that you'd
   463like addressed before the 1.0 release. Currently, a 1.0 is targeted for 
   464March 2023.

View as plain text