1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pouchdb
16
17 import (
18 "context"
19 "net/http"
20 "sync"
21 "time"
22
23 "github.com/gopherjs/gopherjs/js"
24
25 kivik "github.com/go-kivik/kivik/v4"
26 "github.com/go-kivik/kivik/v4/driver"
27 internal "github.com/go-kivik/kivik/v4/int/errors"
28 "github.com/go-kivik/kivik/v4/pouchdb/bindings"
29 )
30
31 type replication struct {
32 source string
33 target string
34 startTime time.Time
35 endTime time.Time
36 state kivik.ReplicationState
37 err error
38
39
40 mu sync.RWMutex
41
42 client *client
43 rh *replicationHandler
44 }
45
46 var _ driver.Replication = &replication{}
47
48 func (c *client) newReplication(target, source string, rep *js.Object) *replication {
49 r := &replication{
50 target: target,
51 source: source,
52 rh: newReplicationHandler(rep),
53 client: c,
54 }
55 c.replicationsMU.Lock()
56 defer c.replicationsMU.Unlock()
57 c.replications = append(c.replications, r)
58 return r
59 }
60
61 func (r *replication) readLock() func() {
62 r.mu.RLock()
63 return r.mu.RUnlock
64 }
65
66 func (r *replication) ReplicationID() string { return "" }
67 func (r *replication) Source() string { defer r.readLock()(); return r.source }
68 func (r *replication) Target() string { defer r.readLock()(); return r.target }
69 func (r *replication) StartTime() time.Time { defer r.readLock()(); return r.startTime }
70 func (r *replication) EndTime() time.Time { defer r.readLock()(); return r.endTime }
71 func (r *replication) State() string { defer r.readLock()(); return string(r.state) }
72 func (r *replication) Err() error { defer r.readLock()(); return r.err }
73
74 func (r *replication) Update(_ context.Context, state *driver.ReplicationInfo) (err error) {
75 defer bindings.RecoverError(&err)
76 r.mu.Lock()
77 defer r.mu.Unlock()
78 event, info, err := r.rh.Status()
79 if err != nil {
80 return err
81 }
82 switch event {
83 case bindings.ReplicationEventDenied, bindings.ReplicationEventError:
84 r.state = kivik.ReplicationError
85 r.err = bindings.NewPouchError(info.Object)
86 case bindings.ReplicationEventComplete:
87 r.state = kivik.ReplicationComplete
88 case bindings.ReplicationEventPaused, bindings.ReplicationEventChange, bindings.ReplicationEventActive:
89 r.state = kivik.ReplicationStarted
90 }
91 if info != nil {
92 startTime, endTime := info.StartTime(), info.EndTime()
93 if r.startTime.IsZero() && !startTime.IsZero() {
94 r.startTime = startTime
95 }
96 if r.endTime.IsZero() && !endTime.IsZero() {
97 r.endTime = endTime
98 }
99 if r.rh.state != nil {
100 state.DocWriteFailures = r.rh.state.DocWriteFailures
101 state.DocsRead = r.rh.state.DocsRead
102 state.DocsWritten = r.rh.state.DocsWritten
103 }
104 }
105 return nil
106 }
107
108 func (r *replication) Delete(context.Context) (err error) {
109 defer bindings.RecoverError(&err)
110 r.rh.Cancel()
111 r.client.replicationsMU.Lock()
112 defer r.client.replicationsMU.Unlock()
113 for i, rep := range r.client.replications {
114 if rep == r {
115 last := len(r.client.replications) - 1
116 r.client.replications[i] = r.client.replications[last]
117 r.client.replications[last] = nil
118 r.client.replications = r.client.replications[:last]
119 return nil
120 }
121 }
122 return &internal.Error{Status: http.StatusNotFound, Message: "replication not found"}
123 }
124
125 func replicationEndpoint(dsn string, object interface{}) (name string, obj interface{}, err error) {
126 defer bindings.RecoverError(&err)
127 if object == nil {
128 return dsn, dsn, nil
129 }
130 switch t := object.(type) {
131 case *js.Object:
132 tx := object.(*js.Object)
133
134 return tx.Get("name").String(), tx, nil
135 case *bindings.DB:
136
137 return t.Object.Get("name").String(), t.Object, nil
138 }
139
140 return "<unknown>", obj, nil
141 }
142
143 func (c *client) Replicate(_ context.Context, targetDSN, sourceDSN string, options driver.Options) (driver.Replication, error) {
144 pouchOpts := map[string]interface{}{}
145 options.Apply(pouchOpts)
146
147 sourceName, sourceObj, err := replicationEndpoint(sourceDSN, pouchOpts["source"])
148 if err != nil {
149 return nil, err
150 }
151 targetName, targetObj, err := replicationEndpoint(targetDSN, pouchOpts["target"])
152 if err != nil {
153 return nil, err
154 }
155 delete(pouchOpts, "source")
156 delete(pouchOpts, "target")
157 rep, err := c.pouch.Replicate(sourceObj, targetObj, pouchOpts)
158 if err != nil {
159 return nil, err
160 }
161 return c.newReplication(targetName, sourceName, rep), nil
162 }
163
164 func (c *client) GetReplications(context.Context, driver.Options) ([]driver.Replication, error) {
165 c.replicationsMU.RLock()
166 defer c.replicationsMU.RUnlock()
167 reps := make([]driver.Replication, len(c.replications))
168 for i, rep := range c.replications {
169 reps[i] = rep
170 }
171 return reps, nil
172 }
173
View as plain text