...
1
2
3
4
5
6
7
8
9
10
11
12
13 package couchdb
14
15 import (
16 "context"
17 "encoding/json"
18 "errors"
19 "io"
20 "net/http"
21
22 "github.com/go-kivik/kivik/v4/couchdb/chttp"
23 "github.com/go-kivik/kivik/v4/driver"
24 internal "github.com/go-kivik/kivik/v4/int/errors"
25 )
26
27
28 func (d *db) Changes(ctx context.Context, options driver.Options) (driver.Changes, error) {
29 opts := map[string]interface{}{}
30 options.Apply(opts)
31 key := "results"
32 if f, ok := opts["feed"]; ok {
33 if f == "eventsource" {
34 return nil, &internal.Error{Status: http.StatusBadRequest, Err: errors.New("kivik: eventsource feed not supported, use 'continuous'")}
35 }
36 if f == "continuous" {
37 key = ""
38 }
39 }
40 chttpOpts := new(chttp.Options)
41 if ids := opts["doc_ids"]; ids != nil {
42 delete(opts, "doc_ids")
43 chttpOpts.GetBody = chttp.BodyEncoder(map[string]interface{}{
44 "doc_ids": ids,
45 })
46 }
47 var err error
48 chttpOpts.Query, err = optionsToParams(opts)
49 if err != nil {
50 return nil, err
51 }
52
53 resp, err := d.Client.DoReq(ctx, http.MethodPost, d.path("_changes"), chttpOpts)
54 if err != nil {
55 return nil, err
56 }
57 if err = chttp.ResponseError(resp); err != nil {
58 return nil, err
59 }
60 etag, _ := chttp.ETag(resp)
61 return newChangesRows(ctx, key, resp.Body, etag), nil
62 }
63
64 type continuousChangesParser struct{}
65
66 func (p *continuousChangesParser) parseMeta(i interface{}, dec *json.Decoder, key string) error {
67 meta := i.(*changesMeta)
68 return meta.parseMeta(key, dec)
69 }
70
71 func (p *continuousChangesParser) decodeItem(i interface{}, dec *json.Decoder) error {
72 row := i.(*driver.Change)
73 ch := &change{Change: row}
74 if err := dec.Decode(ch); err != nil {
75 return &internal.Error{Status: http.StatusBadGateway, Err: err}
76 }
77 ch.Change.Seq = string(ch.Seq)
78 return nil
79 }
80
81 type changesMeta struct {
82 lastSeq sequenceID
83 pending int64
84 }
85
86
87 func (m *changesMeta) parseMeta(key string, dec *json.Decoder) error {
88 switch key {
89 case "last_seq":
90 return dec.Decode(&m.lastSeq)
91 case "pending":
92 return dec.Decode(&m.pending)
93 default:
94
95 var discard json.RawMessage
96 return dec.Decode(&discard)
97 }
98 }
99
100 type changesRows struct {
101 *iter
102 etag string
103 }
104
105 func newChangesRows(ctx context.Context, key string, r io.ReadCloser, etag string) *changesRows {
106 var meta *changesMeta
107 if key != "" {
108 meta = &changesMeta{}
109 }
110 return &changesRows{
111 iter: newIter(ctx, meta, key, r, &continuousChangesParser{}),
112 etag: etag,
113 }
114 }
115
116 var _ driver.Changes = &changesRows{}
117
118 type change struct {
119 *driver.Change
120 Seq sequenceID `json:"seq"`
121 }
122
123 func (r *changesRows) Next(row *driver.Change) error {
124 row.Deleted = false
125 return r.iter.next(row)
126 }
127
128
129 func (r *changesRows) LastSeq() string {
130 return string(r.iter.meta.(*changesMeta).lastSeq)
131 }
132
133
134 func (r *changesRows) Pending() int64 {
135 return r.iter.meta.(*changesMeta).pending
136 }
137
138
139 func (r *changesRows) ETag() string {
140 return r.etag
141 }
142
View as plain text