...

Source file src/go.mongodb.org/mongo-driver/mongo/change_stream.go

Documentation: go.mongodb.org/mongo-driver/mongo

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package mongo
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"reflect"
    14  	"strconv"
    15  	"time"
    16  
    17  	"go.mongodb.org/mongo-driver/bson"
    18  	"go.mongodb.org/mongo-driver/bson/bsoncodec"
    19  	"go.mongodb.org/mongo-driver/bson/primitive"
    20  	"go.mongodb.org/mongo-driver/internal/csot"
    21  	"go.mongodb.org/mongo-driver/mongo/description"
    22  	"go.mongodb.org/mongo-driver/mongo/options"
    23  	"go.mongodb.org/mongo-driver/mongo/readconcern"
    24  	"go.mongodb.org/mongo-driver/mongo/readpref"
    25  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    26  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    27  	"go.mongodb.org/mongo-driver/x/mongo/driver/operation"
    28  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    29  )
    30  
    31  var (
    32  	// ErrMissingResumeToken indicates that a change stream notification from the server did not contain a resume token.
    33  	ErrMissingResumeToken = errors.New("cannot provide resume functionality when the resume token is missing")
    34  	// ErrNilCursor indicates that the underlying cursor for the change stream is nil.
    35  	ErrNilCursor = errors.New("cursor is nil")
    36  
    37  	minResumableLabelWireVersion int32 = 9 // Wire version at which the server includes the resumable error label
    38  	networkErrorLabel                  = "NetworkError"
    39  	resumableErrorLabel                = "ResumableChangeStreamError"
    40  	errorCursorNotFound          int32 = 43 // CursorNotFound error code
    41  
    42  	// Allowlist of error codes that are considered resumable.
    43  	resumableChangeStreamErrors = map[int32]struct{}{
    44  		6:     {}, // HostUnreachable
    45  		7:     {}, // HostNotFound
    46  		89:    {}, // NetworkTimeout
    47  		91:    {}, // ShutdownInProgress
    48  		189:   {}, // PrimarySteppedDown
    49  		262:   {}, // ExceededTimeLimit
    50  		9001:  {}, // SocketException
    51  		10107: {}, // NotPrimary
    52  		11600: {}, // InterruptedAtShutdown
    53  		11602: {}, // InterruptedDueToReplStateChange
    54  		13435: {}, // NotPrimaryNoSecondaryOK
    55  		13436: {}, // NotPrimaryOrSecondary
    56  		63:    {}, // StaleShardVersion
    57  		150:   {}, // StaleEpoch
    58  		13388: {}, // StaleConfig
    59  		234:   {}, // RetryChangeStream
    60  		133:   {}, // FailedToSatisfyReadPreference
    61  	}
    62  )
    63  
    64  // ChangeStream is used to iterate over a stream of events. Each event can be decoded into a Go type via the Decode
    65  // method or accessed as raw BSON via the Current field. This type is not goroutine safe and must not be used
    66  // concurrently by multiple goroutines. For more information about change streams, see
    67  // https://www.mongodb.com/docs/manual/changeStreams/.
    68  type ChangeStream struct {
    69  	// Current is the BSON bytes of the current event. This property is only valid until the next call to Next or
    70  	// TryNext. If continued access is required, a copy must be made.
    71  	Current bson.Raw
    72  
    73  	aggregate       *operation.Aggregate
    74  	pipelineSlice   []bsoncore.Document
    75  	pipelineOptions map[string]bsoncore.Value
    76  	cursor          changeStreamCursor
    77  	cursorOptions   driver.CursorOptions
    78  	batch           []bsoncore.Document
    79  	resumeToken     bson.Raw
    80  	err             error
    81  	sess            *session.Client
    82  	client          *Client
    83  	bsonOpts        *options.BSONOptions
    84  	registry        *bsoncodec.Registry
    85  	streamType      StreamType
    86  	options         *options.ChangeStreamOptions
    87  	selector        description.ServerSelector
    88  	operationTime   *primitive.Timestamp
    89  	wireVersion     *description.VersionRange
    90  }
    91  
    92  type changeStreamConfig struct {
    93  	readConcern    *readconcern.ReadConcern
    94  	readPreference *readpref.ReadPref
    95  	client         *Client
    96  	bsonOpts       *options.BSONOptions
    97  	registry       *bsoncodec.Registry
    98  	streamType     StreamType
    99  	collectionName string
   100  	databaseName   string
   101  	crypt          driver.Crypt
   102  }
   103  
   104  func newChangeStream(ctx context.Context, config changeStreamConfig, pipeline interface{},
   105  	opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {
   106  	if ctx == nil {
   107  		ctx = context.Background()
   108  	}
   109  
   110  	cursorOpts := config.client.createBaseCursorOptions()
   111  
   112  	cursorOpts.MarshalValueEncoderFn = newEncoderFn(config.bsonOpts, config.registry)
   113  
   114  	cs := &ChangeStream{
   115  		client:     config.client,
   116  		bsonOpts:   config.bsonOpts,
   117  		registry:   config.registry,
   118  		streamType: config.streamType,
   119  		options:    options.MergeChangeStreamOptions(opts...),
   120  		selector: description.CompositeSelector([]description.ServerSelector{
   121  			description.ReadPrefSelector(config.readPreference),
   122  			description.LatencySelector(config.client.localThreshold),
   123  		}),
   124  		cursorOptions: cursorOpts,
   125  	}
   126  
   127  	cs.sess = sessionFromContext(ctx)
   128  	if cs.sess == nil && cs.client.sessionPool != nil {
   129  		cs.sess = session.NewImplicitClientSession(cs.client.sessionPool, cs.client.id)
   130  	}
   131  	if cs.err = cs.client.validSession(cs.sess); cs.err != nil {
   132  		closeImplicitSession(cs.sess)
   133  		return nil, cs.Err()
   134  	}
   135  
   136  	cs.aggregate = operation.NewAggregate(nil).
   137  		ReadPreference(config.readPreference).ReadConcern(config.readConcern).
   138  		Deployment(cs.client.deployment).ClusterClock(cs.client.clock).
   139  		CommandMonitor(cs.client.monitor).Session(cs.sess).ServerSelector(cs.selector).Retry(driver.RetryNone).
   140  		ServerAPI(cs.client.serverAPI).Crypt(config.crypt).Timeout(cs.client.timeout)
   141  
   142  	if cs.options.Collation != nil {
   143  		cs.aggregate.Collation(bsoncore.Document(cs.options.Collation.ToDocument()))
   144  	}
   145  	if comment := cs.options.Comment; comment != nil {
   146  		cs.aggregate.Comment(*comment)
   147  
   148  		commentVal, err := marshalValue(comment, cs.bsonOpts, cs.registry)
   149  		if err != nil {
   150  			return nil, err
   151  		}
   152  		cs.cursorOptions.Comment = commentVal
   153  	}
   154  	if cs.options.BatchSize != nil {
   155  		cs.aggregate.BatchSize(*cs.options.BatchSize)
   156  		cs.cursorOptions.BatchSize = *cs.options.BatchSize
   157  	}
   158  	if cs.options.MaxAwaitTime != nil {
   159  		cs.cursorOptions.MaxTimeMS = int64(*cs.options.MaxAwaitTime / time.Millisecond)
   160  	}
   161  	if cs.options.Custom != nil {
   162  		// Marshal all custom options before passing to the initial aggregate. Return
   163  		// any errors from Marshaling.
   164  		customOptions := make(map[string]bsoncore.Value)
   165  		for optionName, optionValue := range cs.options.Custom {
   166  			bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
   167  			if err != nil {
   168  				cs.err = err
   169  				closeImplicitSession(cs.sess)
   170  				return nil, cs.Err()
   171  			}
   172  			optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
   173  			customOptions[optionName] = optionValueBSON
   174  		}
   175  		cs.aggregate.CustomOptions(customOptions)
   176  	}
   177  	if cs.options.CustomPipeline != nil {
   178  		// Marshal all custom pipeline options before building pipeline slice. Return
   179  		// any errors from Marshaling.
   180  		cs.pipelineOptions = make(map[string]bsoncore.Value)
   181  		for optionName, optionValue := range cs.options.CustomPipeline {
   182  			bsonType, bsonData, err := bson.MarshalValueWithRegistry(cs.registry, optionValue)
   183  			if err != nil {
   184  				cs.err = err
   185  				closeImplicitSession(cs.sess)
   186  				return nil, cs.Err()
   187  			}
   188  			optionValueBSON := bsoncore.Value{Type: bsonType, Data: bsonData}
   189  			cs.pipelineOptions[optionName] = optionValueBSON
   190  		}
   191  	}
   192  
   193  	switch cs.streamType {
   194  	case ClientStream:
   195  		cs.aggregate.Database("admin")
   196  	case DatabaseStream:
   197  		cs.aggregate.Database(config.databaseName)
   198  	case CollectionStream:
   199  		cs.aggregate.Collection(config.collectionName).Database(config.databaseName)
   200  	default:
   201  		closeImplicitSession(cs.sess)
   202  		return nil, fmt.Errorf("must supply a valid StreamType in config, instead of %v", cs.streamType)
   203  	}
   204  
   205  	// When starting a change stream, cache startAfter as the first resume token if it is set. If not, cache
   206  	// resumeAfter. If neither is set, do not cache a resume token.
   207  	resumeToken := cs.options.StartAfter
   208  	if resumeToken == nil {
   209  		resumeToken = cs.options.ResumeAfter
   210  	}
   211  	var marshaledToken bson.Raw
   212  	if resumeToken != nil {
   213  		if marshaledToken, cs.err = bson.Marshal(resumeToken); cs.err != nil {
   214  			closeImplicitSession(cs.sess)
   215  			return nil, cs.Err()
   216  		}
   217  	}
   218  	cs.resumeToken = marshaledToken
   219  
   220  	if cs.err = cs.buildPipelineSlice(pipeline); cs.err != nil {
   221  		closeImplicitSession(cs.sess)
   222  		return nil, cs.Err()
   223  	}
   224  	var pipelineArr bsoncore.Document
   225  	pipelineArr, cs.err = cs.pipelineToBSON()
   226  	cs.aggregate.Pipeline(pipelineArr)
   227  
   228  	if cs.err = cs.executeOperation(ctx, false); cs.err != nil {
   229  		closeImplicitSession(cs.sess)
   230  		return nil, cs.Err()
   231  	}
   232  
   233  	return cs, cs.Err()
   234  }
   235  
   236  func (cs *ChangeStream) createOperationDeployment(server driver.Server, connection driver.Connection) driver.Deployment {
   237  	return &changeStreamDeployment{
   238  		topologyKind: cs.client.deployment.Kind(),
   239  		server:       server,
   240  		conn:         connection,
   241  	}
   242  }
   243  
   244  func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) error {
   245  	var server driver.Server
   246  	var conn driver.Connection
   247  
   248  	if server, cs.err = cs.client.deployment.SelectServer(ctx, cs.selector); cs.err != nil {
   249  		return cs.Err()
   250  	}
   251  	if conn, cs.err = server.Connection(ctx); cs.err != nil {
   252  		return cs.Err()
   253  	}
   254  	defer conn.Close()
   255  	cs.wireVersion = conn.Description().WireVersion
   256  
   257  	cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
   258  
   259  	if resuming {
   260  		cs.replaceOptions(cs.wireVersion)
   261  
   262  		csOptDoc, err := cs.createPipelineOptionsDoc()
   263  		if err != nil {
   264  			return err
   265  		}
   266  		pipIdx, pipDoc := bsoncore.AppendDocumentStart(nil)
   267  		pipDoc = bsoncore.AppendDocumentElement(pipDoc, "$changeStream", csOptDoc)
   268  		if pipDoc, cs.err = bsoncore.AppendDocumentEnd(pipDoc, pipIdx); cs.err != nil {
   269  			return cs.Err()
   270  		}
   271  		cs.pipelineSlice[0] = pipDoc
   272  
   273  		var plArr bsoncore.Document
   274  		if plArr, cs.err = cs.pipelineToBSON(); cs.err != nil {
   275  			return cs.Err()
   276  		}
   277  		cs.aggregate.Pipeline(plArr)
   278  	}
   279  
   280  	// If cs.client.timeout is set and context is not already a Timeout context,
   281  	// honor cs.client.timeout in new Timeout context for change stream
   282  	// operation execution and potential retry.
   283  	if cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) {
   284  		newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *cs.client.timeout)
   285  		// Redefine ctx to be the new timeout-derived context.
   286  		ctx = newCtx
   287  		// Cancel the timeout-derived context at the end of executeOperation to avoid a context leak.
   288  		defer cancelFunc()
   289  	}
   290  
   291  	// Execute the aggregate, retrying on retryable errors once (1) if retryable reads are enabled and
   292  	// infinitely (-1) if context is a Timeout context.
   293  	var retries int
   294  	if cs.client.retryReads {
   295  		retries = 1
   296  	}
   297  	if csot.IsTimeoutContext(ctx) {
   298  		retries = -1
   299  	}
   300  
   301  	var err error
   302  AggregateExecuteLoop:
   303  	for {
   304  		err = cs.aggregate.Execute(ctx)
   305  		// If no error or no retries remain, do not retry.
   306  		if err == nil || retries == 0 {
   307  			break AggregateExecuteLoop
   308  		}
   309  
   310  		switch tt := err.(type) {
   311  		case driver.Error:
   312  			// If error is not retryable, do not retry.
   313  			if !tt.RetryableRead() {
   314  				break AggregateExecuteLoop
   315  			}
   316  
   317  			// If error is retryable: subtract 1 from retries, redo server selection, checkout
   318  			// a connection, and restart loop.
   319  			retries--
   320  			server, err = cs.client.deployment.SelectServer(ctx, cs.selector)
   321  			if err != nil {
   322  				break AggregateExecuteLoop
   323  			}
   324  
   325  			conn.Close()
   326  			conn, err = server.Connection(ctx)
   327  			if err != nil {
   328  				break AggregateExecuteLoop
   329  			}
   330  			defer conn.Close()
   331  
   332  			// Update the wire version with data from the new connection.
   333  			cs.wireVersion = conn.Description().WireVersion
   334  
   335  			// Reset deployment.
   336  			cs.aggregate.Deployment(cs.createOperationDeployment(server, conn))
   337  		default:
   338  			// Do not retry if error is not a driver error.
   339  			break AggregateExecuteLoop
   340  		}
   341  	}
   342  	if err != nil {
   343  		cs.err = replaceErrors(err)
   344  		return cs.err
   345  	}
   346  
   347  	cr := cs.aggregate.ResultCursorResponse()
   348  	cr.Server = server
   349  
   350  	cs.cursor, cs.err = driver.NewBatchCursor(cr, cs.sess, cs.client.clock, cs.cursorOptions)
   351  	if cs.err = replaceErrors(cs.err); cs.err != nil {
   352  		return cs.Err()
   353  	}
   354  
   355  	cs.updatePbrtFromCommand()
   356  	if cs.options.StartAtOperationTime == nil && cs.options.ResumeAfter == nil &&
   357  		cs.options.StartAfter == nil && cs.wireVersion.Max >= 7 &&
   358  		cs.emptyBatch() && cs.resumeToken == nil {
   359  		cs.operationTime = cs.sess.OperationTime
   360  	}
   361  
   362  	return cs.Err()
   363  }
   364  
   365  // Updates the post batch resume token after a successful aggregate or getMore operation.
   366  func (cs *ChangeStream) updatePbrtFromCommand() {
   367  	// Only cache the pbrt if an empty batch was returned and a pbrt was included
   368  	if pbrt := cs.cursor.PostBatchResumeToken(); cs.emptyBatch() && pbrt != nil {
   369  		cs.resumeToken = bson.Raw(pbrt)
   370  	}
   371  }
   372  
   373  func (cs *ChangeStream) storeResumeToken() error {
   374  	// If cs.Current is the last document in the batch and a pbrt is included, cache the pbrt
   375  	// Otherwise, cache the _id of the document
   376  	var tokenDoc bson.Raw
   377  	if len(cs.batch) == 0 {
   378  		if pbrt := cs.cursor.PostBatchResumeToken(); pbrt != nil {
   379  			tokenDoc = bson.Raw(pbrt)
   380  		}
   381  	}
   382  
   383  	if tokenDoc == nil {
   384  		var ok bool
   385  		tokenDoc, ok = cs.Current.Lookup("_id").DocumentOK()
   386  		if !ok {
   387  			_ = cs.Close(context.Background())
   388  			return ErrMissingResumeToken
   389  		}
   390  	}
   391  
   392  	cs.resumeToken = tokenDoc
   393  	return nil
   394  }
   395  
   396  func (cs *ChangeStream) buildPipelineSlice(pipeline interface{}) error {
   397  	val := reflect.ValueOf(pipeline)
   398  	if !val.IsValid() || !(val.Kind() == reflect.Slice) {
   399  		cs.err = errors.New("can only marshal slices and arrays into aggregation pipelines, but got invalid")
   400  		return cs.err
   401  	}
   402  
   403  	cs.pipelineSlice = make([]bsoncore.Document, 0, val.Len()+1)
   404  
   405  	csIdx, csDoc := bsoncore.AppendDocumentStart(nil)
   406  
   407  	csDocTemp, err := cs.createPipelineOptionsDoc()
   408  	if err != nil {
   409  		return err
   410  	}
   411  	csDoc = bsoncore.AppendDocumentElement(csDoc, "$changeStream", csDocTemp)
   412  	csDoc, cs.err = bsoncore.AppendDocumentEnd(csDoc, csIdx)
   413  	if cs.err != nil {
   414  		return cs.err
   415  	}
   416  	cs.pipelineSlice = append(cs.pipelineSlice, csDoc)
   417  
   418  	for i := 0; i < val.Len(); i++ {
   419  		var elem []byte
   420  		elem, cs.err = marshal(val.Index(i).Interface(), cs.bsonOpts, cs.registry)
   421  		if cs.err != nil {
   422  			return cs.err
   423  		}
   424  
   425  		cs.pipelineSlice = append(cs.pipelineSlice, elem)
   426  	}
   427  
   428  	return cs.err
   429  }
   430  
   431  func (cs *ChangeStream) createPipelineOptionsDoc() (bsoncore.Document, error) {
   432  	plDocIdx, plDoc := bsoncore.AppendDocumentStart(nil)
   433  
   434  	if cs.streamType == ClientStream {
   435  		plDoc = bsoncore.AppendBooleanElement(plDoc, "allChangesForCluster", true)
   436  	}
   437  
   438  	if cs.options.FullDocument != nil && *cs.options.FullDocument != options.Default {
   439  		plDoc = bsoncore.AppendStringElement(plDoc, "fullDocument", string(*cs.options.FullDocument))
   440  	}
   441  
   442  	if cs.options.FullDocumentBeforeChange != nil {
   443  		plDoc = bsoncore.AppendStringElement(plDoc, "fullDocumentBeforeChange", string(*cs.options.FullDocumentBeforeChange))
   444  	}
   445  
   446  	if cs.options.ResumeAfter != nil {
   447  		var raDoc bsoncore.Document
   448  		raDoc, cs.err = marshal(cs.options.ResumeAfter, cs.bsonOpts, cs.registry)
   449  		if cs.err != nil {
   450  			return nil, cs.err
   451  		}
   452  
   453  		plDoc = bsoncore.AppendDocumentElement(plDoc, "resumeAfter", raDoc)
   454  	}
   455  
   456  	if cs.options.ShowExpandedEvents != nil {
   457  		plDoc = bsoncore.AppendBooleanElement(plDoc, "showExpandedEvents", *cs.options.ShowExpandedEvents)
   458  	}
   459  
   460  	if cs.options.StartAfter != nil {
   461  		var saDoc bsoncore.Document
   462  		saDoc, cs.err = marshal(cs.options.StartAfter, cs.bsonOpts, cs.registry)
   463  		if cs.err != nil {
   464  			return nil, cs.err
   465  		}
   466  
   467  		plDoc = bsoncore.AppendDocumentElement(plDoc, "startAfter", saDoc)
   468  	}
   469  
   470  	if cs.options.StartAtOperationTime != nil {
   471  		plDoc = bsoncore.AppendTimestampElement(plDoc, "startAtOperationTime", cs.options.StartAtOperationTime.T, cs.options.StartAtOperationTime.I)
   472  	}
   473  
   474  	// Append custom pipeline options.
   475  	for optionName, optionValue := range cs.pipelineOptions {
   476  		plDoc = bsoncore.AppendValueElement(plDoc, optionName, optionValue)
   477  	}
   478  
   479  	if plDoc, cs.err = bsoncore.AppendDocumentEnd(plDoc, plDocIdx); cs.err != nil {
   480  		return nil, cs.err
   481  	}
   482  
   483  	return plDoc, nil
   484  }
   485  
   486  func (cs *ChangeStream) pipelineToBSON() (bsoncore.Document, error) {
   487  	pipelineDocIdx, pipelineArr := bsoncore.AppendArrayStart(nil)
   488  	for i, doc := range cs.pipelineSlice {
   489  		pipelineArr = bsoncore.AppendDocumentElement(pipelineArr, strconv.Itoa(i), doc)
   490  	}
   491  	if pipelineArr, cs.err = bsoncore.AppendArrayEnd(pipelineArr, pipelineDocIdx); cs.err != nil {
   492  		return nil, cs.err
   493  	}
   494  	return pipelineArr, cs.err
   495  }
   496  
   497  func (cs *ChangeStream) replaceOptions(wireVersion *description.VersionRange) {
   498  	// Cached resume token: use the resume token as the resumeAfter option and set no other resume options
   499  	if cs.resumeToken != nil {
   500  		cs.options.SetResumeAfter(cs.resumeToken)
   501  		cs.options.SetStartAfter(nil)
   502  		cs.options.SetStartAtOperationTime(nil)
   503  		return
   504  	}
   505  
   506  	// No cached resume token but cached operation time: use the operation time as the startAtOperationTime option and
   507  	// set no other resume options
   508  	if (cs.sess.OperationTime != nil || cs.options.StartAtOperationTime != nil) && wireVersion.Max >= 7 {
   509  		opTime := cs.options.StartAtOperationTime
   510  		if cs.operationTime != nil {
   511  			opTime = cs.sess.OperationTime
   512  		}
   513  
   514  		cs.options.SetStartAtOperationTime(opTime)
   515  		cs.options.SetResumeAfter(nil)
   516  		cs.options.SetStartAfter(nil)
   517  		return
   518  	}
   519  
   520  	// No cached resume token or operation time: set none of the resume options
   521  	cs.options.SetResumeAfter(nil)
   522  	cs.options.SetStartAfter(nil)
   523  	cs.options.SetStartAtOperationTime(nil)
   524  }
   525  
   526  // ID returns the ID for this change stream, or 0 if the cursor has been closed or exhausted.
   527  func (cs *ChangeStream) ID() int64 {
   528  	if cs.cursor == nil {
   529  		return 0
   530  	}
   531  	return cs.cursor.ID()
   532  }
   533  
   534  // SetBatchSize sets the number of documents to fetch from the database with
   535  // each iteration of the ChangeStream's "Next" or "TryNext" method. This setting
   536  // only affects subsequent document batches fetched from the database.
   537  func (cs *ChangeStream) SetBatchSize(size int32) {
   538  	// Set batch size on the cursor options also so any "resumed" change stream
   539  	// cursors will pick up the latest batch size setting.
   540  	cs.cursorOptions.BatchSize = size
   541  	cs.cursor.SetBatchSize(size)
   542  }
   543  
   544  // Decode will unmarshal the current event document into val and return any errors from the unmarshalling process
   545  // without any modification. If val is nil or is a typed nil, an error will be returned.
   546  func (cs *ChangeStream) Decode(val interface{}) error {
   547  	if cs.cursor == nil {
   548  		return ErrNilCursor
   549  	}
   550  
   551  	dec, err := getDecoder(cs.Current, cs.bsonOpts, cs.registry)
   552  	if err != nil {
   553  		return fmt.Errorf("error configuring BSON decoder: %w", err)
   554  	}
   555  	return dec.Decode(val)
   556  }
   557  
   558  // Err returns the last error seen by the change stream, or nil if no errors has occurred.
   559  func (cs *ChangeStream) Err() error {
   560  	if cs.err != nil {
   561  		return replaceErrors(cs.err)
   562  	}
   563  	if cs.cursor == nil {
   564  		return nil
   565  	}
   566  
   567  	return replaceErrors(cs.cursor.Err())
   568  }
   569  
   570  // Close closes this change stream and the underlying cursor. Next and TryNext must not be called after Close has been
   571  // called. Close is idempotent. After the first call, any subsequent calls will not change the state.
   572  func (cs *ChangeStream) Close(ctx context.Context) error {
   573  	if ctx == nil {
   574  		ctx = context.Background()
   575  	}
   576  
   577  	defer closeImplicitSession(cs.sess)
   578  
   579  	if cs.cursor == nil {
   580  		return nil // cursor is already closed
   581  	}
   582  
   583  	cs.err = replaceErrors(cs.cursor.Close(ctx))
   584  	cs.cursor = nil
   585  	return cs.Err()
   586  }
   587  
   588  // ResumeToken returns the last cached resume token for this change stream, or nil if a resume token has not been
   589  // stored.
   590  func (cs *ChangeStream) ResumeToken() bson.Raw {
   591  	return cs.resumeToken
   592  }
   593  
   594  // Next gets the next event for this change stream. It returns true if there were no errors and the next event document
   595  // is available.
   596  //
   597  // Next blocks until an event is available, an error occurs, or ctx expires. If ctx expires, the error
   598  // will be set to ctx.Err(). In an error case, Next will return false.
   599  //
   600  // If Next returns false, subsequent calls will also return false.
   601  func (cs *ChangeStream) Next(ctx context.Context) bool {
   602  	return cs.next(ctx, false)
   603  }
   604  
   605  // TryNext attempts to get the next event for this change stream. It returns true if there were no errors and the next
   606  // event document is available.
   607  //
   608  // TryNext returns false if the change stream is closed by the server, an error occurs when getting changes from the
   609  // server, the next change is not yet available, or ctx expires. If ctx expires, the error will be set to ctx.Err().
   610  //
   611  // If TryNext returns false and an error occurred or the change stream was closed
   612  // (i.e. cs.Err() != nil || cs.ID() == 0), subsequent attempts will also return false. Otherwise, it is safe to call
   613  // TryNext again until a change is available.
   614  //
   615  // This method requires driver version >= 1.2.0.
   616  func (cs *ChangeStream) TryNext(ctx context.Context) bool {
   617  	return cs.next(ctx, true)
   618  }
   619  
   620  func (cs *ChangeStream) next(ctx context.Context, nonBlocking bool) bool {
   621  	// return false right away if the change stream has already errored or if cursor is closed.
   622  	if cs.err != nil {
   623  		return false
   624  	}
   625  
   626  	if ctx == nil {
   627  		ctx = context.Background()
   628  	}
   629  
   630  	if len(cs.batch) == 0 {
   631  		cs.loopNext(ctx, nonBlocking)
   632  		if cs.err != nil {
   633  			cs.err = replaceErrors(cs.err)
   634  			return false
   635  		}
   636  		if len(cs.batch) == 0 {
   637  			return false
   638  		}
   639  	}
   640  
   641  	// successfully got non-empty batch
   642  	cs.Current = bson.Raw(cs.batch[0])
   643  	cs.batch = cs.batch[1:]
   644  	if cs.err = cs.storeResumeToken(); cs.err != nil {
   645  		return false
   646  	}
   647  	return true
   648  }
   649  
   650  func (cs *ChangeStream) loopNext(ctx context.Context, nonBlocking bool) {
   651  	for {
   652  		if cs.cursor == nil {
   653  			return
   654  		}
   655  
   656  		if cs.cursor.Next(ctx) {
   657  			// non-empty batch returned
   658  			cs.batch, cs.err = cs.cursor.Batch().Documents()
   659  			return
   660  		}
   661  
   662  		cs.err = replaceErrors(cs.cursor.Err())
   663  		if cs.err == nil {
   664  			// Check if cursor is alive
   665  			if cs.ID() == 0 {
   666  				return
   667  			}
   668  
   669  			// If a getMore was done but the batch was empty, the batch cursor will return false with no error.
   670  			// Update the tracked resume token to catch the post batch resume token from the server response.
   671  			cs.updatePbrtFromCommand()
   672  			if nonBlocking {
   673  				// stop after a successful getMore, even though the batch was empty
   674  				return
   675  			}
   676  			continue // loop getMore until a non-empty batch is returned or an error occurs
   677  		}
   678  
   679  		if !cs.isResumableError() {
   680  			return
   681  		}
   682  
   683  		// ignore error from cursor close because if the cursor is deleted or errors we tried to close it and will remake and try to get next batch
   684  		_ = cs.cursor.Close(ctx)
   685  		if cs.err = cs.executeOperation(ctx, true); cs.err != nil {
   686  			return
   687  		}
   688  	}
   689  }
   690  
   691  func (cs *ChangeStream) isResumableError() bool {
   692  	var commandErr CommandError
   693  	if !errors.As(cs.err, &commandErr) || commandErr.HasErrorLabel(networkErrorLabel) {
   694  		// All non-server errors or network errors are resumable.
   695  		return true
   696  	}
   697  
   698  	if commandErr.Code == errorCursorNotFound {
   699  		return true
   700  	}
   701  
   702  	// For wire versions 9 and above, a server error is resumable if it has the ResumableChangeStreamError label.
   703  	if cs.wireVersion != nil && cs.wireVersion.Includes(minResumableLabelWireVersion) {
   704  		return commandErr.HasErrorLabel(resumableErrorLabel)
   705  	}
   706  
   707  	// For wire versions below 9, a server error is resumable if its code is on the allowlist.
   708  	_, resumable := resumableChangeStreamErrors[commandErr.Code]
   709  	return resumable
   710  }
   711  
   712  // Returns true if the underlying cursor's batch is empty
   713  func (cs *ChangeStream) emptyBatch() bool {
   714  	return cs.cursor.Batch().Empty()
   715  }
   716  
   717  // StreamType represents the cluster type against which a ChangeStream was created.
   718  type StreamType uint8
   719  
   720  // These constants represent valid change stream types. A change stream can be initialized over a collection, all
   721  // collections in a database, or over a cluster.
   722  const (
   723  	CollectionStream StreamType = iota
   724  	DatabaseStream
   725  	ClientStream
   726  )
   727  

View as plain text