...
1
2
3
4
5
6
7
8
9
10
11
12
13 package kivik
14
15 import (
16 "context"
17 "sync"
18 "time"
19
20 "github.com/go-kivik/kivik/v4/driver"
21 )
22
23
24 type ReplicationState string
25
26
27
28 const (
29 ReplicationNotStarted ReplicationState = ""
30 ReplicationStarted ReplicationState = "triggered"
31 ReplicationError ReplicationState = "error"
32 ReplicationComplete ReplicationState = "completed"
33 )
34
35
36 const (
37 ReplicationInitializing ReplicationState = "initializing"
38 ReplicationRunning ReplicationState = "running"
39 ReplicationPending ReplicationState = "pending"
40 ReplicationCrashing ReplicationState = "crashing"
41 ReplicationFailed ReplicationState = "failed"
42 )
43
44
45 type Replication struct {
46 Source string
47 Target string
48
49 infoMU sync.RWMutex
50 info *driver.ReplicationInfo
51 statusErr error
52 irep driver.Replication
53 }
54
55
56 func (r *Replication) DocsWritten() int64 {
57 if r != nil && r.info != nil {
58 r.infoMU.RLock()
59 defer r.infoMU.RUnlock()
60 return r.info.DocsWritten
61 }
62 return 0
63 }
64
65
66 func (r *Replication) DocsRead() int64 {
67 if r != nil && r.info != nil {
68 r.infoMU.RLock()
69 defer r.infoMU.RUnlock()
70 return r.info.DocsRead
71 }
72 return 0
73 }
74
75
76 func (r *Replication) DocWriteFailures() int64 {
77 if r != nil && r.info != nil {
78 r.infoMU.RLock()
79 defer r.infoMU.RUnlock()
80 return r.info.DocWriteFailures
81 }
82 return 0
83 }
84
85
86 func (r *Replication) Progress() float64 {
87 if r != nil && r.info != nil {
88 r.infoMU.RLock()
89 defer r.infoMU.RUnlock()
90 return r.info.Progress
91 }
92 return 0
93 }
94
95 func newReplication(rep driver.Replication) *Replication {
96 return &Replication{
97 Source: rep.Source(),
98 Target: rep.Target(),
99 irep: rep,
100 }
101 }
102
103
104 func (r *Replication) ReplicationID() string {
105 return r.irep.ReplicationID()
106 }
107
108
109
110 func (r *Replication) StartTime() time.Time {
111 return r.irep.StartTime()
112 }
113
114
115 func (r *Replication) EndTime() time.Time {
116 return r.irep.EndTime()
117 }
118
119
120 func (r *Replication) State() ReplicationState {
121 return ReplicationState(r.irep.State())
122 }
123
124
125 func (r *Replication) Err() error {
126 if r == nil {
127 return nil
128 }
129 return r.irep.Err()
130 }
131
132
133
134 func (r *Replication) IsActive() bool {
135 if r == nil {
136 return false
137 }
138 switch r.State() {
139 case ReplicationError, ReplicationComplete, ReplicationCrashing, ReplicationFailed:
140 return false
141 default:
142 return true
143 }
144 }
145
146
147
148 func (r *Replication) Delete(ctx context.Context) error {
149 return r.irep.Delete(ctx)
150 }
151
152
153
154
155 func (r *Replication) Update(ctx context.Context) error {
156 var info driver.ReplicationInfo
157 r.statusErr = r.irep.Update(ctx, &info)
158 if r.statusErr != nil {
159 return r.statusErr
160 }
161 r.infoMU.Lock()
162 r.info = &info
163 r.infoMU.Unlock()
164 return nil
165 }
166
167
168
169
170 func (c *Client) GetReplications(ctx context.Context, options ...Option) ([]*Replication, error) {
171 endQuery, err := c.startQuery()
172 if err != nil {
173 return nil, err
174 }
175 defer endQuery()
176 replicator, ok := c.driverClient.(driver.ClientReplicator)
177 if !ok {
178 return nil, errReplicationNotImplemented
179 }
180 reps, err := replicator.GetReplications(ctx, multiOptions(options))
181 if err != nil {
182 return nil, err
183 }
184 replications := make([]*Replication, len(reps))
185 for i, rep := range reps {
186 replications[i] = newReplication(rep)
187 }
188 return replications, nil
189 }
190
191
192
193
194
195 func (c *Client) Replicate(ctx context.Context, targetDSN, sourceDSN string, options ...Option) (*Replication, error) {
196 endQuery, err := c.startQuery()
197 if err != nil {
198 return nil, err
199 }
200 defer endQuery()
201 replicator, ok := c.driverClient.(driver.ClientReplicator)
202 if !ok {
203 return nil, errReplicationNotImplemented
204 }
205 rep, err := replicator.Replicate(ctx, targetDSN, sourceDSN, multiOptions(options))
206 if err != nil {
207 return nil, err
208 }
209 return newReplication(rep), nil
210 }
211
212
213 type ReplicationInfo struct {
214 DocWriteFailures int64
215 DocsRead int64
216 DocsWritten int64
217 Progress float64
218 }
219
View as plain text