...

Source file src/github.com/go-kivik/kivik/v4/kiviktest/client/replicate.go

Documentation: github.com/go-kivik/kivik/v4/kiviktest/client

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package client
    14  
    15  import (
    16  	"context"
    17  	"net/http"
    18  	"net/url"
    19  	"path/filepath"
    20  	"strings"
    21  	"testing"
    22  	"time"
    23  
    24  	"github.com/go-kivik/kivik/v4"
    25  	"github.com/go-kivik/kivik/v4/kiviktest/kt"
    26  )
    27  
    28  func init() {
    29  	kt.Register("Replicate", replicate)
    30  }
    31  
    32  func replicate(ctx *kt.Context) {
    33  	defer lockReplication(ctx)()
    34  	ctx.RunRW(func(ctx *kt.Context) {
    35  		ctx.RunAdmin(func(ctx *kt.Context) {
    36  			testReplication(ctx, ctx.Admin)
    37  		})
    38  		ctx.RunNoAuth(func(ctx *kt.Context) {
    39  			testReplication(ctx, ctx.NoAuth)
    40  		})
    41  	})
    42  }
    43  
    44  func callReplicate(ctx *kt.Context, client *kivik.Client, target, source, repID string, options kivik.Option) (*kivik.Replication, error) {
    45  	options = replicationOptions(ctx, client, target, source, repID, options)
    46  	var rep *kivik.Replication
    47  	err := kt.Retry(func() error {
    48  		var err error
    49  		rep, err = client.Replicate(context.Background(), target, source, options)
    50  		return err
    51  	})
    52  	return rep, err
    53  }
    54  
    55  func testReplication(ctx *kt.Context, client *kivik.Client) {
    56  	prefix := ctx.String("prefix")
    57  	switch prefix {
    58  	case "":
    59  		prefix = strings.TrimSuffix(client.DSN(), "/") + "/"
    60  	case "none":
    61  		prefix = ""
    62  	}
    63  	targetDB, sourceDB := ctx.TestDB(), ctx.TestDB()
    64  	dbtarget := prefix + targetDB
    65  	dbsource := prefix + sourceDB
    66  
    67  	db := ctx.Admin.DB(sourceDB)
    68  	if err := db.Err(); err != nil {
    69  		ctx.Fatalf("Failed to open db: %s", err)
    70  	}
    71  
    72  	// Create 10 docs for testing sync
    73  	for i := 0; i < 10; i++ {
    74  		id := ctx.TestDBName()
    75  		doc := struct {
    76  			ID string `json:"id"`
    77  		}{
    78  			ID: id,
    79  		}
    80  		if _, err := db.Put(context.Background(), doc.ID, doc); err != nil {
    81  			ctx.Fatalf("Failed to create doc: %s", err)
    82  		}
    83  	}
    84  
    85  	ctx.Run("group", func(ctx *kt.Context) {
    86  		ctx.Run("ValidReplication", func(ctx *kt.Context) {
    87  			ctx.Parallel()
    88  			tries := 3
    89  			success := false
    90  			for i := 0; i < tries; i++ {
    91  				success = doReplicationTest(ctx, client, dbtarget, dbsource)
    92  				if success {
    93  					break
    94  				}
    95  			}
    96  			if !success {
    97  				ctx.Errorf("Replication failed after %d tries", tries)
    98  			}
    99  		})
   100  		ctx.Run("MissingSource", func(ctx *kt.Context) {
   101  			ctx.Parallel()
   102  			doReplicationTest(ctx, client, dbtarget, ctx.MustString("NotFoundDB"))
   103  		})
   104  		ctx.Run("MissingTarget", func(ctx *kt.Context) {
   105  			ctx.Parallel()
   106  			doReplicationTest(ctx, client, ctx.MustString("NotFoundDB"), dbsource)
   107  		})
   108  		ctx.Run("Cancel", func(ctx *kt.Context) {
   109  			ctx.Parallel()
   110  			replID := ctx.TestDBName()
   111  			rep, err := callReplicate(ctx, client, dbtarget, "http://foo:foo@192.168.2.254/foo", replID, kivik.Param("continuous", true))
   112  			if !ctx.IsExpectedSuccess(err) {
   113  				return
   114  			}
   115  			ctx.T.Cleanup(func() { _ = rep.Delete(context.Background()) })
   116  			timeout := time.Duration(ctx.MustInt("timeoutSeconds")) * time.Second
   117  			cx, cancel := context.WithTimeout(context.Background(), timeout)
   118  			defer cancel()
   119  			ctx.CheckError(rep.Delete(context.Background()))
   120  		loop:
   121  			for rep.IsActive() {
   122  				if rep.State() == kivik.ReplicationStarted {
   123  					return
   124  				}
   125  				select {
   126  				case <-cx.Done():
   127  					break loop
   128  				default:
   129  				}
   130  				if err := rep.Update(cx); err != nil {
   131  					if kivik.HTTPStatus(err) == http.StatusNotFound {
   132  						// NotFound expected after the replication is cancelled
   133  						break
   134  					}
   135  					ctx.Fatalf("Failed to read update: %s", err)
   136  					break
   137  				}
   138  			}
   139  			if err := cx.Err(); err != nil {
   140  				ctx.Fatalf("context was cancelled: %s", err)
   141  			}
   142  			if err := rep.Err(); err != nil {
   143  				ctx.Fatalf("Replication cancellation failed: %s", err)
   144  			}
   145  		})
   146  	})
   147  }
   148  
   149  func doReplicationTest(ctx *kt.Context, client *kivik.Client, dbtarget, dbsource string) (success bool) {
   150  	success = true
   151  	replID := ctx.TestDBName()
   152  	rep, err := callReplicate(ctx, client, dbtarget, dbsource, replID, nil)
   153  	if !ctx.IsExpectedSuccess(err) {
   154  		return success
   155  	}
   156  	ctx.T.Cleanup(func() { _ = rep.Delete(context.Background()) })
   157  	timeout := time.Duration(ctx.MustInt("timeoutSeconds")) * time.Second
   158  	cx, cancel := context.WithTimeout(context.Background(), timeout)
   159  	defer cancel()
   160  	var updateErr error
   161  	for rep.IsActive() {
   162  		select {
   163  		case <-cx.Done():
   164  			ctx.Fatalf("context cancelled waiting for replication: %s", cx.Err())
   165  			return success
   166  		default:
   167  		}
   168  		if updateErr = rep.Update(cx); updateErr != nil {
   169  			break
   170  		}
   171  		if rep.State() == "crashing" {
   172  			// 2.1 treats Not Found as a temporary error (on the theory the missing
   173  			// db could be created), so this short-circuits.
   174  			break
   175  		}
   176  		const delay = 100 * time.Millisecond
   177  		time.Sleep(delay)
   178  	}
   179  	if updateErr != nil {
   180  		ctx.Fatalf("Replication update failed: %s", updateErr)
   181  	}
   182  	ctx.Run("Results", func(ctx *kt.Context) {
   183  		err := rep.Err()
   184  		if kivik.HTTPStatus(err) == http.StatusRequestTimeout {
   185  			success = false // Allow retrying
   186  			return
   187  		}
   188  		if !ctx.IsExpectedSuccess(err) {
   189  			return
   190  		}
   191  		switch ctx.String("mode") {
   192  		case "pouchdb":
   193  			if rep.ReplicationID() != "" {
   194  				ctx.Errorf("Did not expect replication ID")
   195  			}
   196  		default:
   197  			if rep.State() != "completed" && rep.State() != "failed" && // 2.1.x
   198  				rep.ReplicationID() == "" {
   199  				ctx.Errorf("Expected a replication ID")
   200  			}
   201  		}
   202  		checkReplicationURL(ctx.T, "source", dbsource, rep.Source)
   203  		checkReplicationURL(ctx.T, "target", dbtarget, rep.Target)
   204  		if rep.State() != kivik.ReplicationComplete {
   205  			ctx.Errorf("Replication failed to complete. Final state: %s\n", rep.State())
   206  		}
   207  		const (
   208  			pct100  = float64(100)
   209  			minProg = 0.0001
   210  		)
   211  		if (rep.Progress() - pct100) > minProg {
   212  			ctx.Errorf("Expected 100%% completion, got %%%02.2f", rep.Progress())
   213  		}
   214  	})
   215  	return success
   216  }
   217  
   218  func checkReplicationURL(t *testing.T, name, want, got string) {
   219  	t.Helper()
   220  	wantURL, err := url.Parse(want)
   221  	if err != nil {
   222  		t.Fatal(err)
   223  	}
   224  	gotURL, err := url.Parse(got)
   225  	if err != nil {
   226  		t.Fatal(err)
   227  	}
   228  	if !replicationURLsEqual(wantURL, gotURL) {
   229  		t.Errorf("Unexpected %s URL. Want: %s, got %s", name, want, got)
   230  	}
   231  }
   232  
   233  func replicationURLsEqual(want, got *url.URL) bool {
   234  	if want.User != nil && got.User != nil {
   235  		wantUser := want.User.Username()
   236  		gotUser := got.User.Username()
   237  		if wantUser != "" && gotUser != "" && wantUser != gotUser {
   238  			return false
   239  		}
   240  	}
   241  	want.User = nil
   242  	got.User = nil
   243  	want.Path = filepath.Join(want.Path, "")
   244  	got.Path = filepath.Join(got.Path, "")
   245  	return want.String() == got.String()
   246  }
   247  

View as plain text