...
1
2 package bgreader
3
4 import (
5 "io"
6 "sync"
7
8 "github.com/jackc/pgx/v5/internal/iobufpool"
9 )
10
11 const (
12 StatusStopped = iota
13 StatusRunning
14 StatusStopping
15 )
16
17
18 type BGReader struct {
19 r io.Reader
20
21 cond *sync.Cond
22 status int32
23 readResults []readResult
24 }
25
26 type readResult struct {
27 buf *[]byte
28 err error
29 }
30
31
32
33 func (r *BGReader) Start() {
34 r.cond.L.Lock()
35 defer r.cond.L.Unlock()
36
37 switch r.status {
38 case StatusStopped:
39 r.status = StatusRunning
40 go r.bgRead()
41 case StatusRunning:
42
43 case StatusStopping:
44 r.status = StatusRunning
45 }
46 }
47
48
49
50 func (r *BGReader) Stop() {
51 r.cond.L.Lock()
52 defer r.cond.L.Unlock()
53
54 switch r.status {
55 case StatusStopped:
56
57 case StatusRunning:
58 r.status = StatusStopping
59 case StatusStopping:
60
61 }
62 }
63
64
65 func (r *BGReader) Status() int32 {
66 r.cond.L.Lock()
67 defer r.cond.L.Unlock()
68 return r.status
69 }
70
71 func (r *BGReader) bgRead() {
72 keepReading := true
73 for keepReading {
74 buf := iobufpool.Get(8192)
75 n, err := r.r.Read(*buf)
76 *buf = (*buf)[:n]
77
78 r.cond.L.Lock()
79 r.readResults = append(r.readResults, readResult{buf: buf, err: err})
80 if r.status == StatusStopping || err != nil {
81 r.status = StatusStopped
82 keepReading = false
83 }
84 r.cond.L.Unlock()
85 r.cond.Broadcast()
86 }
87 }
88
89
90 func (r *BGReader) Read(p []byte) (int, error) {
91 r.cond.L.Lock()
92 defer r.cond.L.Unlock()
93
94 if len(r.readResults) > 0 {
95 return r.readFromReadResults(p)
96 }
97
98
99 if r.status == StatusStopped {
100 return r.r.Read(p)
101 }
102
103
104 for len(r.readResults) == 0 {
105 r.cond.Wait()
106 }
107 return r.readFromReadResults(p)
108 }
109
110
111 func (r *BGReader) readFromReadResults(p []byte) (int, error) {
112 buf := r.readResults[0].buf
113 var err error
114
115 n := copy(p, *buf)
116 if n == len(*buf) {
117 err = r.readResults[0].err
118 iobufpool.Put(buf)
119 if len(r.readResults) == 1 {
120 r.readResults = nil
121 } else {
122 r.readResults = r.readResults[1:]
123 }
124 } else {
125 *buf = (*buf)[n:]
126 r.readResults[0].buf = buf
127 }
128
129 return n, err
130 }
131
132 func New(r io.Reader) *BGReader {
133 return &BGReader{
134 r: r,
135 cond: &sync.Cond{
136 L: &sync.Mutex{},
137 },
138 }
139 }
140
View as plain text