...
1
2
3
4
5
6
7
8
9
10
11
12
13 package kivik
14
15 import (
16 "context"
17 "encoding/json"
18 "errors"
19 "io"
20 "net/http"
21
22 "github.com/go-kivik/kivik/v4/driver"
23 internal "github.com/go-kivik/kivik/v4/int/errors"
24 )
25
26
27 type Changes struct {
28 *iter
29 changesi driver.Changes
30 }
31
32 type changesIterator struct {
33 driver.Changes
34 *ChangesMetadata
35 }
36
37 var _ iterator = &changesIterator{}
38
39 func (c *changesIterator) Next(i interface{}) error {
40 change := i.(*driver.Change)
41 change.ID = ""
42 change.Seq = ""
43 change.Deleted = false
44 change.Changes = change.Changes[:0]
45 change.Doc = change.Doc[:0]
46 err := c.Changes.Next(change)
47 if err == io.EOF || err == driver.EOQ {
48 c.ChangesMetadata = &ChangesMetadata{
49 LastSeq: c.Changes.LastSeq(),
50 Pending: c.Changes.Pending(),
51 }
52 }
53 return err
54 }
55
56 func newChanges(ctx context.Context, onClose func(), changesi driver.Changes) *Changes {
57 return &Changes{
58 iter: newIterator(ctx, onClose, &changesIterator{Changes: changesi}, &driver.Change{}),
59 changesi: changesi,
60 }
61 }
62
63
64
65
66
67
68
69 func (c *Changes) Close() error {
70 return c.iter.Close()
71 }
72
73
74
75 func (c *Changes) Err() error {
76 return c.iter.Err()
77 }
78
79
80
81
82
83 func (c *Changes) Next() bool {
84 return c.iter.Next()
85 }
86
87
88 func (c *Changes) Changes() []string {
89 return c.curVal.(*driver.Change).Changes
90 }
91
92
93 func (c *Changes) Deleted() bool {
94 return c.curVal.(*driver.Change).Deleted
95 }
96
97
98 func (c *Changes) ID() string {
99 return c.curVal.(*driver.Change).ID
100 }
101
102
103
104 func (c *Changes) ScanDoc(dest interface{}) error {
105 err := c.isReady()
106 if err != nil {
107 return err
108 }
109 return json.Unmarshal(c.curVal.(*driver.Change).Doc, dest)
110 }
111
112
113
114
115
116 func (db *DB) Changes(ctx context.Context, options ...Option) *Changes {
117 if db.err != nil {
118 return &Changes{iter: errIterator(db.err)}
119 }
120 endQuery, err := db.startQuery()
121 if err != nil {
122 return &Changes{iter: errIterator(err)}
123 }
124 changesi, err := db.driverDB.Changes(ctx, multiOptions(options))
125 if err != nil {
126 endQuery()
127 return &Changes{iter: errIterator(err)}
128 }
129 return newChanges(ctx, endQuery, changesi)
130 }
131
132
133 func (c *Changes) Seq() string {
134 return c.curVal.(*driver.Change).Seq
135 }
136
137
138 type ChangesMetadata struct {
139
140
141 LastSeq string
142
143 Pending int64
144 }
145
146
147
148
149 func (c *Changes) Metadata() (*ChangesMetadata, error) {
150 if c.iter == nil || (c.state != stateEOQ && c.state != stateClosed) {
151 return nil, &internal.Error{Status: http.StatusBadRequest, Err: errors.New("Metadata must not be called until result set iteration is complete")}
152 }
153 return c.feed.(*changesIterator).ChangesMetadata, nil
154 }
155
156
157 func (c *Changes) ETag() string {
158 if c.changesi == nil {
159 return ""
160 }
161 return c.changesi.ETag()
162 }
163
164
165
166
167
168
169 type Change struct {
170
171 ID string `json:"id"`
172
173 Seq string `json:"seq"`
174
175
176 Deleted bool `json:"deleted"`
177
178
179 Changes []string `json:"-"`
180
181
182 doc json.RawMessage
183 }
184
185
186
187 func (c *Change) ScanDoc(dest interface{}) error {
188 return json.Unmarshal(c.doc, dest)
189 }
190
191
192
193
194
195
196
197 func (c *Changes) Iterator() func(yield func(*Change, error) bool) {
198 return func(yield func(*Change, error) bool) {
199 for c.Next() {
200 dChange := c.curVal.(*driver.Change)
201 change := &Change{
202 ID: dChange.ID,
203 Seq: dChange.Seq,
204 Deleted: dChange.Deleted,
205 Changes: dChange.Changes,
206 doc: dChange.Doc,
207 }
208 if !yield(change, nil) {
209 _ = c.Close()
210 return
211 }
212 }
213 if err := c.Err(); err != nil {
214 yield(nil, err)
215 }
216 }
217 }
218
View as plain text