...

Source file src/github.com/jackc/pgx/v5/pgconn/internal/bgreader/bgreader_test.go

Documentation: github.com/jackc/pgx/v5/pgconn/internal/bgreader

     1  package bgreader_test
     2  
     3  import (
     4  	"bytes"
     5  	"errors"
     6  	"io"
     7  	"math/rand"
     8  	"testing"
     9  	"time"
    10  
    11  	"github.com/jackc/pgx/v5/pgconn/internal/bgreader"
    12  	"github.com/stretchr/testify/require"
    13  )
    14  
    15  func TestBGReaderReadWhenStopped(t *testing.T) {
    16  	r := bytes.NewReader([]byte("foo bar baz"))
    17  	bgr := bgreader.New(r)
    18  	buf, err := io.ReadAll(bgr)
    19  	require.NoError(t, err)
    20  	require.Equal(t, []byte("foo bar baz"), buf)
    21  }
    22  
    23  func TestBGReaderReadWhenStarted(t *testing.T) {
    24  	r := bytes.NewReader([]byte("foo bar baz"))
    25  	bgr := bgreader.New(r)
    26  	bgr.Start()
    27  	buf, err := io.ReadAll(bgr)
    28  	require.NoError(t, err)
    29  	require.Equal(t, []byte("foo bar baz"), buf)
    30  }
    31  
    32  type mockReadFunc func(p []byte) (int, error)
    33  
    34  type mockReader struct {
    35  	readFuncs []mockReadFunc
    36  }
    37  
    38  func (r *mockReader) Read(p []byte) (int, error) {
    39  	if len(r.readFuncs) == 0 {
    40  		return 0, io.EOF
    41  	}
    42  
    43  	fn := r.readFuncs[0]
    44  	r.readFuncs = r.readFuncs[1:]
    45  
    46  	return fn(p)
    47  }
    48  
    49  func TestBGReaderReadWaitsForBackgroundRead(t *testing.T) {
    50  	rr := &mockReader{
    51  		readFuncs: []mockReadFunc{
    52  			func(p []byte) (int, error) { time.Sleep(1 * time.Second); return copy(p, []byte("foo")), nil },
    53  			func(p []byte) (int, error) { return copy(p, []byte("bar")), nil },
    54  			func(p []byte) (int, error) { return copy(p, []byte("baz")), nil },
    55  		},
    56  	}
    57  	bgr := bgreader.New(rr)
    58  	bgr.Start()
    59  	buf := make([]byte, 3)
    60  	n, err := bgr.Read(buf)
    61  	require.NoError(t, err)
    62  	require.EqualValues(t, 3, n)
    63  	require.Equal(t, []byte("foo"), buf)
    64  }
    65  
    66  func TestBGReaderErrorWhenStarted(t *testing.T) {
    67  	rr := &mockReader{
    68  		readFuncs: []mockReadFunc{
    69  			func(p []byte) (int, error) { return copy(p, []byte("foo")), nil },
    70  			func(p []byte) (int, error) { return copy(p, []byte("bar")), nil },
    71  			func(p []byte) (int, error) { return copy(p, []byte("baz")), errors.New("oops") },
    72  		},
    73  	}
    74  
    75  	bgr := bgreader.New(rr)
    76  	bgr.Start()
    77  	buf, err := io.ReadAll(bgr)
    78  	require.Equal(t, []byte("foobarbaz"), buf)
    79  	require.EqualError(t, err, "oops")
    80  }
    81  
    82  func TestBGReaderErrorWhenStopped(t *testing.T) {
    83  	rr := &mockReader{
    84  		readFuncs: []mockReadFunc{
    85  			func(p []byte) (int, error) { return copy(p, []byte("foo")), nil },
    86  			func(p []byte) (int, error) { return copy(p, []byte("bar")), nil },
    87  			func(p []byte) (int, error) { return copy(p, []byte("baz")), errors.New("oops") },
    88  		},
    89  	}
    90  
    91  	bgr := bgreader.New(rr)
    92  	buf, err := io.ReadAll(bgr)
    93  	require.Equal(t, []byte("foobarbaz"), buf)
    94  	require.EqualError(t, err, "oops")
    95  }
    96  
    97  type numberReader struct {
    98  	v   uint8
    99  	rng *rand.Rand
   100  }
   101  
   102  func (nr *numberReader) Read(p []byte) (int, error) {
   103  	n := nr.rng.Intn(len(p))
   104  	for i := 0; i < n; i++ {
   105  		p[i] = nr.v
   106  		nr.v++
   107  	}
   108  
   109  	return n, nil
   110  }
   111  
   112  // TestBGReaderStress stress tests BGReader by reading a lot of bytes in random sizes while randomly starting and
   113  // stopping the background worker from other goroutines.
   114  func TestBGReaderStress(t *testing.T) {
   115  	nr := &numberReader{rng: rand.New(rand.NewSource(0))}
   116  	bgr := bgreader.New(nr)
   117  
   118  	bytesRead := 0
   119  	var expected uint8
   120  	buf := make([]byte, 10_000)
   121  	rng := rand.New(rand.NewSource(0))
   122  
   123  	for bytesRead < 1_000_000 {
   124  		randomNumber := rng.Intn(100)
   125  		switch {
   126  		case randomNumber < 10:
   127  			go bgr.Start()
   128  		case randomNumber < 20:
   129  			go bgr.Stop()
   130  		default:
   131  			n, err := bgr.Read(buf)
   132  			require.NoError(t, err)
   133  			for i := 0; i < n; i++ {
   134  				require.Equal(t, expected, buf[i])
   135  				expected++
   136  			}
   137  			bytesRead += n
   138  		}
   139  	}
   140  }
   141  

View as plain text