1
2
3
4
5
6
7
8
9
10 package integration
11
12 import (
13 "context"
14 "errors"
15 "fmt"
16 "testing"
17 "time"
18
19 "go.mongodb.org/mongo-driver/bson"
20 "go.mongodb.org/mongo-driver/internal/assert"
21 "go.mongodb.org/mongo-driver/internal/eventtest"
22 "go.mongodb.org/mongo-driver/mongo"
23 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
24 "go.mongodb.org/mongo-driver/mongo/options"
25 )
26
27 func TestSDAMErrorHandling(t *testing.T) {
28 mt := mtest.New(t, noClientOpts)
29 baseClientOpts := func() *options.ClientOptions {
30 return options.Client().
31 ApplyURI(mtest.ClusterURI()).
32 SetRetryWrites(false).
33 SetWriteConcern(mtest.MajorityWc)
34 }
35 baseMtOpts := func() *mtest.Options {
36 mtOpts := mtest.NewOptions().
37 Topologies(mtest.ReplicaSet, mtest.Single).
38 MinServerVersion("4.0").
39 ClientOptions(baseClientOpts())
40
41 if mtest.ClusterTopologyKind() == mtest.Sharded {
42
43 mtOpts.ClientType(mtest.Pinned)
44 }
45 return mtOpts
46 }
47
48
49
50 mt.RunOpts("before handshake completes", baseMtOpts().Auth(true).MinServerVersion("4.4"), func(mt *mtest.T) {
51 mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
52 mt.Run("pool cleared on network timeout", func(mt *mtest.T) {
53
54
55
56
57 appName := "authConnectTimeoutTest"
58
59
60 mt.SetFailPoint(mtest.FailPoint{
61 ConfigureFailPoint: "failCommand",
62 Mode: mtest.FailPointMode{
63 Times: 1,
64 },
65 Data: mtest.FailPointData{
66 FailCommands: []string{"saslContinue"},
67 BlockConnection: true,
68 BlockTimeMS: 150,
69 AppName: appName,
70 },
71 })
72
73
74 tpm := eventtest.NewTestPoolMonitor()
75 mt.ResetClient(baseClientOpts().
76 SetAppName(appName).
77 SetPoolMonitor(tpm.PoolMonitor).
78
79
80 SetSocketTimeout(100 * time.Millisecond))
81
82
83
84 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
85 assert.NotNil(mt, err, "expected InsertOne error, got nil")
86 assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
87 assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
88
89 assert.Soon(mt, func(ctx context.Context) {
90 ticker := time.NewTicker(100 * time.Millisecond)
91 defer ticker.Stop()
92
93 for {
94 select {
95 case <-ticker.C:
96 case <-ctx.Done():
97 return
98 }
99
100 if tpm.IsPoolCleared() {
101 return
102 }
103 }
104 }, 2*time.Second)
105 })
106
107 mt.RunOpts("pool cleared on non-timeout network error", noClientOpts, func(mt *mtest.T) {
108 mt.Run("background", func(mt *mtest.T) {
109
110
111 appName := "authNetworkErrorTestBackground"
112
113 mt.SetFailPoint(mtest.FailPoint{
114 ConfigureFailPoint: "failCommand",
115 Mode: mtest.FailPointMode{
116 Times: 1,
117 },
118 Data: mtest.FailPointData{
119 FailCommands: []string{"saslContinue"},
120 CloseConnection: true,
121 AppName: appName,
122 },
123 })
124
125
126 tpm := eventtest.NewTestPoolMonitor()
127 mt.ResetClient(baseClientOpts().
128 SetAppName(appName).
129 SetPoolMonitor(tpm.PoolMonitor).
130
131 SetMinPoolSize(5))
132
133
134 assert.Soon(mt, func(ctx context.Context) {
135 ticker := time.NewTicker(100 * time.Millisecond)
136 defer ticker.Stop()
137
138 for {
139 select {
140 case <-ticker.C:
141 case <-ctx.Done():
142 return
143 }
144
145 if tpm.IsPoolCleared() {
146 return
147 }
148 }
149 }, 2*time.Second)
150 })
151
152 mt.Run("foreground", func(mt *mtest.T) {
153
154
155 appName := "authNetworkErrorTestForeground"
156
157 mt.SetFailPoint(mtest.FailPoint{
158 ConfigureFailPoint: "failCommand",
159 Mode: mtest.FailPointMode{
160 Times: 1,
161 },
162 Data: mtest.FailPointData{
163 FailCommands: []string{"saslContinue"},
164 CloseConnection: true,
165 AppName: appName,
166 },
167 })
168
169
170 tpm := eventtest.NewTestPoolMonitor()
171 mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
172
173 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
174 assert.NotNil(mt, err, "expected InsertOne error, got nil")
175 assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
176
177
178 assert.Soon(mt, func(ctx context.Context) {
179 ticker := time.NewTicker(100 * time.Millisecond)
180 defer ticker.Stop()
181
182 for {
183 select {
184 case <-ticker.C:
185 case <-ctx.Done():
186 return
187 }
188
189 if tpm.IsPoolCleared() {
190 return
191 }
192 }
193 }, 2*time.Second)
194 })
195 })
196 })
197 })
198 mt.RunOpts("after handshake completes", baseMtOpts(), func(mt *mtest.T) {
199 mt.RunOpts("network errors", noClientOpts, func(mt *mtest.T) {
200 mt.Run("pool cleared on non-timeout network error", func(mt *mtest.T) {
201 appName := "afterHandshakeNetworkError"
202
203 mt.SetFailPoint(mtest.FailPoint{
204 ConfigureFailPoint: "failCommand",
205 Mode: mtest.FailPointMode{
206 Times: 1,
207 },
208 Data: mtest.FailPointData{
209 FailCommands: []string{"insert"},
210 CloseConnection: true,
211 AppName: appName,
212 },
213 })
214
215
216 tpm := eventtest.NewTestPoolMonitor()
217 mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
218
219 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"test", 1}})
220 assert.NotNil(mt, err, "expected InsertOne error, got nil")
221 assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
222 assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared but was not")
223 })
224 mt.Run("pool not cleared on timeout network error", func(mt *mtest.T) {
225 tpm := eventtest.NewTestPoolMonitor()
226 mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))
227
228 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
229 assert.Nil(mt, err, "InsertOne error: %v", err)
230
231 filter := bson.M{
232 "$where": "function() { sleep(1000); return false; }",
233 }
234 timeoutCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
235 defer cancel()
236 _, err = mt.Coll.Find(timeoutCtx, filter)
237 assert.NotNil(mt, err, "expected Find error, got %v", err)
238 assert.True(mt, mongo.IsTimeout(err), "expected timeout error, got %v", err)
239 assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
240 })
241 mt.Run("pool not cleared on context cancellation", func(mt *mtest.T) {
242 tpm := eventtest.NewTestPoolMonitor()
243 mt.ResetClient(baseClientOpts().SetPoolMonitor(tpm.PoolMonitor))
244
245 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
246 assert.Nil(mt, err, "InsertOne error: %v", err)
247
248 findCtx, cancel := context.WithCancel(context.Background())
249 go func() {
250 time.Sleep(100 * time.Millisecond)
251 cancel()
252 }()
253
254 filter := bson.M{
255 "$where": "function() { sleep(1000); return false; }",
256 }
257 _, err = mt.Coll.Find(findCtx, filter)
258 assert.NotNil(mt, err, "expected Find error, got nil")
259 assert.False(mt, mongo.IsTimeout(err), "expected non-timeout error, got %v", err)
260 assert.True(mt, mongo.IsNetworkError(err), "expected network error, got %v", err)
261 assert.True(mt, errors.Is(err, context.Canceled), "expected error %v to be context.Canceled", err)
262 assert.False(mt, tpm.IsPoolCleared(), "expected pool to not be cleared but was")
263 })
264 })
265 mt.RunOpts("server errors", noClientOpts, func(mt *mtest.T) {
266
267
268
269
270
271
272
273 serverErrorsMtOpts := baseMtOpts().
274 MinServerVersion("4.0").
275 MaxServerVersion("4.2").
276 ClientOptions(baseClientOpts().SetRetryWrites(false))
277
278 testCases := []struct {
279 name string
280 errorCode int32
281
282
283
284 isShutdownError bool
285 }{
286
287 {"InterruptedAtShutdown", 11600, true},
288 {"InterruptedDueToReplStateChange, not shutdown", 11602, false},
289 {"NotPrimaryOrSecondary", 13436, false},
290 {"PrimarySteppedDown", 189, false},
291 {"ShutdownInProgress", 91, true},
292
293
294 {"NotPrimary", 10107, false},
295 {"NotPrimaryNoSecondaryOk", 13435, false},
296 }
297 for _, tc := range testCases {
298 mt.RunOpts(fmt.Sprintf("command error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
299 appName := fmt.Sprintf("command_error_%s", tc.name)
300
301
302 mt.SetFailPoint(mtest.FailPoint{
303 ConfigureFailPoint: "failCommand",
304 Mode: mtest.FailPointMode{
305 Times: 1,
306 },
307 Data: mtest.FailPointData{
308 FailCommands: []string{"insert"},
309 ErrorCode: tc.errorCode,
310 AppName: appName,
311 },
312 })
313
314
315 tpm := eventtest.NewTestPoolMonitor()
316 mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
317
318 runServerErrorsTest(mt, tc.isShutdownError, tpm)
319 })
320 mt.RunOpts(fmt.Sprintf("write concern error - %s", tc.name), serverErrorsMtOpts, func(mt *mtest.T) {
321 appName := fmt.Sprintf("write_concern_error_%s", tc.name)
322
323
324 mt.SetFailPoint(mtest.FailPoint{
325 ConfigureFailPoint: "failCommand",
326 Mode: mtest.FailPointMode{
327 Times: 1,
328 },
329 Data: mtest.FailPointData{
330 FailCommands: []string{"insert"},
331 WriteConcernError: &mtest.WriteConcernErrorData{
332 Code: tc.errorCode,
333 },
334 AppName: appName,
335 },
336 })
337
338
339 tpm := eventtest.NewTestPoolMonitor()
340 mt.ResetClient(baseClientOpts().SetAppName(appName).SetPoolMonitor(tpm.PoolMonitor))
341
342 runServerErrorsTest(mt, tc.isShutdownError, tpm)
343 })
344 }
345 })
346 })
347 }
348
349 func runServerErrorsTest(mt *mtest.T, isShutdownError bool, tpm *eventtest.TestPoolMonitor) {
350 mt.Helper()
351
352 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
353 assert.NotNil(mt, err, "expected InsertOne error, got nil")
354
355
356 if isShutdownError {
357 assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared, but was not")
358 return
359 }
360
361
362 wantCleared := mtest.CompareServerVersions(mtest.ServerVersion(), "4.2") < 0
363 gotCleared := tpm.IsPoolCleared()
364 assert.Equal(mt, wantCleared, gotCleared, "expected pool to be cleared: %t; pool was cleared: %t",
365 wantCleared, gotCleared)
366 }
367
View as plain text