1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "sync"
12 "testing"
13 "time"
14
15 "go.mongodb.org/mongo-driver/bson"
16 "go.mongodb.org/mongo-driver/event"
17 "go.mongodb.org/mongo-driver/internal/assert"
18 "go.mongodb.org/mongo-driver/internal/eventtest"
19 "go.mongodb.org/mongo-driver/internal/require"
20 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
21 "go.mongodb.org/mongo-driver/mongo/options"
22 )
23
24 func TestRetryableReadsProse(t *testing.T) {
25 tpm := eventtest.NewTestPoolMonitor()
26
27
28
29
30 hosts := mtest.ClusterConnString().Hosts
31 clientOpts := options.Client().SetMaxPoolSize(1).SetRetryReads(true).
32 SetPoolMonitor(tpm.PoolMonitor).SetHeartbeatInterval(500 * time.Millisecond).
33 SetHosts(hosts[:1])
34
35 mtOpts := mtest.NewOptions().ClientOptions(clientOpts).MinServerVersion("4.3")
36 mt := mtest.New(t, mtOpts)
37
38 mt.Run("PoolClearedError retryability", func(mt *mtest.T) {
39 if mtest.ClusterTopologyKind() == mtest.LoadBalanced {
40 mt.Skip("skipping as load balanced topology has different pool clearing behavior")
41 }
42
43
44 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
45 assert.Nil(mt, err, "InsertOne error: %v", err)
46
47
48 mt.SetFailPoint(mtest.FailPoint{
49 ConfigureFailPoint: "failCommand",
50 Mode: mtest.FailPointMode{
51 Times: 1,
52 },
53 Data: mtest.FailPointData{
54 FailCommands: []string{"find"},
55 ErrorCode: 91,
56 BlockConnection: true,
57 BlockTimeMS: 1000,
58 },
59 })
60
61
62 tpm.ClearEvents()
63 mt.ClearEvents()
64
65
66
67 var wg sync.WaitGroup
68 for i := 0; i < 2; i++ {
69 wg.Add(1)
70 go func() {
71 defer wg.Done()
72 res := mt.Coll.FindOne(context.Background(), bson.D{})
73 assert.Nil(mt, res.Err())
74 }()
75 }
76 wg.Wait()
77
78
79 events := tpm.Events(func(e *event.PoolEvent) bool {
80 getSucceeded := e.Type == event.GetSucceeded
81 getFailed := e.Type == event.GetFailed
82 poolCleared := e.Type == event.PoolCleared
83 return getSucceeded || getFailed || poolCleared
84 })
85
86
87
88 assert.True(mt, len(events) >= 3, "expected at least 3 events, got %v", len(events))
89 assert.Equal(mt, event.GetSucceeded, events[0].Type,
90 "expected ConnectionCheckedOut event, got %v", events[0].Type)
91 assert.Equal(mt, event.PoolCleared, events[1].Type,
92 "expected ConnectionPoolCleared event, got %v", events[1].Type)
93 assert.Equal(mt, event.GetFailed, events[2].Type,
94 "expected ConnectionCheckedOutFailed event, got %v", events[2].Type)
95 assert.Equal(mt, event.ReasonConnectionErrored, events[2].Reason,
96 "expected check out failure due to connection error, failed due to %q", events[2].Reason)
97
98
99 for i := 0; i < 3; i++ {
100 cmdEvt := mt.GetStartedEvent()
101 assert.NotNil(mt, cmdEvt, "expected a find event, got nil")
102 assert.Equal(mt, cmdEvt.CommandName, "find",
103 "expected a find event, got a(n) %v event", cmdEvt.CommandName)
104 }
105 })
106
107 mtOpts = mtest.NewOptions().Topologies(mtest.Sharded).MinServerVersion("4.2")
108 mt.RunOpts("retrying in sharded cluster", mtOpts, func(mt *mtest.T) {
109 tests := []struct {
110 name string
111
112
113
114
115
116 hostCount int
117 failpointErrorCode int32
118 expectedFailCount int
119 expectedSuccessCount int
120 }{
121 {
122 name: "retry on different mongos",
123 hostCount: 2,
124 failpointErrorCode: 6,
125 expectedFailCount: 2,
126 expectedSuccessCount: 0,
127 },
128 {
129 name: "retry on same mongos",
130 hostCount: 1,
131 failpointErrorCode: 6,
132 expectedFailCount: 1,
133 expectedSuccessCount: 1,
134 },
135 }
136
137 for _, tc := range tests {
138 mt.Run(tc.name, func(mt *mtest.T) {
139 hosts := options.Client().ApplyURI(mtest.ClusterURI()).Hosts
140 require.GreaterOrEqualf(mt, len(hosts), tc.hostCount,
141 "test cluster must have at least %v mongos hosts", tc.hostCount)
142
143
144 failPoint := mtest.FailPoint{
145 ConfigureFailPoint: "failCommand",
146 Mode: mtest.FailPointMode{
147 Times: 1,
148 },
149 Data: mtest.FailPointData{
150 FailCommands: []string{"find"},
151 ErrorCode: tc.failpointErrorCode,
152 CloseConnection: false,
153 },
154 }
155
156
157
158
159
160
161 for i := 0; i < tc.hostCount; i++ {
162 mt.ResetClient(options.Client().SetHosts([]string{hosts[i]}))
163 mt.SetFailPoint(failPoint)
164
165
166
167
168 defer mt.ResetClient(options.Client().SetHosts([]string{hosts[i]}))
169 defer mt.ClearFailPoints()
170 }
171
172 failCount := 0
173 successCount := 0
174
175 commandMonitor := &event.CommandMonitor{
176 Failed: func(context.Context, *event.CommandFailedEvent) {
177 failCount++
178 },
179 Succeeded: func(context.Context, *event.CommandSucceededEvent) {
180 successCount++
181 },
182 }
183
184
185 mt.ResetClient(options.Client().
186 SetHosts(hosts[:tc.hostCount]).
187 SetRetryReads(true).
188 SetMonitor(commandMonitor))
189
190 mt.Coll.FindOne(context.Background(), bson.D{})
191
192 assert.Equal(mt, tc.expectedFailCount, failCount)
193 assert.Equal(mt, tc.expectedSuccessCount, successCount)
194 })
195 }
196 })
197 }
198
View as plain text