...

Source file src/github.com/go-kivik/kivik/v4/couchdb/iter.go

Documentation: github.com/go-kivik/kivik/v4/couchdb

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package couchdb
    14  
    15  import (
    16  	"context"
    17  	"encoding/json"
    18  	"errors"
    19  	"fmt"
    20  	"io"
    21  	"net/http"
    22  	"sync"
    23  	"sync/atomic"
    24  
    25  	internal "github.com/go-kivik/kivik/v4/int/errors"
    26  )
    27  
    28  type parser interface {
    29  	decodeItem(interface{}, *json.Decoder) error
    30  }
    31  
    32  type metaParser interface {
    33  	parseMeta(interface{}, *json.Decoder, string) error
    34  }
    35  
    36  type cancelableReadCloser struct {
    37  	ctx    context.Context
    38  	rc     io.ReadCloser
    39  	cancel func()
    40  
    41  	mu     sync.RWMutex
    42  	closed bool
    43  	err    error
    44  }
    45  
    46  var _ io.ReadCloser = &cancelableReadCloser{}
    47  
    48  func newCancelableReadCloser(ctx context.Context, rc io.ReadCloser) io.ReadCloser {
    49  	ctx, cancel := context.WithCancel(ctx)
    50  	return &cancelableReadCloser{
    51  		ctx:    ctx,
    52  		rc:     rc,
    53  		cancel: cancel,
    54  	}
    55  }
    56  
    57  func (r *cancelableReadCloser) readErr() error {
    58  	r.mu.RLock()
    59  	if !r.closed {
    60  		r.mu.RUnlock()
    61  		return nil
    62  	}
    63  	err := r.err
    64  	r.mu.RUnlock()
    65  	if err == nil {
    66  		err = errors.New("iterator closed")
    67  	}
    68  	return err
    69  }
    70  
    71  func (r *cancelableReadCloser) Read(p []byte) (int, error) {
    72  	if err := r.readErr(); err != nil {
    73  		return 0, err
    74  	}
    75  	var c int
    76  	var err error
    77  	done := make(chan struct{})
    78  	go func() {
    79  		c, err = r.rc.Read(p)
    80  		close(done)
    81  	}()
    82  	select {
    83  	case <-r.ctx.Done():
    84  		var err error
    85  		if err = r.readErr(); err == nil {
    86  			err = r.ctx.Err()
    87  		}
    88  		return 0, r.close(err)
    89  	case <-done:
    90  		if err != nil {
    91  			e := r.close(err)
    92  			return c, e
    93  		}
    94  		return c, nil
    95  	}
    96  }
    97  
    98  func (r *cancelableReadCloser) close(err error) error {
    99  	r.mu.Lock()
   100  	defer r.mu.Unlock()
   101  	if !r.closed {
   102  		r.cancel()
   103  		r.closed = true
   104  		e := r.rc.Close()
   105  		if err == nil {
   106  			err = e
   107  		}
   108  		r.err = err
   109  	}
   110  	return r.err
   111  }
   112  
   113  func (r *cancelableReadCloser) Close() error {
   114  	err := r.close(nil)
   115  	if err == io.EOF {
   116  		return nil
   117  	}
   118  	return err
   119  }
   120  
   121  type iter struct {
   122  	meta        interface{}
   123  	expectedKey string
   124  	body        io.ReadCloser
   125  	parser      parser
   126  
   127  	// objMode enables reading one object at a time, with the ID treated as the
   128  	// docid. This was added for the _revs_diff endpoint.
   129  	objMode bool
   130  
   131  	dec    *json.Decoder
   132  	closed int32
   133  }
   134  
   135  func newIter(ctx context.Context, meta interface{}, expectedKey string, body io.ReadCloser, parser parser) *iter {
   136  	return &iter{
   137  		meta:        meta,
   138  		expectedKey: expectedKey,
   139  		body:        newCancelableReadCloser(ctx, body),
   140  		parser:      parser,
   141  	}
   142  }
   143  
   144  func (i *iter) next(row interface{}) error {
   145  	if atomic.LoadInt32(&i.closed) == 1 {
   146  		return io.EOF
   147  	}
   148  	if i.dec == nil {
   149  		// We haven't begun yet
   150  		i.dec = json.NewDecoder(i.body)
   151  		if err := i.begin(); err != nil {
   152  			return &internal.Error{Status: http.StatusBadGateway, Err: err}
   153  		}
   154  	}
   155  
   156  	err := i.nextRow(row)
   157  	if err != nil {
   158  		if err == io.EOF {
   159  			if e := i.finish(); e != nil {
   160  				err = e
   161  			}
   162  			return err
   163  		}
   164  	}
   165  	return err
   166  }
   167  
   168  // begin parses the top-level of the result object; until rows
   169  func (i *iter) begin() error {
   170  	if i.expectedKey == "" && !i.objMode {
   171  		return nil
   172  	}
   173  	// consume the first '{'
   174  	if err := consumeDelim(i.dec, json.Delim('{')); err != nil {
   175  		return err
   176  	}
   177  	if i.objMode {
   178  		return nil
   179  	}
   180  	for {
   181  		key, err := nextKey(i.dec)
   182  		if err != nil {
   183  			return err
   184  		}
   185  		if key == i.expectedKey {
   186  			// Consume the first '['
   187  			return consumeDelim(i.dec, json.Delim('['))
   188  		}
   189  		if err := i.parseMeta(key); err != nil {
   190  			return err
   191  		}
   192  	}
   193  }
   194  
   195  func nextKey(dec *json.Decoder) (string, error) {
   196  	t, err := dec.Token()
   197  	if err != nil {
   198  		// I can't find a test case to trigger this, so it remains uncovered.
   199  		return "", err
   200  	}
   201  	key, ok := t.(string)
   202  	if !ok {
   203  		// The JSON parser should never permit this
   204  		return "", fmt.Errorf("Unexpected token: (%T) %v", t, t)
   205  	}
   206  	return key, nil
   207  }
   208  
   209  func (i *iter) parseMeta(key string) error {
   210  	if i.meta == nil {
   211  		return nil
   212  	}
   213  	if mp, ok := i.parser.(metaParser); ok {
   214  		return mp.parseMeta(i.meta, i.dec, key)
   215  	}
   216  	return nil
   217  }
   218  
   219  func (i *iter) finish() (err error) {
   220  	defer func() {
   221  		e2 := i.Close()
   222  		if err == nil {
   223  			err = e2
   224  		}
   225  	}()
   226  	if i.expectedKey == "" && !i.objMode {
   227  		_, err := i.dec.Token()
   228  		if err != nil && err != io.EOF {
   229  			return &internal.Error{Status: http.StatusBadGateway, Err: err}
   230  		}
   231  		return nil
   232  	}
   233  	if i.objMode {
   234  		err := consumeDelim(i.dec, json.Delim('}'))
   235  		if err != nil && err != io.EOF {
   236  			return &internal.Error{Status: http.StatusBadGateway, Err: err}
   237  		}
   238  		return nil
   239  	}
   240  	if err := consumeDelim(i.dec, json.Delim(']')); err != nil {
   241  		return err
   242  	}
   243  	for i.dec.More() {
   244  		t, err := i.dec.Token()
   245  		if err != nil {
   246  			return err
   247  		}
   248  		switch v := t.(type) {
   249  		case json.Delim:
   250  			if v != json.Delim('}') {
   251  				// This should never happen, as the JSON parser should prevent it.
   252  				return fmt.Errorf("Unexpected JSON delimiter: %c", v)
   253  			}
   254  		case string:
   255  			if err := i.parseMeta(v); err != nil {
   256  				return err
   257  			}
   258  		default:
   259  			// This should never happen, as the JSON parser would never get
   260  			// this far.
   261  			return fmt.Errorf("Unexpected JSON token: (%T) '%s'", t, t)
   262  		}
   263  	}
   264  	return consumeDelim(i.dec, json.Delim('}'))
   265  	// return nil
   266  }
   267  
   268  func (i *iter) nextRow(row interface{}) error {
   269  	if !i.dec.More() {
   270  		return io.EOF
   271  	}
   272  	return i.parser.decodeItem(row, i.dec)
   273  }
   274  
   275  func (i *iter) Close() error {
   276  	atomic.StoreInt32(&i.closed, 1)
   277  	// body will be nil if we're iterating over a multi-query resultset.
   278  	if i.body == nil {
   279  		return nil
   280  	}
   281  	return i.body.Close()
   282  }
   283  
   284  // consumeDelim consumes the expected delimiter from the stream, or returns an
   285  // error if an unexpected token was found.
   286  func consumeDelim(dec *json.Decoder, expectedDelim json.Delim) error {
   287  	t, err := dec.Token()
   288  	if err != nil {
   289  		return &internal.Error{Status: http.StatusBadGateway, Err: err}
   290  	}
   291  	d, ok := t.(json.Delim)
   292  	if !ok {
   293  		return &internal.Error{Status: http.StatusBadGateway, Err: fmt.Errorf("Unexpected token %T: %v", t, t)}
   294  	}
   295  	if d != expectedDelim {
   296  		return unexpectedDelim(d)
   297  	}
   298  	return nil
   299  }
   300  
   301  // unexpectedDelim is used to indicate to the multiQueriesRows type that the
   302  // end of input has been reached, while behaving as an unexpected delimiter
   303  // error to all other code.
   304  type unexpectedDelim byte
   305  
   306  func (d unexpectedDelim) Error() string {
   307  	return fmt.Sprintf("Unexpected JSON delimiter: %c", d)
   308  }
   309  
   310  func (d unexpectedDelim) HTTPStatus() int {
   311  	return http.StatusBadGateway
   312  }
   313  

View as plain text