...
1
2
3# `conc`: better structured concurrency for go
4
5[](https://pkg.go.dev/github.com/sourcegraph/conc)
6[](https://sourcegraph.com/github.com/sourcegraph/conc)
7[](https://goreportcard.com/report/github.com/sourcegraph/conc)
8[](https://codecov.io/gh/sourcegraph/conc)
9[](https://discord.gg/bvXQXmtRjN)
10
11`conc` is your toolbelt for structured concurrency in go, making common tasks
12easier and safer.
13
14```sh
15go get github.com/sourcegraph/conc
16```
17
18# At a glance
19
20- Use [`conc.WaitGroup`](https://pkg.go.dev/github.com/sourcegraph/conc#WaitGroup) if you just want a safer version of `sync.WaitGroup`
21- Use [`pool.Pool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool) if you want a concurrency-limited task runner
22- Use [`pool.ResultPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultPool) if you want a concurrent task runner that collects task results
23- Use [`pool.(Result)?ErrorPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool) if your tasks are fallible
24- Use [`pool.(Result)?ContextPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ContextPool) if your tasks should be canceled on failure
25- Use [`stream.Stream`](https://pkg.go.dev/github.com/sourcegraph/conc/stream#Stream) if you want to process an ordered stream of tasks in parallel with serial callbacks
26- Use [`iter.Map`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#Map) if you want to concurrently map a slice
27- Use [`iter.ForEach`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#ForEach) if you want to concurrently iterate over a slice
28- Use [`panics.Catcher`](https://pkg.go.dev/github.com/sourcegraph/conc/panics#Catcher) if you want to catch panics in your own goroutines
29
30All pools are created with
31[`pool.New()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#New)
32or
33[`pool.NewWithResults[T]()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#NewWithResults),
34then configured with methods:
35
36- [`p.WithMaxGoroutines()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.MaxGoroutines) configures the maximum number of goroutines in the pool
37- [`p.WithErrors()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithErrors) configures the pool to run tasks that return errors
38- [`p.WithContext(ctx)`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithContext) configures the pool to run tasks that should be canceled on first error
39- [`p.WithFirstError()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool.WithFirstError) configures error pools to only keep the first returned error rather than an aggregated error
40- [`p.WithCollectErrored()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultContextPool.WithCollectErrored) configures result pools to collect results even when the task errored
41
42# Goals
43
44The main goals of the package are:
451) Make it harder to leak goroutines
462) Handle panics gracefully
473) Make concurrent code easier to read
48
49## Goal #1: Make it harder to leak goroutines
50
51A common pain point when working with goroutines is cleaning them up. It's
52really easy to fire off a `go` statement and fail to properly wait for it to
53complete.
54
55`conc` takes the opinionated stance that all concurrency should be scoped.
56That is, goroutines should have an owner and that owner should always
57ensure that its owned goroutines exit properly.
58
59In `conc`, the owner of a goroutine is always a `conc.WaitGroup`. Goroutines
60are spawned in a `WaitGroup` with `(*WaitGroup).Go()`, and
61`(*WaitGroup).Wait()` should always be called before the `WaitGroup` goes out
62of scope.
63
64In some cases, you might want a spawned goroutine to outlast the scope of the
65caller. In that case, you could pass a `WaitGroup` into the spawning function.
66
67```go
68func main() {
69 var wg conc.WaitGroup
70 defer wg.Wait()
71
72 startTheThing(&wg)
73}
74
75func startTheThing(wg *conc.WaitGroup) {
76 wg.Go(func() { ... })
77}
78```
79
80For some more discussion on why scoped concurrency is nice, check out [this
81blog
82post](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).
83
84## Goal #2: Handle panics gracefully
85
86A frequent problem with goroutines in long-running applications is handling
87panics. A goroutine spawned without a panic handler will crash the whole process
88on panic. This is usually undesirable.
89
90However, if you do add a panic handler to a goroutine, what do you do with the
91panic once you catch it? Some options:
921) Ignore it
932) Log it
943) Turn it into an error and return that to the goroutine spawner
954) Propagate the panic to the goroutine spawner
96
97Ignoring panics is a bad idea since panics usually mean there is actually
98something wrong and someone should fix it.
99
100Just logging panics isn't great either because then there is no indication to the spawner
101that something bad happened, and it might just continue on as normal even though your
102program is in a really bad state.
103
104Both (3) and (4) are reasonable options, but both require the goroutine to have
105an owner that can actually receive the message that something went wrong. This
106is generally not true with a goroutine spawned with `go`, but in the `conc`
107package, all goroutines have an owner that must collect the spawned goroutine.
108In the conc package, any call to `Wait()` will panic if any of the spawned goroutines
109panicked. Additionally, it decorates the panic value with a stacktrace from the child
110goroutine so that you don't lose information about what caused the panic.
111
112Doing this all correctly every time you spawn something with `go` is not
113trivial and it requires a lot of boilerplate that makes the important parts of
114the code more difficult to read, so `conc` does this for you.
115
116<table>
117<tr>
118<th><code>stdlib</code></th>
119<th><code>conc</code></th>
120</tr>
121<tr>
122<td>
123
124```go
125type caughtPanicError struct {
126 val any
127 stack []byte
128}
129
130func (e *caughtPanicError) Error() string {
131 return fmt.Sprintf(
132 "panic: %q\n%s",
133 e.val,
134 string(e.stack)
135 )
136}
137
138func main() {
139 done := make(chan error)
140 go func() {
141 defer func() {
142 if v := recover(); v != nil {
143 done <- &caughtPanicError{
144 val: v,
145 stack: debug.Stack()
146 }
147 } else {
148 done <- nil
149 }
150 }()
151 doSomethingThatMightPanic()
152 }()
153 err := <-done
154 if err != nil {
155 panic(err)
156 }
157}
158```
159</td>
160<td>
161
162```go
163func main() {
164 var wg conc.WaitGroup
165 wg.Go(doSomethingThatMightPanic)
166 // panics with a nice stacktrace
167 wg.Wait()
168}
169```
170</td>
171</tr>
172</table>
173
174## Goal #3: Make concurrent code easier to read
175
176Doing concurrency correctly is difficult. Doing it in a way that doesn't
177obfuscate what the code is actually doing is more difficult. The `conc` package
178attempts to make common operations easier by abstracting as much boilerplate
179complexity as possible.
180
181Want to run a set of concurrent tasks with a bounded set of goroutines? Use
182`pool.New()`. Want to process an ordered stream of results concurrently, but
183still maintain order? Try `stream.New()`. What about a concurrent map over
184a slice? Take a peek at `iter.Map()`.
185
186Browse some examples below for some comparisons with doing these by hand.
187
188# Examples
189
190Each of these examples forgoes propagating panics for simplicity. To see
191what kind of complexity that would add, check out the "Goal #2" header above.
192
193Spawn a set of goroutines and waiting for them to finish:
194
195<table>
196<tr>
197<th><code>stdlib</code></th>
198<th><code>conc</code></th>
199</tr>
200<tr>
201<td>
202
203```go
204func main() {
205 var wg sync.WaitGroup
206 for i := 0; i < 10; i++ {
207 wg.Add(1)
208 go func() {
209 defer wg.Done()
210 // crashes on panic!
211 doSomething()
212 }()
213 }
214 wg.Wait()
215}
216```
217</td>
218<td>
219
220```go
221func main() {
222 var wg conc.WaitGroup
223 for i := 0; i < 10; i++ {
224 wg.Go(doSomething)
225 }
226 wg.Wait()
227}
228```
229</td>
230</tr>
231</table>
232
233Process each element of a stream in a static pool of goroutines:
234
235<table>
236<tr>
237<th><code>stdlib</code></th>
238<th><code>conc</code></th>
239</tr>
240<tr>
241<td>
242
243```go
244func process(stream chan int) {
245 var wg sync.WaitGroup
246 for i := 0; i < 10; i++ {
247 wg.Add(1)
248 go func() {
249 defer wg.Done()
250 for elem := range stream {
251 handle(elem)
252 }
253 }()
254 }
255 wg.Wait()
256}
257```
258</td>
259<td>
260
261```go
262func process(stream chan int) {
263 p := pool.New().WithMaxGoroutines(10)
264 for elem := range stream {
265 elem := elem
266 p.Go(func() {
267 handle(elem)
268 })
269 }
270 p.Wait()
271}
272```
273</td>
274</tr>
275</table>
276
277Process each element of a slice in a static pool of goroutines:
278
279<table>
280<tr>
281<th><code>stdlib</code></th>
282<th><code>conc</code></th>
283</tr>
284<tr>
285<td>
286
287```go
288func process(values []int) {
289 feeder := make(chan int, 8)
290
291 var wg sync.WaitGroup
292 for i := 0; i < 10; i++ {
293 wg.Add(1)
294 go func() {
295 defer wg.Done()
296 for elem := range feeder {
297 handle(elem)
298 }
299 }()
300 }
301
302 for _, value := range values {
303 feeder <- value
304 }
305 close(feeder)
306 wg.Wait()
307}
308```
309</td>
310<td>
311
312```go
313func process(values []int) {
314 iter.ForEach(values, handle)
315}
316```
317</td>
318</tr>
319</table>
320
321Concurrently map a slice:
322
323<table>
324<tr>
325<th><code>stdlib</code></th>
326<th><code>conc</code></th>
327</tr>
328<tr>
329<td>
330
331```go
332func concMap(
333 input []int,
334 f func(int) int,
335) []int {
336 res := make([]int, len(input))
337 var idx atomic.Int64
338
339 var wg sync.WaitGroup
340 for i := 0; i < 10; i++ {
341 wg.Add(1)
342 go func() {
343 defer wg.Done()
344
345 for {
346 i := int(idx.Add(1) - 1)
347 if i >= len(input) {
348 return
349 }
350
351 res[i] = f(input[i])
352 }
353 }()
354 }
355 wg.Wait()
356 return res
357}
358```
359</td>
360<td>
361
362```go
363func concMap(
364 input []int,
365 f func(*int) int,
366) []int {
367 return iter.Map(input, f)
368}
369```
370</td>
371</tr>
372</table>
373
374Process an ordered stream concurrently:
375
376
377<table>
378<tr>
379<th><code>stdlib</code></th>
380<th><code>conc</code></th>
381</tr>
382<tr>
383<td>
384
385```go
386func mapStream(
387 in chan int,
388 out chan int,
389 f func(int) int,
390) {
391 tasks := make(chan func())
392 taskResults := make(chan chan int)
393
394 // Worker goroutines
395 var workerWg sync.WaitGroup
396 for i := 0; i < 10; i++ {
397 workerWg.Add(1)
398 go func() {
399 defer workerWg.Done()
400 for task := range tasks {
401 task()
402 }
403 }()
404 }
405
406 // Ordered reader goroutines
407 var readerWg sync.WaitGroup
408 readerWg.Add(1)
409 go func() {
410 defer readerWg.Done()
411 for result := range taskResults {
412 item := <-result
413 out <- item
414 }
415 }()
416
417 // Feed the workers with tasks
418 for elem := range in {
419 resultCh := make(chan int, 1)
420 taskResults <- resultCh
421 tasks <- func() {
422 resultCh <- f(elem)
423 }
424 }
425
426 // We've exhausted input.
427 // Wait for everything to finish
428 close(tasks)
429 workerWg.Wait()
430 close(taskResults)
431 readerWg.Wait()
432}
433```
434</td>
435<td>
436
437```go
438func mapStream(
439 in chan int,
440 out chan int,
441 f func(int) int,
442) {
443 s := stream.New().WithMaxGoroutines(10)
444 for elem := range in {
445 elem := elem
446 s.Go(func() stream.Callback {
447 res := f(elem)
448 return func() { out <- res }
449 })
450 }
451 s.Wait()
452}
453```
454</td>
455</tr>
456</table>
457
458# Status
459
460This package is currently pre-1.0. There are likely to be minor breaking
461changes before a 1.0 release as we stabilize the APIs and tweak defaults.
462Please open an issue if you have questions, concerns, or requests that you'd
463like addressed before the 1.0 release. Currently, a 1.0 is targeted for
464March 2023.
View as plain text