...

Source file src/github.com/sourcegraph/conc/stream/stream.go

Documentation: github.com/sourcegraph/conc/stream

     1  // Package stream provides a concurrent, ordered stream implementation.
     2  package stream
     3  
     4  import (
     5  	"sync"
     6  
     7  	"github.com/sourcegraph/conc"
     8  	"github.com/sourcegraph/conc/panics"
     9  	"github.com/sourcegraph/conc/pool"
    10  )
    11  
    12  // New creates a new Stream with default settings.
    13  func New() *Stream {
    14  	return &Stream{
    15  		pool: *pool.New(),
    16  	}
    17  }
    18  
    19  // Stream is used to execute a stream of tasks concurrently while maintaining
    20  // the order of the results.
    21  //
    22  // To use a stream, you submit some number of `Task`s, each of which
    23  // return a callback. Each task will be executed concurrently in the stream's
    24  // associated Pool, and the callbacks will be executed sequentially in the
    25  // order the tasks were submitted.
    26  //
    27  // Once all your tasks have been submitted, Wait() must be called to clean up
    28  // running goroutines and propagate any panics.
    29  //
    30  // In the case of panic during execution of a task or a callback, all other
    31  // tasks and callbacks will still execute. The panic will be propagated to the
    32  // caller when Wait() is called.
    33  //
    34  // A Stream is efficient, but not zero cost. It should not be used for very
    35  // short tasks. Startup and teardown adds an overhead of a couple of
    36  // microseconds, and the overhead for each task is roughly 500ns. It should be
    37  // good enough for any task that requires a network call.
    38  type Stream struct {
    39  	pool             pool.Pool
    40  	callbackerHandle conc.WaitGroup
    41  	queue            chan callbackCh
    42  
    43  	initOnce sync.Once
    44  }
    45  
    46  // Task is a task that is submitted to the stream. Submitted tasks will
    47  // be executed concurrently. It returns a callback that will be called after
    48  // the task has completed.
    49  type Task func() Callback
    50  
    51  // Callback is a function that is returned by a Task. Callbacks are
    52  // called in the same order that tasks are submitted.
    53  type Callback func()
    54  
    55  // Go schedules a task to be run in the stream's pool. All submitted tasks
    56  // will be executed concurrently in worker goroutines. Then, the callbacks
    57  // returned by the tasks will be executed in the order that the tasks were
    58  // submitted. All callbacks will be executed by the same goroutine, so no
    59  // synchronization is necessary between callbacks. If all goroutines in the
    60  // stream's pool are busy, a call to Go() will block until the task can be
    61  // started.
    62  func (s *Stream) Go(f Task) {
    63  	s.init()
    64  
    65  	// Get a channel from the cache.
    66  	ch := getCh()
    67  
    68  	// Queue the channel for the callbacker.
    69  	s.queue <- ch
    70  
    71  	// Submit the task for execution.
    72  	s.pool.Go(func() {
    73  		defer func() {
    74  			// In the case of a panic from f, we don't want the callbacker to
    75  			// starve waiting for a callback from this channel, so give it an
    76  			// empty callback.
    77  			if r := recover(); r != nil {
    78  				ch <- func() {}
    79  				panic(r)
    80  			}
    81  		}()
    82  
    83  		// Run the task, sending its callback down this task's channel.
    84  		callback := f()
    85  		ch <- callback
    86  	})
    87  }
    88  
    89  // Wait signals to the stream that all tasks have been submitted. Wait will
    90  // not return until all tasks and callbacks have been run.
    91  func (s *Stream) Wait() {
    92  	s.init()
    93  
    94  	// Defer the callbacker cleanup so that it occurs even in the case
    95  	// that one of the tasks panics and is propagated up by s.pool.Wait().
    96  	defer func() {
    97  		close(s.queue)
    98  		s.callbackerHandle.Wait()
    99  	}()
   100  
   101  	// Wait for all the workers to exit.
   102  	s.pool.Wait()
   103  }
   104  
   105  func (s *Stream) WithMaxGoroutines(n int) *Stream {
   106  	s.pool.WithMaxGoroutines(n)
   107  	return s
   108  }
   109  
   110  func (s *Stream) init() {
   111  	s.initOnce.Do(func() {
   112  		s.queue = make(chan callbackCh, s.pool.MaxGoroutines()+1)
   113  
   114  		// Start the callbacker.
   115  		s.callbackerHandle.Go(s.callbacker)
   116  	})
   117  }
   118  
   119  // callbacker is responsible for calling the returned callbacks in the order
   120  // they were submitted. There is only a single instance of callbacker running.
   121  func (s *Stream) callbacker() {
   122  	var panicCatcher panics.Catcher
   123  	defer panicCatcher.Repanic()
   124  
   125  	// For every scheduled task, read that tasks channel from the queue.
   126  	for callbackCh := range s.queue {
   127  		// Wait for the task to complete and get its callback from the channel.
   128  		callback := <-callbackCh
   129  
   130  		// Execute the callback (with panic protection).
   131  		panicCatcher.Try(callback)
   132  
   133  		// Return the channel to the pool of unused channels.
   134  		putCh(callbackCh)
   135  	}
   136  }
   137  
   138  type callbackCh chan func()
   139  
   140  var callbackChPool = sync.Pool{
   141  	New: func() any {
   142  		return make(callbackCh, 1)
   143  	},
   144  }
   145  
   146  func getCh() callbackCh {
   147  	return callbackChPool.Get().(callbackCh)
   148  }
   149  
   150  func putCh(ch callbackCh) {
   151  	callbackChPool.Put(ch)
   152  }
   153  

View as plain text