...

Source file src/github.com/go-kivik/kivik/v4/replicate.go

Documentation: github.com/go-kivik/kivik/v4

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package kivik
    14  
    15  import (
    16  	"bytes"
    17  	"compress/gzip"
    18  	"context"
    19  	"fmt"
    20  	"io"
    21  	"net/http"
    22  	"sync/atomic"
    23  	"time"
    24  
    25  	"golang.org/x/sync/errgroup"
    26  )
    27  
    28  // ReplicationResult represents the result of a replication.
    29  type ReplicationResult struct {
    30  	DocWriteFailures int       `json:"doc_write_failures"`
    31  	DocsRead         int       `json:"docs_read"`
    32  	DocsWritten      int       `json:"docs_written"`
    33  	EndTime          time.Time `json:"end_time"`
    34  	MissingChecked   int       `json:"missing_checked"`
    35  	MissingFound     int       `json:"missing_found"`
    36  	StartTime        time.Time `json:"start_time"`
    37  }
    38  
    39  const (
    40  	eventSecurity = "security"
    41  	eventChanges  = "changes"
    42  	eventChange   = "change"
    43  	eventRevsDiff = "revsdiff"
    44  	eventDocument = "document"
    45  )
    46  
    47  // ReplicationEvent is an event emitted by the Replicate function, which
    48  // represents a single read or write event, and its status.
    49  type ReplicationEvent struct {
    50  	// Type is the event type. Options are:
    51  	//
    52  	// - "security" -- Relates to the _security document.
    53  	// - "changes"  -- Relates to the changes feed.
    54  	// - "change"   -- Relates to a single change.
    55  	// - "revsdiff" -- Relates to reading the revs diff.
    56  	// - "document" -- Relates to a specific document.
    57  	Type string
    58  	// Read is true if the event relates to a read operation.
    59  	Read bool
    60  	// DocID is the relevant document ID, if any.
    61  	DocID string
    62  	// Error is the error associated with the event, if any.
    63  	Error error
    64  	// Changes is the list of changed revs, for a "change" event.
    65  	Changes []string
    66  }
    67  
    68  // eventCallback is a function that receives replication events.
    69  type eventCallback func(ReplicationEvent)
    70  
    71  func (c eventCallback) Apply(target interface{}) {
    72  	if r, ok := target.(*replicator); ok {
    73  		r.cb = c
    74  	}
    75  }
    76  
    77  // ReplicateCallback sets a callback function to be called on every replication
    78  // event that takes place.
    79  func ReplicateCallback(callback func(ReplicationEvent)) Option {
    80  	return eventCallback(callback)
    81  }
    82  
    83  type replicateCopySecurityOption struct{}
    84  
    85  func (r replicateCopySecurityOption) Apply(target interface{}) {
    86  	if r, ok := target.(*replicator); ok {
    87  		r.withSecurity = true
    88  	}
    89  }
    90  
    91  // ReplicateCopySecurity will read the security object from source, and copy it
    92  // to the target, before the replication. Use with caution! The security object
    93  // is not versioned, and it will be unconditionally overwritten on the target!
    94  func ReplicateCopySecurity() Option {
    95  	return replicateCopySecurityOption{}
    96  }
    97  
    98  // Replicate performs a replication from source to target, using a limited
    99  // version of the CouchDB replication protocol.
   100  //
   101  // This function supports the [ReplicateCopySecurity] and [ReplicateCallback]
   102  // options. Additionally, the following standard options are passed along to
   103  // the source when querying the changes feed, for server-side filtering, where
   104  // supported:
   105  //
   106  //	filter (string)           - The name of a filter function.
   107  //	doc_ids (array of string) - Array of document IDs to be synchronized.
   108  func Replicate(ctx context.Context, target, source *DB, options ...Option) (*ReplicationResult, error) {
   109  	opts := multiOptions(options)
   110  
   111  	r := newReplicator(target, source)
   112  	opts.Apply(r)
   113  	err := r.replicate(ctx, opts)
   114  	return r.result(), err
   115  }
   116  
   117  func (r *replicator) replicate(ctx context.Context, options Option) error {
   118  	if err := r.copySecurity(ctx); err != nil {
   119  		return err
   120  	}
   121  
   122  	group, ctx := errgroup.WithContext(ctx)
   123  	changes := make(chan *change)
   124  	group.Go(func() error {
   125  		defer close(changes)
   126  		return r.readChanges(ctx, changes, options)
   127  	})
   128  
   129  	diffs := make(chan *revDiff)
   130  	group.Go(func() error {
   131  		defer close(diffs)
   132  		return r.readDiffs(ctx, changes, diffs)
   133  	})
   134  
   135  	docs := make(chan *document)
   136  	group.Go(func() error {
   137  		defer close(docs)
   138  		return r.readDocs(ctx, diffs, docs)
   139  	})
   140  
   141  	group.Go(func() error {
   142  		return r.storeDocs(ctx, docs)
   143  	})
   144  
   145  	return group.Wait()
   146  }
   147  
   148  // replicator manages a single replication.
   149  type replicator struct {
   150  	target, source *DB
   151  	cb             eventCallback
   152  	// withSecurity indicates that the security object should be read from
   153  	// source, and copied to the target, before the replication. Use with
   154  	// caution! The security object is not versioned, and will be
   155  	// unconditionally overwritten!
   156  	withSecurity bool
   157  	// noOpenRevs is set if a call to OpenRevs returns unsupported
   158  	noOpenRevs bool
   159  	start      time.Time
   160  	// replication stats counters
   161  	writeFailures, reads, writes, missingChecks, missingFound int32
   162  }
   163  
   164  func newReplicator(target, source *DB) *replicator {
   165  	return &replicator{
   166  		target: target,
   167  		source: source,
   168  		start:  time.Now(),
   169  	}
   170  }
   171  
   172  func (r *replicator) callback(e ReplicationEvent) {
   173  	if r.cb == nil {
   174  		return
   175  	}
   176  	r.cb(e)
   177  }
   178  
   179  func (r *replicator) result() *ReplicationResult {
   180  	return &ReplicationResult{
   181  		StartTime:        r.start,
   182  		EndTime:          time.Now(),
   183  		DocWriteFailures: int(r.writeFailures),
   184  		DocsRead:         int(r.reads),
   185  		DocsWritten:      int(r.writes),
   186  		MissingChecked:   int(r.missingChecks),
   187  		MissingFound:     int(r.missingFound),
   188  	}
   189  }
   190  
   191  func (r *replicator) copySecurity(ctx context.Context) error {
   192  	if !r.withSecurity {
   193  		return nil
   194  	}
   195  	sec, err := r.source.Security(ctx)
   196  	r.callback(ReplicationEvent{
   197  		Type:  eventSecurity,
   198  		Read:  true,
   199  		Error: err,
   200  	})
   201  	if err != nil {
   202  		return fmt.Errorf("read security: %w", err)
   203  	}
   204  	err = r.target.SetSecurity(ctx, sec)
   205  	r.callback(ReplicationEvent{
   206  		Type:  eventSecurity,
   207  		Read:  false,
   208  		Error: err,
   209  	})
   210  	if err != nil {
   211  		return fmt.Errorf("set security: %w", err)
   212  	}
   213  	return nil
   214  }
   215  
   216  type change struct {
   217  	ID      string
   218  	Changes []string
   219  }
   220  
   221  // readChanges reads the changes feed.
   222  //
   223  // https://docs.couchdb.org/en/stable/replication/protocol.html#listen-to-changes-feed
   224  func (r *replicator) readChanges(ctx context.Context, results chan<- *change, options Option) error {
   225  	changes := r.source.Changes(ctx, options, Param("feed", "normal"), Param("style", "all_docs"))
   226  	r.callback(ReplicationEvent{
   227  		Type: eventChanges,
   228  		Read: true,
   229  	})
   230  
   231  	defer changes.Close() // nolint: errcheck
   232  	for changes.Next() {
   233  		ch := &change{
   234  			ID:      changes.ID(),
   235  			Changes: changes.Changes(),
   236  		}
   237  		r.callback(ReplicationEvent{
   238  			Type:    eventChange,
   239  			DocID:   ch.ID,
   240  			Read:    true,
   241  			Changes: ch.Changes,
   242  		})
   243  		select {
   244  		case <-ctx.Done():
   245  			return ctx.Err()
   246  		case results <- ch:
   247  		}
   248  	}
   249  	if err := changes.Err(); err != nil {
   250  		r.callback(ReplicationEvent{
   251  			Type:  eventChanges,
   252  			Read:  true,
   253  			Error: err,
   254  		})
   255  		return fmt.Errorf("read changes feed: %w", err)
   256  	}
   257  	return nil
   258  }
   259  
   260  type revDiff struct {
   261  	ID                string   `json:"-"`
   262  	Missing           []string `json:"missing"`
   263  	PossibleAncestors []string `json:"possible_ancestors"`
   264  }
   265  
   266  const rdBatchSize = 10
   267  
   268  // readDiffs reads the diffs for the reported changes.
   269  //
   270  // https://docs.couchdb.org/en/stable/replication/protocol.html#calculate-revision-difference
   271  func (r *replicator) readDiffs(ctx context.Context, ch <-chan *change, results chan<- *revDiff) error {
   272  	for {
   273  		revMap := map[string][]string{}
   274  		var change *change
   275  		var ok bool
   276  	loop:
   277  		for {
   278  			select {
   279  			case <-ctx.Done():
   280  				return ctx.Err()
   281  			case change, ok = <-ch:
   282  				if !ok {
   283  					break loop
   284  				}
   285  				revMap[change.ID] = change.Changes
   286  				if len(revMap) >= rdBatchSize {
   287  					break loop
   288  				}
   289  			}
   290  		}
   291  
   292  		if len(revMap) == 0 {
   293  			return nil
   294  		}
   295  		diffs := r.target.RevsDiff(ctx, revMap)
   296  		err := diffs.Err()
   297  		r.callback(ReplicationEvent{
   298  			Type:  eventRevsDiff,
   299  			Read:  true,
   300  			Error: err,
   301  		})
   302  		if err != nil {
   303  			return err
   304  		}
   305  		defer diffs.Close() // nolint: errcheck
   306  		for diffs.Next() {
   307  			var val revDiff
   308  			if err := diffs.ScanValue(&val); err != nil {
   309  				r.callback(ReplicationEvent{
   310  					Type:  eventRevsDiff,
   311  					Read:  true,
   312  					Error: err,
   313  				})
   314  				return err
   315  			}
   316  			val.ID, _ = diffs.ID()
   317  			r.callback(ReplicationEvent{
   318  				Type:  eventRevsDiff,
   319  				Read:  true,
   320  				DocID: val.ID,
   321  			})
   322  			select {
   323  			case <-ctx.Done():
   324  				return ctx.Err()
   325  			case results <- &val:
   326  			}
   327  		}
   328  		if err := diffs.Err(); err != nil {
   329  			r.callback(ReplicationEvent{
   330  				Type:  eventRevsDiff,
   331  				Read:  true,
   332  				Error: err,
   333  			})
   334  			return fmt.Errorf("read revs diffs: %w", err)
   335  		}
   336  	}
   337  }
   338  
   339  // readDocs reads the document revisions that have changed between source and
   340  // target.
   341  //
   342  // https://docs.couchdb.org/en/stable/replication/protocol.html#fetch-changed-documents
   343  func (r *replicator) readDocs(ctx context.Context, diffs <-chan *revDiff, results chan<- *document) error {
   344  	for {
   345  		var rd *revDiff
   346  		var ok bool
   347  		select {
   348  		case <-ctx.Done():
   349  			return ctx.Err()
   350  		case rd, ok = <-diffs:
   351  			if !ok {
   352  				return nil
   353  			}
   354  			if err := r.readDoc(ctx, rd.ID, rd.Missing, results); err != nil {
   355  				return err
   356  			}
   357  		}
   358  	}
   359  }
   360  
   361  func (r *replicator) readDoc(ctx context.Context, id string, revs []string, results chan<- *document) error {
   362  	if !r.noOpenRevs {
   363  		err := r.readOpenRevs(ctx, id, revs, results)
   364  		if HTTPStatus(err) == http.StatusNotImplemented {
   365  			r.noOpenRevs = true
   366  		} else {
   367  			return err
   368  		}
   369  	}
   370  	return r.readIndividualDocs(ctx, id, revs, results)
   371  }
   372  
   373  func (r *replicator) readOpenRevs(ctx context.Context, id string, revs []string, results chan<- *document) error {
   374  	rs := r.source.OpenRevs(ctx, id, revs, Params(map[string]interface{}{
   375  		"revs":   true,
   376  		"latest": true,
   377  	}))
   378  	defer rs.Close()
   379  	for rs.Next() {
   380  		atomic.AddInt32(&r.reads, 1)
   381  		atomic.AddInt32(&r.missingFound, 1)
   382  		doc := new(document)
   383  		err := rs.ScanDoc(&doc)
   384  		if err != nil {
   385  			return err
   386  		}
   387  		r.callback(ReplicationEvent{
   388  			Type:  eventDocument,
   389  			Read:  true,
   390  			DocID: id,
   391  			Error: err,
   392  		})
   393  		atts, _ := rs.Attachments()
   394  		if err := prepareAttachments(doc, atts); err != nil {
   395  			return err
   396  		}
   397  		select {
   398  		case <-ctx.Done():
   399  			return ctx.Err()
   400  		case results <- doc:
   401  		}
   402  	}
   403  	err := rs.Err()
   404  	if err == nil {
   405  		atomic.AddInt32(&r.missingChecks, int32(len(revs)))
   406  	}
   407  	return err
   408  }
   409  
   410  func (r *replicator) readIndividualDocs(ctx context.Context, id string, revs []string, results chan<- *document) error {
   411  	for _, rev := range revs {
   412  		atomic.AddInt32(&r.missingChecks, 1)
   413  		d, err := readDoc(ctx, r.source, id, rev)
   414  		r.callback(ReplicationEvent{
   415  			Type:  eventDocument,
   416  			Read:  true,
   417  			DocID: id,
   418  			Error: err,
   419  		})
   420  		if err != nil {
   421  			return fmt.Errorf("read doc %s: %w", id, err)
   422  		}
   423  		atomic.AddInt32(&r.reads, 1)
   424  		atomic.AddInt32(&r.missingFound, 1)
   425  		select {
   426  		case <-ctx.Done():
   427  			return ctx.Err()
   428  		case results <- d:
   429  		}
   430  	}
   431  	return nil
   432  }
   433  
   434  // prepareAttachments reads attachments from atts, prepares them, and adds them
   435  // to doc.
   436  func prepareAttachments(doc *document, atts *AttachmentsIterator) error {
   437  	if atts == nil {
   438  		return nil
   439  	}
   440  	// TODO: It seems silly this is necessary... I need better attachment
   441  	// handling in kivik.
   442  	for {
   443  		att, err := atts.Next()
   444  		if err != nil {
   445  			if err == io.EOF {
   446  				return nil
   447  			}
   448  			return err
   449  		}
   450  		var content []byte
   451  		switch att.ContentEncoding {
   452  		case "":
   453  			var err error
   454  			content, err = io.ReadAll(att.Content)
   455  			if err != nil {
   456  				return err
   457  			}
   458  			if err := att.Content.Close(); err != nil {
   459  				return err
   460  			}
   461  		case "gzip":
   462  			zr, err := gzip.NewReader(att.Content)
   463  			if err != nil {
   464  				return err
   465  			}
   466  			content, err = io.ReadAll(zr)
   467  			if err != nil {
   468  				return err
   469  			}
   470  			if err := zr.Close(); err != nil {
   471  				return err
   472  			}
   473  			if err := att.Content.Close(); err != nil {
   474  				return err
   475  			}
   476  		default:
   477  			return fmt.Errorf("Unknown encoding '%s' for attachment '%s'", att.ContentEncoding, att.Filename)
   478  		}
   479  		att.Stub = false
   480  		att.Follows = false
   481  		att.Content = io.NopCloser(bytes.NewReader(content))
   482  		doc.Attachments.Set(att.Filename, att)
   483  	}
   484  }
   485  
   486  func readDoc(ctx context.Context, db *DB, docID, rev string) (*document, error) {
   487  	doc := new(document)
   488  	row := db.Get(ctx, docID, Params(map[string]interface{}{
   489  		"rev":         rev,
   490  		"revs":        true,
   491  		"attachments": true,
   492  	}))
   493  	if err := row.ScanDoc(&doc); err != nil {
   494  		return nil, err
   495  	}
   496  	atts, _ := row.Attachments()
   497  	if err := prepareAttachments(doc, atts); err != nil {
   498  		return nil, err
   499  	}
   500  
   501  	return doc, nil
   502  }
   503  
   504  // storeDocs updates the changed documents.
   505  //
   506  // https://docs.couchdb.org/en/stable/replication/protocol.html#upload-batch-of-changed-documents
   507  func (r *replicator) storeDocs(ctx context.Context, docs <-chan *document) error {
   508  	for doc := range docs {
   509  		_, err := r.target.Put(ctx, doc.ID, doc, Param("new_edits", false))
   510  		r.callback(ReplicationEvent{
   511  			Type:  "document",
   512  			Read:  false,
   513  			DocID: doc.ID,
   514  			Error: err,
   515  		})
   516  		if err != nil {
   517  			atomic.AddInt32(&r.writeFailures, 1)
   518  			return fmt.Errorf("store doc %s: %w", doc.ID, err)
   519  		}
   520  		atomic.AddInt32(&r.writes, 1)
   521  	}
   522  	return nil
   523  }
   524  

View as plain text