...

Source file src/go.mongodb.org/mongo-driver/mongo/integration/cursor_test.go

Documentation: go.mongodb.org/mongo-driver/mongo/integration

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package integration
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"os"
    13  	"testing"
    14  	"time"
    15  
    16  	"go.mongodb.org/mongo-driver/bson"
    17  	"go.mongodb.org/mongo-driver/internal/assert"
    18  	"go.mongodb.org/mongo-driver/mongo"
    19  	"go.mongodb.org/mongo-driver/mongo/integration/mtest"
    20  	"go.mongodb.org/mongo-driver/mongo/options"
    21  )
    22  
    23  const (
    24  	errorCursorNotFound = 43
    25  )
    26  
    27  func TestCursor(t *testing.T) {
    28  	mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
    29  	cappedCollectionOpts := options.CreateCollection().SetCapped(true).SetSizeInBytes(64 * 1024)
    30  
    31  	// Server versions 2.6 and 3.0 use OP_GET_MORE so this works on >= 3.2 and when RequireAPIVersion is false;
    32  	// getMore cannot be sent with RunCommand as server API options will be attached when they should not be.
    33  	mt.RunOpts("cursor is killed on server", mtest.NewOptions().MinServerVersion("3.2").RequireAPIVersion(false), func(mt *mtest.T) {
    34  		initCollection(mt, mt.Coll)
    35  		c, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(2))
    36  		assert.Nil(mt, err, "Find error: %v", err)
    37  
    38  		id := c.ID()
    39  		assert.True(mt, c.Next(context.Background()), "expected Next true, got false")
    40  		err = c.Close(context.Background())
    41  		assert.Nil(mt, err, "Close error: %v", err)
    42  
    43  		err = mt.DB.RunCommand(context.Background(), bson.D{
    44  			{"getMore", id},
    45  			{"collection", mt.Coll.Name()},
    46  		}).Err()
    47  		ce := err.(mongo.CommandError)
    48  		assert.Equal(mt, int32(errorCursorNotFound), ce.Code, "expected error code %v, got %v", errorCursorNotFound, ce.Code)
    49  	})
    50  	mt.RunOpts("try next", noClientOpts, func(mt *mtest.T) {
    51  		// Skip tests if running against serverless, as capped collections are banned.
    52  		if os.Getenv("SERVERLESS") == "serverless" {
    53  			mt.Skip("skipping as serverless forbids capped collections")
    54  		}
    55  
    56  		mt.Run("existing non-empty batch", func(mt *mtest.T) {
    57  			// If there's already documents in the current batch, TryNext should return true without doing a getMore
    58  
    59  			initCollection(mt, mt.Coll)
    60  			cursor, err := mt.Coll.Find(context.Background(), bson.D{})
    61  			assert.Nil(mt, err, "Find error: %v", err)
    62  			defer cursor.Close(context.Background())
    63  			tryNextExistingBatchTest(mt, cursor)
    64  		})
    65  		mt.RunOpts("one getMore sent", mtest.NewOptions().CollectionCreateOptions(cappedCollectionOpts), func(mt *mtest.T) {
    66  			// If the current batch is empty, TryNext should send one getMore and return.
    67  
    68  			// insert a document because a tailable cursor will only have a non-zero ID if the initial Find matches
    69  			// at least one document
    70  			_, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
    71  			assert.Nil(mt, err, "InsertOne error: %v", err)
    72  
    73  			cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetCursorType(options.Tailable))
    74  			assert.Nil(mt, err, "Find error: %v", err)
    75  			defer cursor.Close(context.Background())
    76  
    77  			// first call to TryNext should return 1 document
    78  			assert.True(mt, cursor.TryNext(context.Background()), "expected Next to return true, got false")
    79  			// TryNext should attempt one getMore
    80  			mt.ClearEvents()
    81  			assert.False(mt, cursor.TryNext(context.Background()), "unexpected document %v", cursor.Current)
    82  			verifyOneGetmoreSent(mt)
    83  		})
    84  		mt.RunOpts("getMore error", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
    85  			findRes := mtest.CreateCursorResponse(50, "foo.bar", mtest.FirstBatch)
    86  			mt.AddMockResponses(findRes)
    87  			cursor, err := mt.Coll.Find(context.Background(), bson.D{})
    88  			assert.Nil(mt, err, "Find error: %v", err)
    89  			defer cursor.Close(context.Background())
    90  			tryNextGetmoreError(mt, cursor)
    91  		})
    92  	})
    93  	mt.RunOpts("RemainingBatchLength", noClientOpts, func(mt *mtest.T) {
    94  		cappedMtOpts := mtest.NewOptions().CollectionCreateOptions(cappedCollectionOpts)
    95  		// Skip tests if running against serverless, as capped collections are banned.
    96  		if os.Getenv("SERVERLESS") == "serverless" {
    97  			mt.Skip("skipping as serverless forbids capped collections")
    98  		}
    99  
   100  		mt.RunOpts("first batch is non empty", cappedMtOpts, func(mt *mtest.T) {
   101  			// Test that the cursor reports the correct value for RemainingBatchLength at various execution points if
   102  			// the first batch from the server is non-empty.
   103  
   104  			initCollection(mt, mt.Coll)
   105  
   106  			// Create a tailable await cursor with a low cursor timeout.
   107  			batchSize := 2
   108  			findOpts := options.Find().
   109  				SetBatchSize(int32(batchSize)).
   110  				SetCursorType(options.TailableAwait).
   111  				SetMaxAwaitTime(100 * time.Millisecond)
   112  			cursor, err := mt.Coll.Find(context.Background(), bson.D{}, findOpts)
   113  			assert.Nil(mt, err, "Find error: %v", err)
   114  			defer cursor.Close(context.Background())
   115  
   116  			mt.ClearEvents()
   117  
   118  			// The initial batch length should be equal to the batchSize. Do batchSize Next calls to exhaust the current
   119  			// batch and assert that no getMore was done.
   120  			assertCursorBatchLength(mt, cursor, batchSize)
   121  			for i := 0; i < batchSize; i++ {
   122  				prevLength := cursor.RemainingBatchLength()
   123  				if !cursor.Next(context.Background()) {
   124  					mt.Fatalf("expected Next to return true on index %d; cursor err: %v", i, cursor.Err())
   125  				}
   126  
   127  				// Each successful Next call should decrement batch length by 1.
   128  				assertCursorBatchLength(mt, cursor, prevLength-1)
   129  			}
   130  			evt := mt.GetStartedEvent()
   131  			assert.Nil(mt, evt, "expected no events, got %v", evt)
   132  
   133  			// The batch is exhausted, so the batch length should be 0. Do one Next call, which should do a getMore and
   134  			// fetch batchSize more documents. The batch length after the call should be (batchSize-1) because Next consumes
   135  			// one document.
   136  			assertCursorBatchLength(mt, cursor, 0)
   137  
   138  			assert.True(mt, cursor.Next(context.Background()), "expected Next to return true; cursor err: %v", cursor.Err())
   139  			evt = mt.GetStartedEvent()
   140  			assert.NotNil(mt, evt, "expected CommandStartedEvent, got nil")
   141  			assert.Equal(mt, "getMore", evt.CommandName, "expected command %q, got %q", "getMore", evt.CommandName)
   142  
   143  			assertCursorBatchLength(mt, cursor, batchSize-1)
   144  		})
   145  		mt.RunOpts("first batch is empty", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
   146  			// Test that the cursor reports the correct value for RemainingBatchLength if the first batch is empty.
   147  			// Using a mock deployment simplifies this test because the server won't create a valid cursor if the
   148  			// collection is empty when the find is run.
   149  
   150  			cursorID := int64(50)
   151  			ns := mt.DB.Name() + "." + mt.Coll.Name()
   152  			getMoreBatch := []bson.D{
   153  				{{"x", 1}},
   154  				{{"x", 2}},
   155  			}
   156  
   157  			// Create mock responses.
   158  			find := mtest.CreateCursorResponse(cursorID, ns, mtest.FirstBatch)
   159  			getMore := mtest.CreateCursorResponse(cursorID, ns, mtest.NextBatch, getMoreBatch...)
   160  			killCursors := mtest.CreateSuccessResponse()
   161  			mt.AddMockResponses(find, getMore, killCursors)
   162  
   163  			cursor, err := mt.Coll.Find(context.Background(), bson.D{})
   164  			assert.Nil(mt, err, "Find error: %v", err)
   165  			defer cursor.Close(context.Background())
   166  			mt.ClearEvents()
   167  
   168  			for {
   169  				if cursor.TryNext(context.Background()) {
   170  					break
   171  				}
   172  
   173  				assert.Nil(mt, cursor.Err(), "cursor error: %v", err)
   174  				assertCursorBatchLength(mt, cursor, 0)
   175  			}
   176  			// TryNext consumes one document so the remaining batch size should be len(getMoreBatch)-1.
   177  			assertCursorBatchLength(mt, cursor, len(getMoreBatch)-1)
   178  		})
   179  	})
   180  	mt.RunOpts("all", noClientOpts, func(mt *mtest.T) {
   181  		failpointOpts := mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.0")
   182  		mt.RunOpts("getMore error", failpointOpts, func(mt *mtest.T) {
   183  			failpointData := mtest.FailPointData{
   184  				FailCommands: []string{"getMore"},
   185  				ErrorCode:    100,
   186  			}
   187  			mt.SetFailPoint(mtest.FailPoint{
   188  				ConfigureFailPoint: "failCommand",
   189  				Mode:               "alwaysOn",
   190  				Data:               failpointData,
   191  			})
   192  			initCollection(mt, mt.Coll)
   193  			cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(2))
   194  			assert.Nil(mt, err, "Find error: %v", err)
   195  			defer cursor.Close(context.Background())
   196  
   197  			var docs []bson.D
   198  			err = cursor.All(context.Background(), &docs)
   199  			assert.NotNil(mt, err, "expected change stream error, got nil")
   200  
   201  			// make sure that a mongo.CommandError is returned instead of a driver.Error
   202  			mongoErr, ok := err.(mongo.CommandError)
   203  			assert.True(mt, ok, "expected mongo.CommandError, got: %T", err)
   204  			assert.Equal(mt, failpointData.ErrorCode, mongoErr.Code, "expected code %v, got: %v", failpointData.ErrorCode, mongoErr.Code)
   205  		})
   206  
   207  		mt.Run("deferred Close uses context.Background", func(mt *mtest.T) {
   208  			initCollection(mt, mt.Coll)
   209  
   210  			// Find with batchSize 2 so All will run getMore for next 3 docs and error.
   211  			cur, err := mt.Coll.Find(context.Background(), bson.D{},
   212  				options.Find().SetBatchSize(2))
   213  			assert.Nil(mt, err, "Find error: %v", err)
   214  
   215  			// Create a context and immediately cancel it.
   216  			canceledCtx, cancel := context.WithCancel(context.Background())
   217  			cancel()
   218  
   219  			// Clear "insert" and "find" events.
   220  			mt.ClearEvents()
   221  
   222  			// Call All with the canceled context and expect context.Canceled.
   223  			var docs []bson.D
   224  			err = cur.All(canceledCtx, &docs)
   225  			assert.NotNil(mt, err, "expected error for All, got nil")
   226  			assert.True(mt, errors.Is(err, context.Canceled),
   227  				"expected context.Canceled error, got %v", err)
   228  
   229  			// Assert that a "getMore" command was sent and failed (Next used the
   230  			// canceled context).
   231  			stEvt := mt.GetStartedEvent()
   232  			assert.NotNil(mt, stEvt, `expected a "getMore" started event, got no event`)
   233  			assert.Equal(mt, stEvt.CommandName, "getMore",
   234  				`expected a "getMore" started event, got %q`, stEvt.CommandName)
   235  			fEvt := mt.GetFailedEvent()
   236  			assert.NotNil(mt, fEvt, `expected a failed "getMore" event, got no event`)
   237  			assert.Equal(mt, fEvt.CommandName, "getMore",
   238  				`expected a failed "getMore" event, got %q`, fEvt.CommandName)
   239  
   240  			// Assert that a "killCursors" command was sent and was successful (Close
   241  			// used the 2 second Client Timeout).
   242  			stEvt = mt.GetStartedEvent()
   243  			assert.NotNil(mt, stEvt, `expected a "killCursors" started event, got no event`)
   244  			assert.Equal(mt, stEvt.CommandName, "killCursors",
   245  				`expected a "killCursors" started event, got %q`, stEvt.CommandName)
   246  			suEvt := mt.GetSucceededEvent()
   247  			assert.NotNil(mt, suEvt, `expected a successful "killCursors" event, got no event`)
   248  			assert.Equal(mt, suEvt.CommandName, "killCursors",
   249  				`expected a successful "killCursors" event, got %q`, suEvt.CommandName)
   250  		})
   251  	})
   252  	mt.RunOpts("close", noClientOpts, func(mt *mtest.T) {
   253  		failpointOpts := mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.0")
   254  		mt.RunOpts("killCursors error", failpointOpts, func(mt *mtest.T) {
   255  			failpointData := mtest.FailPointData{
   256  				FailCommands: []string{"killCursors"},
   257  				ErrorCode:    100,
   258  			}
   259  			mt.SetFailPoint(mtest.FailPoint{
   260  				ConfigureFailPoint: "failCommand",
   261  				Mode:               "alwaysOn",
   262  				Data:               failpointData,
   263  			})
   264  			initCollection(mt, mt.Coll)
   265  			cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(2))
   266  			assert.Nil(mt, err, "Find error: %v", err)
   267  
   268  			err = cursor.Close(context.Background())
   269  			assert.NotNil(mt, err, "expected change stream error, got nil")
   270  
   271  			// make sure that a mongo.CommandError is returned instead of a driver.Error
   272  			mongoErr, ok := err.(mongo.CommandError)
   273  			assert.True(mt, ok, "expected mongo.CommandError, got: %T", err)
   274  			assert.Equal(mt, failpointData.ErrorCode, mongoErr.Code, "expected code %v, got: %v", failpointData.ErrorCode, mongoErr.Code)
   275  		})
   276  	})
   277  	// For versions < 3.2, the first find will get all the documents
   278  	mt.RunOpts("set batchSize", mtest.NewOptions().MinServerVersion("3.2"), func(mt *mtest.T) {
   279  		initCollection(mt, mt.Coll)
   280  		mt.ClearEvents()
   281  
   282  		// create cursor with batchSize 0
   283  		cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(0))
   284  		assert.Nil(mt, err, "Find error: %v", err)
   285  		defer cursor.Close(context.Background())
   286  		evt := mt.GetStartedEvent()
   287  		assert.Equal(mt, "find", evt.CommandName, "expected 'find' event, got '%v'", evt.CommandName)
   288  		sizeVal, err := evt.Command.LookupErr("batchSize")
   289  		assert.Nil(mt, err, "expected find command to have batchSize")
   290  		batchSize := sizeVal.Int32()
   291  		assert.Equal(mt, int32(0), batchSize, "expected batchSize 0, got %v", batchSize)
   292  
   293  		// make sure that the getMore sends the new batchSize
   294  		batchCursor := mongo.BatchCursorFromCursor(cursor)
   295  		batchCursor.SetBatchSize(4)
   296  		assert.True(mt, cursor.Next(context.Background()), "expected Next true, got false")
   297  		evt = mt.GetStartedEvent()
   298  		assert.NotNil(mt, evt, "expected getMore event, got nil")
   299  		assert.Equal(mt, "getMore", evt.CommandName, "expected 'getMore' event, got '%v'", evt.CommandName)
   300  		sizeVal, err = evt.Command.LookupErr("batchSize")
   301  		assert.Nil(mt, err, "expected getMore command to have batchSize")
   302  		batchSize = sizeVal.Int32()
   303  		assert.Equal(mt, int32(4), batchSize, "expected batchSize 4, got %v", batchSize)
   304  	})
   305  }
   306  
   307  type tryNextCursor interface {
   308  	TryNext(context.Context) bool
   309  	Err() error
   310  }
   311  
   312  func tryNextExistingBatchTest(mt *mtest.T, cursor tryNextCursor) {
   313  	mt.Helper()
   314  
   315  	mt.ClearEvents()
   316  	assert.True(mt, cursor.TryNext(context.Background()), "expected TryNext to return true, got false")
   317  	evt := mt.GetStartedEvent()
   318  	if evt != nil {
   319  		mt.Fatalf("unexpected event sent during TryNext: %v", evt.CommandName)
   320  	}
   321  }
   322  
   323  // use command monitoring to verify that a single getMore was sent
   324  func verifyOneGetmoreSent(mt *mtest.T) {
   325  	mt.Helper()
   326  
   327  	evt := mt.GetStartedEvent()
   328  	assert.NotNil(mt, evt, "expected getMore event, got nil")
   329  	assert.Equal(mt, "getMore", evt.CommandName, "expected 'getMore' event, got '%v'", evt.CommandName)
   330  	evt = mt.GetStartedEvent()
   331  	if evt != nil {
   332  		mt.Fatalf("unexpected event sent during TryNext: %v", evt.CommandName)
   333  	}
   334  }
   335  
   336  // should be called in a test run with a mock deployment
   337  func tryNextGetmoreError(mt *mtest.T, cursor tryNextCursor) {
   338  	testErr := mtest.CommandError{
   339  		Code:    100,
   340  		Message: "getMore error",
   341  		Name:    "CursorError",
   342  		Labels:  []string{"NonResumableChangeStreamError"},
   343  	}
   344  	getMoreRes := mtest.CreateCommandErrorResponse(testErr)
   345  	mt.AddMockResponses(getMoreRes)
   346  
   347  	// first call to TryNext should return false because first batch was empty so batch cursor returns false
   348  	// without doing a getMore
   349  	// next call to TryNext should attempt a getMore
   350  	for i := 0; i < 2; i++ {
   351  		assert.False(mt, cursor.TryNext(context.Background()), "TryNext returned true on iteration %v", i)
   352  	}
   353  
   354  	err := cursor.Err()
   355  	assert.NotNil(mt, err, "expected change stream error, got nil")
   356  
   357  	// make sure that a mongo.CommandError is returned instead of a driver.Error
   358  	mongoErr, ok := err.(mongo.CommandError)
   359  	assert.True(mt, ok, "expected mongo.CommandError, got: %T", err)
   360  	assert.Equal(mt, testErr.Code, mongoErr.Code, "expected code %v, got: %v", testErr.Code, mongoErr.Code)
   361  	assert.Equal(mt, testErr.Message, mongoErr.Message, "expected message %v, got: %v", testErr.Message, mongoErr.Message)
   362  	assert.Equal(mt, testErr.Name, mongoErr.Name, "expected name %v, got: %v", testErr.Name, mongoErr.Name)
   363  	assert.Equal(mt, testErr.Labels, mongoErr.Labels, "expected labels %v, got: %v", testErr.Labels, mongoErr.Labels)
   364  }
   365  
   366  func assertCursorBatchLength(mt *mtest.T, cursor *mongo.Cursor, expected int) {
   367  	batchLen := cursor.RemainingBatchLength()
   368  	assert.Equal(mt, expected, batchLen, "expected remaining batch length %d, got %d", expected, batchLen)
   369  }
   370  

View as plain text