1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "strings"
12 "sync"
13 "testing"
14 "time"
15
16 "go.mongodb.org/mongo-driver/bson"
17 "go.mongodb.org/mongo-driver/bson/primitive"
18 "go.mongodb.org/mongo-driver/event"
19 "go.mongodb.org/mongo-driver/internal/assert"
20 "go.mongodb.org/mongo-driver/internal/eventtest"
21 "go.mongodb.org/mongo-driver/internal/require"
22 "go.mongodb.org/mongo-driver/mongo"
23 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
24 "go.mongodb.org/mongo-driver/mongo/options"
25 )
26
27 type resumeType int
28 type streamType int
29
30 const (
31 minChangeStreamVersion = "3.6.0"
32 minPbrtVersion = "4.0.7"
33 minStartAfterVersion = "4.1.1"
34
35 startAfter resumeType = iota
36 resumeAfter
37 operationTime
38
39 client streamType = iota
40 database
41 collection
42
43 errorInterrupted int32 = 11601
44 errorHostUnreachable int32 = 6
45
46 resumableChangeStreamError = "ResumableChangeStreamError"
47 )
48
49 func TestChangeStream_Standalone(t *testing.T) {
50 mtOpts := mtest.NewOptions().MinServerVersion(minChangeStreamVersion).CreateClient(false).Topologies(mtest.Single)
51 mt := mtest.New(t, mtOpts)
52
53 mt.Run("no custom standalone error", func(mt *mtest.T) {
54 _, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
55 _, ok := err.(mongo.CommandError)
56 assert.True(mt, ok, "expected error type %T, got %T", mongo.CommandError{}, err)
57 })
58 }
59
60 func TestChangeStream_ReplicaSet(t *testing.T) {
61 mtOpts := mtest.NewOptions().MinServerVersion(minChangeStreamVersion).CreateClient(false).Topologies(mtest.ReplicaSet)
62 mt := mtest.New(t, mtOpts)
63
64 mt.Run("first stage is $changeStream", func(mt *mtest.T) {
65
66
67 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
68 assert.Nil(mt, err, "Watch error: %v", err)
69 defer closeStream(cs)
70 started := mt.GetStartedEvent()
71 assert.NotNil(mt, started, "expected started event for aggregate, got nil")
72
73
74 firstStage := started.Command.Lookup("pipeline").Array().Index(0).Value().Document()
75 elems, _ := firstStage.Elements()
76 assert.Equal(mt, 1, len(elems), "expected first stage document to have 1 element, got %v", len(elems))
77 firstKey := elems[0].Key()
78 want := "$changeStream"
79 assert.Equal(mt, want, firstKey, "expected first stage to be %v, got %v", want, firstKey)
80 })
81 mt.Run("track resume token", func(mt *mtest.T) {
82
83
84 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
85 assert.Nil(mt, err, "Watch error: %v", err)
86 defer closeStream(cs)
87
88 generateEvents(mt, 1)
89 assert.True(mt, cs.Next(context.Background()), "expected next to return true, got false")
90 assert.NotNil(mt, cs.ResumeToken(), "expected resume token, got nil")
91 })
92 mt.RunOpts("resume token updated on empty batch", mtest.NewOptions().MinServerVersion("4.0.7"), func(mt *mtest.T) {
93
94
95 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
96 assert.Nil(mt, err, "Watch error: %v", err)
97 defer closeStream(cs)
98
99
100 generateEvents(mt, 1)
101 assert.True(mt, cs.Next(context.Background()), "expected next to return true, got false")
102 firstToken := cs.ResumeToken()
103
104
105 diffColl := mt.CreateCollection(mtest.Collection{Name: "diffCollUpdatePbrt"}, false)
106 _, err = diffColl.InsertOne(context.Background(), bson.D{{"x", 1}})
107 assert.Nil(mt, err, "InsertOne error: %v", err)
108
109
110 mt.ClearEvents()
111 assert.False(mt, cs.TryNext(context.Background()), "unexpected event document: %v", cs.Current)
112 assert.Nil(mt, cs.Err(), "change stream error getting new batch: %v", cs.Err())
113 newToken := cs.ResumeToken()
114 assert.NotEqual(mt, newToken, firstToken, "resume token was not updated after an empty batch was returned")
115
116 evt := mt.GetSucceededEvent()
117 assert.Equal(mt, "getMore", evt.CommandName, "expected event for 'getMore', got '%v'", evt.CommandName)
118 getMorePbrt := evt.Reply.Lookup("cursor", "postBatchResumeToken").Document()
119 assert.Equal(mt, newToken, getMorePbrt, "expected resume token %v, got %v", getMorePbrt, newToken)
120 })
121 mt.Run("missing resume token", func(mt *mtest.T) {
122
123
124 projectDoc := bson.D{
125 {"$project", bson.D{
126 {"_id", 0},
127 }},
128 }
129 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{projectDoc})
130 assert.Nil(mt, err, "Watch error: %v", err)
131 defer closeStream(cs)
132
133 generateEvents(mt, 2)
134 assert.False(mt, cs.Next(context.Background()), "expected Next to return false, got true")
135 assert.NotNil(mt, cs.Err(), "expected error, got nil")
136 })
137 mt.RunOpts("resume once", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
138
139
140
141
142
143
144
145 ns := mt.Coll.Database().Name() + "." + mt.Coll.Name()
146 aggregateRes := mtest.CreateCursorResponse(1, ns, mtest.FirstBatch, bson.D{
147 {"_id", bson.D{{"first", "resume token"}}},
148 })
149 failureGetMoreRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
150 Code: errorHostUnreachable,
151 Name: "foo",
152 Message: "bar",
153 Labels: []string{resumableChangeStreamError},
154 })
155 killCursorsRes := mtest.CreateSuccessResponse()
156 newResumeToken := bson.D{{"second", "resume token"}}
157 resumedAggregateRes := mtest.CreateCursorResponse(2, ns, mtest.FirstBatch, bson.D{
158 {"_id", newResumeToken},
159 })
160 mt.AddMockResponses(
161 aggregateRes,
162 failureGetMoreRes,
163 killCursorsRes,
164 resumedAggregateRes,
165 )
166
167 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
168 assert.Nil(mt, err, "Watch error: %v", err)
169 defer closeStream(cs)
170
171 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
172
173
174 mt.ClearEvents()
175 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
176
177
178 assert.NotNil(mt, mt.GetStartedEvent(), "expected getMore event, got nil")
179 assert.NotNil(mt, mt.GetStartedEvent(), "expected killCursors event, got nil")
180 aggEvent := mt.GetStartedEvent()
181 assert.NotNil(mt, aggEvent, "expected aggregate event, got nil")
182 assert.Equal(mt, "aggregate", aggEvent.CommandName, "expected command name 'aggregate', got '%v'", aggEvent.CommandName)
183
184
185 assert.Equal(mt, cs.ID(), int64(2), "expected change stream ID to be 2, got %d", cs.ID())
186 newResumeTokenRaw, err := bson.Marshal(newResumeToken)
187 assert.Nil(mt, err, "Marshal error: %v", err)
188 comparisonErr := compareDocs(mt, newResumeTokenRaw, cs.ResumeToken())
189 assert.Nil(mt, comparisonErr, "expected resume token %s, got %s", newResumeTokenRaw, cs.ResumeToken())
190 })
191 mt.RunOpts("no resume for aggregate errors", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
192
193
194
195
196
197
198 ns := mt.Coll.Database().Name() + "." + mt.Coll.Name()
199 aggRes := mtest.CreateCursorResponse(1, ns, mtest.FirstBatch)
200 getMoreRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
201 Code: errorHostUnreachable,
202 Name: "foo",
203 Message: "bar",
204 Labels: []string{resumableChangeStreamError},
205 })
206 killCursorsRes := mtest.CreateSuccessResponse()
207 resumedAggRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
208 Code: errorHostUnreachable,
209 Name: "foo",
210 Message: "bar",
211 Labels: []string{resumableChangeStreamError},
212 })
213 mt.AddMockResponses(aggRes, getMoreRes, killCursorsRes, resumedAggRes)
214
215 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
216 assert.Nil(mt, err, "Watch error: %v", err)
217 defer closeStream(cs)
218
219 assert.False(mt, cs.Next(context.Background()), "expected Next to return false, got true")
220 })
221 mt.RunOpts("server selection before resume", mtest.NewOptions().CreateClient(false), func(mt *mtest.T) {
222
223 mt.Skip("skipping for lack of SDAM monitoring")
224 })
225 mt.Run("empty batch cursor not closed", func(mt *mtest.T) {
226
227
228 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
229 assert.Nil(mt, err, "Watch error: %v", err)
230 defer closeStream(cs)
231 assert.True(mt, cs.ID() > 0, "expected non-zero ID, got 0")
232 })
233 mt.RunOpts("ignore errors from killCursors", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
234
235
236 ns := mt.Coll.Database().Name() + "." + mt.Coll.Name()
237 aggRes := mtest.CreateCursorResponse(1, ns, mtest.FirstBatch)
238 getMoreRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
239 Code: errorHostUnreachable,
240 Name: "foo",
241 Message: "bar",
242 Labels: []string{"ResumableChangeStreamError"},
243 })
244 killCursorsRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
245 Code: errorInterrupted,
246 Name: "foo",
247 Message: "bar",
248 })
249 changeDoc := bson.D{{"_id", bson.D{{"x", 1}}}}
250 resumedAggRes := mtest.CreateCursorResponse(1, ns, mtest.FirstBatch, changeDoc)
251 mt.AddMockResponses(aggRes, getMoreRes, killCursorsRes, resumedAggRes)
252
253 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
254 assert.Nil(mt, err, "Watch error: %v", err)
255 defer closeStream(cs)
256
257 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
258 assert.Nil(mt, cs.Err(), "change stream error: %v", cs.Err())
259 })
260
261 startAtOpTimeOpts := mtest.NewOptions().MinServerVersion("4.0").MaxServerVersion("4.0.6")
262 mt.RunOpts("include startAtOperationTime", startAtOpTimeOpts, func(mt *mtest.T) {
263
264
265
266 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
267 assert.Nil(mt, err, "Watch error: %v", err)
268 defer closeStream(cs)
269
270 generateEvents(mt, 1)
271
272 killChangeStreamCursor(mt, cs)
273
274 mt.ClearEvents()
275
276 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
277
278 assert.NotNil(mt, mt.GetStartedEvent(), "expected getMore event, got nil")
279 assert.NotNil(mt, mt.GetStartedEvent(), "expected killCursors event, got nil")
280 aggEvent := mt.GetStartedEvent()
281 assert.NotNil(mt, aggEvent, "expected aggregate event, got nil")
282 assert.Equal(mt, "aggregate", aggEvent.CommandName, "expected command name 'aggregate', got '%v'", aggEvent.CommandName)
283
284
285 csStage := aggEvent.Command.Lookup("pipeline").Array().Index(0).Value().Document()
286 _, err = csStage.Lookup("$changeStream").Document().LookupErr("startAtOperationTime")
287 assert.Nil(mt, err, "startAtOperationTime not included in aggregate command")
288 })
289 mt.RunOpts("decode does not panic", noClientOpts, func(mt *mtest.T) {
290 testCases := []struct {
291 name string
292 st streamType
293 minServerVersion string
294 }{
295 {"client", client, "4.0"},
296 {"database", database, "4.0"},
297 {"collection", collection, ""},
298 }
299 for _, tc := range testCases {
300 tcOpts := mtest.NewOptions()
301 if tc.minServerVersion != "" {
302 tcOpts.MinServerVersion(tc.minServerVersion)
303 }
304 mt.RunOpts(tc.name, tcOpts, func(mt *mtest.T) {
305 var cs *mongo.ChangeStream
306 var err error
307 switch tc.st {
308 case client:
309 cs, err = mt.Client.Watch(context.Background(), mongo.Pipeline{})
310 case database:
311 cs, err = mt.DB.Watch(context.Background(), mongo.Pipeline{})
312 case collection:
313 cs, err = mt.Coll.Watch(context.Background(), mongo.Pipeline{})
314 }
315 assert.Nil(mt, err, "Watch error: %v", err)
316 defer closeStream(cs)
317
318 generateEvents(mt, 1)
319 assert.True(mt, cs.Next(context.Background()), "expected Next true, got false")
320 var res bson.D
321 err = cs.Decode(&res)
322 assert.Nil(mt, err, "Decode error: %v", err)
323 assert.True(mt, len(res) > 0, "expected non-empty document, got empty")
324 })
325 }
326 })
327 mt.Run("maxAwaitTimeMS", func(mt *mtest.T) {
328
329
330 opts := options.ChangeStream().SetMaxAwaitTime(100 * time.Millisecond)
331 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
332 assert.Nil(mt, err, "Watch error: %v", err)
333 defer closeStream(cs)
334
335 _, err = mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
336 assert.Nil(mt, err, "InsertOne error: %v", err)
337 mt.ClearEvents()
338 assert.True(mt, cs.Next(context.Background()), "expected Next true, got false")
339
340 e := mt.GetStartedEvent()
341 assert.NotNil(mt, e, "expected getMore event, got nil")
342 _, err = e.Command.LookupErr("maxTimeMS")
343 assert.Nil(mt, err, "field maxTimeMS not found in command %v", e.Command)
344 })
345 mt.RunOpts("resume token", noClientOpts, func(mt *mtest.T) {
346
347 mt.RunOpts("no getMore", noClientOpts, func(mt *mtest.T) {
348 pbrtOpts := mtest.NewOptions().MinServerVersion(minPbrtVersion).CreateClient(false)
349 mt.RunOpts("with PBRT support", pbrtOpts, func(mt *mtest.T) {
350 testCases := []struct {
351 name string
352 rt resumeType
353 minServerVersion string
354 }{
355 {"startAfter", startAfter, minStartAfterVersion},
356 {"resumeAfter", resumeAfter, minPbrtVersion},
357 {"neither", operationTime, minPbrtVersion},
358 }
359
360 for _, tc := range testCases {
361 tcOpts := mtest.NewOptions().MinServerVersion(tc.minServerVersion)
362 mt.RunOpts(tc.name, tcOpts, func(mt *mtest.T) {
363
364 mt.ClearEvents()
365 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
366 assert.Nil(mt, err, "Watch error: %v", err)
367
368
369 pbrt, opTime := getAggregateResponseInfo(mt)
370 compareResumeTokens(mt, cs, pbrt)
371
372 numEvents := 5
373 generateEvents(mt, numEvents)
374
375
376 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
377 token := cs.ResumeToken()
378 closeStream(cs)
379
380 var numExpectedEvents int
381 var initialToken bson.Raw
382 var opts *options.ChangeStreamOptions
383 switch tc.rt {
384 case startAfter:
385 numExpectedEvents = numEvents - 1
386 initialToken = token
387 opts = options.ChangeStream().SetStartAfter(token)
388 case resumeAfter:
389 numExpectedEvents = numEvents - 1
390 initialToken = token
391 opts = options.ChangeStream().SetResumeAfter(token)
392 case operationTime:
393 numExpectedEvents = numEvents
394 opts = options.ChangeStream().SetStartAtOperationTime(&opTime)
395 }
396
397
398 mt.ClearEvents()
399 cs, err = mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
400 assert.Nil(mt, err, "Watch error: %v", err)
401 defer closeStream(cs)
402
403 aggPbrt, _ := getAggregateResponseInfo(mt)
404 compareResumeTokens(mt, cs, initialToken)
405
406 for i := 0; i < numExpectedEvents; i++ {
407 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
408
409
410 if i != numExpectedEvents-1 {
411 compareResumeTokens(mt, cs, cs.Current.Lookup("_id").Document())
412 }
413 }
414
415 compareResumeTokens(mt, cs, aggPbrt)
416 })
417 }
418 })
419
420 noPbrtOpts := mtest.NewOptions().MaxServerVersion("4.0.6")
421 mt.RunOpts("without PBRT support", noPbrtOpts, func(mt *mtest.T) {
422 collName := mt.Coll.Name()
423 dbName := mt.Coll.Database().Name()
424 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
425 assert.Nil(mt, err, "Watch error: %v", err)
426 defer closeStream(cs)
427
428 compareResumeTokens(mt, cs, nil)
429 numEvents := 5
430 generateEvents(mt, numEvents)
431
432 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
433 token := cs.ResumeToken()
434 assert.NotNil(mt, token, "expected resume token, got nil")
435
436 testCases := []struct {
437 name string
438 opts *options.ChangeStreamOptions
439 iterateStream bool
440 initialToken bson.Raw
441 numDocsExpected int
442 }{
443 {"resumeAfter", options.ChangeStream().SetResumeAfter(token), true, token, numEvents - 1},
444 {"no options", nil, false, nil, 0},
445 }
446 for _, tc := range testCases {
447 mt.Run(tc.name, func(mt *mtest.T) {
448 coll := mt.Client.Database(dbName).Collection(collName)
449 cs, err := coll.Watch(context.Background(), mongo.Pipeline{}, tc.opts)
450 assert.Nil(mt, err, "Watch error: %v", err)
451 defer closeStream(cs)
452
453 compareResumeTokens(mt, cs, tc.initialToken)
454 if !tc.iterateStream {
455 return
456 }
457
458 for i := 0; i < tc.numDocsExpected; i++ {
459 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
460
461 compareResumeTokens(mt, cs, cs.Current.Lookup("_id").Document())
462 }
463 })
464 }
465 })
466 })
467 })
468 mt.RunOpts("try next", noClientOpts, func(mt *mtest.T) {
469 mt.Run("existing non-empty batch", func(mt *mtest.T) {
470
471
472 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
473 assert.Nil(mt, err, "Watch error: %v", err)
474 defer closeStream(cs)
475 generateEvents(mt, 5)
476
477 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
478 tryNextExistingBatchTest(mt, cs)
479 })
480 mt.Run("one getMore sent", func(mt *mtest.T) {
481
482
483 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
484 assert.Nil(mt, err, "Watch error: %v", err)
485 defer closeStream(cs)
486
487 mt.ClearEvents()
488
489
490
491 for i := 0; i < 2; i++ {
492 assert.False(mt, cs.TryNext(context.Background()), "TryNext returned true on iteration %v", i)
493 }
494 verifyOneGetmoreSent(mt)
495 })
496 mt.RunOpts("getMore error", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
497
498
499 aggRes := mtest.CreateCursorResponse(50, "foo.bar", mtest.FirstBatch)
500 mt.AddMockResponses(aggRes)
501 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
502 assert.Nil(mt, err, "Watch error: %v", err)
503 defer closeStream(cs)
504 tryNextGetmoreError(mt, cs)
505 })
506 })
507
508 customDeploymentOpts := mtest.NewOptions().
509 Topologies(mtest.ReplicaSet).
510 MinServerVersion("4.0").
511 CreateClient(false)
512 mt.RunOpts("custom deployment", customDeploymentOpts, func(mt *mtest.T) {
513
514
515
516
517 mt.Run("errors are processed for SDAM on initial aggregate", func(mt *mtest.T) {
518 tpm := eventtest.NewTestPoolMonitor()
519 mt.ResetClient(options.Client().
520 SetPoolMonitor(tpm.PoolMonitor).
521 SetWriteConcern(mtest.MajorityWc).
522 SetReadConcern(mtest.MajorityRc).
523 SetRetryReads(false))
524
525 mt.SetFailPoint(mtest.FailPoint{
526 ConfigureFailPoint: "failCommand",
527 Mode: mtest.FailPointMode{
528 Times: 1,
529 },
530 Data: mtest.FailPointData{
531 FailCommands: []string{"aggregate"},
532 CloseConnection: true,
533 },
534 })
535
536 _, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
537 assert.NotNil(mt, err, "expected Watch error, got nil")
538 assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
539 })
540 mt.Run("errors are processed for SDAM on getMore", func(mt *mtest.T) {
541 tpm := eventtest.NewTestPoolMonitor()
542 mt.ResetClient(options.Client().
543 SetPoolMonitor(tpm.PoolMonitor).
544 SetWriteConcern(mtest.MajorityWc).
545 SetReadConcern(mtest.MajorityRc).
546 SetRetryReads(false))
547
548 mt.SetFailPoint(mtest.FailPoint{
549 ConfigureFailPoint: "failCommand",
550 Mode: mtest.FailPointMode{
551 Times: 1,
552 },
553 Data: mtest.FailPointData{
554 FailCommands: []string{"getMore"},
555 CloseConnection: true,
556 },
557 })
558
559 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
560 assert.Nil(mt, err, "Watch error: %v", err)
561 defer closeStream(cs)
562
563 _, err = mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
564 assert.Nil(mt, err, "InsertOne error: %v", err)
565
566 assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false (iteration error %v)",
567 cs.Err())
568 assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
569 })
570 mt.Run("errors are processed for SDAM on retried aggregate", func(mt *mtest.T) {
571 tpm := eventtest.NewTestPoolMonitor()
572 mt.ResetClient(options.Client().
573 SetPoolMonitor(tpm.PoolMonitor).
574 SetRetryReads(true))
575
576 mt.SetFailPoint(mtest.FailPoint{
577 ConfigureFailPoint: "failCommand",
578 Mode: mtest.FailPointMode{
579 Times: 2,
580 },
581 Data: mtest.FailPointData{
582 FailCommands: []string{"aggregate"},
583 CloseConnection: true,
584 },
585 })
586
587 _, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
588 assert.NotNil(mt, err, "expected Watch error, got nil")
589
590 clearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
591 return evt.Type == event.PoolCleared
592 })
593 assert.Equal(mt, 2, len(clearedEvents), "expected two PoolCleared events, got %d", len(clearedEvents))
594 })
595 })
596
597 mt.RunOpts("call to cursor.Next after cursor closed", mtest.NewOptions().MinServerVersion("4.0"), func(mt *mtest.T) {
598 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
599 assert.Nil(mt, err, "Watch error: %v", err)
600 defer closeStream(cs)
601
602
603 generateEvents(mt, 5)
604
605 err = mt.Coll.Drop(context.Background())
606 assert.Nil(mt, err, "Drop error: %v", err)
607
608
609 for i := 0; i < 7; i++ {
610 assert.True(mt, cs.Next(context.Background()), "Next returned false at index %d; iteration error: %v", i, cs.Err())
611 }
612
613 operationType := cs.Current.Lookup("operationType").StringValue()
614 assert.Equal(mt, operationType, "invalidate", "expected invalidate event but returned %q event", operationType)
615
616 assert.False(mt, cs.Next(context.Background()), "expected to return false, but returned true")
617 })
618 mt.Run("getMore commands are monitored", func(mt *mtest.T) {
619 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
620 assert.Nil(mt, err, "Watch error: %v", err)
621 defer closeStream(cs)
622
623 _, err = mt.Coll.InsertOne(context.Background(), bson.M{"x": 1})
624 assert.Nil(mt, err, "InsertOne error: %v", err)
625
626 mt.ClearEvents()
627 assert.True(mt, cs.Next(context.Background()), "Next returned false with error %v", cs.Err())
628 evt := mt.GetStartedEvent()
629 assert.Equal(mt, "getMore", evt.CommandName, "expected command 'getMore', got %q", evt.CommandName)
630 })
631 mt.Run("killCursors commands are monitored", func(mt *mtest.T) {
632 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
633 assert.Nil(mt, err, "Watch error: %v", err)
634 defer closeStream(cs)
635
636 mt.ClearEvents()
637 err = cs.Close(context.Background())
638 assert.Nil(mt, err, "Close error: %v", err)
639 evt := mt.GetStartedEvent()
640 assert.Equal(mt, "killCursors", evt.CommandName, "expected command 'killCursors', got %q", evt.CommandName)
641 })
642 mt.Run("Custom", func(mt *mtest.T) {
643
644
645 customOpts := bson.M{"allowDiskUse": true}
646 opts := options.ChangeStream().SetCustom(customOpts)
647
648
649 mt.ClearEvents()
650 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
651 assert.Nil(mt, err, "Watch error: %v", err)
652 defer closeStream(cs)
653
654
655 evt := mt.GetStartedEvent()
656 assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate' got, %q", evt.CommandName)
657
658 aduVal, err := evt.Command.LookupErr("allowDiskUse")
659 assert.Nil(mt, err, "expected field 'allowDiskUse' in started command not found")
660 adu, ok := aduVal.BooleanOK()
661 assert.True(mt, ok, "expected field 'allowDiskUse' to be boolean, got %v", aduVal.Type.String())
662 assert.True(mt, adu, "expected field 'allowDiskUse' to be true, got false")
663 })
664 mt.RunOpts("CustomPipeline", mtest.NewOptions().MinServerVersion("4.0"), func(mt *mtest.T) {
665
666
667 customPipelineOpts := bson.M{"allChangesForCluster": false}
668 opts := options.ChangeStream().SetCustomPipeline(customPipelineOpts)
669
670
671 mt.ClearEvents()
672 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
673 assert.Nil(mt, err, "Watch error: %v", err)
674 defer closeStream(cs)
675
676
677 evt := mt.GetStartedEvent()
678 assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate' got, %q", evt.CommandName)
679
680 acfcVal, err := evt.Command.LookupErr("pipeline", "0", "$changeStream", "allChangesForCluster")
681 assert.Nil(mt, err, "expected field 'allChangesForCluster' in $changeStream stage not found")
682 acfc, ok := acfcVal.BooleanOK()
683 assert.True(mt, ok, "expected field 'allChangesForCluster' to be boolean, got %v", acfcVal.Type.String())
684 assert.False(mt, acfc, "expected field 'allChangesForCluster' to be false, got %v", acfc)
685 })
686
687 withBSONOpts := mtest.NewOptions().ClientOptions(
688 options.Client().SetBSONOptions(&options.BSONOptions{
689 UseJSONStructTags: true,
690 }))
691 mt.RunOpts("with BSONOptions", withBSONOpts, func(mt *mtest.T) {
692 cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
693 require.NoError(mt, err, "Watch error")
694 defer closeStream(cs)
695
696 type myDocument struct {
697 A string `json:"x"`
698 }
699
700 var wg sync.WaitGroup
701 wg.Add(1)
702 go func() {
703 defer wg.Done()
704 _, err := mt.Coll.InsertOne(context.Background(), myDocument{A: "foo"})
705 require.NoError(mt, err, "InsertOne error")
706 }()
707
708 cs.Next(context.Background())
709
710 var got struct {
711 FullDocument myDocument `bson:"fullDocument"`
712 }
713 err = cs.Decode(&got)
714 require.NoError(mt, err, "Decode error")
715
716 want := myDocument{
717 A: "foo",
718 }
719 assert.Equal(mt, want, got.FullDocument, "expected and actual Decode results are different")
720
721 wg.Wait()
722 })
723
724 splitLargeChangesCollOpts := options.
725 CreateCollection().
726 SetChangeStreamPreAndPostImages(bson.M{"enabled": true})
727
728 splitLargeChangesOpts := mtOpts.
729 MinServerVersion("6.0.9").
730 CreateClient(true).
731 CollectionCreateOptions(splitLargeChangesCollOpts)
732
733 mt.RunOpts("split large changes", splitLargeChangesOpts, func(mt *mtest.T) {
734 type idValue struct {
735 ID int32 `bson:"_id"`
736 Value string `bson:"value"`
737 }
738
739 doc := idValue{
740 ID: 1,
741 Value: "q" + strings.Repeat("q", 10*1024*1024),
742 }
743
744
745 _, err := mt.Coll.InsertOne(context.Background(), doc)
746 require.NoError(t, err, "failed to insert idValue")
747
748
749 pipeline := mongo.Pipeline{
750 {{"$changeStreamSplitLargeEvent", bson.D{}}},
751 }
752
753 opts := options.ChangeStream().SetFullDocument(options.Required)
754
755 cs, err := mt.Coll.Watch(context.Background(), pipeline, opts)
756 require.NoError(t, err, "failed to watch collection")
757
758 defer closeStream(cs)
759
760 var wg sync.WaitGroup
761 wg.Add(1)
762
763 go func() {
764 defer wg.Done()
765
766 filter := bson.D{{"_id", int32(1)}}
767 update := bson.D{{"$set", bson.D{{"value", "z" + strings.Repeat("q", 10*1024*1024)}}}}
768
769 _, err := mt.Coll.UpdateOne(context.Background(), filter, update)
770 require.NoError(mt, err, "failed to update idValue")
771 }()
772
773 nextCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
774 t.Cleanup(cancel)
775
776 type splitEvent struct {
777 Fragment int32 `bson:"fragment"`
778 Of int32 `bson:"of"`
779 }
780
781 got := struct {
782 SplitEvent splitEvent `bson:"splitEvent"`
783 }{}
784
785 cs.Next(nextCtx)
786
787 err = cs.Decode(&got)
788 require.NoError(mt, err, "failed to decode first iteration")
789
790 want := splitEvent{
791 Fragment: 1,
792 Of: 2,
793 }
794
795 assert.Equal(mt, want, got.SplitEvent, "expected and actual Decode results are different")
796
797 cs.Next(nextCtx)
798
799 err = cs.Decode(&got)
800 require.NoError(mt, err, "failed to decoded second iteration")
801
802 want = splitEvent{
803 Fragment: 2,
804 Of: 2,
805 }
806
807 assert.Equal(mt, want, got.SplitEvent, "expected and actual decode results are different")
808
809 wg.Wait()
810 })
811 }
812
813 func closeStream(cs *mongo.ChangeStream) {
814 _ = cs.Close(context.Background())
815 }
816
817 func generateEvents(mt *mtest.T, numEvents int) {
818 mt.Helper()
819
820 for i := 0; i < numEvents; i++ {
821 doc := bson.D{{"x", i}}
822 _, err := mt.Coll.InsertOne(context.Background(), doc)
823 assert.Nil(mt, err, "InsertOne error on document %v: %v", doc, err)
824 }
825 }
826
827 func killChangeStreamCursor(mt *mtest.T, cs *mongo.ChangeStream) {
828 mt.Helper()
829
830 db := mt.Coll.Database().Client().Database("admin")
831 err := db.RunCommand(context.Background(), bson.D{
832 {"killCursors", mt.Coll.Name()},
833 {"cursors", bson.A{cs.ID()}},
834 }).Err()
835 assert.Nil(mt, err, "killCursors error: %v", err)
836 }
837
838
839 func getAggregateResponseInfo(mt *mtest.T) (bson.Raw, primitive.Timestamp) {
840 mt.Helper()
841
842 succeeded := mt.GetSucceededEvent()
843 assert.NotNil(mt, succeeded, "expected success event for aggregate, got nil")
844 assert.Equal(mt, "aggregate", succeeded.CommandName, "expected command name 'aggregate', got '%v'", succeeded.CommandName)
845
846 pbrt := succeeded.Reply.Lookup("cursor", "postBatchResumeToken").Document()
847 optimeT, optimeI := succeeded.Reply.Lookup("operationTime").Timestamp()
848 return pbrt, primitive.Timestamp{T: optimeT, I: optimeI}
849 }
850
851 func compareResumeTokens(mt *mtest.T, cs *mongo.ChangeStream, expected bson.Raw) {
852 mt.Helper()
853 assert.Equal(mt, expected, cs.ResumeToken(), "expected resume token %v, got %v", expected, cs.ResumeToken())
854 }
855
View as plain text