...

Source file src/github.com/sourcegraph/conc/pool/error_pool_test.go

Documentation: github.com/sourcegraph/conc/pool

     1  package pool
     2  
     3  import (
     4  	"errors"
     5  	"fmt"
     6  	"strconv"
     7  	"sync/atomic"
     8  	"testing"
     9  	"time"
    10  
    11  	"github.com/stretchr/testify/require"
    12  )
    13  
    14  func ExampleErrorPool() {
    15  	p := New().WithErrors()
    16  	for i := 0; i < 3; i++ {
    17  		i := i
    18  		p.Go(func() error {
    19  			if i == 2 {
    20  				return errors.New("oh no!")
    21  			}
    22  			return nil
    23  		})
    24  	}
    25  	err := p.Wait()
    26  	fmt.Println(err)
    27  	// Output:
    28  	// oh no!
    29  }
    30  
    31  func TestErrorPool(t *testing.T) {
    32  	t.Parallel()
    33  
    34  	err1 := errors.New("err1")
    35  	err2 := errors.New("err2")
    36  
    37  	t.Run("panics on configuration after init", func(t *testing.T) {
    38  		t.Run("before wait", func(t *testing.T) {
    39  			t.Parallel()
    40  			g := New().WithErrors()
    41  			g.Go(func() error { return nil })
    42  			require.Panics(t, func() { g.WithMaxGoroutines(10) })
    43  		})
    44  
    45  		t.Run("after wait", func(t *testing.T) {
    46  			t.Parallel()
    47  			g := New().WithErrors()
    48  			g.Go(func() error { return nil })
    49  			_ = g.Wait()
    50  			require.Panics(t, func() { g.WithMaxGoroutines(10) })
    51  		})
    52  	})
    53  
    54  	t.Run("wait returns no error if no errors", func(t *testing.T) {
    55  		t.Parallel()
    56  		g := New().WithErrors()
    57  		g.Go(func() error { return nil })
    58  		require.NoError(t, g.Wait())
    59  	})
    60  
    61  	t.Run("wait error if func returns error", func(t *testing.T) {
    62  		t.Parallel()
    63  		g := New().WithErrors()
    64  		g.Go(func() error { return err1 })
    65  		require.ErrorIs(t, g.Wait(), err1)
    66  	})
    67  
    68  	t.Run("wait error is all returned errors", func(t *testing.T) {
    69  		t.Parallel()
    70  		g := New().WithErrors()
    71  		g.Go(func() error { return err1 })
    72  		g.Go(func() error { return nil })
    73  		g.Go(func() error { return err2 })
    74  		err := g.Wait()
    75  		require.ErrorIs(t, err, err1)
    76  		require.ErrorIs(t, err, err2)
    77  	})
    78  
    79  	t.Run("propagates panics", func(t *testing.T) {
    80  		t.Parallel()
    81  		g := New().WithErrors()
    82  		for i := 0; i < 10; i++ {
    83  			i := i
    84  			g.Go(func() error {
    85  				if i == 5 {
    86  					panic("fatal")
    87  				}
    88  				return nil
    89  			})
    90  		}
    91  		require.Panics(t, func() { _ = g.Wait() })
    92  	})
    93  
    94  	t.Run("limit", func(t *testing.T) {
    95  		t.Parallel()
    96  		for _, maxGoroutines := range []int{1, 10, 100} {
    97  			t.Run(strconv.Itoa(maxGoroutines), func(t *testing.T) {
    98  				g := New().WithErrors().WithMaxGoroutines(maxGoroutines)
    99  
   100  				var currentConcurrent atomic.Int64
   101  				taskCount := maxGoroutines * 10
   102  				for i := 0; i < taskCount; i++ {
   103  					g.Go(func() error {
   104  						cur := currentConcurrent.Add(1)
   105  						if cur > int64(maxGoroutines) {
   106  							return fmt.Errorf("expected no more than %d concurrent goroutine", maxGoroutines)
   107  						}
   108  						time.Sleep(time.Millisecond)
   109  						currentConcurrent.Add(-1)
   110  						return nil
   111  					})
   112  				}
   113  				require.NoError(t, g.Wait())
   114  				require.Equal(t, int64(0), currentConcurrent.Load())
   115  			})
   116  		}
   117  	})
   118  }
   119  

View as plain text