1 // Package stream provides a concurrent, ordered stream implementation. 2 package stream 3 4 import ( 5 "sync" 6 7 "github.com/sourcegraph/conc" 8 "github.com/sourcegraph/conc/panics" 9 "github.com/sourcegraph/conc/pool" 10 ) 11 12 // New creates a new Stream with default settings. 13 func New() *Stream { 14 return &Stream{ 15 pool: *pool.New(), 16 } 17 } 18 19 // Stream is used to execute a stream of tasks concurrently while maintaining 20 // the order of the results. 21 // 22 // To use a stream, you submit some number of `Task`s, each of which 23 // return a callback. Each task will be executed concurrently in the stream's 24 // associated Pool, and the callbacks will be executed sequentially in the 25 // order the tasks were submitted. 26 // 27 // Once all your tasks have been submitted, Wait() must be called to clean up 28 // running goroutines and propagate any panics. 29 // 30 // In the case of panic during execution of a task or a callback, all other 31 // tasks and callbacks will still execute. The panic will be propagated to the 32 // caller when Wait() is called. 33 // 34 // A Stream is efficient, but not zero cost. It should not be used for very 35 // short tasks. Startup and teardown adds an overhead of a couple of 36 // microseconds, and the overhead for each task is roughly 500ns. It should be 37 // good enough for any task that requires a network call. 38 type Stream struct { 39 pool pool.Pool 40 callbackerHandle conc.WaitGroup 41 queue chan callbackCh 42 43 initOnce sync.Once 44 } 45 46 // Task is a task that is submitted to the stream. Submitted tasks will 47 // be executed concurrently. It returns a callback that will be called after 48 // the task has completed. 49 type Task func() Callback 50 51 // Callback is a function that is returned by a Task. Callbacks are 52 // called in the same order that tasks are submitted. 53 type Callback func() 54 55 // Go schedules a task to be run in the stream's pool. All submitted tasks 56 // will be executed concurrently in worker goroutines. Then, the callbacks 57 // returned by the tasks will be executed in the order that the tasks were 58 // submitted. All callbacks will be executed by the same goroutine, so no 59 // synchronization is necessary between callbacks. If all goroutines in the 60 // stream's pool are busy, a call to Go() will block until the task can be 61 // started. 62 func (s *Stream) Go(f Task) { 63 s.init() 64 65 // Get a channel from the cache. 66 ch := getCh() 67 68 // Queue the channel for the callbacker. 69 s.queue <- ch 70 71 // Submit the task for execution. 72 s.pool.Go(func() { 73 defer func() { 74 // In the case of a panic from f, we don't want the callbacker to 75 // starve waiting for a callback from this channel, so give it an 76 // empty callback. 77 if r := recover(); r != nil { 78 ch <- func() {} 79 panic(r) 80 } 81 }() 82 83 // Run the task, sending its callback down this task's channel. 84 callback := f() 85 ch <- callback 86 }) 87 } 88 89 // Wait signals to the stream that all tasks have been submitted. Wait will 90 // not return until all tasks and callbacks have been run. 91 func (s *Stream) Wait() { 92 s.init() 93 94 // Defer the callbacker cleanup so that it occurs even in the case 95 // that one of the tasks panics and is propagated up by s.pool.Wait(). 96 defer func() { 97 close(s.queue) 98 s.callbackerHandle.Wait() 99 }() 100 101 // Wait for all the workers to exit. 102 s.pool.Wait() 103 } 104 105 func (s *Stream) WithMaxGoroutines(n int) *Stream { 106 s.pool.WithMaxGoroutines(n) 107 return s 108 } 109 110 func (s *Stream) init() { 111 s.initOnce.Do(func() { 112 s.queue = make(chan callbackCh, s.pool.MaxGoroutines()+1) 113 114 // Start the callbacker. 115 s.callbackerHandle.Go(s.callbacker) 116 }) 117 } 118 119 // callbacker is responsible for calling the returned callbacks in the order 120 // they were submitted. There is only a single instance of callbacker running. 121 func (s *Stream) callbacker() { 122 var panicCatcher panics.Catcher 123 defer panicCatcher.Repanic() 124 125 // For every scheduled task, read that tasks channel from the queue. 126 for callbackCh := range s.queue { 127 // Wait for the task to complete and get its callback from the channel. 128 callback := <-callbackCh 129 130 // Execute the callback (with panic protection). 131 panicCatcher.Try(callback) 132 133 // Return the channel to the pool of unused channels. 134 putCh(callbackCh) 135 } 136 } 137 138 type callbackCh chan func() 139 140 var callbackChPool = sync.Pool{ 141 New: func() any { 142 return make(callbackCh, 1) 143 }, 144 } 145 146 func getCh() callbackCh { 147 return callbackChPool.Get().(callbackCh) 148 } 149 150 func putCh(ch callbackCh) { 151 callbackChPool.Put(ch) 152 } 153