1
2
3
4
5
6
7
8
9
10
11
12
13 package couchdb
14
15 import (
16 "context"
17 "encoding/json"
18 "errors"
19 "fmt"
20 "io"
21 "net/http"
22 "sync"
23 "sync/atomic"
24
25 internal "github.com/go-kivik/kivik/v4/int/errors"
26 )
27
28 type parser interface {
29 decodeItem(interface{}, *json.Decoder) error
30 }
31
32 type metaParser interface {
33 parseMeta(interface{}, *json.Decoder, string) error
34 }
35
36 type cancelableReadCloser struct {
37 ctx context.Context
38 rc io.ReadCloser
39 cancel func()
40
41 mu sync.RWMutex
42 closed bool
43 err error
44 }
45
46 var _ io.ReadCloser = &cancelableReadCloser{}
47
48 func newCancelableReadCloser(ctx context.Context, rc io.ReadCloser) io.ReadCloser {
49 ctx, cancel := context.WithCancel(ctx)
50 return &cancelableReadCloser{
51 ctx: ctx,
52 rc: rc,
53 cancel: cancel,
54 }
55 }
56
57 func (r *cancelableReadCloser) readErr() error {
58 r.mu.RLock()
59 if !r.closed {
60 r.mu.RUnlock()
61 return nil
62 }
63 err := r.err
64 r.mu.RUnlock()
65 if err == nil {
66 err = errors.New("iterator closed")
67 }
68 return err
69 }
70
71 func (r *cancelableReadCloser) Read(p []byte) (int, error) {
72 if err := r.readErr(); err != nil {
73 return 0, err
74 }
75 var c int
76 var err error
77 done := make(chan struct{})
78 go func() {
79 c, err = r.rc.Read(p)
80 close(done)
81 }()
82 select {
83 case <-r.ctx.Done():
84 var err error
85 if err = r.readErr(); err == nil {
86 err = r.ctx.Err()
87 }
88 return 0, r.close(err)
89 case <-done:
90 if err != nil {
91 e := r.close(err)
92 return c, e
93 }
94 return c, nil
95 }
96 }
97
98 func (r *cancelableReadCloser) close(err error) error {
99 r.mu.Lock()
100 defer r.mu.Unlock()
101 if !r.closed {
102 r.cancel()
103 r.closed = true
104 e := r.rc.Close()
105 if err == nil {
106 err = e
107 }
108 r.err = err
109 }
110 return r.err
111 }
112
113 func (r *cancelableReadCloser) Close() error {
114 err := r.close(nil)
115 if err == io.EOF {
116 return nil
117 }
118 return err
119 }
120
121 type iter struct {
122 meta interface{}
123 expectedKey string
124 body io.ReadCloser
125 parser parser
126
127
128
129 objMode bool
130
131 dec *json.Decoder
132 closed int32
133 }
134
135 func newIter(ctx context.Context, meta interface{}, expectedKey string, body io.ReadCloser, parser parser) *iter {
136 return &iter{
137 meta: meta,
138 expectedKey: expectedKey,
139 body: newCancelableReadCloser(ctx, body),
140 parser: parser,
141 }
142 }
143
144 func (i *iter) next(row interface{}) error {
145 if atomic.LoadInt32(&i.closed) == 1 {
146 return io.EOF
147 }
148 if i.dec == nil {
149
150 i.dec = json.NewDecoder(i.body)
151 if err := i.begin(); err != nil {
152 return &internal.Error{Status: http.StatusBadGateway, Err: err}
153 }
154 }
155
156 err := i.nextRow(row)
157 if err != nil {
158 if err == io.EOF {
159 if e := i.finish(); e != nil {
160 err = e
161 }
162 return err
163 }
164 }
165 return err
166 }
167
168
169 func (i *iter) begin() error {
170 if i.expectedKey == "" && !i.objMode {
171 return nil
172 }
173
174 if err := consumeDelim(i.dec, json.Delim('{')); err != nil {
175 return err
176 }
177 if i.objMode {
178 return nil
179 }
180 for {
181 key, err := nextKey(i.dec)
182 if err != nil {
183 return err
184 }
185 if key == i.expectedKey {
186
187 return consumeDelim(i.dec, json.Delim('['))
188 }
189 if err := i.parseMeta(key); err != nil {
190 return err
191 }
192 }
193 }
194
195 func nextKey(dec *json.Decoder) (string, error) {
196 t, err := dec.Token()
197 if err != nil {
198
199 return "", err
200 }
201 key, ok := t.(string)
202 if !ok {
203
204 return "", fmt.Errorf("Unexpected token: (%T) %v", t, t)
205 }
206 return key, nil
207 }
208
209 func (i *iter) parseMeta(key string) error {
210 if i.meta == nil {
211 return nil
212 }
213 if mp, ok := i.parser.(metaParser); ok {
214 return mp.parseMeta(i.meta, i.dec, key)
215 }
216 return nil
217 }
218
219 func (i *iter) finish() (err error) {
220 defer func() {
221 e2 := i.Close()
222 if err == nil {
223 err = e2
224 }
225 }()
226 if i.expectedKey == "" && !i.objMode {
227 _, err := i.dec.Token()
228 if err != nil && err != io.EOF {
229 return &internal.Error{Status: http.StatusBadGateway, Err: err}
230 }
231 return nil
232 }
233 if i.objMode {
234 err := consumeDelim(i.dec, json.Delim('}'))
235 if err != nil && err != io.EOF {
236 return &internal.Error{Status: http.StatusBadGateway, Err: err}
237 }
238 return nil
239 }
240 if err := consumeDelim(i.dec, json.Delim(']')); err != nil {
241 return err
242 }
243 for i.dec.More() {
244 t, err := i.dec.Token()
245 if err != nil {
246 return err
247 }
248 switch v := t.(type) {
249 case json.Delim:
250 if v != json.Delim('}') {
251
252 return fmt.Errorf("Unexpected JSON delimiter: %c", v)
253 }
254 case string:
255 if err := i.parseMeta(v); err != nil {
256 return err
257 }
258 default:
259
260
261 return fmt.Errorf("Unexpected JSON token: (%T) '%s'", t, t)
262 }
263 }
264 return consumeDelim(i.dec, json.Delim('}'))
265
266 }
267
268 func (i *iter) nextRow(row interface{}) error {
269 if !i.dec.More() {
270 return io.EOF
271 }
272 return i.parser.decodeItem(row, i.dec)
273 }
274
275 func (i *iter) Close() error {
276 atomic.StoreInt32(&i.closed, 1)
277
278 if i.body == nil {
279 return nil
280 }
281 return i.body.Close()
282 }
283
284
285
286 func consumeDelim(dec *json.Decoder, expectedDelim json.Delim) error {
287 t, err := dec.Token()
288 if err != nil {
289 return &internal.Error{Status: http.StatusBadGateway, Err: err}
290 }
291 d, ok := t.(json.Delim)
292 if !ok {
293 return &internal.Error{Status: http.StatusBadGateway, Err: fmt.Errorf("Unexpected token %T: %v", t, t)}
294 }
295 if d != expectedDelim {
296 return unexpectedDelim(d)
297 }
298 return nil
299 }
300
301
302
303
304 type unexpectedDelim byte
305
306 func (d unexpectedDelim) Error() string {
307 return fmt.Sprintf("Unexpected JSON delimiter: %c", d)
308 }
309
310 func (d unexpectedDelim) HTTPStatus() int {
311 return http.StatusBadGateway
312 }
313
View as plain text