...

Source file src/github.com/sourcegraph/conc/pool/pool_test.go

Documentation: github.com/sourcegraph/conc/pool

     1  package pool
     2  
     3  import (
     4  	"fmt"
     5  	"strconv"
     6  	"sync/atomic"
     7  	"testing"
     8  	"time"
     9  
    10  	"github.com/stretchr/testify/require"
    11  )
    12  
    13  func ExamplePool() {
    14  	p := New().WithMaxGoroutines(3)
    15  	for i := 0; i < 5; i++ {
    16  		p.Go(func() {
    17  			fmt.Println("conc")
    18  		})
    19  	}
    20  	p.Wait()
    21  	// Output:
    22  	// conc
    23  	// conc
    24  	// conc
    25  	// conc
    26  	// conc
    27  }
    28  
    29  func TestPool(t *testing.T) {
    30  	t.Parallel()
    31  
    32  	t.Run("basic", func(t *testing.T) {
    33  		t.Parallel()
    34  
    35  		g := New()
    36  		var completed atomic.Int64
    37  		for i := 0; i < 100; i++ {
    38  			g.Go(func() {
    39  				time.Sleep(1 * time.Millisecond)
    40  				completed.Add(1)
    41  			})
    42  		}
    43  		g.Wait()
    44  		require.Equal(t, completed.Load(), int64(100))
    45  	})
    46  
    47  	t.Run("panics on configuration after init", func(t *testing.T) {
    48  		t.Run("before wait", func(t *testing.T) {
    49  			t.Parallel()
    50  			g := New()
    51  			g.Go(func() {})
    52  			require.Panics(t, func() { g.WithMaxGoroutines(10) })
    53  		})
    54  
    55  		t.Run("after wait", func(t *testing.T) {
    56  			t.Parallel()
    57  			g := New()
    58  			g.Go(func() {})
    59  			g.Wait()
    60  			require.Panics(t, func() { g.WithMaxGoroutines(10) })
    61  		})
    62  	})
    63  
    64  	t.Run("limit", func(t *testing.T) {
    65  		t.Parallel()
    66  		for _, maxConcurrent := range []int{1, 10, 100} {
    67  			t.Run(strconv.Itoa(maxConcurrent), func(t *testing.T) {
    68  				g := New().WithMaxGoroutines(maxConcurrent)
    69  
    70  				var currentConcurrent atomic.Int64
    71  				var errCount atomic.Int64
    72  				taskCount := maxConcurrent * 10
    73  				for i := 0; i < taskCount; i++ {
    74  					g.Go(func() {
    75  						cur := currentConcurrent.Add(1)
    76  						if cur > int64(maxConcurrent) {
    77  							errCount.Add(1)
    78  						}
    79  						time.Sleep(time.Millisecond)
    80  						currentConcurrent.Add(-1)
    81  					})
    82  				}
    83  				g.Wait()
    84  				require.Equal(t, int64(0), errCount.Load())
    85  				require.Equal(t, int64(0), currentConcurrent.Load())
    86  			})
    87  		}
    88  	})
    89  
    90  	t.Run("propagate panic", func(t *testing.T) {
    91  		t.Parallel()
    92  		g := New()
    93  		for i := 0; i < 10; i++ {
    94  			i := i
    95  			g.Go(func() {
    96  				if i == 5 {
    97  					panic(i)
    98  				}
    99  			})
   100  		}
   101  		require.Panics(t, g.Wait)
   102  	})
   103  
   104  	t.Run("panics do not exhaust goroutines", func(t *testing.T) {
   105  		t.Parallel()
   106  		g := New().WithMaxGoroutines(2)
   107  		for i := 0; i < 10; i++ {
   108  			g.Go(func() {
   109  				panic(42)
   110  			})
   111  		}
   112  		require.Panics(t, g.Wait)
   113  	})
   114  
   115  	t.Run("panics on invalid WithMaxGoroutines", func(t *testing.T) {
   116  		t.Parallel()
   117  		require.Panics(t, func() { New().WithMaxGoroutines(0) })
   118  	})
   119  
   120  	t.Run("returns correct MaxGoroutines", func(t *testing.T) {
   121  		t.Parallel()
   122  		p := New().WithMaxGoroutines(42)
   123  		require.Equal(t, 42, p.MaxGoroutines())
   124  	})
   125  }
   126  
   127  func BenchmarkPool(b *testing.B) {
   128  	b.Run("startup and teardown", func(b *testing.B) {
   129  		for i := 0; i < b.N; i++ {
   130  			p := New()
   131  			p.Go(func() {})
   132  			p.Wait()
   133  		}
   134  	})
   135  
   136  	b.Run("per task", func(b *testing.B) {
   137  		p := New()
   138  		f := func() {}
   139  		for i := 0; i < b.N; i++ {
   140  			p.Go(f)
   141  		}
   142  		p.Wait()
   143  	})
   144  }
   145  

View as plain text