...

Source file src/github.com/go-kivik/kivik/v4/replication.go

Documentation: github.com/go-kivik/kivik/v4

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package kivik
    14  
    15  import (
    16  	"context"
    17  	"sync"
    18  	"time"
    19  
    20  	"github.com/go-kivik/kivik/v4/driver"
    21  )
    22  
    23  // ReplicationState represents a replication's state
    24  type ReplicationState string
    25  
    26  // The possible values for the _replication_state field in _replicator documents
    27  // plus a blank value for unstarted replications.
    28  const (
    29  	ReplicationNotStarted ReplicationState = ""
    30  	ReplicationStarted    ReplicationState = "triggered"
    31  	ReplicationError      ReplicationState = "error"
    32  	ReplicationComplete   ReplicationState = "completed"
    33  )
    34  
    35  // The additional possible values for the state field in the _scheduler docs.
    36  const (
    37  	ReplicationInitializing ReplicationState = "initializing"
    38  	ReplicationRunning      ReplicationState = "running"
    39  	ReplicationPending      ReplicationState = "pending"
    40  	ReplicationCrashing     ReplicationState = "crashing"
    41  	ReplicationFailed       ReplicationState = "failed"
    42  )
    43  
    44  // Replication represents a CouchDB replication process.
    45  type Replication struct {
    46  	Source string
    47  	Target string
    48  
    49  	infoMU    sync.RWMutex
    50  	info      *driver.ReplicationInfo
    51  	statusErr error
    52  	irep      driver.Replication
    53  }
    54  
    55  // DocsWritten returns the number of documents written, if known.
    56  func (r *Replication) DocsWritten() int64 {
    57  	if r != nil && r.info != nil {
    58  		r.infoMU.RLock()
    59  		defer r.infoMU.RUnlock()
    60  		return r.info.DocsWritten
    61  	}
    62  	return 0
    63  }
    64  
    65  // DocsRead returns the number of documents read, if known.
    66  func (r *Replication) DocsRead() int64 {
    67  	if r != nil && r.info != nil {
    68  		r.infoMU.RLock()
    69  		defer r.infoMU.RUnlock()
    70  		return r.info.DocsRead
    71  	}
    72  	return 0
    73  }
    74  
    75  // DocWriteFailures returns the number of doc write failures, if known.
    76  func (r *Replication) DocWriteFailures() int64 {
    77  	if r != nil && r.info != nil {
    78  		r.infoMU.RLock()
    79  		defer r.infoMU.RUnlock()
    80  		return r.info.DocWriteFailures
    81  	}
    82  	return 0
    83  }
    84  
    85  // Progress returns the current replication progress, if known.
    86  func (r *Replication) Progress() float64 {
    87  	if r != nil && r.info != nil {
    88  		r.infoMU.RLock()
    89  		defer r.infoMU.RUnlock()
    90  		return r.info.Progress
    91  	}
    92  	return 0
    93  }
    94  
    95  func newReplication(rep driver.Replication) *Replication {
    96  	return &Replication{
    97  		Source: rep.Source(),
    98  		Target: rep.Target(),
    99  		irep:   rep,
   100  	}
   101  }
   102  
   103  // ReplicationID returns the _replication_id field of the replicator document.
   104  func (r *Replication) ReplicationID() string {
   105  	return r.irep.ReplicationID()
   106  }
   107  
   108  // StartTime returns the replication start time, once the replication has been
   109  // triggered.
   110  func (r *Replication) StartTime() time.Time {
   111  	return r.irep.StartTime()
   112  }
   113  
   114  // EndTime returns the replication end time, once the replication has terminated.
   115  func (r *Replication) EndTime() time.Time {
   116  	return r.irep.EndTime()
   117  }
   118  
   119  // State returns the current replication state
   120  func (r *Replication) State() ReplicationState {
   121  	return ReplicationState(r.irep.State())
   122  }
   123  
   124  // Err returns the error, if any, that caused the replication to abort.
   125  func (r *Replication) Err() error {
   126  	if r == nil {
   127  		return nil
   128  	}
   129  	return r.irep.Err()
   130  }
   131  
   132  // IsActive returns true if the replication has not yet completed or
   133  // errored.
   134  func (r *Replication) IsActive() bool {
   135  	if r == nil {
   136  		return false
   137  	}
   138  	switch r.State() {
   139  	case ReplicationError, ReplicationComplete, ReplicationCrashing, ReplicationFailed:
   140  		return false
   141  	default:
   142  		return true
   143  	}
   144  }
   145  
   146  // Delete deletes a replication. If it is currently running, it will be
   147  // cancelled.
   148  func (r *Replication) Delete(ctx context.Context) error {
   149  	return r.irep.Delete(ctx)
   150  }
   151  
   152  // Update requests a replication state update from the server. If there is an
   153  // error retrieving the update, it is returned and the replication state is
   154  // unaltered.
   155  func (r *Replication) Update(ctx context.Context) error {
   156  	var info driver.ReplicationInfo
   157  	r.statusErr = r.irep.Update(ctx, &info)
   158  	if r.statusErr != nil {
   159  		return r.statusErr
   160  	}
   161  	r.infoMU.Lock()
   162  	r.info = &info
   163  	r.infoMU.Unlock()
   164  	return nil
   165  }
   166  
   167  // GetReplications returns a list of defined replications in the _replicator
   168  // database. Options are in the same format as to [DB.AllDocs], except that
   169  // "conflicts" and "update_seq" are ignored.
   170  func (c *Client) GetReplications(ctx context.Context, options ...Option) ([]*Replication, error) {
   171  	endQuery, err := c.startQuery()
   172  	if err != nil {
   173  		return nil, err
   174  	}
   175  	defer endQuery()
   176  	replicator, ok := c.driverClient.(driver.ClientReplicator)
   177  	if !ok {
   178  		return nil, errReplicationNotImplemented
   179  	}
   180  	reps, err := replicator.GetReplications(ctx, multiOptions(options))
   181  	if err != nil {
   182  		return nil, err
   183  	}
   184  	replications := make([]*Replication, len(reps))
   185  	for i, rep := range reps {
   186  		replications[i] = newReplication(rep)
   187  	}
   188  	return replications, nil
   189  }
   190  
   191  // Replicate initiates a replication from source to target.
   192  //
   193  // To use an object for either "source" or "target", pass the desired object
   194  // in options. This will override targetDSN and sourceDSN function parameters.
   195  func (c *Client) Replicate(ctx context.Context, targetDSN, sourceDSN string, options ...Option) (*Replication, error) {
   196  	endQuery, err := c.startQuery()
   197  	if err != nil {
   198  		return nil, err
   199  	}
   200  	defer endQuery()
   201  	replicator, ok := c.driverClient.(driver.ClientReplicator)
   202  	if !ok {
   203  		return nil, errReplicationNotImplemented
   204  	}
   205  	rep, err := replicator.Replicate(ctx, targetDSN, sourceDSN, multiOptions(options))
   206  	if err != nil {
   207  		return nil, err
   208  	}
   209  	return newReplication(rep), nil
   210  }
   211  
   212  // ReplicationInfo represents a snapshot of the status of a replication.
   213  type ReplicationInfo struct {
   214  	DocWriteFailures int64
   215  	DocsRead         int64
   216  	DocsWritten      int64
   217  	Progress         float64
   218  }
   219  

View as plain text