...

Source file src/github.com/go-kivik/kivik/v4/pouchdb/replication.go

Documentation: github.com/go-kivik/kivik/v4/pouchdb

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  //go:build js
    14  
    15  package pouchdb
    16  
    17  import (
    18  	"context"
    19  	"net/http"
    20  	"sync"
    21  	"time"
    22  
    23  	"github.com/gopherjs/gopherjs/js"
    24  
    25  	kivik "github.com/go-kivik/kivik/v4"
    26  	"github.com/go-kivik/kivik/v4/driver"
    27  	internal "github.com/go-kivik/kivik/v4/int/errors"
    28  	"github.com/go-kivik/kivik/v4/pouchdb/bindings"
    29  )
    30  
    31  type replication struct {
    32  	source    string
    33  	target    string
    34  	startTime time.Time
    35  	endTime   time.Time
    36  	state     kivik.ReplicationState
    37  	err       error
    38  
    39  	// mu protects the above values
    40  	mu sync.RWMutex
    41  
    42  	client *client
    43  	rh     *replicationHandler
    44  }
    45  
    46  var _ driver.Replication = &replication{}
    47  
    48  func (c *client) newReplication(target, source string, rep *js.Object) *replication {
    49  	r := &replication{
    50  		target: target,
    51  		source: source,
    52  		rh:     newReplicationHandler(rep),
    53  		client: c,
    54  	}
    55  	c.replicationsMU.Lock()
    56  	defer c.replicationsMU.Unlock()
    57  	c.replications = append(c.replications, r)
    58  	return r
    59  }
    60  
    61  func (r *replication) readLock() func() {
    62  	r.mu.RLock()
    63  	return r.mu.RUnlock
    64  }
    65  
    66  func (r *replication) ReplicationID() string { return "" }
    67  func (r *replication) Source() string        { defer r.readLock()(); return r.source }
    68  func (r *replication) Target() string        { defer r.readLock()(); return r.target }
    69  func (r *replication) StartTime() time.Time  { defer r.readLock()(); return r.startTime }
    70  func (r *replication) EndTime() time.Time    { defer r.readLock()(); return r.endTime }
    71  func (r *replication) State() string         { defer r.readLock()(); return string(r.state) }
    72  func (r *replication) Err() error            { defer r.readLock()(); return r.err }
    73  
    74  func (r *replication) Update(_ context.Context, state *driver.ReplicationInfo) (err error) {
    75  	defer bindings.RecoverError(&err)
    76  	r.mu.Lock()
    77  	defer r.mu.Unlock()
    78  	event, info, err := r.rh.Status()
    79  	if err != nil {
    80  		return err
    81  	}
    82  	switch event {
    83  	case bindings.ReplicationEventDenied, bindings.ReplicationEventError:
    84  		r.state = kivik.ReplicationError
    85  		r.err = bindings.NewPouchError(info.Object)
    86  	case bindings.ReplicationEventComplete:
    87  		r.state = kivik.ReplicationComplete
    88  	case bindings.ReplicationEventPaused, bindings.ReplicationEventChange, bindings.ReplicationEventActive:
    89  		r.state = kivik.ReplicationStarted
    90  	}
    91  	if info != nil {
    92  		startTime, endTime := info.StartTime(), info.EndTime()
    93  		if r.startTime.IsZero() && !startTime.IsZero() {
    94  			r.startTime = startTime
    95  		}
    96  		if r.endTime.IsZero() && !endTime.IsZero() {
    97  			r.endTime = endTime
    98  		}
    99  		if r.rh.state != nil {
   100  			state.DocWriteFailures = r.rh.state.DocWriteFailures
   101  			state.DocsRead = r.rh.state.DocsRead
   102  			state.DocsWritten = r.rh.state.DocsWritten
   103  		}
   104  	}
   105  	return nil
   106  }
   107  
   108  func (r *replication) Delete(context.Context) (err error) {
   109  	defer bindings.RecoverError(&err)
   110  	r.rh.Cancel()
   111  	r.client.replicationsMU.Lock()
   112  	defer r.client.replicationsMU.Unlock()
   113  	for i, rep := range r.client.replications {
   114  		if rep == r {
   115  			last := len(r.client.replications) - 1
   116  			r.client.replications[i] = r.client.replications[last]
   117  			r.client.replications[last] = nil
   118  			r.client.replications = r.client.replications[:last]
   119  			return nil
   120  		}
   121  	}
   122  	return &internal.Error{Status: http.StatusNotFound, Message: "replication not found"}
   123  }
   124  
   125  func replicationEndpoint(dsn string, object interface{}) (name string, obj interface{}, err error) {
   126  	defer bindings.RecoverError(&err)
   127  	if object == nil {
   128  		return dsn, dsn, nil
   129  	}
   130  	switch t := object.(type) {
   131  	case *js.Object:
   132  		tx := object.(*js.Object) // https://github.com/gopherjs/gopherjs/issues/682
   133  		// Assume it's a raw PouchDB object
   134  		return tx.Get("name").String(), tx, nil
   135  	case *bindings.DB:
   136  		// Unwrap the bare object
   137  		return t.Object.Get("name").String(), t.Object, nil
   138  	}
   139  	// Just let it pass through
   140  	return "<unknown>", obj, nil
   141  }
   142  
   143  func (c *client) Replicate(_ context.Context, targetDSN, sourceDSN string, options driver.Options) (driver.Replication, error) {
   144  	pouchOpts := map[string]interface{}{}
   145  	options.Apply(pouchOpts)
   146  	// Allow overriding source and target with options, i.e. for PouchDB objects
   147  	sourceName, sourceObj, err := replicationEndpoint(sourceDSN, pouchOpts["source"])
   148  	if err != nil {
   149  		return nil, err
   150  	}
   151  	targetName, targetObj, err := replicationEndpoint(targetDSN, pouchOpts["target"])
   152  	if err != nil {
   153  		return nil, err
   154  	}
   155  	delete(pouchOpts, "source")
   156  	delete(pouchOpts, "target")
   157  	rep, err := c.pouch.Replicate(sourceObj, targetObj, pouchOpts)
   158  	if err != nil {
   159  		return nil, err
   160  	}
   161  	return c.newReplication(targetName, sourceName, rep), nil
   162  }
   163  
   164  func (c *client) GetReplications(context.Context, driver.Options) ([]driver.Replication, error) {
   165  	c.replicationsMU.RLock()
   166  	defer c.replicationsMU.RUnlock()
   167  	reps := make([]driver.Replication, len(c.replications))
   168  	for i, rep := range c.replications {
   169  		reps[i] = rep
   170  	}
   171  	return reps, nil
   172  }
   173  

View as plain text