1
2
3
4
5
6
7
8
9
10
11
12
13 package db
14
15 import (
16 "context"
17 "sort"
18 "time"
19
20 "gitlab.com/flimzy/testy"
21
22 "github.com/go-kivik/kivik/v4"
23 "github.com/go-kivik/kivik/v4/kiviktest/kt"
24 )
25
26 func init() {
27 kt.Register("Changes", changes)
28 }
29
30 func changes(ctx *kt.Context) {
31 ctx.Run("Normal", func(ctx *kt.Context) {
32 ctx.RunRW(func(ctx *kt.Context) {
33 ctx.Run("group", func(ctx *kt.Context) {
34 ctx.RunAdmin(func(ctx *kt.Context) {
35 testNormalChanges(ctx, ctx.Admin)
36 })
37 ctx.RunNoAuth(func(ctx *kt.Context) {
38 testNormalChanges(ctx, ctx.NoAuth)
39 })
40 })
41 })
42 })
43 ctx.Run("Continuous", func(ctx *kt.Context) {
44 ctx.RunRW(func(ctx *kt.Context) {
45 ctx.Run("group", func(ctx *kt.Context) {
46 ctx.RunAdmin(func(ctx *kt.Context) {
47 testContinuousChanges(ctx, ctx.Admin)
48 })
49 ctx.RunNoAuth(func(ctx *kt.Context) {
50 testContinuousChanges(ctx, ctx.NoAuth)
51 })
52 })
53 })
54 })
55 }
56
57 const maxWait = 5 * time.Second
58
59 type cDoc struct {
60 ID string `json:"_id"`
61 Rev string `json:"_rev,omitempty"`
62 Value string `json:"value"`
63 }
64
65 func testContinuousChanges(ctx *kt.Context, client *kivik.Client) {
66 ctx.Parallel()
67 dbname := ctx.TestDB()
68 db := client.DB(dbname, ctx.Options("db"))
69 if err := db.Err(); err != nil {
70 ctx.Fatalf("failed to connect to db: %s", err)
71 }
72 changes := db.Changes(context.Background(), ctx.Options("options"))
73 if !ctx.IsExpectedSuccess(changes.Err()) {
74 return
75 }
76 const maxChanges = 3
77 expected := make([]string, 0, maxChanges)
78 doc := cDoc{
79 ID: ctx.TestDBName(),
80 Value: "foo",
81 }
82 rev, err := ctx.Admin.DB(dbname).Put(context.Background(), doc.ID, doc)
83 if err != nil {
84 ctx.Fatalf("Failed to create doc: %s", err)
85 }
86 expected = append(expected, rev)
87 doc.Rev = rev
88 doc.Value = "bar"
89 rev, err = db.Put(context.Background(), doc.ID, doc)
90 if err != nil {
91 ctx.Fatalf("Failed to update doc: %s", err)
92 }
93 expected = append(expected, rev)
94 doc.Rev = rev
95 const delay = 10 * time.Millisecond
96 time.Sleep(delay)
97 rev, err = db.Delete(context.Background(), doc.ID, doc.Rev)
98 if err != nil {
99 ctx.Fatalf("Failed to delete doc: %s", err)
100 }
101 expected = append(expected, rev)
102 const maxRevs = 3
103 revs := make([]string, 0, maxRevs)
104 errChan := make(chan error)
105 go func() {
106 for changes.Next() {
107 revs = append(revs, changes.Changes()...)
108 if len(revs) >= len(expected) {
109 _ = changes.Close()
110 }
111 }
112 if err = changes.Err(); err != nil {
113 errChan <- err
114 }
115 close(errChan)
116 }()
117 timer := time.NewTimer(maxWait)
118 select {
119 case chErr, ok := <-errChan:
120 if ok {
121 ctx.Errorf("Error reading changes: %s", chErr)
122 }
123 case <-timer.C:
124 _ = changes.Close()
125 ctx.Errorf("Failed to read changes in %s", maxWait)
126 }
127 if err = changes.Err(); err != nil {
128 ctx.Errorf("iteration failed: %s", err)
129 }
130 expectedRevs := make(map[string]struct{})
131 for _, rev := range expected {
132 expectedRevs[rev] = struct{}{}
133 }
134 for _, rev := range revs {
135 if _, ok := expectedRevs[rev]; !ok {
136 ctx.Errorf("Unexpected rev in changes feed: %s", rev)
137 }
138 }
139 if d := testy.DiffTextSlices(expected, revs); d != nil {
140 ctx.Errorf("Unexpected revisions:\n%s", d)
141 }
142 if err = changes.Close(); err != nil {
143 ctx.Errorf("Error closing changes feed: %s", err)
144 }
145 }
146
147 func testNormalChanges(ctx *kt.Context, client *kivik.Client) {
148 ctx.Parallel()
149 dbname := ctx.TestDB()
150 db := client.DB(dbname, ctx.Options("db"))
151 if err := db.Err(); err != nil {
152 ctx.Fatalf("failed to connect to db: %s", err)
153 }
154 adb := ctx.Admin.DB(dbname)
155 const maxChanges = 3
156 expected := make([]string, 0, maxChanges)
157
158
159 doc := cDoc{
160 ID: ctx.TestDBName(),
161 Value: "foo",
162 }
163 rev, err := adb.Put(context.Background(), doc.ID, doc)
164 if err != nil {
165 ctx.Fatalf("Failed to create doc: %s", err)
166 }
167 expected = append(expected, rev)
168
169
170 doc = cDoc{
171 ID: ctx.TestDBName(),
172 Value: "bar",
173 }
174 rev, err = adb.Put(context.Background(), doc.ID, doc)
175 if err != nil {
176 ctx.Fatalf("Failed to create doc: %s", err)
177 }
178 doc.Rev = rev
179 doc.Value = "baz"
180 rev, err = adb.Put(context.Background(), doc.ID, doc)
181 if err != nil {
182 ctx.Fatalf("Failed to update doc: %s", err)
183 }
184 expected = append(expected, rev)
185
186
187 doc = cDoc{
188 ID: ctx.TestDBName(),
189 Value: "bar",
190 }
191 rev, err = adb.Put(context.Background(), doc.ID, doc)
192 if err != nil {
193 ctx.Fatalf("Failed to create doc: %s", err)
194 }
195 doc.Rev = rev
196 rev, err = adb.Delete(context.Background(), doc.ID, doc.Rev)
197 if err != nil {
198 ctx.Fatalf("Failed to delete doc: %s", err)
199 }
200 expected = append(expected, rev)
201
202 changes := db.Changes(context.Background(), ctx.Options("options"))
203 if !ctx.IsExpectedSuccess(changes.Err()) {
204 return
205 }
206
207 const maxRevs = 3
208 revs := make([]string, 0, maxRevs)
209 for changes.Next() {
210 revs = append(revs, changes.Changes()...)
211 if len(revs) >= len(expected) {
212 _ = changes.Close()
213 }
214 }
215 if err = changes.Err(); err != nil {
216 ctx.Errorf("iteration failed: %s", err)
217 }
218 expectedRevs := make(map[string]struct{})
219 for _, rev := range expected {
220 expectedRevs[rev] = struct{}{}
221 }
222 for _, rev := range revs {
223 if _, ok := expectedRevs[rev]; !ok {
224 ctx.Errorf("Unexpected rev in changes feed: %s", rev)
225 }
226 }
227 sort.Strings(expected)
228 sort.Strings(revs)
229 if d := testy.DiffTextSlices(expected, revs); d != nil {
230 ctx.Errorf("Unexpected revisions:\n%s", d)
231 }
232 if err = changes.Close(); err != nil {
233 ctx.Errorf("Error closing changes feed: %s", err)
234 }
235 }
236
View as plain text