...

Source file src/github.com/go-kivik/kivik/v4/couchdb/changes.go

Documentation: github.com/go-kivik/kivik/v4/couchdb

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package couchdb
    14  
    15  import (
    16  	"context"
    17  	"encoding/json"
    18  	"errors"
    19  	"io"
    20  	"net/http"
    21  
    22  	"github.com/go-kivik/kivik/v4/couchdb/chttp"
    23  	"github.com/go-kivik/kivik/v4/driver"
    24  	internal "github.com/go-kivik/kivik/v4/int/errors"
    25  )
    26  
    27  // Changes returns the changes stream for the database.
    28  func (d *db) Changes(ctx context.Context, options driver.Options) (driver.Changes, error) {
    29  	opts := map[string]interface{}{}
    30  	options.Apply(opts)
    31  	key := "results"
    32  	if f, ok := opts["feed"]; ok {
    33  		if f == "eventsource" {
    34  			return nil, &internal.Error{Status: http.StatusBadRequest, Err: errors.New("kivik: eventsource feed not supported, use 'continuous'")}
    35  		}
    36  		if f == "continuous" {
    37  			key = ""
    38  		}
    39  	}
    40  	chttpOpts := new(chttp.Options)
    41  	if ids := opts["doc_ids"]; ids != nil {
    42  		delete(opts, "doc_ids")
    43  		chttpOpts.GetBody = chttp.BodyEncoder(map[string]interface{}{
    44  			"doc_ids": ids,
    45  		})
    46  	}
    47  	var err error
    48  	chttpOpts.Query, err = optionsToParams(opts)
    49  	if err != nil {
    50  		return nil, err
    51  	}
    52  
    53  	resp, err := d.Client.DoReq(ctx, http.MethodPost, d.path("_changes"), chttpOpts)
    54  	if err != nil {
    55  		return nil, err
    56  	}
    57  	if err = chttp.ResponseError(resp); err != nil {
    58  		return nil, err
    59  	}
    60  	etag, _ := chttp.ETag(resp)
    61  	return newChangesRows(ctx, key, resp.Body, etag), nil
    62  }
    63  
    64  type continuousChangesParser struct{}
    65  
    66  func (p *continuousChangesParser) parseMeta(i interface{}, dec *json.Decoder, key string) error {
    67  	meta := i.(*changesMeta)
    68  	return meta.parseMeta(key, dec)
    69  }
    70  
    71  func (p *continuousChangesParser) decodeItem(i interface{}, dec *json.Decoder) error {
    72  	row := i.(*driver.Change)
    73  	ch := &change{Change: row}
    74  	if err := dec.Decode(ch); err != nil {
    75  		return &internal.Error{Status: http.StatusBadGateway, Err: err}
    76  	}
    77  	ch.Change.Seq = string(ch.Seq)
    78  	return nil
    79  }
    80  
    81  type changesMeta struct {
    82  	lastSeq sequenceID
    83  	pending int64
    84  }
    85  
    86  // parseMeta parses result metadata
    87  func (m *changesMeta) parseMeta(key string, dec *json.Decoder) error {
    88  	switch key {
    89  	case "last_seq":
    90  		return dec.Decode(&m.lastSeq)
    91  	case "pending":
    92  		return dec.Decode(&m.pending)
    93  	default:
    94  		// Just consume the value, since we don't know what it means.
    95  		var discard json.RawMessage
    96  		return dec.Decode(&discard)
    97  	}
    98  }
    99  
   100  type changesRows struct {
   101  	*iter
   102  	etag string
   103  }
   104  
   105  func newChangesRows(ctx context.Context, key string, r io.ReadCloser, etag string) *changesRows {
   106  	var meta *changesMeta
   107  	if key != "" {
   108  		meta = &changesMeta{}
   109  	}
   110  	return &changesRows{
   111  		iter: newIter(ctx, meta, key, r, &continuousChangesParser{}),
   112  		etag: etag,
   113  	}
   114  }
   115  
   116  var _ driver.Changes = &changesRows{}
   117  
   118  type change struct {
   119  	*driver.Change
   120  	Seq sequenceID `json:"seq"`
   121  }
   122  
   123  func (r *changesRows) Next(row *driver.Change) error {
   124  	row.Deleted = false
   125  	return r.iter.next(row)
   126  }
   127  
   128  // LastSeq returns the last sequence ID.
   129  func (r *changesRows) LastSeq() string {
   130  	return string(r.iter.meta.(*changesMeta).lastSeq)
   131  }
   132  
   133  // Pending returns the pending count.
   134  func (r *changesRows) Pending() int64 {
   135  	return r.iter.meta.(*changesMeta).pending
   136  }
   137  
   138  // ETag returns the unquoted ETag header for the CouchDB response, if any.
   139  func (r *changesRows) ETag() string {
   140  	return r.etag
   141  }
   142  

View as plain text