1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "fmt"
12 "strings"
13 "testing"
14 "time"
15
16 "go.mongodb.org/mongo-driver/bson"
17 "go.mongodb.org/mongo-driver/internal/assert"
18 "go.mongodb.org/mongo-driver/internal/require"
19 "go.mongodb.org/mongo-driver/mongo"
20 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
21 "go.mongodb.org/mongo-driver/mongo/options"
22 )
23
24 func TestLoadBalancerSupport(t *testing.T) {
25 mt := mtest.New(t, mtest.NewOptions().Topologies(mtest.LoadBalanced).CreateClient(false))
26
27 mt.Run("RunCommandCursor pins to a connection", func(mt *mtest.T) {
28
29
30
31 initCollection(mt, mt.Coll)
32 findCmd := bson.D{
33 {"find", mt.Coll.Name()},
34 {"filter", bson.D{}},
35 {"batchSize", 2},
36 }
37 cursor, err := mt.DB.RunCommandCursor(context.Background(), findCmd)
38 assert.Nil(mt, err, "RunCommandCursor error: %v", err)
39 defer func() {
40 _ = cursor.Close(context.Background())
41 }()
42
43 assert.True(mt, cursor.ID() > 0, "expected cursor ID to be non-zero")
44 assert.Equal(mt, 1, mt.NumberConnectionsCheckedOut(),
45 "expected one connection to be checked out, got %d", mt.NumberConnectionsCheckedOut())
46 })
47
48 mt.RunOpts("wait queue timeout errors include extra information", noClientOpts, func(mt *mtest.T) {
49
50
51
52
53 assertErrorHasInfo := func(mt *mtest.T, err error, numCursorConns, numTxnConns, numOtherConns int) {
54 mt.Helper()
55
56 assert.NotNil(mt, err, "expected wait queue timeout error, got nil")
57 expectedMsg := fmt.Sprintf("maxPoolSize: 1, "+
58 "connections in use by cursors: %d, "+
59 "connections in use by transactions: %d, "+
60 "connections in use by other operations: %d",
61 numCursorConns, numTxnConns, numOtherConns,
62 )
63 assert.True(mt, strings.Contains(err.Error(), expectedMsg),
64 "expected error %q to contain substring %q", err, expectedMsg)
65 }
66 maxPoolSizeMtOpts := mtest.NewOptions().
67 ClientOptions(options.Client().SetMaxPoolSize(1))
68
69 mt.RunOpts("cursors", maxPoolSizeMtOpts, func(mt *mtest.T) {
70 initCollection(mt, mt.Coll)
71 findOpts := options.Find().SetBatchSize(2)
72 cursor, err := mt.Coll.Find(context.Background(), bson.M{}, findOpts)
73 assert.Nil(mt, err, "Find error: %v", err)
74 defer func() {
75 _ = cursor.Close(context.Background())
76 }()
77
78 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
79 defer cancel()
80 _, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
81 assertErrorHasInfo(mt, err, 1, 0, 0)
82 })
83 mt.RunOpts("transactions", maxPoolSizeMtOpts, func(mt *mtest.T) {
84 {
85 sess, err := mt.Client.StartSession()
86 assert.Nil(mt, err, "StartSession error: %v", err)
87 defer sess.EndSession(context.Background())
88 ctx := mongo.NewSessionContext(context.Background(), sess)
89
90
91 err = sess.StartTransaction()
92 assert.Nil(mt, err, "StartTransaction error: %v", err)
93 _, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
94 assert.Nil(mt, err, "InsertOne error: %v", err)
95 }
96
97 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
98 defer cancel()
99 _, err := mt.Coll.InsertOne(ctx, bson.M{"x": 1})
100 assertErrorHasInfo(mt, err, 0, 1, 0)
101 })
102
103
104
105
106
107 mt.RunOpts("transaction connections are unpinned", maxPoolSizeMtOpts, func(mt *mtest.T) {
108 {
109 ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
110 defer cancel()
111
112 for i := 0; i < 5; i++ {
113 sess, err := mt.Client.StartSession()
114 require.NoError(mt, err, "StartSession error")
115
116 err = sess.StartTransaction()
117 require.NoError(mt, err, "StartTransaction error")
118
119 ctx := mongo.NewSessionContext(ctx, sess)
120 _, err = mt.Coll.InsertOne(ctx, bson.M{"x": 1})
121 assert.NoError(mt, err, "InsertOne error")
122
123 err = sess.CommitTransaction(ctx)
124 assert.NoError(mt, err, "CommitTransaction error")
125
126 sess.EndSession(ctx)
127 }
128 }
129 })
130 })
131 }
132
View as plain text