...

Source file src/github.com/go-kivik/kivik/v4/kiviktest/db/changes.go

Documentation: github.com/go-kivik/kivik/v4/kiviktest/db

     1  // Licensed under the Apache License, Version 2.0 (the "License"); you may not
     2  // use this file except in compliance with the License. You may obtain a copy of
     3  // the License at
     4  //
     5  //  http://www.apache.org/licenses/LICENSE-2.0
     6  //
     7  // Unless required by applicable law or agreed to in writing, software
     8  // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
     9  // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
    10  // License for the specific language governing permissions and limitations under
    11  // the License.
    12  
    13  package db
    14  
    15  import (
    16  	"context"
    17  	"sort"
    18  	"time"
    19  
    20  	"gitlab.com/flimzy/testy"
    21  
    22  	"github.com/go-kivik/kivik/v4"
    23  	"github.com/go-kivik/kivik/v4/kiviktest/kt"
    24  )
    25  
    26  func init() {
    27  	kt.Register("Changes", changes)
    28  }
    29  
    30  func changes(ctx *kt.Context) {
    31  	ctx.Run("Normal", func(ctx *kt.Context) {
    32  		ctx.RunRW(func(ctx *kt.Context) {
    33  			ctx.Run("group", func(ctx *kt.Context) {
    34  				ctx.RunAdmin(func(ctx *kt.Context) {
    35  					testNormalChanges(ctx, ctx.Admin)
    36  				})
    37  				ctx.RunNoAuth(func(ctx *kt.Context) {
    38  					testNormalChanges(ctx, ctx.NoAuth)
    39  				})
    40  			})
    41  		})
    42  	})
    43  	ctx.Run("Continuous", func(ctx *kt.Context) {
    44  		ctx.RunRW(func(ctx *kt.Context) {
    45  			ctx.Run("group", func(ctx *kt.Context) {
    46  				ctx.RunAdmin(func(ctx *kt.Context) {
    47  					testContinuousChanges(ctx, ctx.Admin)
    48  				})
    49  				ctx.RunNoAuth(func(ctx *kt.Context) {
    50  					testContinuousChanges(ctx, ctx.NoAuth)
    51  				})
    52  			})
    53  		})
    54  	})
    55  }
    56  
    57  const maxWait = 5 * time.Second
    58  
    59  type cDoc struct {
    60  	ID    string `json:"_id"`
    61  	Rev   string `json:"_rev,omitempty"`
    62  	Value string `json:"value"`
    63  }
    64  
    65  func testContinuousChanges(ctx *kt.Context, client *kivik.Client) {
    66  	ctx.Parallel()
    67  	dbname := ctx.TestDB()
    68  	db := client.DB(dbname, ctx.Options("db"))
    69  	if err := db.Err(); err != nil {
    70  		ctx.Fatalf("failed to connect to db: %s", err)
    71  	}
    72  	changes := db.Changes(context.Background(), ctx.Options("options"))
    73  	if !ctx.IsExpectedSuccess(changes.Err()) {
    74  		return
    75  	}
    76  	const maxChanges = 3
    77  	expected := make([]string, 0, maxChanges)
    78  	doc := cDoc{
    79  		ID:    ctx.TestDBName(),
    80  		Value: "foo",
    81  	}
    82  	rev, err := ctx.Admin.DB(dbname).Put(context.Background(), doc.ID, doc)
    83  	if err != nil {
    84  		ctx.Fatalf("Failed to create doc: %s", err)
    85  	}
    86  	expected = append(expected, rev)
    87  	doc.Rev = rev
    88  	doc.Value = "bar"
    89  	rev, err = db.Put(context.Background(), doc.ID, doc)
    90  	if err != nil {
    91  		ctx.Fatalf("Failed to update doc: %s", err)
    92  	}
    93  	expected = append(expected, rev)
    94  	doc.Rev = rev
    95  	const delay = 10 * time.Millisecond
    96  	time.Sleep(delay) // Pause to ensure that the update counts as a separate rev; especially problematic on PouchDB
    97  	rev, err = db.Delete(context.Background(), doc.ID, doc.Rev)
    98  	if err != nil {
    99  		ctx.Fatalf("Failed to delete doc: %s", err)
   100  	}
   101  	expected = append(expected, rev)
   102  	const maxRevs = 3
   103  	revs := make([]string, 0, maxRevs)
   104  	errChan := make(chan error)
   105  	go func() {
   106  		for changes.Next() {
   107  			revs = append(revs, changes.Changes()...)
   108  			if len(revs) >= len(expected) {
   109  				_ = changes.Close()
   110  			}
   111  		}
   112  		if err = changes.Err(); err != nil {
   113  			errChan <- err
   114  		}
   115  		close(errChan)
   116  	}()
   117  	timer := time.NewTimer(maxWait)
   118  	select {
   119  	case chErr, ok := <-errChan:
   120  		if ok {
   121  			ctx.Errorf("Error reading changes: %s", chErr)
   122  		}
   123  	case <-timer.C:
   124  		_ = changes.Close()
   125  		ctx.Errorf("Failed to read changes in %s", maxWait)
   126  	}
   127  	if err = changes.Err(); err != nil {
   128  		ctx.Errorf("iteration failed: %s", err)
   129  	}
   130  	expectedRevs := make(map[string]struct{})
   131  	for _, rev := range expected {
   132  		expectedRevs[rev] = struct{}{}
   133  	}
   134  	for _, rev := range revs {
   135  		if _, ok := expectedRevs[rev]; !ok {
   136  			ctx.Errorf("Unexpected rev in changes feed: %s", rev)
   137  		}
   138  	}
   139  	if d := testy.DiffTextSlices(expected, revs); d != nil {
   140  		ctx.Errorf("Unexpected revisions:\n%s", d)
   141  	}
   142  	if err = changes.Close(); err != nil {
   143  		ctx.Errorf("Error closing changes feed: %s", err)
   144  	}
   145  }
   146  
   147  func testNormalChanges(ctx *kt.Context, client *kivik.Client) {
   148  	ctx.Parallel()
   149  	dbname := ctx.TestDB()
   150  	db := client.DB(dbname, ctx.Options("db"))
   151  	if err := db.Err(); err != nil {
   152  		ctx.Fatalf("failed to connect to db: %s", err)
   153  	}
   154  	adb := ctx.Admin.DB(dbname)
   155  	const maxChanges = 3
   156  	expected := make([]string, 0, maxChanges)
   157  
   158  	// Doc: foo
   159  	doc := cDoc{
   160  		ID:    ctx.TestDBName(),
   161  		Value: "foo",
   162  	}
   163  	rev, err := adb.Put(context.Background(), doc.ID, doc)
   164  	if err != nil {
   165  		ctx.Fatalf("Failed to create doc: %s", err)
   166  	}
   167  	expected = append(expected, rev)
   168  
   169  	// Doc: bar
   170  	doc = cDoc{
   171  		ID:    ctx.TestDBName(),
   172  		Value: "bar",
   173  	}
   174  	rev, err = adb.Put(context.Background(), doc.ID, doc)
   175  	if err != nil {
   176  		ctx.Fatalf("Failed to create doc: %s", err)
   177  	}
   178  	doc.Rev = rev
   179  	doc.Value = "baz"
   180  	rev, err = adb.Put(context.Background(), doc.ID, doc)
   181  	if err != nil {
   182  		ctx.Fatalf("Failed to update doc: %s", err)
   183  	}
   184  	expected = append(expected, rev)
   185  
   186  	// Doc: baz
   187  	doc = cDoc{
   188  		ID:    ctx.TestDBName(),
   189  		Value: "bar",
   190  	}
   191  	rev, err = adb.Put(context.Background(), doc.ID, doc)
   192  	if err != nil {
   193  		ctx.Fatalf("Failed to create doc: %s", err)
   194  	}
   195  	doc.Rev = rev
   196  	rev, err = adb.Delete(context.Background(), doc.ID, doc.Rev)
   197  	if err != nil {
   198  		ctx.Fatalf("Failed to delete doc: %s", err)
   199  	}
   200  	expected = append(expected, rev)
   201  
   202  	changes := db.Changes(context.Background(), ctx.Options("options"))
   203  	if !ctx.IsExpectedSuccess(changes.Err()) {
   204  		return
   205  	}
   206  
   207  	const maxRevs = 3
   208  	revs := make([]string, 0, maxRevs)
   209  	for changes.Next() {
   210  		revs = append(revs, changes.Changes()...)
   211  		if len(revs) >= len(expected) {
   212  			_ = changes.Close()
   213  		}
   214  	}
   215  	if err = changes.Err(); err != nil {
   216  		ctx.Errorf("iteration failed: %s", err)
   217  	}
   218  	expectedRevs := make(map[string]struct{})
   219  	for _, rev := range expected {
   220  		expectedRevs[rev] = struct{}{}
   221  	}
   222  	for _, rev := range revs {
   223  		if _, ok := expectedRevs[rev]; !ok {
   224  			ctx.Errorf("Unexpected rev in changes feed: %s", rev)
   225  		}
   226  	}
   227  	sort.Strings(expected)
   228  	sort.Strings(revs)
   229  	if d := testy.DiffTextSlices(expected, revs); d != nil {
   230  		ctx.Errorf("Unexpected revisions:\n%s", d)
   231  	}
   232  	if err = changes.Close(); err != nil {
   233  		ctx.Errorf("Error closing changes feed: %s", err)
   234  	}
   235  }
   236  

View as plain text