1
2
3
4
5
6
7
8
9
10
11
12
13 package kivik
14
15 import (
16 "bytes"
17 "compress/gzip"
18 "context"
19 "fmt"
20 "io"
21 "net/http"
22 "sync/atomic"
23 "time"
24
25 "golang.org/x/sync/errgroup"
26 )
27
28
29 type ReplicationResult struct {
30 DocWriteFailures int `json:"doc_write_failures"`
31 DocsRead int `json:"docs_read"`
32 DocsWritten int `json:"docs_written"`
33 EndTime time.Time `json:"end_time"`
34 MissingChecked int `json:"missing_checked"`
35 MissingFound int `json:"missing_found"`
36 StartTime time.Time `json:"start_time"`
37 }
38
39 const (
40 eventSecurity = "security"
41 eventChanges = "changes"
42 eventChange = "change"
43 eventRevsDiff = "revsdiff"
44 eventDocument = "document"
45 )
46
47
48
49 type ReplicationEvent struct {
50
51
52
53
54
55
56
57 Type string
58
59 Read bool
60
61 DocID string
62
63 Error error
64
65 Changes []string
66 }
67
68
69 type eventCallback func(ReplicationEvent)
70
71 func (c eventCallback) Apply(target interface{}) {
72 if r, ok := target.(*replicator); ok {
73 r.cb = c
74 }
75 }
76
77
78
79 func ReplicateCallback(callback func(ReplicationEvent)) Option {
80 return eventCallback(callback)
81 }
82
83 type replicateCopySecurityOption struct{}
84
85 func (r replicateCopySecurityOption) Apply(target interface{}) {
86 if r, ok := target.(*replicator); ok {
87 r.withSecurity = true
88 }
89 }
90
91
92
93
94 func ReplicateCopySecurity() Option {
95 return replicateCopySecurityOption{}
96 }
97
98
99
100
101
102
103
104
105
106
107
108 func Replicate(ctx context.Context, target, source *DB, options ...Option) (*ReplicationResult, error) {
109 opts := multiOptions(options)
110
111 r := newReplicator(target, source)
112 opts.Apply(r)
113 err := r.replicate(ctx, opts)
114 return r.result(), err
115 }
116
117 func (r *replicator) replicate(ctx context.Context, options Option) error {
118 if err := r.copySecurity(ctx); err != nil {
119 return err
120 }
121
122 group, ctx := errgroup.WithContext(ctx)
123 changes := make(chan *change)
124 group.Go(func() error {
125 defer close(changes)
126 return r.readChanges(ctx, changes, options)
127 })
128
129 diffs := make(chan *revDiff)
130 group.Go(func() error {
131 defer close(diffs)
132 return r.readDiffs(ctx, changes, diffs)
133 })
134
135 docs := make(chan *document)
136 group.Go(func() error {
137 defer close(docs)
138 return r.readDocs(ctx, diffs, docs)
139 })
140
141 group.Go(func() error {
142 return r.storeDocs(ctx, docs)
143 })
144
145 return group.Wait()
146 }
147
148
149 type replicator struct {
150 target, source *DB
151 cb eventCallback
152
153
154
155
156 withSecurity bool
157
158 noOpenRevs bool
159 start time.Time
160
161 writeFailures, reads, writes, missingChecks, missingFound int32
162 }
163
164 func newReplicator(target, source *DB) *replicator {
165 return &replicator{
166 target: target,
167 source: source,
168 start: time.Now(),
169 }
170 }
171
172 func (r *replicator) callback(e ReplicationEvent) {
173 if r.cb == nil {
174 return
175 }
176 r.cb(e)
177 }
178
179 func (r *replicator) result() *ReplicationResult {
180 return &ReplicationResult{
181 StartTime: r.start,
182 EndTime: time.Now(),
183 DocWriteFailures: int(r.writeFailures),
184 DocsRead: int(r.reads),
185 DocsWritten: int(r.writes),
186 MissingChecked: int(r.missingChecks),
187 MissingFound: int(r.missingFound),
188 }
189 }
190
191 func (r *replicator) copySecurity(ctx context.Context) error {
192 if !r.withSecurity {
193 return nil
194 }
195 sec, err := r.source.Security(ctx)
196 r.callback(ReplicationEvent{
197 Type: eventSecurity,
198 Read: true,
199 Error: err,
200 })
201 if err != nil {
202 return fmt.Errorf("read security: %w", err)
203 }
204 err = r.target.SetSecurity(ctx, sec)
205 r.callback(ReplicationEvent{
206 Type: eventSecurity,
207 Read: false,
208 Error: err,
209 })
210 if err != nil {
211 return fmt.Errorf("set security: %w", err)
212 }
213 return nil
214 }
215
216 type change struct {
217 ID string
218 Changes []string
219 }
220
221
222
223
224 func (r *replicator) readChanges(ctx context.Context, results chan<- *change, options Option) error {
225 changes := r.source.Changes(ctx, options, Param("feed", "normal"), Param("style", "all_docs"))
226 r.callback(ReplicationEvent{
227 Type: eventChanges,
228 Read: true,
229 })
230
231 defer changes.Close()
232 for changes.Next() {
233 ch := &change{
234 ID: changes.ID(),
235 Changes: changes.Changes(),
236 }
237 r.callback(ReplicationEvent{
238 Type: eventChange,
239 DocID: ch.ID,
240 Read: true,
241 Changes: ch.Changes,
242 })
243 select {
244 case <-ctx.Done():
245 return ctx.Err()
246 case results <- ch:
247 }
248 }
249 if err := changes.Err(); err != nil {
250 r.callback(ReplicationEvent{
251 Type: eventChanges,
252 Read: true,
253 Error: err,
254 })
255 return fmt.Errorf("read changes feed: %w", err)
256 }
257 return nil
258 }
259
260 type revDiff struct {
261 ID string `json:"-"`
262 Missing []string `json:"missing"`
263 PossibleAncestors []string `json:"possible_ancestors"`
264 }
265
266 const rdBatchSize = 10
267
268
269
270
271 func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results chan<- *revDiff) error {
272 for {
273 revMap := map[string][]string{}
274 var change *change
275 var ok bool
276 loop:
277 for {
278 select {
279 case <-ctx.Done():
280 return ctx.Err()
281 case change, ok = <-ch:
282 if !ok {
283 break loop
284 }
285 revMap[change.ID] = change.Changes
286 if len(revMap) >= rdBatchSize {
287 break loop
288 }
289 }
290 }
291
292 if len(revMap) == 0 {
293 return nil
294 }
295 diffs := r.target.RevsDiff(ctx, revMap)
296 err := diffs.Err()
297 r.callback(ReplicationEvent{
298 Type: eventRevsDiff,
299 Read: true,
300 Error: err,
301 })
302 if err != nil {
303 return err
304 }
305 defer diffs.Close()
306 for diffs.Next() {
307 var val revDiff
308 if err := diffs.ScanValue(&val); err != nil {
309 r.callback(ReplicationEvent{
310 Type: eventRevsDiff,
311 Read: true,
312 Error: err,
313 })
314 return err
315 }
316 val.ID, _ = diffs.ID()
317 r.callback(ReplicationEvent{
318 Type: eventRevsDiff,
319 Read: true,
320 DocID: val.ID,
321 })
322 select {
323 case <-ctx.Done():
324 return ctx.Err()
325 case results <- &val:
326 }
327 }
328 if err := diffs.Err(); err != nil {
329 r.callback(ReplicationEvent{
330 Type: eventRevsDiff,
331 Read: true,
332 Error: err,
333 })
334 return fmt.Errorf("read revs diffs: %w", err)
335 }
336 }
337 }
338
339
340
341
342
343 func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, results chan<- *document) error {
344 for {
345 var rd *revDiff
346 var ok bool
347 select {
348 case <-ctx.Done():
349 return ctx.Err()
350 case rd, ok = <-diffs:
351 if !ok {
352 return nil
353 }
354 if err := r.readDoc(ctx, rd.ID, rd.Missing, results); err != nil {
355 return err
356 }
357 }
358 }
359 }
360
361 func (r *replicator) readDoc(ctx context.Context, id string, revs []string, results chan<- *document) error {
362 if !r.noOpenRevs {
363 err := r.readOpenRevs(ctx, id, revs, results)
364 if HTTPStatus(err) == http.StatusNotImplemented {
365 r.noOpenRevs = true
366 } else {
367 return err
368 }
369 }
370 return r.readIndividualDocs(ctx, id, revs, results)
371 }
372
373 func (r *replicator) readOpenRevs(ctx context.Context, id string, revs []string, results chan<- *document) error {
374 rs := r.source.OpenRevs(ctx, id, revs, Params(map[string]interface{}{
375 "revs": true,
376 "latest": true,
377 }))
378 defer rs.Close()
379 for rs.Next() {
380 atomic.AddInt32(&r.reads, 1)
381 atomic.AddInt32(&r.missingFound, 1)
382 doc := new(document)
383 err := rs.ScanDoc(&doc)
384 if err != nil {
385 return err
386 }
387 r.callback(ReplicationEvent{
388 Type: eventDocument,
389 Read: true,
390 DocID: id,
391 Error: err,
392 })
393 atts, _ := rs.Attachments()
394 if err := prepareAttachments(doc, atts); err != nil {
395 return err
396 }
397 select {
398 case <-ctx.Done():
399 return ctx.Err()
400 case results <- doc:
401 }
402 }
403 err := rs.Err()
404 if err == nil {
405 atomic.AddInt32(&r.missingChecks, int32(len(revs)))
406 }
407 return err
408 }
409
410 func (r *replicator) readIndividualDocs(ctx context.Context, id string, revs []string, results chan<- *document) error {
411 for _, rev := range revs {
412 atomic.AddInt32(&r.missingChecks, 1)
413 d, err := readDoc(ctx, r.source, id, rev)
414 r.callback(ReplicationEvent{
415 Type: eventDocument,
416 Read: true,
417 DocID: id,
418 Error: err,
419 })
420 if err != nil {
421 return fmt.Errorf("read doc %s: %w", id, err)
422 }
423 atomic.AddInt32(&r.reads, 1)
424 atomic.AddInt32(&r.missingFound, 1)
425 select {
426 case <-ctx.Done():
427 return ctx.Err()
428 case results <- d:
429 }
430 }
431 return nil
432 }
433
434
435
436 func prepareAttachments(doc *document, atts *AttachmentsIterator) error {
437 if atts == nil {
438 return nil
439 }
440
441
442 for {
443 att, err := atts.Next()
444 if err != nil {
445 if err == io.EOF {
446 return nil
447 }
448 return err
449 }
450 var content []byte
451 switch att.ContentEncoding {
452 case "":
453 var err error
454 content, err = io.ReadAll(att.Content)
455 if err != nil {
456 return err
457 }
458 if err := att.Content.Close(); err != nil {
459 return err
460 }
461 case "gzip":
462 zr, err := gzip.NewReader(att.Content)
463 if err != nil {
464 return err
465 }
466 content, err = io.ReadAll(zr)
467 if err != nil {
468 return err
469 }
470 if err := zr.Close(); err != nil {
471 return err
472 }
473 if err := att.Content.Close(); err != nil {
474 return err
475 }
476 default:
477 return fmt.Errorf("Unknown encoding '%s' for attachment '%s'", att.ContentEncoding, att.Filename)
478 }
479 att.Stub = false
480 att.Follows = false
481 att.Content = io.NopCloser(bytes.NewReader(content))
482 doc.Attachments.Set(att.Filename, att)
483 }
484 }
485
486 func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) {
487 doc := new(document)
488 row := db.Get(ctx, docID, Params(map[string]interface{}{
489 "rev": rev,
490 "revs": true,
491 "attachments": true,
492 }))
493 if err := row.ScanDoc(&doc); err != nil {
494 return nil, err
495 }
496 atts, _ := row.Attachments()
497 if err := prepareAttachments(doc, atts); err != nil {
498 return nil, err
499 }
500
501 return doc, nil
502 }
503
504
505
506
507 func (r *replicator) storeDocs(ctx context.Context, docs <-chan *document) error {
508 for doc := range docs {
509 _, err := r.target.Put(ctx, doc.ID, doc, Param("new_edits", false))
510 r.callback(ReplicationEvent{
511 Type: "document",
512 Read: false,
513 DocID: doc.ID,
514 Error: err,
515 })
516 if err != nil {
517 atomic.AddInt32(&r.writeFailures, 1)
518 return fmt.Errorf("store doc %s: %w", doc.ID, err)
519 }
520 atomic.AddInt32(&r.writes, 1)
521 }
522 return nil
523 }
524
View as plain text