1
2
3
4
5
6
7 package mongo
8
9 import (
10 "context"
11 "errors"
12 "fmt"
13 "reflect"
14 "strconv"
15 "time"
16
17 "go.mongodb.org/mongo-driver/bson"
18 "go.mongodb.org/mongo-driver/bson/bsoncodec"
19 "go.mongodb.org/mongo-driver/bson/primitive"
20 "go.mongodb.org/mongo-driver/internal/csot"
21 "go.mongodb.org/mongo-driver/mongo/description"
22 "go.mongodb.org/mongo-driver/mongo/options"
23 "go.mongodb.org/mongo-driver/mongo/readconcern"
24 "go.mongodb.org/mongo-driver/mongo/readpref"
25 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
26 "go.mongodb.org/mongo-driver/x/mongo/driver"
27 "go.mongodb.org/mongo-driver/x/mongo/driver/operation"
28 "go.mongodb.org/mongo-driver/x/mongo/driver/session"
29 )
30
31 var (
32
33 ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
34
35 ErrNilCursor = errors.New("cursor is nil")
36
37 minResumableLabelWireVersion int32 = 9
38 networkErrorLabel = "NetworkError"
39 resumableErrorLabel = "ResumableChangeStreamError"
40 errorCursorNotFound int32 = 43
41
42
43 resumableChangeStreamErrors = map[int32]struct{}{
44 6: {},
45 7: {},
46 89: {},
47 91: {},
48 189: {},
49 262: {},
50 9001: {},
51 10107: {},
52 11600: {},
53 11602: {},
54 13435: {},
55 13436: {},
56 63: {},
57 150: {},
58 13388: {},
59 234: {},
60 133: {},
61 }
62 )
63
64
65
66
67
68 type ChangeStream struct {
69
70
71 Current bson.Raw
72
73 aggregate *operation.Aggregate
74 pipelineSlice []bsoncore.Document
75 pipelineOptions map[string]bsoncore.Value
76 cursor changeStreamCursor
77 cursorOptions driver.CursorOptions
78 batch []bsoncore.Document
79 resumeToken bson.Raw
80 err error
81 sess *session.Client
82 client *Client
83 bsonOpts *options.BSONOptions
84 registry *bsoncodec.Registry
85 streamType StreamType
86 options *options.ChangeStreamOptions
87 selector description.ServerSelector
88 operationTime *primitive.Timestamp
89 wireVersion *description.VersionRange
90 }
91
92 type changeStreamConfig struct {
93 readConcern *readconcern.ReadConcern
94 readPreference *readpref.ReadPref
95 client *Client
96 bsonOpts *options.BSONOptions
97 registry *bsoncodec.Registry
98 streamType StreamType
99 collectionName string
100 databaseName string
101 crypt driver.Crypt
102 }
103
104 func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
105 opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
106 if ctx == nil {
107 ctx = context.Background()
108 }
109
110 cursorOpts := config.client.createBaseCursorOptions()
111
112 cursorOpts.MarshalValueEncoderFn = newEncoderFn(config.bsonOpts, config.registry)
113
114 cs := &ChangeStream{
115 client: config.client,
116 bsonOpts: config.bsonOpts,
117 registry: config.registry,
118 streamType: config.streamType,
119 options: options.MergeChangeStreamOptions(opts...),
120 selector: description.CompositeSelector([]description.ServerSelector{
121 description.ReadPrefSelector(config.readPreference),
122 description.LatencySelector(config.client.localThreshold),
123 }),
124 cursorOptions: cursorOpts,
125 }
126
127 cs.sess = sessionFromContext(ctx)
128 if cs.sess == nil && cs.client.sessionPool != nil {
129 cs.sess = session.NewImplicitClientSession(cs.client.sessionPool, cs.client.id)
130 }
131 if cs.err = cs.client.validSession(cs.sess); cs.err != nil {
132 closeImplicitSession(cs.sess)
133 return nil, cs.Err()
134 }
135
136 cs.aggregate = operation.NewAggregate(nil).
137 ReadPreference(config.readPreference).ReadConcern(config.readConcern).
138 Deployment(cs.client.deployment).ClusterClock(cs.client.clock).
139 CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone).
140 ServerAPI(cs.client.serverAPI).Crypt(config.crypt).Timeout(cs.client.timeout)
141
142 if cs.options.Collation != nil {
143 cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument()))
144 }
145 if comment := cs.options.Comment; comment != nil {
146 cs.aggregate.Comment(*comment)
147
148 commentVal, err := marshalValue(comment, cs.bsonOpts, cs.registry)
149 if err != nil {
150 return nil, err
151 }
152 cs.cursorOptions.Comment = commentVal
153 }
154 if cs.options.BatchSize != nil {
155 cs.aggregate.BatchSize(*cs.options.BatchSize)
156 cs.cursorOptions.BatchSize = *cs.options.BatchSize
157 }
158 if cs.options.MaxAwaitTime != nil {
159 cs.cursorOptions.MaxTimeMS = int64(*cs.options.MaxAwaitTime / time.Millisecond)
160 }
161 if cs.options.Custom != nil {
162
163
164 customOptions := make(map[string]bsoncore.Value)
165 for optionName, optionValue := range cs.options.Custom {
166 bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
167 if err != nil {
168 cs.err = err
169 closeImplicitSession(cs.sess)
170 return nil, cs.Err()
171 }
172 optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
173 customOptions[optionName] = optionValueBSON
174 }
175 cs.aggregate.CustomOptions(customOptions)
176 }
177 if cs.options.CustomPipeline != nil {
178
179
180 cs.pipelineOptions = make(map[string]bsoncore.Value)
181 for optionName, optionValue := range cs.options.CustomPipeline {
182 bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
183 if err != nil {
184 cs.err = err
185 closeImplicitSession(cs.sess)
186 return nil, cs.Err()
187 }
188 optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
189 cs.pipelineOptions[optionName] = optionValueBSON
190 }
191 }
192
193 switch cs.streamType {
194 case ClientStream:
195 cs.aggregate.Database("admin")
196 case DatabaseStream:
197 cs.aggregate.Database(config.databaseName)
198 case CollectionStream:
199 cs.aggregate.Collection(config.collectionName).Database(config.databaseName)
200 default:
201 closeImplicitSession(cs.sess)
202 return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType)
203 }
204
205
206
207 resumeToken := cs.options.StartAfter
208 if resumeToken == nil {
209 resumeToken = cs.options.ResumeAfter
210 }
211 var marshaledToken bson.Raw
212 if resumeToken != nil {
213 if marshaledToken, cs.err = bson.Marshal(resumeToken); cs.err != nil {
214 closeImplicitSession(cs.sess)
215 return nil, cs.Err()
216 }
217 }
218 cs.resumeToken = marshaledToken
219
220 if cs.err = cs.buildPipelineSlice(pipeline); cs.err != nil {
221 closeImplicitSession(cs.sess)
222 return nil, cs.Err()
223 }
224 var pipelineArr bsoncore.Document
225 pipelineArr, cs.err = cs.pipelineToBSON()
226 cs.aggregate.Pipeline(pipelineArr)
227
228 if cs.err = cs.executeOperation(ctx, false); cs.err != nil {
229 closeImplicitSession(cs.sess)
230 return nil, cs.Err()
231 }
232
233 return cs, cs.Err()
234 }
235
236 func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connection) driver.Deployment {
237 return &changeStreamDeployment{
238 topologyKind: cs.client.deployment.Kind(),
239 server: server,
240 conn: connection,
241 }
242 }
243
244 func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
245 var server driver.Server
246 var conn driver.Connection
247
248 if server, cs.err = cs.client.deployment.SelectServer(ctx, cs.selector); cs.err != nil {
249 return cs.Err()
250 }
251 if conn, cs.err = server.Connection(ctx); cs.err != nil {
252 return cs.Err()
253 }
254 defer conn.Close()
255 cs.wireVersion = conn.Description().WireVersion
256
257 cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
258
259 if resuming {
260 cs.replaceOptions(cs.wireVersion)
261
262 csOptDoc, err := cs.createPipelineOptionsDoc()
263 if err != nil {
264 return err
265 }
266 pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil)
267 pipDoc = bsoncore.AppendDocumentElement(pipDoc, "$changeStream", csOptDoc)
268 if pipDoc, cs.err = bsoncore.AppendDocumentEnd(pipDoc, pipIdx); cs.err != nil {
269 return cs.Err()
270 }
271 cs.pipelineSlice[0] = pipDoc
272
273 var plArr bsoncore.Document
274 if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil {
275 return cs.Err()
276 }
277 cs.aggregate.Pipeline(plArr)
278 }
279
280
281
282
283 if cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) {
284 newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *cs.client.timeout)
285
286 ctx = newCtx
287
288 defer cancelFunc()
289 }
290
291
292
293 var retries int
294 if cs.client.retryReads {
295 retries = 1
296 }
297 if csot.IsTimeoutContext(ctx) {
298 retries = -1
299 }
300
301 var err error
302 AggregateExecuteLoop:
303 for {
304 err = cs.aggregate.Execute(ctx)
305
306 if err == nil || retries == 0 {
307 break AggregateExecuteLoop
308 }
309
310 switch tt := err.(type) {
311 case driver.Error:
312
313 if !tt.RetryableRead() {
314 break AggregateExecuteLoop
315 }
316
317
318
319 retries--
320 server, err = cs.client.deployment.SelectServer(ctx, cs.selector)
321 if err != nil {
322 break AggregateExecuteLoop
323 }
324
325 conn.Close()
326 conn, err = server.Connection(ctx)
327 if err != nil {
328 break AggregateExecuteLoop
329 }
330 defer conn.Close()
331
332
333 cs.wireVersion = conn.Description().WireVersion
334
335
336 cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
337 default:
338
339 break AggregateExecuteLoop
340 }
341 }
342 if err != nil {
343 cs.err = replaceErrors(err)
344 return cs.err
345 }
346
347 cr := cs.aggregate.ResultCursorResponse()
348 cr.Server = server
349
350 cs.cursor, cs.err = driver.NewBatchCursor(cr, cs.sess, cs.client.clock, cs.cursorOptions)
351 if cs.err = replaceErrors(cs.err); cs.err != nil {
352 return cs.Err()
353 }
354
355 cs.updatePbrtFromCommand()
356 if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil &&
357 cs.options.StartAfter == nil && cs.wireVersion.Max >= 7 &&
358 cs.emptyBatch() && cs.resumeToken == nil {
359 cs.operationTime = cs.sess.OperationTime
360 }
361
362 return cs.Err()
363 }
364
365
366 func (cs *ChangeStream) updatePbrtFromCommand() {
367
368 if pbrt := cs.cursor.PostBatchResumeToken(); cs.emptyBatch() && pbrt != nil {
369 cs.resumeToken = bson.Raw(pbrt)
370 }
371 }
372
373 func (cs *ChangeStream) storeResumeToken() error {
374
375
376 var tokenDoc bson.Raw
377 if len(cs.batch) == 0 {
378 if pbrt := cs.cursor.PostBatchResumeToken(); pbrt != nil {
379 tokenDoc = bson.Raw(pbrt)
380 }
381 }
382
383 if tokenDoc == nil {
384 var ok bool
385 tokenDoc, ok = cs.Current.Lookup("_id").DocumentOK()
386 if !ok {
387 _ = cs.Close(context.Background())
388 return ErrMissingResumeToken
389 }
390 }
391
392 cs.resumeToken = tokenDoc
393 return nil
394 }
395
396 func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error {
397 val := reflect.ValueOf(pipeline)
398 if !val.IsValid() || !(val.Kind() == reflect.Slice) {
399 cs.err = errors.New("can only marshal slices and arrays into aggregation pipelines, but got invalid")
400 return cs.err
401 }
402
403 cs.pipelineSlice = make([]bsoncore.Document, 0, val.Len()+1)
404
405 csIdx, csDoc := bsoncore.AppendDocumentStart(nil)
406
407 csDocTemp, err := cs.createPipelineOptionsDoc()
408 if err != nil {
409 return err
410 }
411 csDoc = bsoncore.AppendDocumentElement(csDoc, "$changeStream", csDocTemp)
412 csDoc, cs.err = bsoncore.AppendDocumentEnd(csDoc, csIdx)
413 if cs.err != nil {
414 return cs.err
415 }
416 cs.pipelineSlice = append(cs.pipelineSlice, csDoc)
417
418 for i := 0; i < val.Len(); i++ {
419 var elem []byte
420 elem, cs.err = marshal(val.Index(i).Interface(), cs.bsonOpts, cs.registry)
421 if cs.err != nil {
422 return cs.err
423 }
424
425 cs.pipelineSlice = append(cs.pipelineSlice, elem)
426 }
427
428 return cs.err
429 }
430
431 func (cs *ChangeStream) createPipelineOptionsDoc() (bsoncore.Document, error) {
432 plDocIdx, plDoc := bsoncore.AppendDocumentStart(nil)
433
434 if cs.streamType == ClientStream {
435 plDoc = bsoncore.AppendBooleanElement(plDoc, "allChangesForCluster", true)
436 }
437
438 if cs.options.FullDocument != nil && *cs.options.FullDocument != options.Default {
439 plDoc = bsoncore.AppendStringElement(plDoc, "fullDocument", string(*cs.options.FullDocument))
440 }
441
442 if cs.options.FullDocumentBeforeChange != nil {
443 plDoc = bsoncore.AppendStringElement(plDoc, "fullDocumentBeforeChange", string(*cs.options.FullDocumentBeforeChange))
444 }
445
446 if cs.options.ResumeAfter != nil {
447 var raDoc bsoncore.Document
448 raDoc, cs.err = marshal(cs.options.ResumeAfter, cs.bsonOpts, cs.registry)
449 if cs.err != nil {
450 return nil, cs.err
451 }
452
453 plDoc = bsoncore.AppendDocumentElement(plDoc, "resumeAfter", raDoc)
454 }
455
456 if cs.options.ShowExpandedEvents != nil {
457 plDoc = bsoncore.AppendBooleanElement(plDoc, "showExpandedEvents", *cs.options.ShowExpandedEvents)
458 }
459
460 if cs.options.StartAfter != nil {
461 var saDoc bsoncore.Document
462 saDoc, cs.err = marshal(cs.options.StartAfter, cs.bsonOpts, cs.registry)
463 if cs.err != nil {
464 return nil, cs.err
465 }
466
467 plDoc = bsoncore.AppendDocumentElement(plDoc, "startAfter", saDoc)
468 }
469
470 if cs.options.StartAtOperationTime != nil {
471 plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I)
472 }
473
474
475 for optionName, optionValue := range cs.pipelineOptions {
476 plDoc = bsoncore.AppendValueElement(plDoc, optionName, optionValue)
477 }
478
479 if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
480 return nil, cs.err
481 }
482
483 return plDoc, nil
484 }
485
486 func (cs *ChangeStream) pipelineToBSON() (bsoncore.Document, error) {
487 pipelineDocIdx, pipelineArr := bsoncore.AppendArrayStart(nil)
488 for i, doc := range cs.pipelineSlice {
489 pipelineArr = bsoncore.AppendDocumentElement(pipelineArr, strconv.Itoa(i), doc)
490 }
491 if pipelineArr, cs.err = bsoncore.AppendArrayEnd(pipelineArr, pipelineDocIdx); cs.err != nil {
492 return nil, cs.err
493 }
494 return pipelineArr, cs.err
495 }
496
497 func (cs *ChangeStream) replaceOptions(wireVersion *description.VersionRange) {
498
499 if cs.resumeToken != nil {
500 cs.options.SetResumeAfter(cs.resumeToken)
501 cs.options.SetStartAfter(nil)
502 cs.options.SetStartAtOperationTime(nil)
503 return
504 }
505
506
507
508 if (cs.sess.OperationTime != nil || cs.options.StartAtOperationTime != nil) && wireVersion.Max >= 7 {
509 opTime := cs.options.StartAtOperationTime
510 if cs.operationTime != nil {
511 opTime = cs.sess.OperationTime
512 }
513
514 cs.options.SetStartAtOperationTime(opTime)
515 cs.options.SetResumeAfter(nil)
516 cs.options.SetStartAfter(nil)
517 return
518 }
519
520
521 cs.options.SetResumeAfter(nil)
522 cs.options.SetStartAfter(nil)
523 cs.options.SetStartAtOperationTime(nil)
524 }
525
526
527 func (cs *ChangeStream) ID() int64 {
528 if cs.cursor == nil {
529 return 0
530 }
531 return cs.cursor.ID()
532 }
533
534
535
536
537 func (cs *ChangeStream) SetBatchSize(size int32) {
538
539
540 cs.cursorOptions.BatchSize = size
541 cs.cursor.SetBatchSize(size)
542 }
543
544
545
546 func (cs *ChangeStream) Decode(val interface{}) error {
547 if cs.cursor == nil {
548 return ErrNilCursor
549 }
550
551 dec, err := getDecoder(cs.Current, cs.bsonOpts, cs.registry)
552 if err != nil {
553 return fmt.Errorf("error configuring BSON decoder: %w", err)
554 }
555 return dec.Decode(val)
556 }
557
558
559 func (cs *ChangeStream) Err() error {
560 if cs.err != nil {
561 return replaceErrors(cs.err)
562 }
563 if cs.cursor == nil {
564 return nil
565 }
566
567 return replaceErrors(cs.cursor.Err())
568 }
569
570
571
572 func (cs *ChangeStream) Close(ctx context.Context) error {
573 if ctx == nil {
574 ctx = context.Background()
575 }
576
577 defer closeImplicitSession(cs.sess)
578
579 if cs.cursor == nil {
580 return nil
581 }
582
583 cs.err = replaceErrors(cs.cursor.Close(ctx))
584 cs.cursor = nil
585 return cs.Err()
586 }
587
588
589
590 func (cs *ChangeStream) ResumeToken() bson.Raw {
591 return cs.resumeToken
592 }
593
594
595
596
597
598
599
600
601 func (cs *ChangeStream) Next(ctx context.Context) bool {
602 return cs.next(ctx, false)
603 }
604
605
606
607
608
609
610
611
612
613
614
615
616 func (cs *ChangeStream) TryNext(ctx context.Context) bool {
617 return cs.next(ctx, true)
618 }
619
620 func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
621
622 if cs.err != nil {
623 return false
624 }
625
626 if ctx == nil {
627 ctx = context.Background()
628 }
629
630 if len(cs.batch) == 0 {
631 cs.loopNext(ctx, nonBlocking)
632 if cs.err != nil {
633 cs.err = replaceErrors(cs.err)
634 return false
635 }
636 if len(cs.batch) == 0 {
637 return false
638 }
639 }
640
641
642 cs.Current = bson.Raw(cs.batch[0])
643 cs.batch = cs.batch[1:]
644 if cs.err = cs.storeResumeToken(); cs.err != nil {
645 return false
646 }
647 return true
648 }
649
650 func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
651 for {
652 if cs.cursor == nil {
653 return
654 }
655
656 if cs.cursor.Next(ctx) {
657
658 cs.batch, cs.err = cs.cursor.Batch().Documents()
659 return
660 }
661
662 cs.err = replaceErrors(cs.cursor.Err())
663 if cs.err == nil {
664
665 if cs.ID() == 0 {
666 return
667 }
668
669
670
671 cs.updatePbrtFromCommand()
672 if nonBlocking {
673
674 return
675 }
676 continue
677 }
678
679 if !cs.isResumableError() {
680 return
681 }
682
683
684 _ = cs.cursor.Close(ctx)
685 if cs.err = cs.executeOperation(ctx, true); cs.err != nil {
686 return
687 }
688 }
689 }
690
691 func (cs *ChangeStream) isResumableError() bool {
692 var commandErr CommandError
693 if !errors.As(cs.err, &commandErr) || commandErr.HasErrorLabel(networkErrorLabel) {
694
695 return true
696 }
697
698 if commandErr.Code == errorCursorNotFound {
699 return true
700 }
701
702
703 if cs.wireVersion != nil && cs.wireVersion.Includes(minResumableLabelWireVersion) {
704 return commandErr.HasErrorLabel(resumableErrorLabel)
705 }
706
707
708 _, resumable := resumableChangeStreamErrors[commandErr.Code]
709 return resumable
710 }
711
712
713 func (cs *ChangeStream) emptyBatch() bool {
714 return cs.cursor.Batch().Empty()
715 }
716
717
718 type StreamType uint8
719
720
721
722 const (
723 CollectionStream StreamType = iota
724 DatabaseStream
725 ClientStream
726 )
727
View as plain text