1
2
3
4
5
6
7
8
9
10
11
12
13 package client
14
15 import (
16 "context"
17 "net/http"
18 "net/url"
19 "path/filepath"
20 "strings"
21 "testing"
22 "time"
23
24 "github.com/go-kivik/kivik/v4"
25 "github.com/go-kivik/kivik/v4/kiviktest/kt"
26 )
27
28 func init() {
29 kt.Register("Replicate", replicate)
30 }
31
32 func replicate(ctx *kt.Context) {
33 defer lockReplication(ctx)()
34 ctx.RunRW(func(ctx *kt.Context) {
35 ctx.RunAdmin(func(ctx *kt.Context) {
36 testReplication(ctx, ctx.Admin)
37 })
38 ctx.RunNoAuth(func(ctx *kt.Context) {
39 testReplication(ctx, ctx.NoAuth)
40 })
41 })
42 }
43
44 func callReplicate(ctx *kt.Context, client *kivik.Client, target, source, repID string, options kivik.Option) (*kivik.Replication, error) {
45 options = replicationOptions(ctx, client, target, source, repID, options)
46 var rep *kivik.Replication
47 err := kt.Retry(func() error {
48 var err error
49 rep, err = client.Replicate(context.Background(), target, source, options)
50 return err
51 })
52 return rep, err
53 }
54
55 func testReplication(ctx *kt.Context, client *kivik.Client) {
56 prefix := ctx.String("prefix")
57 switch prefix {
58 case "":
59 prefix = strings.TrimSuffix(client.DSN(), "/") + "/"
60 case "none":
61 prefix = ""
62 }
63 targetDB, sourceDB := ctx.TestDB(), ctx.TestDB()
64 dbtarget := prefix + targetDB
65 dbsource := prefix + sourceDB
66
67 db := ctx.Admin.DB(sourceDB)
68 if err := db.Err(); err != nil {
69 ctx.Fatalf("Failed to open db: %s", err)
70 }
71
72
73 for i := 0; i < 10; i++ {
74 id := ctx.TestDBName()
75 doc := struct {
76 ID string `json:"id"`
77 }{
78 ID: id,
79 }
80 if _, err := db.Put(context.Background(), doc.ID, doc); err != nil {
81 ctx.Fatalf("Failed to create doc: %s", err)
82 }
83 }
84
85 ctx.Run("group", func(ctx *kt.Context) {
86 ctx.Run("ValidReplication", func(ctx *kt.Context) {
87 ctx.Parallel()
88 tries := 3
89 success := false
90 for i := 0; i < tries; i++ {
91 success = doReplicationTest(ctx, client, dbtarget, dbsource)
92 if success {
93 break
94 }
95 }
96 if !success {
97 ctx.Errorf("Replication failed after %d tries", tries)
98 }
99 })
100 ctx.Run("MissingSource", func(ctx *kt.Context) {
101 ctx.Parallel()
102 doReplicationTest(ctx, client, dbtarget, ctx.MustString("NotFoundDB"))
103 })
104 ctx.Run("MissingTarget", func(ctx *kt.Context) {
105 ctx.Parallel()
106 doReplicationTest(ctx, client, ctx.MustString("NotFoundDB"), dbsource)
107 })
108 ctx.Run("Cancel", func(ctx *kt.Context) {
109 ctx.Parallel()
110 replID := ctx.TestDBName()
111 rep, err := callReplicate(ctx, client, dbtarget, "http://foo:foo@192.168.2.254/foo", replID, kivik.Param("continuous", true))
112 if !ctx.IsExpectedSuccess(err) {
113 return
114 }
115 ctx.T.Cleanup(func() { _ = rep.Delete(context.Background()) })
116 timeout := time.Duration(ctx.MustInt("timeoutSeconds")) * time.Second
117 cx, cancel := context.WithTimeout(context.Background(), timeout)
118 defer cancel()
119 ctx.CheckError(rep.Delete(context.Background()))
120 loop:
121 for rep.IsActive() {
122 if rep.State() == kivik.ReplicationStarted {
123 return
124 }
125 select {
126 case <-cx.Done():
127 break loop
128 default:
129 }
130 if err := rep.Update(cx); err != nil {
131 if kivik.HTTPStatus(err) == http.StatusNotFound {
132
133 break
134 }
135 ctx.Fatalf("Failed to read update: %s", err)
136 break
137 }
138 }
139 if err := cx.Err(); err != nil {
140 ctx.Fatalf("context was cancelled: %s", err)
141 }
142 if err := rep.Err(); err != nil {
143 ctx.Fatalf("Replication cancellation failed: %s", err)
144 }
145 })
146 })
147 }
148
149 func doReplicationTest(ctx *kt.Context, client *kivik.Client, dbtarget, dbsource string) (success bool) {
150 success = true
151 replID := ctx.TestDBName()
152 rep, err := callReplicate(ctx, client, dbtarget, dbsource, replID, nil)
153 if !ctx.IsExpectedSuccess(err) {
154 return success
155 }
156 ctx.T.Cleanup(func() { _ = rep.Delete(context.Background()) })
157 timeout := time.Duration(ctx.MustInt("timeoutSeconds")) * time.Second
158 cx, cancel := context.WithTimeout(context.Background(), timeout)
159 defer cancel()
160 var updateErr error
161 for rep.IsActive() {
162 select {
163 case <-cx.Done():
164 ctx.Fatalf("context cancelled waiting for replication: %s", cx.Err())
165 return success
166 default:
167 }
168 if updateErr = rep.Update(cx); updateErr != nil {
169 break
170 }
171 if rep.State() == "crashing" {
172
173
174 break
175 }
176 const delay = 100 * time.Millisecond
177 time.Sleep(delay)
178 }
179 if updateErr != nil {
180 ctx.Fatalf("Replication update failed: %s", updateErr)
181 }
182 ctx.Run("Results", func(ctx *kt.Context) {
183 err := rep.Err()
184 if kivik.HTTPStatus(err) == http.StatusRequestTimeout {
185 success = false
186 return
187 }
188 if !ctx.IsExpectedSuccess(err) {
189 return
190 }
191 switch ctx.String("mode") {
192 case "pouchdb":
193 if rep.ReplicationID() != "" {
194 ctx.Errorf("Did not expect replication ID")
195 }
196 default:
197 if rep.State() != "completed" && rep.State() != "failed" &&
198 rep.ReplicationID() == "" {
199 ctx.Errorf("Expected a replication ID")
200 }
201 }
202 checkReplicationURL(ctx.T, "source", dbsource, rep.Source)
203 checkReplicationURL(ctx.T, "target", dbtarget, rep.Target)
204 if rep.State() != kivik.ReplicationComplete {
205 ctx.Errorf("Replication failed to complete. Final state: %s\n", rep.State())
206 }
207 const (
208 pct100 = float64(100)
209 minProg = 0.0001
210 )
211 if (rep.Progress() - pct100) > minProg {
212 ctx.Errorf("Expected 100%% completion, got %%%02.2f", rep.Progress())
213 }
214 })
215 return success
216 }
217
218 func checkReplicationURL(t *testing.T, name, want, got string) {
219 t.Helper()
220 wantURL, err := url.Parse(want)
221 if err != nil {
222 t.Fatal(err)
223 }
224 gotURL, err := url.Parse(got)
225 if err != nil {
226 t.Fatal(err)
227 }
228 if !replicationURLsEqual(wantURL, gotURL) {
229 t.Errorf("Unexpected %s URL. Want: %s, got %s", name, want, got)
230 }
231 }
232
233 func replicationURLsEqual(want, got *url.URL) bool {
234 if want.User != nil && got.User != nil {
235 wantUser := want.User.Username()
236 gotUser := got.User.Username()
237 if wantUser != "" && gotUser != "" && wantUser != gotUser {
238 return false
239 }
240 }
241 want.User = nil
242 got.User = nil
243 want.Path = filepath.Join(want.Path, "")
244 got.Path = filepath.Join(got.Path, "")
245 return want.String() == got.String()
246 }
247
View as plain text