...

Source file src/github.com/go-kivik/kivik/v4/couchdb/replication.go

Documentation: github.com/go-kivik/kivik/v4/couchdb

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package couchdb
    14  
    15  import (
    16  	"bytes"
    17  	"context"
    18  	"encoding/json"
    19  	"errors"
    20  	"fmt"
    21  	"net/http"
    22  	"strconv"
    23  	"strings"
    24  	"sync"
    25  	"time"
    26  
    27  	kivik "github.com/go-kivik/kivik/v4"
    28  	"github.com/go-kivik/kivik/v4/couchdb/chttp"
    29  	"github.com/go-kivik/kivik/v4/driver"
    30  	internal "github.com/go-kivik/kivik/v4/int/errors"
    31  )
    32  
    33  type replicationError struct {
    34  	status int
    35  	reason string
    36  }
    37  
    38  func (re *replicationError) Error() string {
    39  	return re.reason
    40  }
    41  
    42  func (re *replicationError) HTTPStatus() int {
    43  	return re.status
    44  }
    45  
    46  func (re *replicationError) UnmarshalJSON(data []byte) error {
    47  	if err := json.Unmarshal(data, &re.reason); err != nil {
    48  		return err
    49  	}
    50  	switch (strings.SplitN(re.reason, ":", 2))[0] { // nolint:gomnd
    51  	case "db_not_found":
    52  		re.status = http.StatusNotFound
    53  	case "timeout":
    54  		re.status = http.StatusRequestTimeout
    55  	case "unauthorized":
    56  		re.status = http.StatusUnauthorized
    57  	default:
    58  		re.status = http.StatusInternalServerError
    59  	}
    60  	return nil
    61  }
    62  
    63  type replicationStateTime time.Time
    64  
    65  func (t *replicationStateTime) UnmarshalJSON(data []byte) error {
    66  	input := string(bytes.Trim(data, `"`))
    67  	if ts, err := time.Parse(time.RFC3339, input); err == nil {
    68  		*t = replicationStateTime(ts)
    69  		return nil
    70  	}
    71  	// Fallback for really old versions of CouchDB
    72  	if seconds, err := strconv.ParseInt(input, 10, 64); err == nil { // nolint:gomnd
    73  		epochTime := replicationStateTime(time.Unix(seconds, 0).UTC())
    74  		*t = epochTime
    75  		return nil
    76  	}
    77  	return &internal.Error{Status: http.StatusBadGateway, Err: fmt.Errorf("kivik: '%s' does not appear to be a valid timestamp", string(data))}
    78  }
    79  
    80  type replication struct {
    81  	docID         string
    82  	replicationID string
    83  	source        string
    84  	target        string
    85  	startTime     time.Time
    86  	endTime       time.Time
    87  	state         string
    88  	err           error
    89  
    90  	// mu protects the above values
    91  	mu sync.RWMutex
    92  
    93  	*db
    94  }
    95  
    96  var _ driver.Replication = &replication{}
    97  
    98  func (c *client) fetchReplication(ctx context.Context, docID string) *replication {
    99  	rep := c.newReplication(docID)
   100  	rep.db = &db{client: c, dbName: "_replicator"}
   101  	// Do an update to get the initial state, but don't fail if there's an error
   102  	// at this stage, because we successfully created the replication doc.
   103  	_ = rep.updateMain(ctx)
   104  	return rep
   105  }
   106  
   107  func (c *client) newReplication(docID string) *replication {
   108  	return &replication{
   109  		docID: docID,
   110  		db: &db{
   111  			client: c,
   112  			dbName: "_replicator",
   113  		},
   114  	}
   115  }
   116  
   117  func (r *replication) readLock() func() {
   118  	r.mu.RLock()
   119  	return r.mu.RUnlock
   120  }
   121  
   122  func (r *replication) ReplicationID() string { defer r.readLock()(); return r.replicationID }
   123  func (r *replication) Source() string        { defer r.readLock()(); return r.source }
   124  func (r *replication) Target() string        { defer r.readLock()(); return r.target }
   125  func (r *replication) StartTime() time.Time  { defer r.readLock()(); return r.startTime }
   126  func (r *replication) EndTime() time.Time    { defer r.readLock()(); return r.endTime }
   127  func (r *replication) State() string         { defer r.readLock()(); return r.state }
   128  func (r *replication) Err() error            { defer r.readLock()(); return r.err }
   129  
   130  func (r *replication) Update(ctx context.Context, state *driver.ReplicationInfo) error {
   131  	if err := r.updateMain(ctx); err != nil {
   132  		return err
   133  	}
   134  	if r.State() == "complete" {
   135  		state.Progress = 100
   136  		return nil
   137  	}
   138  	info, err := r.updateActiveTasks(ctx)
   139  	if err != nil {
   140  		if kivik.HTTPStatus(err) == http.StatusNotFound {
   141  			// not listed in _active_tasks (because the replication is done, or
   142  			// hasn't yet started), but this isn't an error
   143  			return nil
   144  		}
   145  		return err
   146  	}
   147  	state.DocWriteFailures = info.DocWriteFailures
   148  	state.DocsRead = info.DocsRead
   149  	state.DocsWritten = info.DocsWritten
   150  	// state.progress = info.Progress
   151  	return nil
   152  }
   153  
   154  type activeTask struct {
   155  	Type             string `json:"type"`
   156  	ReplicationID    string `json:"replication_id"`
   157  	DocsWritten      int64  `json:"docs_written"`
   158  	DocsRead         int64  `json:"docs_read"`
   159  	DocWriteFailures int64  `json:"doc_write_failures"`
   160  }
   161  
   162  func (r *replication) updateActiveTasks(ctx context.Context) (*activeTask, error) {
   163  	resp, err := r.client.DoReq(ctx, http.MethodGet, "/_active_tasks", nil)
   164  	if err != nil {
   165  		return nil, err
   166  	}
   167  	if err = chttp.ResponseError(resp); err != nil {
   168  		return nil, err
   169  	}
   170  	defer chttp.CloseBody(resp.Body)
   171  	var tasks []*activeTask
   172  	if err = json.NewDecoder(resp.Body).Decode(&tasks); err != nil {
   173  		return nil, &internal.Error{Status: http.StatusBadGateway, Err: err}
   174  	}
   175  	for _, task := range tasks {
   176  		if task.Type != "replication" {
   177  			continue
   178  		}
   179  		repIDparts := strings.SplitN(task.ReplicationID, "+", 2) // nolint:gomnd
   180  		if repIDparts[0] != r.replicationID {
   181  			continue
   182  		}
   183  		return task, nil
   184  	}
   185  	return nil, &internal.Error{Status: http.StatusNotFound, Err: errors.New("task not found")}
   186  }
   187  
   188  // updateMain updates the "main" fields: those stored directly in r.
   189  func (r *replication) updateMain(ctx context.Context) error {
   190  	doc, err := r.getReplicatorDoc(ctx)
   191  	if err != nil {
   192  		return err
   193  	}
   194  	r.setFromReplicatorDoc(doc)
   195  	return nil
   196  }
   197  
   198  func (r *replication) getReplicatorDoc(ctx context.Context) (*replicatorDoc, error) {
   199  	result, err := r.db.Get(ctx, r.docID, kivik.Params(nil))
   200  	if err != nil {
   201  		return nil, err
   202  	}
   203  	var doc replicatorDoc
   204  	err = json.NewDecoder(result.Body).Decode(&doc)
   205  	return &doc, err
   206  }
   207  
   208  func (r *replication) setFromReplicatorDoc(doc *replicatorDoc) {
   209  	r.mu.Lock()
   210  	defer r.mu.Unlock()
   211  	switch kivik.ReplicationState(doc.State) {
   212  	case kivik.ReplicationStarted:
   213  		r.startTime = time.Time(doc.StateTime)
   214  	case kivik.ReplicationError, kivik.ReplicationComplete:
   215  		r.endTime = time.Time(doc.StateTime)
   216  	}
   217  	r.state = doc.State
   218  	if doc.Error != nil {
   219  		r.err = doc.Error
   220  	} else {
   221  		r.err = nil
   222  	}
   223  	if r.source == "" {
   224  		r.source = doc.Source
   225  	}
   226  	if r.target == "" {
   227  		r.target = doc.Target
   228  	}
   229  	if r.replicationID == "" {
   230  		r.replicationID = doc.ReplicationID
   231  	}
   232  }
   233  
   234  func (r *replication) Delete(ctx context.Context) error {
   235  	rev, err := r.GetRev(ctx, r.docID, kivik.Params(nil))
   236  	if err != nil {
   237  		return err
   238  	}
   239  	_, err = r.db.Delete(ctx, r.docID, kivik.Rev(rev))
   240  	return err
   241  }
   242  
   243  type replicatorDoc struct {
   244  	DocID         string               `json:"_id"`
   245  	ReplicationID string               `json:"_replication_id"`
   246  	Source        string               `json:"source"`
   247  	Target        string               `json:"target"`
   248  	State         string               `json:"_replication_state"`
   249  	StateTime     replicationStateTime `json:"_replication_state_time"`
   250  	Error         *replicationError    `json:"_replication_state_reason,omitempty"`
   251  }
   252  
   253  func (c *client) GetReplications(ctx context.Context, options driver.Options) ([]driver.Replication, error) {
   254  	scheduler, err := c.schedulerSupported(ctx)
   255  	if err != nil {
   256  		return nil, err
   257  	}
   258  	opts := map[string]interface{}{}
   259  	options.Apply(opts)
   260  	if scheduler {
   261  		return c.getReplicationsFromScheduler(ctx, opts)
   262  	}
   263  	return c.legacyGetReplications(ctx, opts)
   264  }
   265  
   266  func (c *client) legacyGetReplications(ctx context.Context, opts map[string]interface{}) ([]driver.Replication, error) {
   267  	if opts == nil {
   268  		opts = map[string]interface{}{}
   269  	}
   270  	delete(opts, "conflicts")
   271  	delete(opts, "update_seq")
   272  	opts["include_docs"] = true
   273  	params, err := optionsToParams(opts)
   274  	if err != nil {
   275  		return nil, err
   276  	}
   277  	var result struct {
   278  		Rows []struct {
   279  			Doc replicatorDoc `json:"doc"`
   280  		} `json:"rows"`
   281  	}
   282  	path := "/_replicator/_all_docs?" + params.Encode()
   283  	if err = c.DoJSON(ctx, http.MethodGet, path, nil, &result); err != nil {
   284  		return nil, err
   285  	}
   286  	reps := make([]driver.Replication, 0, len(result.Rows))
   287  	for _, row := range result.Rows {
   288  		if row.Doc.DocID == "_design/_replicator" {
   289  			continue
   290  		}
   291  		rep := c.newReplication(row.Doc.DocID)
   292  		rep.setFromReplicatorDoc(&row.Doc)
   293  		reps = append(reps, rep)
   294  	}
   295  	return reps, nil
   296  }
   297  
   298  func (c *client) Replicate(ctx context.Context, targetDSN, sourceDSN string, options driver.Options) (driver.Replication, error) {
   299  	opts := map[string]interface{}{}
   300  	options.Apply(opts)
   301  	// Allow overriding source and target with options, i.e. for auth options
   302  	if _, ok := opts["source"]; !ok {
   303  		opts["source"] = sourceDSN
   304  	}
   305  	if _, ok := opts["target"]; !ok {
   306  		opts["target"] = targetDSN
   307  	}
   308  	if t := opts["target"]; t == "" {
   309  		return nil, missingArg("targetDSN")
   310  	}
   311  	if s := opts["source"]; s == "" {
   312  		return nil, missingArg("sourceDSN")
   313  	}
   314  
   315  	scheduler, err := c.schedulerSupported(ctx)
   316  	if err != nil {
   317  		return nil, err
   318  	}
   319  	chttpOpts := &chttp.Options{
   320  		Body: chttp.EncodeBody(opts),
   321  	}
   322  
   323  	var repStub struct {
   324  		ID string `json:"id"`
   325  	}
   326  	if e := c.Client.DoJSON(ctx, http.MethodPost, "/_replicator", chttpOpts, &repStub); e != nil {
   327  		return nil, e
   328  	}
   329  	if scheduler {
   330  		return c.fetchSchedulerReplication(ctx, repStub.ID)
   331  	}
   332  	return c.fetchReplication(ctx, repStub.ID), nil
   333  }
   334  

View as plain text