...

Source file src/github.com/go-kivik/kivik/v4/iterator.go

Documentation: github.com/go-kivik/kivik/v4

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package kivik
    14  
    15  import (
    16  	"context"
    17  	"io"
    18  	"net/http"
    19  	"sync"
    20  
    21  	"github.com/go-kivik/kivik/v4/driver"
    22  	internal "github.com/go-kivik/kivik/v4/int/errors"
    23  )
    24  
    25  type iterator interface {
    26  	Next(interface{}) error
    27  	Close() error
    28  }
    29  
    30  // possible states of the result set iterator
    31  const (
    32  	// stateReady is the initial state before [ResultSet.Next] or
    33  	// [ResultSet.NextResultSet] is called.
    34  	stateReady = iota
    35  	// stateResultSetReady is the state after calling [ResultSet.NextResultSet]
    36  	stateResultSetReady
    37  	// stateResultSetRowReady is the state after calling [ResultSet.Next] within
    38  	// a result set.
    39  	stateResultSetRowReady
    40  	// stateEOQ is the state after having reached the final row in a result set.
    41  	// [ResultSet.ResultSetNext] should be called next.
    42  	stateEOQ
    43  	// stateRowReady is the state when not iterating resultsets, after
    44  	// [ResultSet.Next] has been called.
    45  	stateRowReady
    46  	// stateClosed means the last row has been retrieved. The iterator is no
    47  	// longer usable.
    48  	stateClosed
    49  )
    50  
    51  type iter struct {
    52  	feed    iterator
    53  	onClose func()
    54  
    55  	mu    sync.Mutex
    56  	state int   // Set to true once Next() has been called
    57  	err   error // non-nil only if state == stateClosed
    58  	wg    sync.WaitGroup
    59  
    60  	cancel func() // cancel function to exit context goroutine when iterator is closed
    61  
    62  	curVal interface{}
    63  }
    64  
    65  // isReady returns an error if the iterator is not ready, because it has been
    66  // closed, or has not been made ready yet.
    67  func (i *iter) isReady() error {
    68  	if i.state == stateClosed {
    69  		return &internal.Error{Status: http.StatusBadRequest, Message: "kivik: Iterator is closed"}
    70  	}
    71  	if !stateIsReady(i.state) {
    72  		return &internal.Error{Status: http.StatusBadRequest, Message: "kivik: Iterator access before calling Next"}
    73  	}
    74  	return nil
    75  }
    76  
    77  func stateIsReady(state int) bool {
    78  	switch state {
    79  	case stateRowReady, stateResultSetReady, stateResultSetRowReady, stateClosed:
    80  		return true
    81  	}
    82  	return false
    83  }
    84  
    85  // newIterator instantiates a new iterator.
    86  //
    87  // ctx is a possibly-cancellable context.  zeroValue is an empty instance of
    88  // the data type this iterator iterates over feed is the iterator interface,
    89  // which typically wraps a driver.X iterator
    90  func newIterator(ctx context.Context, onClose func(), feed iterator, zeroValue interface{}) *iter {
    91  	i := &iter{
    92  		onClose: onClose,
    93  		feed:    feed,
    94  		curVal:  zeroValue,
    95  	}
    96  	ctx, i.cancel = context.WithCancel(ctx)
    97  	go i.awaitDone(ctx)
    98  	return i
    99  }
   100  
   101  // errIterator instantiates a new iterator that is already closed, and only
   102  // returns an error.
   103  func errIterator(err error) *iter {
   104  	return &iter{
   105  		state: stateClosed,
   106  		err:   err,
   107  	}
   108  }
   109  
   110  // awaitDone blocks until the rows are closed or the context is cancelled, then
   111  // closes the iterator if it's still open.
   112  func (i *iter) awaitDone(ctx context.Context) {
   113  	<-ctx.Done()
   114  	i.mu.Lock()
   115  	_ = i.closeErr(ctx.Err())
   116  	i.mu.Unlock()
   117  }
   118  
   119  // Next prepares the next iterator result value for reading. It returns true on
   120  // success, or false if there is no next result or an error occurs while
   121  // preparing it. [Err] should be consulted to distinguish between the two.
   122  func (i *iter) Next() bool {
   123  	i.mu.Lock()
   124  	defer i.mu.Unlock()
   125  	return i.next()
   126  }
   127  
   128  // next is the same as Next but doesn't do its own locking.
   129  func (i *iter) next() bool {
   130  	if i.state == stateClosed {
   131  		return false
   132  	}
   133  	for {
   134  		err := i.feed.Next(i.curVal)
   135  		if err == driver.EOQ {
   136  			if i.state == stateResultSetReady || i.state == stateResultSetRowReady {
   137  				i.state = stateEOQ
   138  				i.err = nil
   139  				return false
   140  			}
   141  			continue
   142  		}
   143  		switch i.state {
   144  		case stateResultSetReady, stateResultSetRowReady:
   145  			i.state = stateResultSetRowReady
   146  		default:
   147  			i.state = stateRowReady
   148  		}
   149  		i.err = err
   150  		if i.err != nil {
   151  			_ = i.closeErr(nil)
   152  			return false
   153  		}
   154  		return true
   155  	}
   156  }
   157  
   158  // Close closes the iterator, preventing further enumeration, and freeing any
   159  // resources (such as the http request body) of the underlying feed. If [Next]
   160  // is called and there are no further results, the iterator is closed
   161  // automatically and it will suffice to check the result of [Err]. Close is
   162  // idempotent and does not affect the result of [Err].
   163  func (i *iter) Close() error {
   164  	i.mu.Lock()
   165  	defer i.mu.Unlock()
   166  	i.wg.Wait()
   167  	return i.closeErr(nil)
   168  }
   169  
   170  func (i *iter) closeErr(err error) error {
   171  	if i.state == stateClosed {
   172  		return nil
   173  	}
   174  	i.state = stateClosed
   175  
   176  	if i.err == nil {
   177  		i.err = err
   178  	}
   179  
   180  	err = i.feed.Close()
   181  
   182  	if i.cancel != nil {
   183  		i.cancel()
   184  	}
   185  
   186  	if i.onClose != nil {
   187  		i.onClose()
   188  	}
   189  
   190  	return err
   191  }
   192  
   193  // Err returns the error, if any, that was encountered during iteration. Err may
   194  // be called after an explicit or implicit [Close].
   195  func (i *iter) Err() error {
   196  	i.mu.Lock()
   197  	defer i.mu.Unlock()
   198  	if i.err == io.EOF {
   199  		return nil
   200  	}
   201  	return i.err
   202  }
   203  

View as plain text