1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "fmt"
12 "testing"
13 "time"
14
15 "go.mongodb.org/mongo-driver/bson"
16 "go.mongodb.org/mongo-driver/internal/assert"
17 "go.mongodb.org/mongo-driver/mongo"
18 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
19 "go.mongodb.org/mongo-driver/mongo/options"
20 )
21
22 func TestMongosPinning(t *testing.T) {
23 clientOpts := options.Client().SetLocalThreshold(1 * time.Second).SetWriteConcern(mtest.MajorityWc)
24 mtOpts := mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.1").CreateClient(false).
25 ClientOptions(clientOpts)
26 mt := mtest.New(t, mtOpts)
27
28 if len(options.Client().ApplyURI(mtest.ClusterURI()).Hosts) < 2 {
29 mt.Skip("skipping because at least 2 mongoses are required")
30 }
31
32 mt.Run("unpin for next transaction", func(mt *mtest.T) {
33 addresses := map[string]struct{}{}
34 _ = mt.Client.UseSession(context.Background(), func(sc mongo.SessionContext) error {
35
36 err := sc.StartTransaction()
37 assert.Nil(mt, err, "StartTransaction error: %v", err)
38 _, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}})
39 assert.Nil(mt, err, "InsertOne error: %v", err)
40 err = sc.CommitTransaction(sc)
41 assert.Nil(mt, err, "CommitTransaction error: %v", err)
42
43 for i := 0; i < 50; i++ {
44
45 err = sc.StartTransaction()
46 assert.Nil(mt, err, iterationErrmsg("StartTransaction", i, err))
47
48 cursor, err := mt.Coll.Find(sc, bson.D{})
49 assert.Nil(mt, err, iterationErrmsg("Find", i, err))
50 assert.True(mt, cursor.Next(context.Background()), "Next returned false on iteration %v", i)
51
52 descConn, err := mongo.BatchCursorFromCursor(cursor).Server().Connection(context.Background())
53 assert.Nil(mt, err, iterationErrmsg("Connection", i, err))
54 addresses[descConn.Description().Addr.String()] = struct{}{}
55 err = descConn.Close()
56 assert.Nil(mt, err, iterationErrmsg("connection Close", i, err))
57
58 err = sc.CommitTransaction(sc)
59 assert.Nil(mt, err, iterationErrmsg("CommitTransaction", i, err))
60 }
61 return nil
62 })
63 assert.True(mt, len(addresses) > 1, "expected more than 1 address, got %v", addresses)
64 })
65 mt.Run("unpin for non transaction operation", func(mt *mtest.T) {
66 addresses := map[string]struct{}{}
67 _ = mt.Client.UseSession(context.Background(), func(sc mongo.SessionContext) error {
68
69 err := sc.StartTransaction()
70 assert.Nil(mt, err, "StartTransaction error: %v", err)
71 _, err = mt.Coll.InsertOne(sc, bson.D{{"x", 1}})
72 assert.Nil(mt, err, "InsertOne error: %v", err)
73 err = sc.CommitTransaction(sc)
74 assert.Nil(mt, err, "CommitTransaction error: %v", err)
75
76 for i := 0; i < 50; i++ {
77
78 cursor, err := mt.Coll.Find(sc, bson.D{})
79 assert.Nil(mt, err, iterationErrmsg("Find", i, err))
80 assert.True(mt, cursor.Next(context.Background()), "Next returned false on iteration %v", i)
81
82 descConn, err := mongo.BatchCursorFromCursor(cursor).Server().Connection(context.Background())
83 assert.Nil(mt, err, iterationErrmsg("Connection", i, err))
84 addresses[descConn.Description().Addr.String()] = struct{}{}
85 err = descConn.Close()
86 assert.Nil(mt, err, iterationErrmsg("connection Close", i, err))
87 }
88 return nil
89 })
90 assert.True(mt, len(addresses) > 1, "expected more than 1 address, got %v", addresses)
91 })
92 }
93
94 func iterationErrmsg(op string, i int, wrapped error) string {
95 return fmt.Sprintf("%v error on iteration %v: %v", op, i, wrapped)
96 }
97
View as plain text