1
2
3
4
5
6
7
8
9
10
11
12
13 package couchdb
14
15 import (
16 "bytes"
17 "context"
18 "encoding/json"
19 "errors"
20 "fmt"
21 "net/http"
22 "strconv"
23 "strings"
24 "sync"
25 "time"
26
27 kivik "github.com/go-kivik/kivik/v4"
28 "github.com/go-kivik/kivik/v4/couchdb/chttp"
29 "github.com/go-kivik/kivik/v4/driver"
30 internal "github.com/go-kivik/kivik/v4/int/errors"
31 )
32
33 type replicationError struct {
34 status int
35 reason string
36 }
37
38 func (re *replicationError) Error() string {
39 return re.reason
40 }
41
42 func (re *replicationError) HTTPStatus() int {
43 return re.status
44 }
45
46 func (re *replicationError) UnmarshalJSON(data []byte) error {
47 if err := json.Unmarshal(data, &re.reason); err != nil {
48 return err
49 }
50 switch (strings.SplitN(re.reason, ":", 2))[0] {
51 case "db_not_found":
52 re.status = http.StatusNotFound
53 case "timeout":
54 re.status = http.StatusRequestTimeout
55 case "unauthorized":
56 re.status = http.StatusUnauthorized
57 default:
58 re.status = http.StatusInternalServerError
59 }
60 return nil
61 }
62
63 type replicationStateTime time.Time
64
65 func (t *replicationStateTime) UnmarshalJSON(data []byte) error {
66 input := string(bytes.Trim(data, `"`))
67 if ts, err := time.Parse(time.RFC3339, input); err == nil {
68 *t = replicationStateTime(ts)
69 return nil
70 }
71
72 if seconds, err := strconv.ParseInt(input, 10, 64); err == nil {
73 epochTime := replicationStateTime(time.Unix(seconds, 0).UTC())
74 *t = epochTime
75 return nil
76 }
77 return &internal.Error{Status: http.StatusBadGateway, Err: fmt.Errorf("kivik: '%s' does not appear to be a valid timestamp", string(data))}
78 }
79
80 type replication struct {
81 docID string
82 replicationID string
83 source string
84 target string
85 startTime time.Time
86 endTime time.Time
87 state string
88 err error
89
90
91 mu sync.RWMutex
92
93 *db
94 }
95
96 var _ driver.Replication = &replication{}
97
98 func (c *client) fetchReplication(ctx context.Context, docID string) *replication {
99 rep := c.newReplication(docID)
100 rep.db = &db{client: c, dbName: "_replicator"}
101
102
103 _ = rep.updateMain(ctx)
104 return rep
105 }
106
107 func (c *client) newReplication(docID string) *replication {
108 return &replication{
109 docID: docID,
110 db: &db{
111 client: c,
112 dbName: "_replicator",
113 },
114 }
115 }
116
117 func (r *replication) readLock() func() {
118 r.mu.RLock()
119 return r.mu.RUnlock
120 }
121
122 func (r *replication) ReplicationID() string { defer r.readLock()(); return r.replicationID }
123 func (r *replication) Source() string { defer r.readLock()(); return r.source }
124 func (r *replication) Target() string { defer r.readLock()(); return r.target }
125 func (r *replication) StartTime() time.Time { defer r.readLock()(); return r.startTime }
126 func (r *replication) EndTime() time.Time { defer r.readLock()(); return r.endTime }
127 func (r *replication) State() string { defer r.readLock()(); return r.state }
128 func (r *replication) Err() error { defer r.readLock()(); return r.err }
129
130 func (r *replication) Update(ctx context.Context, state *driver.ReplicationInfo) error {
131 if err := r.updateMain(ctx); err != nil {
132 return err
133 }
134 if r.State() == "complete" {
135 state.Progress = 100
136 return nil
137 }
138 info, err := r.updateActiveTasks(ctx)
139 if err != nil {
140 if kivik.HTTPStatus(err) == http.StatusNotFound {
141
142
143 return nil
144 }
145 return err
146 }
147 state.DocWriteFailures = info.DocWriteFailures
148 state.DocsRead = info.DocsRead
149 state.DocsWritten = info.DocsWritten
150
151 return nil
152 }
153
154 type activeTask struct {
155 Type string `json:"type"`
156 ReplicationID string `json:"replication_id"`
157 DocsWritten int64 `json:"docs_written"`
158 DocsRead int64 `json:"docs_read"`
159 DocWriteFailures int64 `json:"doc_write_failures"`
160 }
161
162 func (r *replication) updateActiveTasks(ctx context.Context) (*activeTask, error) {
163 resp, err := r.client.DoReq(ctx, http.MethodGet, "/_active_tasks", nil)
164 if err != nil {
165 return nil, err
166 }
167 if err = chttp.ResponseError(resp); err != nil {
168 return nil, err
169 }
170 defer chttp.CloseBody(resp.Body)
171 var tasks []*activeTask
172 if err = json.NewDecoder(resp.Body).Decode(&tasks); err != nil {
173 return nil, &internal.Error{Status: http.StatusBadGateway, Err: err}
174 }
175 for _, task := range tasks {
176 if task.Type != "replication" {
177 continue
178 }
179 repIDparts := strings.SplitN(task.ReplicationID, "+", 2)
180 if repIDparts[0] != r.replicationID {
181 continue
182 }
183 return task, nil
184 }
185 return nil, &internal.Error{Status: http.StatusNotFound, Err: errors.New("task not found")}
186 }
187
188
189 func (r *replication) updateMain(ctx context.Context) error {
190 doc, err := r.getReplicatorDoc(ctx)
191 if err != nil {
192 return err
193 }
194 r.setFromReplicatorDoc(doc)
195 return nil
196 }
197
198 func (r *replication) getReplicatorDoc(ctx context.Context) (*replicatorDoc, error) {
199 result, err := r.db.Get(ctx, r.docID, kivik.Params(nil))
200 if err != nil {
201 return nil, err
202 }
203 var doc replicatorDoc
204 err = json.NewDecoder(result.Body).Decode(&doc)
205 return &doc, err
206 }
207
208 func (r *replication) setFromReplicatorDoc(doc *replicatorDoc) {
209 r.mu.Lock()
210 defer r.mu.Unlock()
211 switch kivik.ReplicationState(doc.State) {
212 case kivik.ReplicationStarted:
213 r.startTime = time.Time(doc.StateTime)
214 case kivik.ReplicationError, kivik.ReplicationComplete:
215 r.endTime = time.Time(doc.StateTime)
216 }
217 r.state = doc.State
218 if doc.Error != nil {
219 r.err = doc.Error
220 } else {
221 r.err = nil
222 }
223 if r.source == "" {
224 r.source = doc.Source
225 }
226 if r.target == "" {
227 r.target = doc.Target
228 }
229 if r.replicationID == "" {
230 r.replicationID = doc.ReplicationID
231 }
232 }
233
234 func (r *replication) Delete(ctx context.Context) error {
235 rev, err := r.GetRev(ctx, r.docID, kivik.Params(nil))
236 if err != nil {
237 return err
238 }
239 _, err = r.db.Delete(ctx, r.docID, kivik.Rev(rev))
240 return err
241 }
242
243 type replicatorDoc struct {
244 DocID string `json:"_id"`
245 ReplicationID string `json:"_replication_id"`
246 Source string `json:"source"`
247 Target string `json:"target"`
248 State string `json:"_replication_state"`
249 StateTime replicationStateTime `json:"_replication_state_time"`
250 Error *replicationError `json:"_replication_state_reason,omitempty"`
251 }
252
253 func (c *client) GetReplications(ctx context.Context, options driver.Options) ([]driver.Replication, error) {
254 scheduler, err := c.schedulerSupported(ctx)
255 if err != nil {
256 return nil, err
257 }
258 opts := map[string]interface{}{}
259 options.Apply(opts)
260 if scheduler {
261 return c.getReplicationsFromScheduler(ctx, opts)
262 }
263 return c.legacyGetReplications(ctx, opts)
264 }
265
266 func (c *client) legacyGetReplications(ctx context.Context, opts map[string]interface{}) ([]driver.Replication, error) {
267 if opts == nil {
268 opts = map[string]interface{}{}
269 }
270 delete(opts, "conflicts")
271 delete(opts, "update_seq")
272 opts["include_docs"] = true
273 params, err := optionsToParams(opts)
274 if err != nil {
275 return nil, err
276 }
277 var result struct {
278 Rows []struct {
279 Doc replicatorDoc `json:"doc"`
280 } `json:"rows"`
281 }
282 path := "/_replicator/_all_docs?" + params.Encode()
283 if err = c.DoJSON(ctx, http.MethodGet, path, nil, &result); err != nil {
284 return nil, err
285 }
286 reps := make([]driver.Replication, 0, len(result.Rows))
287 for _, row := range result.Rows {
288 if row.Doc.DocID == "_design/_replicator" {
289 continue
290 }
291 rep := c.newReplication(row.Doc.DocID)
292 rep.setFromReplicatorDoc(&row.Doc)
293 reps = append(reps, rep)
294 }
295 return reps, nil
296 }
297
298 func (c *client) Replicate(ctx context.Context, targetDSN, sourceDSN string, options driver.Options) (driver.Replication, error) {
299 opts := map[string]interface{}{}
300 options.Apply(opts)
301
302 if _, ok := opts["source"]; !ok {
303 opts["source"] = sourceDSN
304 }
305 if _, ok := opts["target"]; !ok {
306 opts["target"] = targetDSN
307 }
308 if t := opts["target"]; t == "" {
309 return nil, missingArg("targetDSN")
310 }
311 if s := opts["source"]; s == "" {
312 return nil, missingArg("sourceDSN")
313 }
314
315 scheduler, err := c.schedulerSupported(ctx)
316 if err != nil {
317 return nil, err
318 }
319 chttpOpts := &chttp.Options{
320 Body: chttp.EncodeBody(opts),
321 }
322
323 var repStub struct {
324 ID string `json:"id"`
325 }
326 if e := c.Client.DoJSON(ctx, http.MethodPost, "/_replicator", chttpOpts, &repStub); e != nil {
327 return nil, e
328 }
329 if scheduler {
330 return c.fetchSchedulerReplication(ctx, repStub.ID)
331 }
332 return c.fetchReplication(ctx, repStub.ID), nil
333 }
334
View as plain text