...
1
2
3
4
5
6
7
8
9
10
11
12
13 package kivik
14
15 import (
16 "context"
17 "errors"
18 "net/http"
19
20 "github.com/go-kivik/kivik/v4/driver"
21 internal "github.com/go-kivik/kivik/v4/int/errors"
22 )
23
24
25 type DBUpdates struct {
26 *iter
27 }
28
29 type updatesIterator struct{ driver.DBUpdates }
30
31 var _ iterator = &updatesIterator{}
32
33 func (r *updatesIterator) Next(i interface{}) error {
34 update := i.(*driver.DBUpdate)
35 update.DBName = ""
36 update.Seq = ""
37 update.Type = ""
38 return r.DBUpdates.Next(update)
39 }
40
41 func newDBUpdates(ctx context.Context, onClose func(), updatesi driver.DBUpdates) *DBUpdates {
42 return &DBUpdates{
43 iter: newIterator(ctx, onClose, &updatesIterator{updatesi}, &driver.DBUpdate{}),
44 }
45 }
46
47
48
49
50
51
52
53 func (f *DBUpdates) Close() error {
54 return f.iter.Close()
55 }
56
57
58
59 func (f *DBUpdates) Err() error {
60 return f.iter.Err()
61 }
62
63
64
65
66
67 func (f *DBUpdates) Next() bool {
68 return f.iter.Next()
69 }
70
71
72 func (f *DBUpdates) DBName() string {
73 err := f.isReady()
74 if err != nil {
75 return ""
76 }
77 return f.curVal.(*driver.DBUpdate).DBName
78 }
79
80
81 func (f *DBUpdates) Type() string {
82 err := f.isReady()
83 if err != nil {
84 return ""
85 }
86 return f.curVal.(*driver.DBUpdate).Type
87 }
88
89
90 func (f *DBUpdates) Seq() string {
91 err := f.isReady()
92 if err != nil {
93 return ""
94 }
95 return f.curVal.(*driver.DBUpdate).Seq
96 }
97
98
99
100
101
102
103 func (f *DBUpdates) LastSeq() (string, error) {
104 for f.iter == nil || f.state != stateEOQ && f.state != stateClosed {
105 return "", &internal.Error{Status: http.StatusBadRequest, Err: errors.New("LastSeq must not be called until results iteration is complete")}
106 }
107 driverUpdates := f.feed.(*updatesIterator).DBUpdates
108 if lastSeqer, ok := driverUpdates.(driver.LastSeqer); ok {
109 return lastSeqer.LastSeq()
110 }
111 return "", nil
112 }
113
114
115
116
117
118
119
120
121
122
123
124
125 func (c *Client) DBUpdates(ctx context.Context, options ...Option) *DBUpdates {
126 updater, ok := c.driverClient.(driver.DBUpdater)
127 if !ok {
128 return &DBUpdates{errIterator(&internal.Error{Status: http.StatusNotImplemented, Message: "kivik: driver does not implement DBUpdater"})}
129 }
130
131 endQuery, err := c.startQuery()
132 if err != nil {
133 return &DBUpdates{errIterator(err)}
134 }
135
136 updatesi, err := updater.DBUpdates(ctx, multiOptions(options))
137 if err != nil {
138 endQuery()
139 return &DBUpdates{errIterator(err)}
140 }
141 return newDBUpdates(context.Background(), endQuery, updatesi)
142 }
143
144
145
146
147
148 type DBUpdate struct {
149 DBName string `json:"db_name"`
150 Type string `json:"type"`
151 Seq string `json:"seq"`
152 }
153
154
155
156
157
158
159
160 func (f *DBUpdates) Iterator() func(yield func(*DBUpdate, error) bool) {
161 return func(yield func(*DBUpdate, error) bool) {
162 for f.Next() {
163 update := f.curVal.(*driver.DBUpdate)
164 if !yield((*DBUpdate)(update), nil) {
165 _ = f.Close()
166 break
167 }
168 }
169 if err := f.Err(); err != nil {
170 yield(nil, err)
171 }
172 }
173 }
174
View as plain text