1
2
3
4
5
6
7
8
9
10
11
12
13 package kivik
14
15 import (
16 "context"
17 "encoding/json"
18 "errors"
19 "io"
20 "net/http"
21 "reflect"
22 "sync"
23
24 "github.com/go-kivik/kivik/v4/driver"
25 internal "github.com/go-kivik/kivik/v4/int/errors"
26 )
27
28
29 type ResultMetadata struct {
30
31 Offset int64
32
33
34
35 TotalRows int64
36
37
38
39 UpdateSeq string
40
41
42 Warning string
43
44
45
46
47
48
49
50 Bookmark string
51 }
52
53
54
55
56
57
58
59
60
61 type ResultSet struct {
62 *iter
63 rowsi driver.Rows
64 }
65
66 func newResultSet(ctx context.Context, onClose func(), rowsi driver.Rows) *ResultSet {
67 return &ResultSet{
68 iter: newIterator(ctx, onClose, &rowsIterator{Rows: rowsi}, &driver.Row{}),
69 rowsi: rowsi,
70 }
71 }
72
73
74
75
76
77
78
79
80 func (r *ResultSet) Next() bool {
81 r.iter.mu.Lock()
82 defer r.iter.mu.Unlock()
83 if r.err != nil {
84 return false
85 }
86 if atts := r.curVal.(*driver.Row).Attachments; atts != nil {
87 _ = atts.Close()
88 }
89
90 return r.iter.next()
91 }
92
93
94
95
96
97
98
99 func (r *ResultSet) NextResultSet() bool {
100 r.mu.Lock()
101 defer r.mu.Unlock()
102 if r.err != nil {
103 return false
104 }
105 if r.state == stateClosed {
106 return false
107 }
108 if r.state == stateRowReady {
109 r.err = errors.New("must call NextResultSet before Next")
110 return false
111 }
112 r.state = stateResultSetReady
113 return true
114 }
115
116
117
118
119 func (r *ResultSet) Err() error {
120 return r.iter.Err()
121 }
122
123
124
125
126
127
128 func (r *ResultSet) Close() error {
129 return r.iter.Close()
130 }
131
132
133
134
135 func (r *ResultSet) Metadata() (*ResultMetadata, error) {
136 for r.iter == nil || (r.state != stateEOQ && r.state != stateClosed) {
137 return nil, &internal.Error{Status: http.StatusBadRequest, Err: errors.New("Metadata must not be called until result set iteration is complete")}
138 }
139 return r.feed.(*rowsIterator).ResultMetadata, nil
140 }
141
142
143
144
145
146
147
148
149
150 func (r *ResultSet) ScanValue(dest interface{}) error {
151 runlock, err := r.makeReady()
152 if err != nil {
153 return err
154 }
155 defer runlock()
156 row := r.curVal.(*driver.Row)
157 if row.Error != nil {
158 return row.Error
159 }
160 if row.Value != nil {
161 return json.NewDecoder(row.Value).Decode(dest)
162 }
163 return nil
164 }
165
166
167
168
169
170
171
172 func (r *ResultSet) ScanDoc(dest interface{}) error {
173 runlock, err := r.makeReady()
174 if err != nil {
175 return err
176 }
177 defer runlock()
178 row := r.curVal.(*driver.Row)
179 if err := row.Error; err != nil {
180 return err
181 }
182 if row.Doc != nil {
183 return json.NewDecoder(row.Doc).Decode(dest)
184 }
185 return &internal.Error{Status: http.StatusBadRequest, Message: "kivik: doc is nil; does the query include docs?"}
186 }
187
188
189
190
191
192
193
194 func (r *ResultSet) ScanKey(dest interface{}) error {
195 runlock, err := r.makeReady()
196 if err != nil {
197 return err
198 }
199 defer runlock()
200 row := r.curVal.(*driver.Row)
201 if err := json.Unmarshal(row.Key, dest); err != nil {
202 return err
203 }
204 return row.Error
205 }
206
207
208 func (r *ResultSet) ID() (string, error) {
209 runlock, err := r.makeReady()
210 if err != nil {
211 return "", err
212 }
213 defer runlock()
214 row := r.curVal.(*driver.Row)
215 return row.ID, row.Error
216 }
217
218
219
220
221 func (r *ResultSet) Rev() (string, error) {
222 runlock, err := r.makeReady()
223 if err != nil {
224 return "", err
225 }
226 defer runlock()
227 row := r.curVal.(*driver.Row)
228 return row.Rev, row.Error
229 }
230
231
232
233 func (r *ResultSet) Key() (string, error) {
234 runlock, err := r.makeReady()
235 if err != nil {
236 return "", err
237 }
238 defer runlock()
239 row := r.curVal.(*driver.Row)
240 return string(row.Key), row.Error
241 }
242
243
244
245 func (r *ResultSet) Attachments() (*AttachmentsIterator, error) {
246 runlock, err := r.makeReady()
247 if err != nil {
248 return nil, err
249 }
250 row := r.curVal.(*driver.Row)
251 if row.Error != nil {
252 runlock()
253 return nil, row.Error
254 }
255 if row.Attachments == nil {
256 runlock()
257 return nil, errNoAttachments
258 }
259 return &AttachmentsIterator{
260 onClose: runlock,
261 atti: row.Attachments,
262 }, nil
263 }
264
265
266
267 func (r *ResultSet) makeReady() (unlock func(), err error) {
268 r.mu.Lock()
269 if r.err != nil {
270 r.mu.Unlock()
271 return nil, r.err
272 }
273 if r.state == stateClosed {
274 r.mu.Unlock()
275 return nil, &internal.Error{Status: http.StatusBadRequest, Message: "kivik: Iterator is closed"}
276 }
277 if !stateIsReady(r.state) {
278 r.mu.Unlock()
279 return nil, &internal.Error{Status: http.StatusBadRequest, Message: "kivik: Iterator access before calling Next"}
280 }
281 var once sync.Once
282 r.wg.Add(1)
283 return func() {
284 once.Do(func() {
285 r.wg.Done()
286 r.mu.Unlock()
287 })
288 }, nil
289 }
290
291 type rowsIterator struct {
292 driver.Rows
293 *ResultMetadata
294 }
295
296 var _ iterator = &rowsIterator{}
297
298 func (r *rowsIterator) Next(i interface{}) error {
299 row := i.(*driver.Row)
300 row.ID = ""
301 row.Rev = ""
302 row.Key = row.Key[:0]
303 row.Value = nil
304 row.Doc = nil
305 row.Attachments = nil
306 row.Error = nil
307 err := r.Rows.Next(row)
308 if err == io.EOF || err == driver.EOQ {
309 var warning, bookmark string
310 if w, ok := r.Rows.(driver.RowsWarner); ok {
311 warning = w.Warning()
312 }
313 if b, ok := r.Rows.(driver.Bookmarker); ok {
314 bookmark = b.Bookmark()
315 }
316 r.ResultMetadata = &ResultMetadata{
317 Offset: r.Rows.Offset(),
318 TotalRows: r.Rows.TotalRows(),
319 UpdateSeq: r.Rows.UpdateSeq(),
320 Warning: warning,
321 Bookmark: bookmark,
322 }
323 }
324 return err
325 }
326
327
328
329
330
331
332
333 func ScanAllDocs(r *ResultSet, dest interface{}) error {
334 return scanAll(r, dest, r.ScanDoc)
335 }
336
337
338 func ScanAllValues(r *ResultSet, dest interface{}) error {
339 return scanAll(r, dest, r.ScanValue)
340 }
341
342 func scanAll(r *ResultSet, dest interface{}, scan func(interface{}) error) (err error) {
343 defer func() {
344 closeErr := r.Close()
345 if err == nil {
346 err = closeErr
347 }
348 }()
349 if err := r.Err(); err != nil {
350 return err
351 }
352
353 value := reflect.ValueOf(dest)
354 if value.Kind() != reflect.Ptr {
355 return errors.New("must pass a pointer to ScanAllDocs")
356 }
357 if value.IsNil() {
358 return errors.New("nil pointer passed to ScanAllDocs")
359 }
360
361 direct := reflect.Indirect(value)
362 var limit int
363
364 switch direct.Kind() {
365 case reflect.Array:
366 limit = direct.Len()
367 if limit == 0 {
368 return errors.New("0-length array passed to ScanAllDocs")
369 }
370 case reflect.Slice:
371 default:
372 return errors.New("dest must be a pointer to a slice or array")
373 }
374
375 base := value.Type()
376 if base.Kind() == reflect.Ptr {
377 base = base.Elem()
378 }
379 base = base.Elem()
380
381 for i := 0; r.Next(); i++ {
382 if limit > 0 && i >= limit {
383 return nil
384 }
385 vp := reflect.New(base)
386 err = scan(vp.Interface())
387 if limit > 0 {
388 direct.Index(i).Set(reflect.Indirect(vp))
389 } else {
390 direct.Set(reflect.Append(direct, reflect.Indirect(vp)))
391 }
392 }
393 return nil
394 }
395
396
397
398
399
400
401 type Row struct {
402 dRow *driver.Row
403 }
404
405
406 func (r *Row) ID() (string, error) {
407 return r.dRow.ID, r.dRow.Error
408 }
409
410
411 func (r *Row) Rev() (string, error) {
412 return r.dRow.Rev, r.dRow.Error
413 }
414
415
416 func (r *Row) Key() (json.RawMessage, error) {
417 return r.dRow.Key, r.dRow.Error
418 }
419
420
421
422
423
424
425
426
427
428 func (r *Row) ScanValue(dest interface{}) error {
429 if err := r.dRow.Error; err != nil {
430 return err
431 }
432 return json.NewDecoder(r.dRow.Value).Decode(dest)
433 }
434
435
436
437
438
439
440
441 func (r *Row) ScanKey(dest interface{}) error {
442 if err := r.dRow.Error; err != nil {
443 return err
444 }
445 return json.Unmarshal(r.dRow.Key, dest)
446 }
447
448
449
450
451
452
453 func (r *Row) ScanDoc(dest interface{}) error {
454 if err := r.dRow.Error; err != nil {
455 return err
456 }
457 return json.NewDecoder(r.dRow.Doc).Decode(dest)
458 }
459
460
461
462
463
464
465
466
467
468
469
470
471 func (r *ResultSet) Iterator() func(yield func(*Row, error) bool) {
472 return func(yield func(*Row, error) bool) {
473 for r.Next() {
474 row := r.iter.curVal.(*driver.Row)
475 if !yield(&Row{dRow: row}, nil) {
476 _ = r.Close()
477 break
478 }
479 }
480 if err := r.Err(); err != nil {
481 yield(nil, err)
482 }
483 }
484 }
485
486
487
488
489
490
491
492
493 func (r *ResultSet) NextIterator() func(yield func() bool) {
494 return func(yield func() bool) {
495 for r.NextResultSet() {
496 if !yield() {
497 _ = r.Close()
498 break
499 }
500 }
501 }
502 }
503
View as plain text