1 package bgreader_test
2
3 import (
4 "bytes"
5 "errors"
6 "io"
7 "math/rand"
8 "testing"
9 "time"
10
11 "github.com/jackc/pgx/v5/pgconn/internal/bgreader"
12 "github.com/stretchr/testify/require"
13 )
14
15 func TestBGReaderReadWhenStopped(t *testing.T) {
16 r := bytes.NewReader([]byte("foo bar baz"))
17 bgr := bgreader.New(r)
18 buf, err := io.ReadAll(bgr)
19 require.NoError(t, err)
20 require.Equal(t, []byte("foo bar baz"), buf)
21 }
22
23 func TestBGReaderReadWhenStarted(t *testing.T) {
24 r := bytes.NewReader([]byte("foo bar baz"))
25 bgr := bgreader.New(r)
26 bgr.Start()
27 buf, err := io.ReadAll(bgr)
28 require.NoError(t, err)
29 require.Equal(t, []byte("foo bar baz"), buf)
30 }
31
32 type mockReadFunc func(p []byte) (int, error)
33
34 type mockReader struct {
35 readFuncs []mockReadFunc
36 }
37
38 func (r *mockReader) Read(p []byte) (int, error) {
39 if len(r.readFuncs) == 0 {
40 return 0, io.EOF
41 }
42
43 fn := r.readFuncs[0]
44 r.readFuncs = r.readFuncs[1:]
45
46 return fn(p)
47 }
48
49 func TestBGReaderReadWaitsForBackgroundRead(t *testing.T) {
50 rr := &mockReader{
51 readFuncs: []mockReadFunc{
52 func(p []byte) (int, error) { time.Sleep(1 * time.Second); return copy(p, []byte("foo")), nil },
53 func(p []byte) (int, error) { return copy(p, []byte("bar")), nil },
54 func(p []byte) (int, error) { return copy(p, []byte("baz")), nil },
55 },
56 }
57 bgr := bgreader.New(rr)
58 bgr.Start()
59 buf := make([]byte, 3)
60 n, err := bgr.Read(buf)
61 require.NoError(t, err)
62 require.EqualValues(t, 3, n)
63 require.Equal(t, []byte("foo"), buf)
64 }
65
66 func TestBGReaderErrorWhenStarted(t *testing.T) {
67 rr := &mockReader{
68 readFuncs: []mockReadFunc{
69 func(p []byte) (int, error) { return copy(p, []byte("foo")), nil },
70 func(p []byte) (int, error) { return copy(p, []byte("bar")), nil },
71 func(p []byte) (int, error) { return copy(p, []byte("baz")), errors.New("oops") },
72 },
73 }
74
75 bgr := bgreader.New(rr)
76 bgr.Start()
77 buf, err := io.ReadAll(bgr)
78 require.Equal(t, []byte("foobarbaz"), buf)
79 require.EqualError(t, err, "oops")
80 }
81
82 func TestBGReaderErrorWhenStopped(t *testing.T) {
83 rr := &mockReader{
84 readFuncs: []mockReadFunc{
85 func(p []byte) (int, error) { return copy(p, []byte("foo")), nil },
86 func(p []byte) (int, error) { return copy(p, []byte("bar")), nil },
87 func(p []byte) (int, error) { return copy(p, []byte("baz")), errors.New("oops") },
88 },
89 }
90
91 bgr := bgreader.New(rr)
92 buf, err := io.ReadAll(bgr)
93 require.Equal(t, []byte("foobarbaz"), buf)
94 require.EqualError(t, err, "oops")
95 }
96
97 type numberReader struct {
98 v uint8
99 rng *rand.Rand
100 }
101
102 func (nr *numberReader) Read(p []byte) (int, error) {
103 n := nr.rng.Intn(len(p))
104 for i := 0; i < n; i++ {
105 p[i] = nr.v
106 nr.v++
107 }
108
109 return n, nil
110 }
111
112
113
114 func TestBGReaderStress(t *testing.T) {
115 nr := &numberReader{rng: rand.New(rand.NewSource(0))}
116 bgr := bgreader.New(nr)
117
118 bytesRead := 0
119 var expected uint8
120 buf := make([]byte, 10_000)
121 rng := rand.New(rand.NewSource(0))
122
123 for bytesRead < 1_000_000 {
124 randomNumber := rng.Intn(100)
125 switch {
126 case randomNumber < 10:
127 go bgr.Start()
128 case randomNumber < 20:
129 go bgr.Stop()
130 default:
131 n, err := bgr.Read(buf)
132 require.NoError(t, err)
133 for i := 0; i < n; i++ {
134 require.Equal(t, expected, buf[i])
135 expected++
136 }
137 bytesRead += n
138 }
139 }
140 }
141
View as plain text