1 package mongodb
2
3 import (
4 "bytes"
5 "context"
6 "fmt"
7
8 "log"
9
10 "github.com/golang-migrate/migrate/v4"
11 "io"
12 "os"
13 "strconv"
14 "testing"
15 "time"
16 )
17
18 import (
19 "github.com/dhui/dktest"
20 "go.mongodb.org/mongo-driver/bson"
21 "go.mongodb.org/mongo-driver/mongo"
22 "go.mongodb.org/mongo-driver/mongo/options"
23 )
24
25 import (
26 dt "github.com/golang-migrate/migrate/v4/database/testing"
27 "github.com/golang-migrate/migrate/v4/dktesting"
28 _ "github.com/golang-migrate/migrate/v4/source/file"
29 )
30
31 var (
32 opts = dktest.Options{PortRequired: true, ReadyFunc: isReady}
33
34 specs = []dktesting.ContainerSpec{
35 {ImageName: "mongo:3.4", Options: opts},
36 {ImageName: "mongo:3.6", Options: opts},
37 {ImageName: "mongo:4.0", Options: opts},
38 {ImageName: "mongo:4.2", Options: opts},
39 }
40 )
41
42 func mongoConnectionString(host, port string) string {
43
44
45 return fmt.Sprintf("mongodb://%s:%s/testMigration?connect=direct", host, port)
46 }
47
48 func isReady(ctx context.Context, c dktest.ContainerInfo) bool {
49 ip, port, err := c.FirstPort()
50 if err != nil {
51 return false
52 }
53
54 client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoConnectionString(ip, port)))
55 if err != nil {
56 return false
57 }
58 defer func() {
59 if err := client.Disconnect(ctx); err != nil {
60 log.Println("close error:", err)
61 }
62 }()
63
64 if err = client.Ping(ctx, nil); err != nil {
65 switch err {
66 case io.EOF:
67 return false
68 default:
69 log.Println(err)
70 }
71 return false
72 }
73 return true
74 }
75
76 func Test(t *testing.T) {
77 dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
78 ip, port, err := c.FirstPort()
79 if err != nil {
80 t.Fatal(err)
81 }
82
83 addr := mongoConnectionString(ip, port)
84 p := &Mongo{}
85 d, err := p.Open(addr)
86 if err != nil {
87 t.Fatal(err)
88 }
89 defer func() {
90 if err := d.Close(); err != nil {
91 t.Error(err)
92 }
93 }()
94 dt.TestNilVersion(t, d)
95 dt.TestLockAndUnlock(t, d)
96 dt.TestRun(t, d, bytes.NewReader([]byte(`[{"insert":"hello","documents":[{"wild":"world"}]}]`)))
97 dt.TestSetVersion(t, d)
98 dt.TestDrop(t, d)
99 })
100 }
101
102 func TestMigrate(t *testing.T) {
103 dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
104 ip, port, err := c.FirstPort()
105 if err != nil {
106 t.Fatal(err)
107 }
108
109 addr := mongoConnectionString(ip, port)
110 p := &Mongo{}
111 d, err := p.Open(addr)
112 if err != nil {
113 t.Fatal(err)
114 }
115 defer func() {
116 if err := d.Close(); err != nil {
117 t.Error(err)
118 }
119 }()
120 m, err := migrate.NewWithDatabaseInstance("file://./examples/migrations", "", d)
121 if err != nil {
122 t.Fatal(err)
123 }
124 dt.TestMigrate(t, m)
125 })
126 }
127
128 func TestWithAuth(t *testing.T) {
129 dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
130 ip, port, err := c.FirstPort()
131 if err != nil {
132 t.Fatal(err)
133 }
134
135 addr := mongoConnectionString(ip, port)
136 p := &Mongo{}
137 d, err := p.Open(addr)
138 if err != nil {
139 t.Fatal(err)
140 }
141 defer func() {
142 if err := d.Close(); err != nil {
143 t.Error(err)
144 }
145 }()
146 createUserCMD := []byte(`[{"createUser":"deminem","pwd":"gogo","roles":[{"role":"readWrite","db":"testMigration"}]}]`)
147 err = d.Run(bytes.NewReader(createUserCMD))
148 if err != nil {
149 t.Fatal(err)
150 }
151 testcases := []struct {
152 name string
153 connectUri string
154 isErrorExpected bool
155 }{
156 {"right auth data", "mongodb://deminem:gogo@%s:%v/testMigration", false},
157 {"wrong auth data", "mongodb://wrong:auth@%s:%v/testMigration", true},
158 }
159
160 for _, tcase := range testcases {
161 t.Run(tcase.name, func(t *testing.T) {
162 mc := &Mongo{}
163 d, err := mc.Open(fmt.Sprintf(tcase.connectUri, ip, port))
164 if err == nil {
165 defer func() {
166 if err := d.Close(); err != nil {
167 t.Error(err)
168 }
169 }()
170 }
171
172 switch {
173 case tcase.isErrorExpected && err == nil:
174 t.Fatalf("no error when expected")
175 case !tcase.isErrorExpected && err != nil:
176 t.Fatalf("unexpected error: %v", err)
177 }
178 })
179 }
180 })
181 }
182
183 func TestLockWorks(t *testing.T) {
184 dktesting.ParallelTest(t, specs, func(t *testing.T, c dktest.ContainerInfo) {
185 ip, port, err := c.FirstPort()
186 if err != nil {
187 t.Fatal(err)
188 }
189
190 addr := mongoConnectionString(ip, port)
191 p := &Mongo{}
192 d, err := p.Open(addr)
193 if err != nil {
194 t.Fatal(err)
195 }
196 defer func() {
197 if err := d.Close(); err != nil {
198 t.Error(err)
199 }
200 }()
201
202 dt.TestRun(t, d, bytes.NewReader([]byte(`[{"insert":"hello","documents":[{"wild":"world"}]}]`)))
203
204 mc := d.(*Mongo)
205
206 err = mc.Lock()
207 if err != nil {
208 t.Fatal(err)
209 }
210 err = mc.Unlock()
211 if err != nil {
212 t.Fatal(err)
213 }
214
215 err = mc.Lock()
216 if err != nil {
217 t.Fatal(err)
218 }
219 err = mc.Unlock()
220 if err != nil {
221 t.Fatal(err)
222 }
223
224
225
226 mc.config.Locking.Enabled = true
227 mc.config.Locking.Timeout = 1
228 err = mc.Lock()
229 if err != nil {
230 t.Fatal(err)
231 }
232 err = mc.Lock()
233 if err == nil {
234 t.Fatal("should have failed, mongo should be locked already")
235 }
236 })
237 }
238
239 func TestTransaction(t *testing.T) {
240 transactionSpecs := []dktesting.ContainerSpec{
241 {ImageName: "mongo:4", Options: dktest.Options{PortRequired: true, ReadyFunc: isReady,
242 Cmd: []string{"mongod", "--bind_ip_all", "--replSet", "rs0"}}},
243 }
244 dktesting.ParallelTest(t, transactionSpecs, func(t *testing.T, c dktest.ContainerInfo) {
245 ip, port, err := c.FirstPort()
246 if err != nil {
247 t.Fatal(err)
248 }
249
250 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoConnectionString(ip, port)))
251 if err != nil {
252 t.Fatal(err)
253 }
254 err = client.Ping(context.TODO(), nil)
255 if err != nil {
256 t.Fatal(err)
257 }
258
259 err = client.Database("admin").RunCommand(context.TODO(), bson.D{bson.E{Key: "replSetInitiate", Value: bson.D{}}}).Err()
260 if err != nil {
261 t.Fatal(err)
262 }
263 err = waitForReplicaInit(client)
264 if err != nil {
265 t.Fatal(err)
266 }
267 d, err := WithInstance(client, &Config{
268 DatabaseName: "testMigration",
269 })
270 if err != nil {
271 t.Fatal(err)
272 }
273 defer func() {
274 if err := d.Close(); err != nil {
275 t.Error(err)
276 }
277 }()
278
279
280
281 insertCMD := []byte(`[
282 {"create":"hello"},
283 {"createIndexes": "hello",
284 "indexes": [{
285 "key": {
286 "wild": 1
287 },
288 "name": "unique_wild",
289 "unique": true,
290 "background": true
291 }]
292 }]`)
293 err = d.Run(bytes.NewReader(insertCMD))
294 if err != nil {
295 t.Fatal(err)
296 }
297 testcases := []struct {
298 name string
299 cmds []byte
300 documentsCount int64
301 isErrorExpected bool
302 }{
303 {
304 name: "success transaction",
305 cmds: []byte(`[{"insert":"hello","documents":[
306 {"wild":"world"},
307 {"wild":"west"},
308 {"wild":"natural"}
309 ]
310 }]`),
311 documentsCount: 3,
312 isErrorExpected: false,
313 },
314 {
315 name: "failure transaction",
316
317
318 cmds: []byte(`[{"insert":"hello","documents":[{"wild":"flower"}]},
319 {"insert":"hello","documents":[
320 {"wild":"cat"},
321 {"wild":"west"}
322 ]
323 }]`),
324 documentsCount: 3,
325 isErrorExpected: true,
326 },
327 }
328 for _, tcase := range testcases {
329 t.Run(tcase.name, func(t *testing.T) {
330 client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(mongoConnectionString(ip, port)))
331 if err != nil {
332 t.Fatal(err)
333 }
334 err = client.Ping(context.TODO(), nil)
335 if err != nil {
336 t.Fatal(err)
337 }
338 d, err := WithInstance(client, &Config{
339 DatabaseName: "testMigration",
340 TransactionMode: true,
341 })
342 if err != nil {
343 t.Fatal(err)
344 }
345 defer func() {
346 if err := d.Close(); err != nil {
347 t.Error(err)
348 }
349 }()
350 runErr := d.Run(bytes.NewReader(tcase.cmds))
351 if runErr != nil {
352 if !tcase.isErrorExpected {
353 t.Fatal(runErr)
354 }
355 }
356 documentsCount, err := client.Database("testMigration").Collection("hello").CountDocuments(context.TODO(), bson.M{})
357 if err != nil {
358 t.Fatal(err)
359 }
360 if tcase.documentsCount != documentsCount {
361 t.Fatalf("expected %d and actual %d documents count not equal. run migration error:%s", tcase.documentsCount, documentsCount, runErr)
362 }
363 })
364 }
365 })
366 }
367
368 type isMaster struct {
369 IsMaster bool `bson:"ismaster"`
370 }
371
372 func waitForReplicaInit(client *mongo.Client) error {
373 ticker := time.NewTicker(time.Second * 1)
374 defer ticker.Stop()
375 timeout, err := strconv.Atoi(os.Getenv("MIGRATE_TEST_MONGO_REPLICA_SET_INIT_TIMEOUT"))
376 if err != nil {
377 timeout = 30
378 }
379 timeoutTimer := time.NewTimer(time.Duration(timeout) * time.Second)
380 defer timeoutTimer.Stop()
381 for {
382 select {
383 case <-ticker.C:
384 var status isMaster
385
386
387
388 result := client.Database("admin").RunCommand(context.TODO(), bson.D{bson.E{Key: "isMaster", Value: 1}})
389 r, err := result.DecodeBytes()
390 if err != nil {
391 return err
392 }
393 err = bson.Unmarshal(r, &status)
394 if err != nil {
395 return err
396 }
397 if status.IsMaster {
398 return nil
399 }
400 case <-timeoutTimer.C:
401 return fmt.Errorf("replica init timeout")
402 }
403 }
404
405 }
406
View as plain text