...

Source file src/github.com/go-kivik/kivik/v4/changes.go

Documentation: github.com/go-kivik/kivik/v4

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package kivik
    14  
    15  import (
    16  	"context"
    17  	"encoding/json"
    18  	"errors"
    19  	"io"
    20  	"net/http"
    21  
    22  	"github.com/go-kivik/kivik/v4/driver"
    23  	internal "github.com/go-kivik/kivik/v4/int/errors"
    24  )
    25  
    26  // Changes is an iterator over the database changes feed.
    27  type Changes struct {
    28  	*iter
    29  	changesi driver.Changes
    30  }
    31  
    32  type changesIterator struct {
    33  	driver.Changes
    34  	*ChangesMetadata
    35  }
    36  
    37  var _ iterator = &changesIterator{}
    38  
    39  func (c *changesIterator) Next(i interface{}) error {
    40  	change := i.(*driver.Change)
    41  	change.ID = ""
    42  	change.Seq = ""
    43  	change.Deleted = false
    44  	change.Changes = change.Changes[:0]
    45  	change.Doc = change.Doc[:0]
    46  	err := c.Changes.Next(change)
    47  	if err == io.EOF || err == driver.EOQ {
    48  		c.ChangesMetadata = &ChangesMetadata{
    49  			LastSeq: c.Changes.LastSeq(),
    50  			Pending: c.Changes.Pending(),
    51  		}
    52  	}
    53  	return err
    54  }
    55  
    56  func newChanges(ctx context.Context, onClose func(), changesi driver.Changes) *Changes {
    57  	return &Changes{
    58  		iter:     newIterator(ctx, onClose, &changesIterator{Changes: changesi}, &driver.Change{}),
    59  		changesi: changesi,
    60  	}
    61  }
    62  
    63  // Close closes the iterator, preventing further enumeration, and freeing any
    64  // resources (such as the http request body) of the underlying feed. If
    65  // [Changes.Next] is called and there are no further results, the iterator is
    66  // closed automatically and it will suffice to check the result of
    67  // [Changes.Err]. Close is idempotent and does not affect the result of
    68  // [Changes.Err].
    69  func (c *Changes) Close() error {
    70  	return c.iter.Close()
    71  }
    72  
    73  // Err returns the error, if any, that was encountered during iteration. Err may
    74  // be called after an explicit or implicit [Changes.Close].
    75  func (c *Changes) Err() error {
    76  	return c.iter.Err()
    77  }
    78  
    79  // Next prepares the next iterator result value for reading. It returns true on
    80  // success, or false if there is no next result or an error occurs while
    81  // preparing it. [Changes.Err] should be consulted to distinguish between the
    82  // two.
    83  func (c *Changes) Next() bool {
    84  	return c.iter.Next()
    85  }
    86  
    87  // Changes returns a list of changed revs.
    88  func (c *Changes) Changes() []string {
    89  	return c.curVal.(*driver.Change).Changes
    90  }
    91  
    92  // Deleted returns true if the change relates to a deleted document.
    93  func (c *Changes) Deleted() bool {
    94  	return c.curVal.(*driver.Change).Deleted
    95  }
    96  
    97  // ID returns the ID of the current result.
    98  func (c *Changes) ID() string {
    99  	return c.curVal.(*driver.Change).ID
   100  }
   101  
   102  // ScanDoc copies the data from the result into dest.  See [ResultSet.ScanValue]
   103  // for additional details.
   104  func (c *Changes) ScanDoc(dest interface{}) error {
   105  	err := c.isReady()
   106  	if err != nil {
   107  		return err
   108  	}
   109  	return json.Unmarshal(c.curVal.(*driver.Change).Doc, dest)
   110  }
   111  
   112  // Changes returns an iterator over the real-time [changes feed]. The feed remains
   113  // open until explicitly closed, or an error is encountered.
   114  //
   115  // [changes feed]: http://couchdb.readthedocs.io/en/latest/api/database/changes.html#get--db-_changes
   116  func (db *DB) Changes(ctx context.Context, options ...Option) *Changes {
   117  	if db.err != nil {
   118  		return &Changes{iter: errIterator(db.err)}
   119  	}
   120  	endQuery, err := db.startQuery()
   121  	if err != nil {
   122  		return &Changes{iter: errIterator(err)}
   123  	}
   124  	changesi, err := db.driverDB.Changes(ctx, multiOptions(options))
   125  	if err != nil {
   126  		endQuery()
   127  		return &Changes{iter: errIterator(err)}
   128  	}
   129  	return newChanges(ctx, endQuery, changesi)
   130  }
   131  
   132  // Seq returns the Seq of the current result.
   133  func (c *Changes) Seq() string {
   134  	return c.curVal.(*driver.Change).Seq
   135  }
   136  
   137  // ChangesMetadata contains metadata about a changes feed.
   138  type ChangesMetadata struct {
   139  	// LastSeq is the last update sequence id present in the change set, if
   140  	// returned by the server.
   141  	LastSeq string
   142  	// Pending is the count of remaining items in the change feed.
   143  	Pending int64
   144  }
   145  
   146  // Metadata returns the result metadata for the changes feed. It must be called
   147  // after [Changes.Next] returns false or [Changes.Iterator] has been completely
   148  // and successfully iterated. Otherwise it will return an error.
   149  func (c *Changes) Metadata() (*ChangesMetadata, error) {
   150  	if c.iter == nil || (c.state != stateEOQ && c.state != stateClosed) {
   151  		return nil, &internal.Error{Status: http.StatusBadRequest, Err: errors.New("Metadata must not be called until result set iteration is complete")}
   152  	}
   153  	return c.feed.(*changesIterator).ChangesMetadata, nil
   154  }
   155  
   156  // ETag returns the unquoted ETag header, if any.
   157  func (c *Changes) ETag() string {
   158  	if c.changesi == nil {
   159  		return ""
   160  	}
   161  	return c.changesi.ETag()
   162  }
   163  
   164  // Change represents a single change in the changes feed, as returned by
   165  // [Changes.Iterator].
   166  //
   167  // !!NOTICE!! This struct is considered experimental, and may change without
   168  // notice.
   169  type Change struct {
   170  	// ID is the document ID to which the change relates.
   171  	ID string `json:"id"`
   172  	// Seq is the update sequence for the changes feed.
   173  	Seq string `json:"seq"`
   174  	// Deleted is set to true for the changes feed, if the document has been
   175  	// deleted.
   176  	Deleted bool `json:"deleted"`
   177  	// Changes represents a list of document leaf revisions for the /_changes
   178  	// endpoint.
   179  	Changes []string `json:"-"`
   180  	// Doc is the raw, un-decoded JSON document. This is only populated when
   181  	// include_docs=true is set.
   182  	doc json.RawMessage
   183  }
   184  
   185  // ScanDoc copies the data from the result into dest.  See [Row.ScanValue]
   186  // for additional details.
   187  func (c *Change) ScanDoc(dest interface{}) error {
   188  	return json.Unmarshal(c.doc, dest)
   189  }
   190  
   191  // Iterator returns a function that can be used to iterate over the changes
   192  // feed. This function works with Go 1.23's range functions, and is an
   193  // alternative to using [Changes.Next] directly.
   194  //
   195  // !!NOTICE!! This function is considered experimental, and may change without
   196  // notice.
   197  func (c *Changes) Iterator() func(yield func(*Change, error) bool) {
   198  	return func(yield func(*Change, error) bool) {
   199  		for c.Next() {
   200  			dChange := c.curVal.(*driver.Change)
   201  			change := &Change{
   202  				ID:      dChange.ID,
   203  				Seq:     dChange.Seq,
   204  				Deleted: dChange.Deleted,
   205  				Changes: dChange.Changes,
   206  				doc:     dChange.Doc,
   207  			}
   208  			if !yield(change, nil) {
   209  				_ = c.Close()
   210  				return
   211  			}
   212  		}
   213  		if err := c.Err(); err != nil {
   214  			yield(nil, err)
   215  		}
   216  	}
   217  }
   218  

View as plain text