...

Source file src/github.com/sourcegraph/conc/stream/stream_test.go

Documentation: github.com/sourcegraph/conc/stream

     1  package stream
     2  
     3  import (
     4  	"fmt"
     5  	"sync/atomic"
     6  	"testing"
     7  	"time"
     8  
     9  	"github.com/stretchr/testify/require"
    10  )
    11  
    12  func ExampleStream() {
    13  	times := []int{20, 52, 16, 45, 4, 80}
    14  
    15  	stream := New()
    16  	for _, millis := range times {
    17  		dur := time.Duration(millis) * time.Millisecond
    18  		stream.Go(func() Callback {
    19  			time.Sleep(dur)
    20  			// This will print in the order the tasks were submitted
    21  			return func() { fmt.Println(dur) }
    22  		})
    23  	}
    24  	stream.Wait()
    25  
    26  	// Output:
    27  	// 20ms
    28  	// 52ms
    29  	// 16ms
    30  	// 45ms
    31  	// 4ms
    32  	// 80ms
    33  }
    34  
    35  func TestStream(t *testing.T) {
    36  	t.Parallel()
    37  
    38  	t.Run("simple", func(t *testing.T) {
    39  		t.Parallel()
    40  		s := New()
    41  		var res []int
    42  		for i := 0; i < 5; i++ {
    43  			i := i
    44  			s.Go(func() Callback {
    45  				i *= 2
    46  				return func() {
    47  					res = append(res, i)
    48  				}
    49  			})
    50  		}
    51  		s.Wait()
    52  		require.Equal(t, []int{0, 2, 4, 6, 8}, res)
    53  	})
    54  
    55  	t.Run("max goroutines", func(t *testing.T) {
    56  		t.Parallel()
    57  		s := New().WithMaxGoroutines(5)
    58  		var currentTaskCount atomic.Int64
    59  		var currentCallbackCount atomic.Int64
    60  		for i := 0; i < 50; i++ {
    61  			s.Go(func() Callback {
    62  				curr := currentTaskCount.Add(1)
    63  				if curr > 5 {
    64  					t.Fatal("too many concurrent tasks being executed")
    65  				}
    66  				defer currentTaskCount.Add(-1)
    67  
    68  				time.Sleep(time.Millisecond)
    69  
    70  				return func() {
    71  					curr := currentCallbackCount.Add(1)
    72  					if curr > 1 {
    73  						t.Fatal("too many concurrent callbacks being executed")
    74  					}
    75  
    76  					time.Sleep(time.Millisecond)
    77  
    78  					defer currentCallbackCount.Add(-1)
    79  				}
    80  			})
    81  		}
    82  		s.Wait()
    83  	})
    84  
    85  	t.Run("panic in task is propagated", func(t *testing.T) {
    86  		t.Parallel()
    87  		s := New().WithMaxGoroutines(5)
    88  		s.Go(func() Callback {
    89  			panic("something really bad happened in the task")
    90  		})
    91  		require.Panics(t, s.Wait)
    92  	})
    93  
    94  	t.Run("panic in callback is propagated", func(t *testing.T) {
    95  		t.Parallel()
    96  		s := New().WithMaxGoroutines(5)
    97  		s.Go(func() Callback {
    98  			return func() {
    99  				panic("something really bad happened in the callback")
   100  			}
   101  		})
   102  		require.Panics(t, s.Wait)
   103  	})
   104  
   105  	t.Run("panic in callback does not block producers", func(t *testing.T) {
   106  		t.Parallel()
   107  		s := New().WithMaxGoroutines(5)
   108  		s.Go(func() Callback {
   109  			return func() {
   110  				panic("something really bad happened in the callback")
   111  			}
   112  		})
   113  		for i := 0; i < 100; i++ {
   114  			s.Go(func() Callback {
   115  				return func() {}
   116  			})
   117  		}
   118  		require.Panics(t, s.Wait)
   119  	})
   120  }
   121  
   122  func BenchmarkStream(b *testing.B) {
   123  	b.Run("startup and teardown", func(b *testing.B) {
   124  		for i := 0; i < b.N; i++ {
   125  			s := New()
   126  			s.Go(func() Callback { return func() {} })
   127  			s.Wait()
   128  		}
   129  	})
   130  
   131  	b.Run("per task", func(b *testing.B) {
   132  		n := 0
   133  		s := New()
   134  		for i := 0; i < b.N; i++ {
   135  			s.Go(func() Callback {
   136  				return func() {
   137  					n += 1
   138  				}
   139  			})
   140  		}
   141  		s.Wait()
   142  	})
   143  }
   144  

View as plain text