1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "errors"
12 "testing"
13 "time"
14
15 "go.mongodb.org/mongo-driver/bson"
16 "go.mongodb.org/mongo-driver/event"
17 "go.mongodb.org/mongo-driver/internal/assert"
18 "go.mongodb.org/mongo-driver/internal/eventtest"
19 "go.mongodb.org/mongo-driver/internal/require"
20 "go.mongodb.org/mongo-driver/mongo"
21 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
22 "go.mongodb.org/mongo-driver/mongo/options"
23 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
24 "go.mongodb.org/mongo-driver/x/mongo/driver"
25 )
26
27
28
29 func TestCSOT(t *testing.T) {
30 mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
31
32 testCases := []struct {
33 desc string
34 commandName string
35 setup func(coll *mongo.Collection) error
36 operation func(ctx context.Context, coll *mongo.Collection) error
37 topologies []mtest.TopologyKind
38
39
40
41
42 sendsMaxTimeMSWithTimeoutMS bool
43
44
45
46
47 sendsMaxTimeMSWithContextDeadline bool
48
49
50
51
52 preventsConnClosureWithTimeoutMS bool
53 }{
54 {
55 desc: "FindOne",
56 commandName: "find",
57 setup: func(coll *mongo.Collection) error {
58 _, err := coll.InsertOne(context.Background(), bson.D{})
59 return err
60 },
61 operation: func(ctx context.Context, coll *mongo.Collection) error {
62 return coll.FindOne(ctx, bson.D{}).Err()
63 },
64 sendsMaxTimeMSWithTimeoutMS: true,
65 sendsMaxTimeMSWithContextDeadline: true,
66 preventsConnClosureWithTimeoutMS: true,
67 },
68 {
69 desc: "Find",
70 commandName: "find",
71 setup: func(coll *mongo.Collection) error {
72 _, err := coll.InsertOne(context.Background(), bson.D{})
73 return err
74 },
75 operation: func(ctx context.Context, coll *mongo.Collection) error {
76 _, err := coll.Find(ctx, bson.D{})
77 return err
78 },
79 sendsMaxTimeMSWithTimeoutMS: true,
80 sendsMaxTimeMSWithContextDeadline: false,
81 preventsConnClosureWithTimeoutMS: true,
82 },
83 {
84 desc: "FindOneAndDelete",
85 commandName: "findAndModify",
86 setup: func(coll *mongo.Collection) error {
87 _, err := coll.InsertOne(context.Background(), bson.D{})
88 return err
89 },
90 operation: func(ctx context.Context, coll *mongo.Collection) error {
91 return coll.FindOneAndDelete(ctx, bson.D{}).Err()
92 },
93 sendsMaxTimeMSWithTimeoutMS: true,
94 sendsMaxTimeMSWithContextDeadline: true,
95 preventsConnClosureWithTimeoutMS: true,
96 },
97 {
98 desc: "FindOneAndUpdate",
99 commandName: "findAndModify",
100 setup: func(coll *mongo.Collection) error {
101 _, err := coll.InsertOne(context.Background(), bson.D{})
102 return err
103 },
104 operation: func(ctx context.Context, coll *mongo.Collection) error {
105 return coll.FindOneAndUpdate(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}}).Err()
106 },
107 sendsMaxTimeMSWithTimeoutMS: true,
108 sendsMaxTimeMSWithContextDeadline: true,
109 preventsConnClosureWithTimeoutMS: true,
110 },
111 {
112 desc: "FindOneAndReplace",
113 commandName: "findAndModify",
114 setup: func(coll *mongo.Collection) error {
115 _, err := coll.InsertOne(context.Background(), bson.D{})
116 return err
117 },
118 operation: func(ctx context.Context, coll *mongo.Collection) error {
119 return coll.FindOneAndReplace(ctx, bson.D{}, bson.D{}).Err()
120 },
121 sendsMaxTimeMSWithTimeoutMS: true,
122 sendsMaxTimeMSWithContextDeadline: true,
123 preventsConnClosureWithTimeoutMS: true,
124 },
125 {
126 desc: "InsertOne",
127 commandName: "insert",
128 operation: func(ctx context.Context, coll *mongo.Collection) error {
129 _, err := coll.InsertOne(ctx, bson.D{})
130 return err
131 },
132 sendsMaxTimeMSWithTimeoutMS: true,
133 sendsMaxTimeMSWithContextDeadline: true,
134 preventsConnClosureWithTimeoutMS: true,
135 },
136 {
137 desc: "InsertMany",
138 commandName: "insert",
139 operation: func(ctx context.Context, coll *mongo.Collection) error {
140 _, err := coll.InsertMany(ctx, []interface{}{bson.D{}})
141 return err
142 },
143 sendsMaxTimeMSWithTimeoutMS: true,
144 sendsMaxTimeMSWithContextDeadline: true,
145 preventsConnClosureWithTimeoutMS: true,
146 },
147 {
148 desc: "UpdateOne",
149 commandName: "update",
150 operation: func(ctx context.Context, coll *mongo.Collection) error {
151 _, err := coll.UpdateOne(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}})
152 return err
153 },
154 sendsMaxTimeMSWithTimeoutMS: true,
155 sendsMaxTimeMSWithContextDeadline: true,
156 preventsConnClosureWithTimeoutMS: true,
157 },
158 {
159 desc: "UpdateMany",
160 commandName: "update",
161 operation: func(ctx context.Context, coll *mongo.Collection) error {
162 _, err := coll.UpdateMany(ctx, bson.D{}, bson.M{"$set": bson.M{"key": "value"}})
163 return err
164 },
165 sendsMaxTimeMSWithTimeoutMS: true,
166 sendsMaxTimeMSWithContextDeadline: true,
167 preventsConnClosureWithTimeoutMS: true,
168 },
169 {
170 desc: "ReplaceOne",
171 commandName: "update",
172 operation: func(ctx context.Context, coll *mongo.Collection) error {
173 _, err := coll.ReplaceOne(ctx, bson.D{}, bson.D{})
174 return err
175 },
176 sendsMaxTimeMSWithTimeoutMS: true,
177 sendsMaxTimeMSWithContextDeadline: true,
178 preventsConnClosureWithTimeoutMS: true,
179 },
180 {
181 desc: "DeleteOne",
182 commandName: "delete",
183 operation: func(ctx context.Context, coll *mongo.Collection) error {
184 _, err := coll.DeleteOne(ctx, bson.D{})
185 return err
186 },
187 sendsMaxTimeMSWithTimeoutMS: true,
188 sendsMaxTimeMSWithContextDeadline: true,
189 preventsConnClosureWithTimeoutMS: true,
190 },
191 {
192 desc: "DeleteMany",
193 commandName: "delete",
194 operation: func(ctx context.Context, coll *mongo.Collection) error {
195 _, err := coll.DeleteMany(ctx, bson.D{})
196 return err
197 },
198 sendsMaxTimeMSWithTimeoutMS: true,
199 sendsMaxTimeMSWithContextDeadline: true,
200 preventsConnClosureWithTimeoutMS: true,
201 },
202 {
203 desc: "Distinct",
204 commandName: "distinct",
205 operation: func(ctx context.Context, coll *mongo.Collection) error {
206 _, err := coll.Distinct(ctx, "name", bson.D{})
207 return err
208 },
209 sendsMaxTimeMSWithTimeoutMS: true,
210 sendsMaxTimeMSWithContextDeadline: true,
211 preventsConnClosureWithTimeoutMS: true,
212 },
213 {
214 desc: "Aggregate",
215 commandName: "aggregate",
216 operation: func(ctx context.Context, coll *mongo.Collection) error {
217 _, err := coll.Aggregate(ctx, mongo.Pipeline{})
218 return err
219 },
220 sendsMaxTimeMSWithTimeoutMS: true,
221 sendsMaxTimeMSWithContextDeadline: false,
222 preventsConnClosureWithTimeoutMS: true,
223 },
224 {
225 desc: "Watch",
226 commandName: "aggregate",
227 operation: func(ctx context.Context, coll *mongo.Collection) error {
228 cs, err := coll.Watch(ctx, mongo.Pipeline{})
229 if cs != nil {
230 cs.Close(context.Background())
231 }
232 return err
233 },
234 sendsMaxTimeMSWithTimeoutMS: true,
235 sendsMaxTimeMSWithContextDeadline: true,
236 preventsConnClosureWithTimeoutMS: true,
237
238 topologies: []mtest.TopologyKind{
239 mtest.ReplicaSet,
240 mtest.Sharded,
241 },
242 },
243 {
244 desc: "Cursor getMore",
245 commandName: "getMore",
246 setup: func(coll *mongo.Collection) error {
247 _, err := coll.InsertMany(context.Background(), []interface{}{bson.D{}, bson.D{}})
248 return err
249 },
250 operation: func(ctx context.Context, coll *mongo.Collection) error {
251 cursor, err := coll.Find(ctx, bson.D{}, options.Find().SetBatchSize(1))
252 if err != nil {
253 return err
254 }
255 var res []bson.D
256 return cursor.All(ctx, &res)
257 },
258 sendsMaxTimeMSWithTimeoutMS: false,
259 sendsMaxTimeMSWithContextDeadline: false,
260 preventsConnClosureWithTimeoutMS: false,
261 },
262 }
263
264
265
266 getStartedEvent := func(mt *mtest.T, command string) *event.CommandStartedEvent {
267 for {
268 evt := mt.GetStartedEvent()
269 if evt == nil {
270 break
271 }
272 _, err := evt.Command.LookupErr(command)
273 if errors.Is(err, bsoncore.ErrElementNotFound) {
274 continue
275 }
276 return evt
277 }
278
279 mt.Errorf("could not find command started event for command %q", command)
280 mt.FailNow()
281 return nil
282 }
283
284
285
286 assertMaxTimeMSIsSet := func(mt *mtest.T, command bson.Raw) {
287 mt.Helper()
288
289 maxTimeVal := command.Lookup("maxTimeMS")
290
291 require.Greater(mt,
292 len(maxTimeVal.Value),
293 0,
294 "expected maxTimeMS BSON value to be non-empty")
295 require.Equal(mt,
296 maxTimeVal.Type,
297 bson.TypeInt64,
298 "expected maxTimeMS BSON value to be type Int64")
299 assert.Greater(mt,
300 maxTimeVal.Int64(),
301 int64(0),
302 "expected maxTimeMS value to be greater than 0")
303 }
304
305
306
307 assertMaxTimeMSNotSet := func(mt *mtest.T, command bson.Raw) {
308 mt.Helper()
309
310 _, err := command.LookupErr("maxTimeMS")
311 assert.ErrorIs(mt,
312 err,
313 bsoncore.ErrElementNotFound,
314 "expected maxTimeMS BSON value to be missing, but is present")
315 }
316
317 for _, tc := range testCases {
318 mt.RunOpts(tc.desc, mtest.NewOptions().Topologies(tc.topologies...), func(mt *mtest.T) {
319 mt.Run("maxTimeMS", func(mt *mtest.T) {
320 mt.Run("timeoutMS not set", func(mt *mtest.T) {
321 if tc.setup != nil {
322 err := tc.setup(mt.Coll)
323 require.NoError(mt, err)
324 }
325
326 err := tc.operation(context.Background(), mt.Coll)
327 require.NoError(mt, err)
328
329 evt := getStartedEvent(mt, tc.commandName)
330 assertMaxTimeMSNotSet(mt, evt.Command)
331 })
332
333 csotOpts := mtest.NewOptions().ClientOptions(options.Client().SetTimeout(10 * time.Second))
334 mt.RunOpts("timeoutMS and context.Background", csotOpts, func(mt *mtest.T) {
335 if tc.setup != nil {
336 err := tc.setup(mt.Coll)
337 require.NoError(mt, err)
338 }
339
340 err := tc.operation(context.Background(), mt.Coll)
341 require.NoError(mt, err)
342
343 evt := getStartedEvent(mt, tc.commandName)
344 if tc.sendsMaxTimeMSWithTimeoutMS {
345 assertMaxTimeMSIsSet(mt, evt.Command)
346 } else {
347 assertMaxTimeMSNotSet(mt, evt.Command)
348 }
349 })
350
351 mt.RunOpts("timeoutMS and Context with deadline", csotOpts, func(mt *mtest.T) {
352 if tc.setup != nil {
353 err := tc.setup(mt.Coll)
354 require.NoError(mt, err)
355 }
356
357 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
358 defer cancel()
359
360 err := tc.operation(ctx, mt.Coll)
361 require.NoError(mt, err)
362
363 evt := getStartedEvent(mt, tc.commandName)
364 if tc.sendsMaxTimeMSWithContextDeadline {
365 assertMaxTimeMSIsSet(mt, evt.Command)
366 } else {
367 assertMaxTimeMSNotSet(mt, evt.Command)
368 }
369 })
370 })
371
372 if tc.preventsConnClosureWithTimeoutMS {
373 opts := mtest.NewOptions().
374
375 Topologies(mtest.Single, mtest.ReplicaSet).
376 MinServerVersion("4.2")
377 mt.RunOpts("prevents connection closure with timeoutMS", opts, func(mt *mtest.T) {
378 if tc.setup != nil {
379 err := tc.setup(mt.Coll)
380 require.NoError(mt, err)
381 }
382
383 mt.SetFailPoint(mtest.FailPoint{
384 ConfigureFailPoint: "failCommand",
385 Mode: "alwaysOn",
386 Data: mtest.FailPointData{
387 FailCommands: []string{tc.commandName},
388 BlockConnection: true,
389 BlockTimeMS: 500,
390 },
391 })
392
393 tpm := eventtest.NewTestPoolMonitor()
394 mt.ResetClient(options.Client().
395 SetPoolMonitor(tpm.PoolMonitor))
396
397
398
399
400 for i := 0; i < 5; i++ {
401 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond)
402 err := tc.operation(ctx, mt.Coll)
403 cancel()
404
405 if !mongo.IsTimeout(err) {
406 t.Logf("CSOT-disabled operation %d returned a non-timeout error: %v", i, err)
407 }
408 }
409
410 closedEvents := tpm.Events(func(pe *event.PoolEvent) bool {
411 return pe.Type == event.ConnectionClosed
412 })
413 assert.Greater(mt,
414 len(closedEvents),
415 0,
416 "expected more than 0 connection closed events")
417
418 tpm = eventtest.NewTestPoolMonitor()
419 mt.ResetClient(options.Client().
420 SetPoolMonitor(tpm.PoolMonitor).
421 SetTimeout(10 * time.Second))
422
423
424
425 for i := 0; i < 5; i++ {
426 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond)
427 err := tc.operation(ctx, mt.Coll)
428 cancel()
429
430 if !mongo.IsTimeout(err) {
431 t.Logf("CSOT-enabled operation %d returned a non-timeout error: %v", i, err)
432 }
433 }
434
435 closedEvents = tpm.Events(func(pe *event.PoolEvent) bool {
436 return pe.Type == event.ConnectionClosed
437 })
438 assert.Len(mt, closedEvents, 0, "expected no connection closed event")
439 })
440 }
441 })
442 }
443
444 csotOpts := mtest.NewOptions().ClientOptions(options.Client().SetTimeout(10 * time.Second))
445 mt.RunOpts("maxTimeMS is omitted for values greater than 2147483647ms", csotOpts, func(mt *mtest.T) {
446 ctx, cancel := context.WithTimeout(context.Background(), (2147483647+1000)*time.Millisecond)
447 defer cancel()
448 _, err := mt.Coll.InsertOne(ctx, bson.D{})
449 require.NoError(t, err)
450
451 evt := mt.GetStartedEvent()
452 _, err = evt.Command.LookupErr("maxTimeMS")
453 assert.ErrorIs(mt,
454 err,
455 bsoncore.ErrElementNotFound,
456 "expected maxTimeMS BSON value to be missing, but is present")
457 })
458 }
459
460 func TestCSOT_errors(t *testing.T) {
461 mt := mtest.New(t, mtest.NewOptions().
462 CreateClient(false).
463
464 Topologies(mtest.Single, mtest.ReplicaSet).
465 MinServerVersion("4.2").
466
467 ClientOptions(options.Client().SetTimeout(10*time.Second)))
468
469
470
471
472 mt.Run("MaxTimeMSExceeded wraps context.DeadlineExceeded", func(mt *mtest.T) {
473 _, err := mt.Coll.InsertOne(context.Background(), bson.D{})
474 require.NoError(mt, err, "InsertOne error")
475
476 mt.SetFailPoint(mtest.FailPoint{
477 ConfigureFailPoint: "failCommand",
478 Mode: mtest.FailPointMode{
479 Times: 1,
480 },
481 Data: mtest.FailPointData{
482 FailCommands: []string{"find"},
483 ErrorCode: 50,
484 },
485 })
486
487 err = mt.Coll.FindOne(context.Background(), bson.D{}).Err()
488
489 assert.True(mt,
490 errors.Is(err, context.DeadlineExceeded),
491 "expected error %[1]T(%[1]q) to wrap context.DeadlineExceeded",
492 err)
493 assert.True(mt,
494 mongo.IsTimeout(err),
495 "expected error %[1]T(%[1]q) to be a timeout error",
496 err)
497 })
498
499
500
501
502 mt.Run("Context timeout wraps context.DeadlineExceeded", func(mt *mtest.T) {
503 _, err := mt.Coll.InsertOne(context.Background(), bson.D{})
504 require.NoError(mt, err, "InsertOne error")
505
506 mt.SetFailPoint(mtest.FailPoint{
507 ConfigureFailPoint: "failCommand",
508 Mode: mtest.FailPointMode{
509 Times: 1,
510 },
511 Data: mtest.FailPointData{
512 FailCommands: []string{"find"},
513 BlockConnection: true,
514 BlockTimeMS: 500,
515 },
516 })
517
518 ctx, cancel := context.WithTimeout(context.Background(), 15*time.Millisecond)
519 defer cancel()
520 err = mt.Coll.FindOne(ctx, bson.D{}).Err()
521
522 assert.False(mt,
523 errors.Is(err, driver.ErrDeadlineWouldBeExceeded),
524 "expected error %[1]T(%[1]q) to not wrap driver.ErrDeadlineWouldBeExceeded",
525 err)
526 assert.True(mt,
527 errors.Is(err, context.DeadlineExceeded),
528 "expected error %[1]T(%[1]q) to wrap context.DeadlineExceeded",
529 err)
530 assert.True(mt,
531 mongo.IsTimeout(err),
532 "expected error %[1]T(%[1]q) to be a timeout error",
533 err)
534 })
535
536 mt.Run("timeoutMS timeout wraps context.DeadlineExceeded", func(mt *mtest.T) {
537 _, err := mt.Coll.InsertOne(context.Background(), bson.D{})
538 require.NoError(mt, err, "InsertOne error")
539
540 mt.SetFailPoint(mtest.FailPoint{
541 ConfigureFailPoint: "failCommand",
542 Mode: mtest.FailPointMode{
543 Times: 1,
544 },
545 Data: mtest.FailPointData{
546 FailCommands: []string{"find"},
547 BlockConnection: true,
548 BlockTimeMS: 100,
549 },
550 })
551
552
553
554 mt.ResetClient(options.Client().SetTimeout(10 * time.Millisecond))
555 defer mt.ResetClient(options.Client())
556 err = mt.Coll.FindOne(context.Background(), bson.D{}).Err()
557
558 assert.False(mt,
559 errors.Is(err, driver.ErrDeadlineWouldBeExceeded),
560 "expected error %[1]T(%[1]q) to not wrap driver.ErrDeadlineWouldBeExceeded",
561 err)
562 assert.True(mt,
563 errors.Is(err, context.DeadlineExceeded),
564 "expected error %[1]T(%[1]q) to wrap context.DeadlineExceeded",
565 err)
566 assert.True(mt,
567 mongo.IsTimeout(err),
568 "expected error %[1]T(%[1]q) to be a timeout error",
569 err)
570 })
571 }
572
View as plain text