...

Source file src/github.com/go-kivik/kivik/v4/updates.go

Documentation: github.com/go-kivik/kivik/v4

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package kivik
    14  
    15  import (
    16  	"context"
    17  	"errors"
    18  	"net/http"
    19  
    20  	"github.com/go-kivik/kivik/v4/driver"
    21  	internal "github.com/go-kivik/kivik/v4/int/errors"
    22  )
    23  
    24  // DBUpdates is an iterator for database updates.
    25  type DBUpdates struct {
    26  	*iter
    27  }
    28  
    29  type updatesIterator struct{ driver.DBUpdates }
    30  
    31  var _ iterator = &updatesIterator{}
    32  
    33  func (r *updatesIterator) Next(i interface{}) error {
    34  	update := i.(*driver.DBUpdate)
    35  	update.DBName = ""
    36  	update.Seq = ""
    37  	update.Type = ""
    38  	return r.DBUpdates.Next(update)
    39  }
    40  
    41  func newDBUpdates(ctx context.Context, onClose func(), updatesi driver.DBUpdates) *DBUpdates {
    42  	return &DBUpdates{
    43  		iter: newIterator(ctx, onClose, &updatesIterator{updatesi}, &driver.DBUpdate{}),
    44  	}
    45  }
    46  
    47  // Close closes the iterator, preventing further enumeration, and freeing any
    48  // resources (such as the http request body) of the underlying feed. If
    49  // [DBUpdates.Next] is called and there are no further results, the iterator is
    50  // closed automatically and it will suffice to check the result of
    51  // [DBUpdates.Err]. Close is idempotent and does not affect the result of
    52  // [DBUpdates.Err].
    53  func (f *DBUpdates) Close() error {
    54  	return f.iter.Close()
    55  }
    56  
    57  // Err returns the error, if any, that was encountered during iteration. Err may
    58  // be called after an explicit or implicit [DBUpdates.Close].
    59  func (f *DBUpdates) Err() error {
    60  	return f.iter.Err()
    61  }
    62  
    63  // Next prepares the next iterator result value for reading. It returns true on
    64  // success, or false if there is no next result or an error occurs while
    65  // preparing it. [DBUpdates.Err] should be consulted to distinguish between the
    66  // two.
    67  func (f *DBUpdates) Next() bool {
    68  	return f.iter.Next()
    69  }
    70  
    71  // DBName returns the database name for the current update.
    72  func (f *DBUpdates) DBName() string {
    73  	err := f.isReady()
    74  	if err != nil {
    75  		return ""
    76  	}
    77  	return f.curVal.(*driver.DBUpdate).DBName
    78  }
    79  
    80  // Type returns the type of the current update.
    81  func (f *DBUpdates) Type() string {
    82  	err := f.isReady()
    83  	if err != nil {
    84  		return ""
    85  	}
    86  	return f.curVal.(*driver.DBUpdate).Type
    87  }
    88  
    89  // Seq returns the update sequence of the current update.
    90  func (f *DBUpdates) Seq() string {
    91  	err := f.isReady()
    92  	if err != nil {
    93  		return ""
    94  	}
    95  	return f.curVal.(*driver.DBUpdate).Seq
    96  }
    97  
    98  // LastSeq returns the last sequence ID reported, or in the case no results
    99  // were returned due to `since`	being set to `now`, or some other value that
   100  // excludes all results, the current sequence ID. It must be called after
   101  // [DBUpdates.Next] returns false or [DBUpdates.Iterator] has been completely
   102  // and successfully iterated. Otherwise it will return an error.
   103  func (f *DBUpdates) LastSeq() (string, error) {
   104  	for f.iter == nil || f.state != stateEOQ && f.state != stateClosed {
   105  		return "", &internal.Error{Status: http.StatusBadRequest, Err: errors.New("LastSeq must not be called until results iteration is complete")}
   106  	}
   107  	driverUpdates := f.feed.(*updatesIterator).DBUpdates
   108  	if lastSeqer, ok := driverUpdates.(driver.LastSeqer); ok {
   109  		return lastSeqer.LastSeq()
   110  	}
   111  	return "", nil
   112  }
   113  
   114  // DBUpdates begins polling for database updates. Canceling the context will
   115  // close the iterator. The iterator will also close automatically if there are
   116  // no more updates, when an error occurs, or when the [DBUpdates.Close] method
   117  // is called. The [DBUpdates.Err] method should be consulted to determine if
   118  // there was an error during iteration.
   119  //
   120  // For historical reasons, the CouchDB driver's implementation of this function
   121  // defaults to feed=continuous and since=now. To use the default CouchDB
   122  // behavior, set feed to either the empty string or "normal", and since to the
   123  // empty string. In kivik/v5, the default behavior will be to use feed=normal
   124  // as CouchDB does by default.
   125  func (c *Client) DBUpdates(ctx context.Context, options ...Option) *DBUpdates {
   126  	updater, ok := c.driverClient.(driver.DBUpdater)
   127  	if !ok {
   128  		return &DBUpdates{errIterator(&internal.Error{Status: http.StatusNotImplemented, Message: "kivik: driver does not implement DBUpdater"})}
   129  	}
   130  
   131  	endQuery, err := c.startQuery()
   132  	if err != nil {
   133  		return &DBUpdates{errIterator(err)}
   134  	}
   135  
   136  	updatesi, err := updater.DBUpdates(ctx, multiOptions(options))
   137  	if err != nil {
   138  		endQuery()
   139  		return &DBUpdates{errIterator(err)}
   140  	}
   141  	return newDBUpdates(context.Background(), endQuery, updatesi)
   142  }
   143  
   144  // DBUpdate represents a database update as returned by [DBUpdates.Iterator].
   145  //
   146  // !!NOTICE!! This struct is considered experimental, and may change without
   147  // notice.
   148  type DBUpdate struct {
   149  	DBName string `json:"db_name"`
   150  	Type   string `json:"type"`
   151  	Seq    string `json:"seq"`
   152  }
   153  
   154  // Iterator returns a function that can be used to iterate over the DB updates
   155  // feed. This function works with Go 1.23's range functions, and is an
   156  // alternative to using [DBUpdates.Next] directly.
   157  //
   158  // !!NOTICE!! This function is considered experimental, and may change without
   159  // notice.
   160  func (f *DBUpdates) Iterator() func(yield func(*DBUpdate, error) bool) {
   161  	return func(yield func(*DBUpdate, error) bool) {
   162  		for f.Next() {
   163  			update := f.curVal.(*driver.DBUpdate)
   164  			if !yield((*DBUpdate)(update), nil) {
   165  				_ = f.Close()
   166  				break
   167  			}
   168  		}
   169  		if err := f.Err(); err != nil {
   170  			yield(nil, err)
   171  		}
   172  	}
   173  }
   174  

View as plain text