...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/change_stream_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"context"
    11  	"strings"
    12  	"sync"
    13  	"testing"
    14  	"time"
    15  
    16  	"go.mongodb.org/mongo-driver/bson"
    17  	"go.mongodb.org/mongo-driver/bson/primitive"
    18  	"go.mongodb.org/mongo-driver/event"
    19  	"go.mongodb.org/mongo-driver/internal/assert"
    20  	"go.mongodb.org/mongo-driver/internal/eventtest"
    21  	"go.mongodb.org/mongo-driver/internal/require"
    22  	"go.mongodb.org/mongo-driver/mongo"
    23  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    24  	"go.mongodb.org/mongo-driver/mongo/options"
    25  )
    26  
    27  type resumeType int
    28  type streamType int
    29  
    30  const (
    31  	minChangeStreamVersion = "3.6.0"
    32  	minPbrtVersion         = "4.0.7"
    33  	minStartAfterVersion   = "4.1.1"
    34  
    35  	startAfter resumeType = iota
    36  	resumeAfter
    37  	operationTime
    38  
    39  	client streamType = iota
    40  	database
    41  	collection
    42  
    43  	errorInterrupted     int32 = 11601
    44  	errorHostUnreachable int32 = 6
    45  
    46  	resumableChangeStreamError = "ResumableChangeStreamError"
    47  )
    48  
    49  func TestChangeStream_Standalone(t *testing.T) {
    50  	mtOpts := mtest.NewOptions().MinServerVersion(minChangeStreamVersion).CreateClient(false).Topologies(mtest.Single)
    51  	mt := mtest.New(t, mtOpts)
    52  
    53  	mt.Run("no custom standalone error", func(mt *mtest.T) {
    54  		_, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
    55  		_, ok := err.(mongo.CommandError)
    56  		assert.True(mt, ok, "expected error type %T, got %T", mongo.CommandError{}, err)
    57  	})
    58  }
    59  
    60  func TestChangeStream_ReplicaSet(t *testing.T) {
    61  	mtOpts := mtest.NewOptions().MinServerVersion(minChangeStreamVersion).CreateClient(false).Topologies(mtest.ReplicaSet)
    62  	mt := mtest.New(t, mtOpts)
    63  
    64  	mt.Run("first stage is $changeStream", func(mt *mtest.T) {
    65  		// first stage in the aggregate pipeline must be $changeStream
    66  
    67  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
    68  		assert.Nil(mt, err, "Watch error: %v", err)
    69  		defer closeStream(cs)
    70  		started := mt.GetStartedEvent()
    71  		assert.NotNil(mt, started, "expected started event for aggregate, got nil")
    72  
    73  		// pipeline is array of documents. first value of first element in array is the first stage document
    74  		firstStage := started.Command.Lookup("pipeline").Array().Index(0).Value().Document()
    75  		elems, _ := firstStage.Elements()
    76  		assert.Equal(mt, 1, len(elems), "expected first stage document to have 1 element, got %v", len(elems))
    77  		firstKey := elems[0].Key()
    78  		want := "$changeStream"
    79  		assert.Equal(mt, want, firstKey, "expected first stage to be %v, got %v", want, firstKey)
    80  	})
    81  	mt.Run("track resume token", func(mt *mtest.T) {
    82  		// ChangeStream must continuously track the last seen resumeToken
    83  
    84  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
    85  		assert.Nil(mt, err, "Watch error: %v", err)
    86  		defer closeStream(cs)
    87  
    88  		generateEvents(mt, 1)
    89  		assert.True(mt, cs.Next(context.Background()), "expected next to return true, got false")
    90  		assert.NotNil(mt, cs.ResumeToken(), "expected resume token, got nil")
    91  	})
    92  	mt.RunOpts("resume token updated on empty batch", mtest.NewOptions().MinServerVersion("4.0.7"), func(mt *mtest.T) {
    93  		// The resume token is updated when an empty batch is returned using the server's post batch resume token.
    94  
    95  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
    96  		assert.Nil(mt, err, "Watch error: %v", err)
    97  		defer closeStream(cs)
    98  
    99  		// cause an event to occur so the resume token is updated
   100  		generateEvents(mt, 1)
   101  		assert.True(mt, cs.Next(context.Background()), "expected next to return true, got false")
   102  		firstToken := cs.ResumeToken()
   103  
   104  		// cause an event on a different collection than the one being watched so the server's PBRT is updated
   105  		diffColl := mt.CreateCollection(mtest.Collection{Name: "diffCollUpdatePbrt"}, false)
   106  		_, err = diffColl.InsertOne(context.Background(), bson.D{{"x", 1}})
   107  		assert.Nil(mt, err, "InsertOne error: %v", err)
   108  
   109  		// verify that the resume token is updated using the PBRT from an empty batch
   110  		mt.ClearEvents()
   111  		assert.False(mt, cs.TryNext(context.Background()), "unexpected event document: %v", cs.Current)
   112  		assert.Nil(mt, cs.Err(), "change stream error getting new batch: %v", cs.Err())
   113  		newToken := cs.ResumeToken()
   114  		assert.NotEqual(mt, newToken, firstToken, "resume token was not updated after an empty batch was returned")
   115  
   116  		evt := mt.GetSucceededEvent()
   117  		assert.Equal(mt, "getMore", evt.CommandName, "expected event for 'getMore', got '%v'", evt.CommandName)
   118  		getMorePbrt := evt.Reply.Lookup("cursor", "postBatchResumeToken").Document()
   119  		assert.Equal(mt, newToken, getMorePbrt, "expected resume token %v, got %v", getMorePbrt, newToken)
   120  	})
   121  	mt.Run("missing resume token", func(mt *mtest.T) {
   122  		// ChangeStream will throw an exception if the server response is missing the resume token
   123  
   124  		projectDoc := bson.D{
   125  			{"$project", bson.D{
   126  				{"_id", 0},
   127  			}},
   128  		}
   129  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{projectDoc})
   130  		assert.Nil(mt, err, "Watch error: %v", err)
   131  		defer closeStream(cs)
   132  
   133  		generateEvents(mt, 2)
   134  		assert.False(mt, cs.Next(context.Background()), "expected Next to return false, got true")
   135  		assert.NotNil(mt, cs.Err(), "expected error, got nil")
   136  	})
   137  	mt.RunOpts("resume once", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
   138  		// ChangeStream will automatically resume one time on a resumable error
   139  
   140  		// aggregateRes: create change stream with ID 1 and a batch of size 1 so the resume token will be recorded
   141  		// failureGetMoreRes: resumable error
   142  		// killCursorsRes: success
   143  		// resumedAggregateRes: create new change stream with ID 2 and a batch of size 1 so the resume token will be
   144  		// updated
   145  		ns := mt.Coll.Database().Name() + "." + mt.Coll.Name()
   146  		aggregateRes := mtest.CreateCursorResponse(1, ns, mtest.FirstBatch, bson.D{
   147  			{"_id", bson.D{{"first", "resume token"}}},
   148  		})
   149  		failureGetMoreRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
   150  			Code:    errorHostUnreachable,
   151  			Name:    "foo",
   152  			Message: "bar",
   153  			Labels:  []string{resumableChangeStreamError},
   154  		})
   155  		killCursorsRes := mtest.CreateSuccessResponse()
   156  		newResumeToken := bson.D{{"second", "resume token"}}
   157  		resumedAggregateRes := mtest.CreateCursorResponse(2, ns, mtest.FirstBatch, bson.D{
   158  			{"_id", newResumeToken},
   159  		})
   160  		mt.AddMockResponses(
   161  			aggregateRes,
   162  			failureGetMoreRes,
   163  			killCursorsRes,
   164  			resumedAggregateRes,
   165  		)
   166  
   167  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   168  		assert.Nil(mt, err, "Watch error: %v", err)
   169  		defer closeStream(cs)
   170  		// Consume the first document
   171  		assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
   172  
   173  		// Clear existing events and expect a resume attempt to happen.
   174  		mt.ClearEvents()
   175  		assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
   176  
   177  		// Next should cause getMore, killCursors, and aggregate to run
   178  		assert.NotNil(mt, mt.GetStartedEvent(), "expected getMore event, got nil")
   179  		assert.NotNil(mt, mt.GetStartedEvent(), "expected killCursors event, got nil")
   180  		aggEvent := mt.GetStartedEvent()
   181  		assert.NotNil(mt, aggEvent, "expected aggregate event, got nil")
   182  		assert.Equal(mt, "aggregate", aggEvent.CommandName, "expected command name 'aggregate', got '%v'", aggEvent.CommandName)
   183  
   184  		// Assert that the change stream has updated it's ID and resume token.
   185  		assert.Equal(mt, cs.ID(), int64(2), "expected change stream ID to be 2, got %d", cs.ID())
   186  		newResumeTokenRaw, err := bson.Marshal(newResumeToken)
   187  		assert.Nil(mt, err, "Marshal error: %v", err)
   188  		comparisonErr := compareDocs(mt, newResumeTokenRaw, cs.ResumeToken())
   189  		assert.Nil(mt, comparisonErr, "expected resume token %s, got %s", newResumeTokenRaw, cs.ResumeToken())
   190  	})
   191  	mt.RunOpts("no resume for aggregate errors", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
   192  		// ChangeStream will not attempt to resume on any error encountered while executing an aggregate command
   193  
   194  		// aggregate response: empty batch but valid cursor ID
   195  		// getMore response: resumable error
   196  		// killCursors response: success
   197  		// resumed aggregate response: resumable error
   198  		ns := mt.Coll.Database().Name() + "." + mt.Coll.Name()
   199  		aggRes := mtest.CreateCursorResponse(1, ns, mtest.FirstBatch)
   200  		getMoreRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
   201  			Code:    errorHostUnreachable,
   202  			Name:    "foo",
   203  			Message: "bar",
   204  			Labels:  []string{resumableChangeStreamError},
   205  		})
   206  		killCursorsRes := mtest.CreateSuccessResponse()
   207  		resumedAggRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
   208  			Code:    errorHostUnreachable,
   209  			Name:    "foo",
   210  			Message: "bar",
   211  			Labels:  []string{resumableChangeStreamError},
   212  		})
   213  		mt.AddMockResponses(aggRes, getMoreRes, killCursorsRes, resumedAggRes)
   214  
   215  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   216  		assert.Nil(mt, err, "Watch error: %v", err)
   217  		defer closeStream(cs)
   218  
   219  		assert.False(mt, cs.Next(context.Background()), "expected Next to return false, got true")
   220  	})
   221  	mt.RunOpts("server selection before resume", mtest.NewOptions().CreateClient(false), func(mt *mtest.T) {
   222  		// ChangeStream will perform server selection before attempting to resume, using initial readPreference
   223  		mt.Skip("skipping for lack of SDAM monitoring")
   224  	})
   225  	mt.Run("empty batch cursor not closed", func(mt *mtest.T) {
   226  		// Ensure that a cursor returned from an aggregate command with a cursor id and an initial empty batch is not closed
   227  
   228  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   229  		assert.Nil(mt, err, "Watch error: %v", err)
   230  		defer closeStream(cs)
   231  		assert.True(mt, cs.ID() > 0, "expected non-zero ID, got 0")
   232  	})
   233  	mt.RunOpts("ignore errors from killCursors", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
   234  		// The killCursors command sent during the "Resume Process" must not be allowed to throw an exception.
   235  
   236  		ns := mt.Coll.Database().Name() + "." + mt.Coll.Name()
   237  		aggRes := mtest.CreateCursorResponse(1, ns, mtest.FirstBatch)
   238  		getMoreRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
   239  			Code:    errorHostUnreachable,
   240  			Name:    "foo",
   241  			Message: "bar",
   242  			Labels:  []string{"ResumableChangeStreamError"},
   243  		})
   244  		killCursorsRes := mtest.CreateCommandErrorResponse(mtest.CommandError{
   245  			Code:    errorInterrupted,
   246  			Name:    "foo",
   247  			Message: "bar",
   248  		})
   249  		changeDoc := bson.D{{"_id", bson.D{{"x", 1}}}}
   250  		resumedAggRes := mtest.CreateCursorResponse(1, ns, mtest.FirstBatch, changeDoc)
   251  		mt.AddMockResponses(aggRes, getMoreRes, killCursorsRes, resumedAggRes)
   252  
   253  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   254  		assert.Nil(mt, err, "Watch error: %v", err)
   255  		defer closeStream(cs)
   256  
   257  		assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
   258  		assert.Nil(mt, cs.Err(), "change stream error: %v", cs.Err())
   259  	})
   260  
   261  	startAtOpTimeOpts := mtest.NewOptions().MinServerVersion("4.0").MaxServerVersion("4.0.6")
   262  	mt.RunOpts("include startAtOperationTime", startAtOpTimeOpts, func(mt *mtest.T) {
   263  		// $changeStream stage for ChangeStream against a server >=4.0 and <4.0.7 that has not received any results yet
   264  		// MUST include a startAtOperationTime option when resuming a changestream.
   265  
   266  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   267  		assert.Nil(mt, err, "Watch error: %v", err)
   268  		defer closeStream(cs)
   269  
   270  		generateEvents(mt, 1)
   271  		// kill cursor to force resumable error
   272  		killChangeStreamCursor(mt, cs)
   273  
   274  		mt.ClearEvents()
   275  		// change stream should resume once and get new change
   276  		assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
   277  		// Next should cause getMore, killCursors, and aggregate to run
   278  		assert.NotNil(mt, mt.GetStartedEvent(), "expected getMore event, got nil")
   279  		assert.NotNil(mt, mt.GetStartedEvent(), "expected killCursors event, got nil")
   280  		aggEvent := mt.GetStartedEvent()
   281  		assert.NotNil(mt, aggEvent, "expected aggregate event, got nil")
   282  		assert.Equal(mt, "aggregate", aggEvent.CommandName, "expected command name 'aggregate', got '%v'", aggEvent.CommandName)
   283  
   284  		// check for startAtOperationTime in pipeline
   285  		csStage := aggEvent.Command.Lookup("pipeline").Array().Index(0).Value().Document() // $changeStream stage
   286  		_, err = csStage.Lookup("$changeStream").Document().LookupErr("startAtOperationTime")
   287  		assert.Nil(mt, err, "startAtOperationTime not included in aggregate command")
   288  	})
   289  	mt.RunOpts("decode does not panic", noClientOpts, func(mt *mtest.T) {
   290  		testCases := []struct {
   291  			name             string
   292  			st               streamType
   293  			minServerVersion string
   294  		}{
   295  			{"client", client, "4.0"},
   296  			{"database", database, "4.0"},
   297  			{"collection", collection, ""},
   298  		}
   299  		for _, tc := range testCases {
   300  			tcOpts := mtest.NewOptions()
   301  			if tc.minServerVersion != "" {
   302  				tcOpts.MinServerVersion(tc.minServerVersion)
   303  			}
   304  			mt.RunOpts(tc.name, tcOpts, func(mt *mtest.T) {
   305  				var cs *mongo.ChangeStream
   306  				var err error
   307  				switch tc.st {
   308  				case client:
   309  					cs, err = mt.Client.Watch(context.Background(), mongo.Pipeline{})
   310  				case database:
   311  					cs, err = mt.DB.Watch(context.Background(), mongo.Pipeline{})
   312  				case collection:
   313  					cs, err = mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   314  				}
   315  				assert.Nil(mt, err, "Watch error: %v", err)
   316  				defer closeStream(cs)
   317  
   318  				generateEvents(mt, 1)
   319  				assert.True(mt, cs.Next(context.Background()), "expected Next true, got false")
   320  				var res bson.D
   321  				err = cs.Decode(&res)
   322  				assert.Nil(mt, err, "Decode error: %v", err)
   323  				assert.True(mt, len(res) > 0, "expected non-empty document, got empty")
   324  			})
   325  		}
   326  	})
   327  	mt.Run("maxAwaitTimeMS", func(mt *mtest.T) {
   328  		// maxAwaitTimeMS option should be sent as maxTimeMS on getMore
   329  
   330  		opts := options.ChangeStream().SetMaxAwaitTime(100 * time.Millisecond)
   331  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
   332  		assert.Nil(mt, err, "Watch error: %v", err)
   333  		defer closeStream(cs)
   334  
   335  		_, err = mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
   336  		assert.Nil(mt, err, "InsertOne error: %v", err)
   337  		mt.ClearEvents()
   338  		assert.True(mt, cs.Next(context.Background()), "expected Next true, got false")
   339  
   340  		e := mt.GetStartedEvent()
   341  		assert.NotNil(mt, e, "expected getMore event, got nil")
   342  		_, err = e.Command.LookupErr("maxTimeMS")
   343  		assert.Nil(mt, err, "field maxTimeMS not found in command %v", e.Command)
   344  	})
   345  	mt.RunOpts("resume token", noClientOpts, func(mt *mtest.T) {
   346  		// Prose tests to make assertions on resume tokens for change streams that have not done a getMore yet
   347  		mt.RunOpts("no getMore", noClientOpts, func(mt *mtest.T) {
   348  			pbrtOpts := mtest.NewOptions().MinServerVersion(minPbrtVersion).CreateClient(false)
   349  			mt.RunOpts("with PBRT support", pbrtOpts, func(mt *mtest.T) {
   350  				testCases := []struct {
   351  					name             string
   352  					rt               resumeType
   353  					minServerVersion string
   354  				}{
   355  					{"startAfter", startAfter, minStartAfterVersion},
   356  					{"resumeAfter", resumeAfter, minPbrtVersion},
   357  					{"neither", operationTime, minPbrtVersion},
   358  				}
   359  
   360  				for _, tc := range testCases {
   361  					tcOpts := mtest.NewOptions().MinServerVersion(tc.minServerVersion)
   362  					mt.RunOpts(tc.name, tcOpts, func(mt *mtest.T) {
   363  						// create temp stream to get a resume token
   364  						mt.ClearEvents()
   365  						cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   366  						assert.Nil(mt, err, "Watch error: %v", err)
   367  
   368  						// Initial resume token should equal the PBRT in the aggregate command
   369  						pbrt, opTime := getAggregateResponseInfo(mt)
   370  						compareResumeTokens(mt, cs, pbrt)
   371  
   372  						numEvents := 5
   373  						generateEvents(mt, numEvents)
   374  
   375  						// Iterate over one event to get resume token
   376  						assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
   377  						token := cs.ResumeToken()
   378  						closeStream(cs)
   379  
   380  						var numExpectedEvents int
   381  						var initialToken bson.Raw
   382  						var opts *options.ChangeStreamOptions
   383  						switch tc.rt {
   384  						case startAfter:
   385  							numExpectedEvents = numEvents - 1
   386  							initialToken = token
   387  							opts = options.ChangeStream().SetStartAfter(token)
   388  						case resumeAfter:
   389  							numExpectedEvents = numEvents - 1
   390  							initialToken = token
   391  							opts = options.ChangeStream().SetResumeAfter(token)
   392  						case operationTime:
   393  							numExpectedEvents = numEvents
   394  							opts = options.ChangeStream().SetStartAtOperationTime(&opTime)
   395  						}
   396  
   397  						// clear slate and create new change stream
   398  						mt.ClearEvents()
   399  						cs, err = mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
   400  						assert.Nil(mt, err, "Watch error: %v", err)
   401  						defer closeStream(cs)
   402  
   403  						aggPbrt, _ := getAggregateResponseInfo(mt)
   404  						compareResumeTokens(mt, cs, initialToken)
   405  
   406  						for i := 0; i < numExpectedEvents; i++ {
   407  							assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
   408  							// while we're not at the last doc in the batch, the resume token should be the _id of the
   409  							// document
   410  							if i != numExpectedEvents-1 {
   411  								compareResumeTokens(mt, cs, cs.Current.Lookup("_id").Document())
   412  							}
   413  						}
   414  						// at end of batch, the resume token should equal the PBRT of the aggregate
   415  						compareResumeTokens(mt, cs, aggPbrt)
   416  					})
   417  				}
   418  			})
   419  
   420  			noPbrtOpts := mtest.NewOptions().MaxServerVersion("4.0.6")
   421  			mt.RunOpts("without PBRT support", noPbrtOpts, func(mt *mtest.T) {
   422  				collName := mt.Coll.Name()
   423  				dbName := mt.Coll.Database().Name()
   424  				cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   425  				assert.Nil(mt, err, "Watch error: %v", err)
   426  				defer closeStream(cs)
   427  
   428  				compareResumeTokens(mt, cs, nil) // should be no resume token because no PBRT
   429  				numEvents := 5
   430  				generateEvents(mt, numEvents)
   431  				// iterate once to get a resume token
   432  				assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
   433  				token := cs.ResumeToken()
   434  				assert.NotNil(mt, token, "expected resume token, got nil")
   435  
   436  				testCases := []struct {
   437  					name            string
   438  					opts            *options.ChangeStreamOptions
   439  					iterateStream   bool // whether or not resulting change stream should be iterated
   440  					initialToken    bson.Raw
   441  					numDocsExpected int
   442  				}{
   443  					{"resumeAfter", options.ChangeStream().SetResumeAfter(token), true, token, numEvents - 1},
   444  					{"no options", nil, false, nil, 0},
   445  				}
   446  				for _, tc := range testCases {
   447  					mt.Run(tc.name, func(mt *mtest.T) {
   448  						coll := mt.Client.Database(dbName).Collection(collName)
   449  						cs, err := coll.Watch(context.Background(), mongo.Pipeline{}, tc.opts)
   450  						assert.Nil(mt, err, "Watch error: %v", err)
   451  						defer closeStream(cs)
   452  
   453  						compareResumeTokens(mt, cs, tc.initialToken)
   454  						if !tc.iterateStream {
   455  							return
   456  						}
   457  
   458  						for i := 0; i < tc.numDocsExpected; i++ {
   459  							assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
   460  							// current resume token should always equal _id of current document
   461  							compareResumeTokens(mt, cs, cs.Current.Lookup("_id").Document())
   462  						}
   463  					})
   464  				}
   465  			})
   466  		})
   467  	})
   468  	mt.RunOpts("try next", noClientOpts, func(mt *mtest.T) {
   469  		mt.Run("existing non-empty batch", func(mt *mtest.T) {
   470  			// If there's already documents in the current batch, TryNext should return true without doing a getMore
   471  
   472  			cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   473  			assert.Nil(mt, err, "Watch error: %v", err)
   474  			defer closeStream(cs)
   475  			generateEvents(mt, 5)
   476  			// call Next to make sure a batch is retrieved
   477  			assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false")
   478  			tryNextExistingBatchTest(mt, cs)
   479  		})
   480  		mt.Run("one getMore sent", func(mt *mtest.T) {
   481  			// If the current batch is empty, TryNext should send one getMore and return.
   482  
   483  			cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   484  			assert.Nil(mt, err, "Watch error: %v", err)
   485  			defer closeStream(cs)
   486  
   487  			mt.ClearEvents()
   488  			// first call to TryNext should return false because first batch was empty so batch cursor returns false
   489  			// without doing a getMore
   490  			// next call to TryNext should attempt a getMore
   491  			for i := 0; i < 2; i++ {
   492  				assert.False(mt, cs.TryNext(context.Background()), "TryNext returned true on iteration %v", i)
   493  			}
   494  			verifyOneGetmoreSent(mt)
   495  		})
   496  		mt.RunOpts("getMore error", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
   497  			// If the getMore attempt errors with a non-resumable error, TryNext returns false
   498  
   499  			aggRes := mtest.CreateCursorResponse(50, "foo.bar", mtest.FirstBatch)
   500  			mt.AddMockResponses(aggRes)
   501  			cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   502  			assert.Nil(mt, err, "Watch error: %v", err)
   503  			defer closeStream(cs)
   504  			tryNextGetmoreError(mt, cs)
   505  		})
   506  	})
   507  
   508  	customDeploymentOpts := mtest.NewOptions().
   509  		Topologies(mtest.ReplicaSet). // Avoid complexity of sharded fail points.
   510  		MinServerVersion("4.0").      // 4.0 is needed to use replica set fail points.
   511  		CreateClient(false)
   512  	mt.RunOpts("custom deployment", customDeploymentOpts, func(mt *mtest.T) {
   513  		// Tests for the changeStreamDeployment type. These are written as integration tests for ChangeStream rather
   514  		// than unit/integration tests for changeStreamDeployment to ensure that the deployment is correctly wired
   515  		// by ChangeStream when executing an aggregate.
   516  
   517  		mt.Run("errors are processed for SDAM on initial aggregate", func(mt *mtest.T) {
   518  			tpm := eventtest.NewTestPoolMonitor()
   519  			mt.ResetClient(options.Client().
   520  				SetPoolMonitor(tpm.PoolMonitor).
   521  				SetWriteConcern(mtest.MajorityWc).
   522  				SetReadConcern(mtest.MajorityRc).
   523  				SetRetryReads(false))
   524  
   525  			mt.SetFailPoint(mtest.FailPoint{
   526  				ConfigureFailPoint: "failCommand",
   527  				Mode: mtest.FailPointMode{
   528  					Times: 1,
   529  				},
   530  				Data: mtest.FailPointData{
   531  					FailCommands:    []string{"aggregate"},
   532  					CloseConnection: true,
   533  				},
   534  			})
   535  
   536  			_, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   537  			assert.NotNil(mt, err, "expected Watch error, got nil")
   538  			assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
   539  		})
   540  		mt.Run("errors are processed for SDAM on getMore", func(mt *mtest.T) {
   541  			tpm := eventtest.NewTestPoolMonitor()
   542  			mt.ResetClient(options.Client().
   543  				SetPoolMonitor(tpm.PoolMonitor).
   544  				SetWriteConcern(mtest.MajorityWc).
   545  				SetReadConcern(mtest.MajorityRc).
   546  				SetRetryReads(false))
   547  
   548  			mt.SetFailPoint(mtest.FailPoint{
   549  				ConfigureFailPoint: "failCommand",
   550  				Mode: mtest.FailPointMode{
   551  					Times: 1,
   552  				},
   553  				Data: mtest.FailPointData{
   554  					FailCommands:    []string{"getMore"},
   555  					CloseConnection: true,
   556  				},
   557  			})
   558  
   559  			cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   560  			assert.Nil(mt, err, "Watch error: %v", err)
   561  			defer closeStream(cs)
   562  
   563  			_, err = mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
   564  			assert.Nil(mt, err, "InsertOne error: %v", err)
   565  
   566  			assert.True(mt, cs.Next(context.Background()), "expected Next to return true, got false (iteration error %v)",
   567  				cs.Err())
   568  			assert.True(mt, tpm.IsPoolCleared(), "expected pool to be cleared after non-timeout network error but was not")
   569  		})
   570  		mt.Run("errors are processed for SDAM on retried aggregate", func(mt *mtest.T) {
   571  			tpm := eventtest.NewTestPoolMonitor()
   572  			mt.ResetClient(options.Client().
   573  				SetPoolMonitor(tpm.PoolMonitor).
   574  				SetRetryReads(true))
   575  
   576  			mt.SetFailPoint(mtest.FailPoint{
   577  				ConfigureFailPoint: "failCommand",
   578  				Mode: mtest.FailPointMode{
   579  					Times: 2,
   580  				},
   581  				Data: mtest.FailPointData{
   582  					FailCommands:    []string{"aggregate"},
   583  					CloseConnection: true,
   584  				},
   585  			})
   586  
   587  			_, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   588  			assert.NotNil(mt, err, "expected Watch error, got nil")
   589  
   590  			clearedEvents := tpm.Events(func(evt *event.PoolEvent) bool {
   591  				return evt.Type == event.PoolCleared
   592  			})
   593  			assert.Equal(mt, 2, len(clearedEvents), "expected two PoolCleared events, got %d", len(clearedEvents))
   594  		})
   595  	})
   596  	// Setting min server version as 4.0 since v3.6 does not send a "dropEvent"
   597  	mt.RunOpts("call to cursor.Next after cursor closed", mtest.NewOptions().MinServerVersion("4.0"), func(mt *mtest.T) {
   598  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   599  		assert.Nil(mt, err, "Watch error: %v", err)
   600  		defer closeStream(cs)
   601  
   602  		// Generate insert events
   603  		generateEvents(mt, 5)
   604  		// Call Coll.Drop to generate drop and invalidate event
   605  		err = mt.Coll.Drop(context.Background())
   606  		assert.Nil(mt, err, "Drop error: %v", err)
   607  
   608  		// Test that all events were successful
   609  		for i := 0; i < 7; i++ {
   610  			assert.True(mt, cs.Next(context.Background()), "Next returned false at index %d; iteration error: %v", i, cs.Err())
   611  		}
   612  
   613  		operationType := cs.Current.Lookup("operationType").StringValue()
   614  		assert.Equal(mt, operationType, "invalidate", "expected invalidate event but returned %q event", operationType)
   615  		// next call to cs.Next should return False since cursor is closed
   616  		assert.False(mt, cs.Next(context.Background()), "expected to return false, but returned true")
   617  	})
   618  	mt.Run("getMore commands are monitored", func(mt *mtest.T) {
   619  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   620  		assert.Nil(mt, err, "Watch error: %v", err)
   621  		defer closeStream(cs)
   622  
   623  		_, err = mt.Coll.InsertOne(context.Background(), bson.M{"x": 1})
   624  		assert.Nil(mt, err, "InsertOne error: %v", err)
   625  
   626  		mt.ClearEvents()
   627  		assert.True(mt, cs.Next(context.Background()), "Next returned false with error %v", cs.Err())
   628  		evt := mt.GetStartedEvent()
   629  		assert.Equal(mt, "getMore", evt.CommandName, "expected command 'getMore', got %q", evt.CommandName)
   630  	})
   631  	mt.Run("killCursors commands are monitored", func(mt *mtest.T) {
   632  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   633  		assert.Nil(mt, err, "Watch error: %v", err)
   634  		defer closeStream(cs)
   635  
   636  		mt.ClearEvents()
   637  		err = cs.Close(context.Background())
   638  		assert.Nil(mt, err, "Close error: %v", err)
   639  		evt := mt.GetStartedEvent()
   640  		assert.Equal(mt, "killCursors", evt.CommandName, "expected command 'killCursors', got %q", evt.CommandName)
   641  	})
   642  	mt.Run("Custom", func(mt *mtest.T) {
   643  		// Custom options should be a BSON map of option names to Marshalable option values.
   644  		// We use "allowDiskUse" as an example.
   645  		customOpts := bson.M{"allowDiskUse": true}
   646  		opts := options.ChangeStream().SetCustom(customOpts)
   647  
   648  		// Create change stream with custom options set.
   649  		mt.ClearEvents()
   650  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
   651  		assert.Nil(mt, err, "Watch error: %v", err)
   652  		defer closeStream(cs)
   653  
   654  		// Assert that custom option is passed to the initial aggregate.
   655  		evt := mt.GetStartedEvent()
   656  		assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate' got, %q", evt.CommandName)
   657  
   658  		aduVal, err := evt.Command.LookupErr("allowDiskUse")
   659  		assert.Nil(mt, err, "expected field 'allowDiskUse' in started command not found")
   660  		adu, ok := aduVal.BooleanOK()
   661  		assert.True(mt, ok, "expected field 'allowDiskUse' to be boolean, got %v", aduVal.Type.String())
   662  		assert.True(mt, adu, "expected field 'allowDiskUse' to be true, got false")
   663  	})
   664  	mt.RunOpts("CustomPipeline", mtest.NewOptions().MinServerVersion("4.0"), func(mt *mtest.T) {
   665  		// Custom pipeline options should be a BSON map of option names to Marshalable option values.
   666  		// We use "allChangesForCluster" as an example.
   667  		customPipelineOpts := bson.M{"allChangesForCluster": false}
   668  		opts := options.ChangeStream().SetCustomPipeline(customPipelineOpts)
   669  
   670  		// Create change stream with custom pipeline options set.
   671  		mt.ClearEvents()
   672  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{}, opts)
   673  		assert.Nil(mt, err, "Watch error: %v", err)
   674  		defer closeStream(cs)
   675  
   676  		// Assert that custom pipeline option is included in the $changeStream stage.
   677  		evt := mt.GetStartedEvent()
   678  		assert.Equal(mt, "aggregate", evt.CommandName, "expected command 'aggregate' got, %q", evt.CommandName)
   679  
   680  		acfcVal, err := evt.Command.LookupErr("pipeline", "0", "$changeStream", "allChangesForCluster")
   681  		assert.Nil(mt, err, "expected field 'allChangesForCluster' in $changeStream stage not found")
   682  		acfc, ok := acfcVal.BooleanOK()
   683  		assert.True(mt, ok, "expected field 'allChangesForCluster' to be boolean, got %v", acfcVal.Type.String())
   684  		assert.False(mt, acfc, "expected field 'allChangesForCluster' to be false, got %v", acfc)
   685  	})
   686  
   687  	withBSONOpts := mtest.NewOptions().ClientOptions(
   688  		options.Client().SetBSONOptions(&options.BSONOptions{
   689  			UseJSONStructTags: true,
   690  		}))
   691  	mt.RunOpts("with BSONOptions", withBSONOpts, func(mt *mtest.T) {
   692  		cs, err := mt.Coll.Watch(context.Background(), mongo.Pipeline{})
   693  		require.NoError(mt, err, "Watch error")
   694  		defer closeStream(cs)
   695  
   696  		type myDocument struct {
   697  			A string `json:"x"`
   698  		}
   699  
   700  		var wg sync.WaitGroup
   701  		wg.Add(1)
   702  		go func() {
   703  			defer wg.Done()
   704  			_, err := mt.Coll.InsertOne(context.Background(), myDocument{A: "foo"})
   705  			require.NoError(mt, err, "InsertOne error")
   706  		}()
   707  
   708  		cs.Next(context.Background())
   709  
   710  		var got struct {
   711  			FullDocument myDocument `bson:"fullDocument"`
   712  		}
   713  		err = cs.Decode(&got)
   714  		require.NoError(mt, err, "Decode error")
   715  
   716  		want := myDocument{
   717  			A: "foo",
   718  		}
   719  		assert.Equal(mt, want, got.FullDocument, "expected and actual Decode results are different")
   720  
   721  		wg.Wait()
   722  	})
   723  
   724  	splitLargeChangesCollOpts := options.
   725  		CreateCollection().
   726  		SetChangeStreamPreAndPostImages(bson.M{"enabled": true})
   727  
   728  	splitLargeChangesOpts := mtOpts.
   729  		MinServerVersion("6.0.9").
   730  		CreateClient(true).
   731  		CollectionCreateOptions(splitLargeChangesCollOpts)
   732  
   733  	mt.RunOpts("split large changes", splitLargeChangesOpts, func(mt *mtest.T) {
   734  		type idValue struct {
   735  			ID    int32  `bson:"_id"`
   736  			Value string `bson:"value"`
   737  		}
   738  
   739  		doc := idValue{
   740  			ID:    1,
   741  			Value: "q" + strings.Repeat("q", 10*1024*1024),
   742  		}
   743  
   744  		// Insert the document
   745  		_, err := mt.Coll.InsertOne(context.Background(), doc)
   746  		require.NoError(t, err, "failed to insert idValue")
   747  
   748  		// Watch for change events
   749  		pipeline := mongo.Pipeline{
   750  			{{"$changeStreamSplitLargeEvent", bson.D{}}},
   751  		}
   752  
   753  		opts := options.ChangeStream().SetFullDocument(options.Required)
   754  
   755  		cs, err := mt.Coll.Watch(context.Background(), pipeline, opts)
   756  		require.NoError(t, err, "failed to watch collection")
   757  
   758  		defer closeStream(cs)
   759  
   760  		var wg sync.WaitGroup
   761  		wg.Add(1)
   762  
   763  		go func() {
   764  			defer wg.Done()
   765  
   766  			filter := bson.D{{"_id", int32(1)}}
   767  			update := bson.D{{"$set", bson.D{{"value", "z" + strings.Repeat("q", 10*1024*1024)}}}}
   768  
   769  			_, err := mt.Coll.UpdateOne(context.Background(), filter, update)
   770  			require.NoError(mt, err, "failed to update idValue")
   771  		}()
   772  
   773  		nextCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   774  		t.Cleanup(cancel)
   775  
   776  		type splitEvent struct {
   777  			Fragment int32 `bson:"fragment"`
   778  			Of       int32 `bson:"of"`
   779  		}
   780  
   781  		got := struct {
   782  			SplitEvent splitEvent `bson:"splitEvent"`
   783  		}{}
   784  
   785  		cs.Next(nextCtx)
   786  
   787  		err = cs.Decode(&got)
   788  		require.NoError(mt, err, "failed to decode first iteration")
   789  
   790  		want := splitEvent{
   791  			Fragment: 1,
   792  			Of:       2,
   793  		}
   794  
   795  		assert.Equal(mt, want, got.SplitEvent, "expected and actual Decode results are different")
   796  
   797  		cs.Next(nextCtx)
   798  
   799  		err = cs.Decode(&got)
   800  		require.NoError(mt, err, "failed to decoded second iteration")
   801  
   802  		want = splitEvent{
   803  			Fragment: 2,
   804  			Of:       2,
   805  		}
   806  
   807  		assert.Equal(mt, want, got.SplitEvent, "expected and actual decode results are different")
   808  
   809  		wg.Wait()
   810  	})
   811  }
   812  
   813  func closeStream(cs *mongo.ChangeStream) {
   814  	_ = cs.Close(context.Background())
   815  }
   816  
   817  func generateEvents(mt *mtest.T, numEvents int) {
   818  	mt.Helper()
   819  
   820  	for i := 0; i < numEvents; i++ {
   821  		doc := bson.D{{"x", i}}
   822  		_, err := mt.Coll.InsertOne(context.Background(), doc)
   823  		assert.Nil(mt, err, "InsertOne error on document %v: %v", doc, err)
   824  	}
   825  }
   826  
   827  func killChangeStreamCursor(mt *mtest.T, cs *mongo.ChangeStream) {
   828  	mt.Helper()
   829  
   830  	db := mt.Coll.Database().Client().Database("admin")
   831  	err := db.RunCommand(context.Background(), bson.D{
   832  		{"killCursors", mt.Coll.Name()},
   833  		{"cursors", bson.A{cs.ID()}},
   834  	}).Err()
   835  	assert.Nil(mt, err, "killCursors error: %v", err)
   836  }
   837  
   838  // returns pbrt, operationTime from aggregate command response
   839  func getAggregateResponseInfo(mt *mtest.T) (bson.Raw, primitive.Timestamp) {
   840  	mt.Helper()
   841  
   842  	succeeded := mt.GetSucceededEvent()
   843  	assert.NotNil(mt, succeeded, "expected success event for aggregate, got nil")
   844  	assert.Equal(mt, "aggregate", succeeded.CommandName, "expected command name 'aggregate', got '%v'", succeeded.CommandName)
   845  
   846  	pbrt := succeeded.Reply.Lookup("cursor", "postBatchResumeToken").Document()
   847  	optimeT, optimeI := succeeded.Reply.Lookup("operationTime").Timestamp()
   848  	return pbrt, primitive.Timestamp{T: optimeT, I: optimeI}
   849  }
   850  
   851  func compareResumeTokens(mt *mtest.T, cs *mongo.ChangeStream, expected bson.Raw) {
   852  	mt.Helper()
   853  	assert.Equal(mt, expected, cs.ResumeToken(), "expected resume token %v, got %v", expected, cs.ResumeToken())
   854  }
   855  

View as plain text