1 package pgx
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7
8 "github.com/jackc/pgconn"
9 )
10
11 type batchItem struct {
12 query string
13 arguments []interface{}
14 }
15
16
17
18 type Batch struct {
19 items []*batchItem
20 }
21
22
23 func (b *Batch) Queue(query string, arguments ...interface{}) {
24 b.items = append(b.items, &batchItem{
25 query: query,
26 arguments: arguments,
27 })
28 }
29
30
31 func (b *Batch) Len() int {
32 return len(b.items)
33 }
34
35 type BatchResults interface {
36
37 Exec() (pgconn.CommandTag, error)
38
39
40 Query() (Rows, error)
41
42
43 QueryRow() Row
44
45
46 QueryFunc(scans []interface{}, f func(QueryFuncRow) error) (pgconn.CommandTag, error)
47
48
49
50
51 Close() error
52 }
53
54 type batchResults struct {
55 ctx context.Context
56 conn *Conn
57 mrr *pgconn.MultiResultReader
58 err error
59 b *Batch
60 ix int
61 closed bool
62 }
63
64
65 func (br *batchResults) Exec() (pgconn.CommandTag, error) {
66 if br.err != nil {
67 return nil, br.err
68 }
69 if br.closed {
70 return nil, fmt.Errorf("batch already closed")
71 }
72
73 query, arguments, _ := br.nextQueryAndArgs()
74
75 if !br.mrr.NextResult() {
76 err := br.mrr.Close()
77 if err == nil {
78 err = errors.New("no result")
79 }
80 if br.conn.shouldLog(LogLevelError) {
81 br.conn.log(br.ctx, LogLevelError, "BatchResult.Exec", map[string]interface{}{
82 "sql": query,
83 "args": logQueryArgs(arguments),
84 "err": err,
85 })
86 }
87 return nil, err
88 }
89
90 commandTag, err := br.mrr.ResultReader().Close()
91
92 if err != nil {
93 if br.conn.shouldLog(LogLevelError) {
94 br.conn.log(br.ctx, LogLevelError, "BatchResult.Exec", map[string]interface{}{
95 "sql": query,
96 "args": logQueryArgs(arguments),
97 "err": err,
98 })
99 }
100 } else if br.conn.shouldLog(LogLevelInfo) {
101 br.conn.log(br.ctx, LogLevelInfo, "BatchResult.Exec", map[string]interface{}{
102 "sql": query,
103 "args": logQueryArgs(arguments),
104 "commandTag": commandTag,
105 })
106 }
107
108 return commandTag, err
109 }
110
111
112 func (br *batchResults) Query() (Rows, error) {
113 query, arguments, ok := br.nextQueryAndArgs()
114 if !ok {
115 query = "batch query"
116 }
117
118 if br.err != nil {
119 return &connRows{err: br.err, closed: true}, br.err
120 }
121
122 if br.closed {
123 alreadyClosedErr := fmt.Errorf("batch already closed")
124 return &connRows{err: alreadyClosedErr, closed: true}, alreadyClosedErr
125 }
126
127 rows := br.conn.getRows(br.ctx, query, arguments)
128
129 if !br.mrr.NextResult() {
130 rows.err = br.mrr.Close()
131 if rows.err == nil {
132 rows.err = errors.New("no result")
133 }
134 rows.closed = true
135
136 if br.conn.shouldLog(LogLevelError) {
137 br.conn.log(br.ctx, LogLevelError, "BatchResult.Query", map[string]interface{}{
138 "sql": query,
139 "args": logQueryArgs(arguments),
140 "err": rows.err,
141 })
142 }
143
144 return rows, rows.err
145 }
146
147 rows.resultReader = br.mrr.ResultReader()
148 return rows, nil
149 }
150
151
152 func (br *batchResults) QueryFunc(scans []interface{}, f func(QueryFuncRow) error) (pgconn.CommandTag, error) {
153 if br.closed {
154 return nil, fmt.Errorf("batch already closed")
155 }
156
157 rows, err := br.Query()
158 if err != nil {
159 return nil, err
160 }
161 defer rows.Close()
162
163 for rows.Next() {
164 err = rows.Scan(scans...)
165 if err != nil {
166 return nil, err
167 }
168
169 err = f(rows)
170 if err != nil {
171 return nil, err
172 }
173 }
174
175 if err := rows.Err(); err != nil {
176 return nil, err
177 }
178
179 return rows.CommandTag(), nil
180 }
181
182
183 func (br *batchResults) QueryRow() Row {
184 rows, _ := br.Query()
185 return (*connRow)(rows.(*connRows))
186
187 }
188
189
190
191 func (br *batchResults) Close() error {
192 if br.err != nil {
193 return br.err
194 }
195
196 if br.closed {
197 return nil
198 }
199 br.closed = true
200
201
202 for {
203 query, args, ok := br.nextQueryAndArgs()
204 if !ok {
205 break
206 }
207
208 if br.conn.shouldLog(LogLevelInfo) {
209 br.conn.log(br.ctx, LogLevelInfo, "BatchResult.Close", map[string]interface{}{
210 "sql": query,
211 "args": logQueryArgs(args),
212 })
213 }
214 }
215
216 return br.mrr.Close()
217 }
218
219 func (br *batchResults) nextQueryAndArgs() (query string, args []interface{}, ok bool) {
220 if br.b != nil && br.ix < len(br.b.items) {
221 bi := br.b.items[br.ix]
222 query = bi.query
223 args = bi.arguments
224 ok = true
225 br.ix++
226 }
227 return
228 }
229
View as plain text