1
2
3
4
5
6
7 package integration
8
9 import (
10 "context"
11 "errors"
12 "os"
13 "testing"
14 "time"
15
16 "go.mongodb.org/mongo-driver/bson"
17 "go.mongodb.org/mongo-driver/internal/assert"
18 "go.mongodb.org/mongo-driver/mongo"
19 "go.mongodb.org/mongo-driver/mongo/integration/mtest"
20 "go.mongodb.org/mongo-driver/mongo/options"
21 )
22
23 const (
24 errorCursorNotFound = 43
25 )
26
27 func TestCursor(t *testing.T) {
28 mt := mtest.New(t, mtest.NewOptions().CreateClient(false))
29 cappedCollectionOpts := options.CreateCollection().SetCapped(true).SetSizeInBytes(64 * 1024)
30
31
32
33 mt.RunOpts("cursor is killed on server", mtest.NewOptions().MinServerVersion("3.2").RequireAPIVersion(false), func(mt *mtest.T) {
34 initCollection(mt, mt.Coll)
35 c, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(2))
36 assert.Nil(mt, err, "Find error: %v", err)
37
38 id := c.ID()
39 assert.True(mt, c.Next(context.Background()), "expected Next true, got false")
40 err = c.Close(context.Background())
41 assert.Nil(mt, err, "Close error: %v", err)
42
43 err = mt.DB.RunCommand(context.Background(), bson.D{
44 {"getMore", id},
45 {"collection", mt.Coll.Name()},
46 }).Err()
47 ce := err.(mongo.CommandError)
48 assert.Equal(mt, int32(errorCursorNotFound), ce.Code, "expected error code %v, got %v", errorCursorNotFound, ce.Code)
49 })
50 mt.RunOpts("try next", noClientOpts, func(mt *mtest.T) {
51
52 if os.Getenv("SERVERLESS") == "serverless" {
53 mt.Skip("skipping as serverless forbids capped collections")
54 }
55
56 mt.Run("existing non-empty batch", func(mt *mtest.T) {
57
58
59 initCollection(mt, mt.Coll)
60 cursor, err := mt.Coll.Find(context.Background(), bson.D{})
61 assert.Nil(mt, err, "Find error: %v", err)
62 defer cursor.Close(context.Background())
63 tryNextExistingBatchTest(mt, cursor)
64 })
65 mt.RunOpts("one getMore sent", mtest.NewOptions().CollectionCreateOptions(cappedCollectionOpts), func(mt *mtest.T) {
66
67
68
69
70 _, err := mt.Coll.InsertOne(context.Background(), bson.D{{"x", 1}})
71 assert.Nil(mt, err, "InsertOne error: %v", err)
72
73 cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetCursorType(options.Tailable))
74 assert.Nil(mt, err, "Find error: %v", err)
75 defer cursor.Close(context.Background())
76
77
78 assert.True(mt, cursor.TryNext(context.Background()), "expected Next to return true, got false")
79
80 mt.ClearEvents()
81 assert.False(mt, cursor.TryNext(context.Background()), "unexpected document %v", cursor.Current)
82 verifyOneGetmoreSent(mt)
83 })
84 mt.RunOpts("getMore error", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
85 findRes := mtest.CreateCursorResponse(50, "foo.bar", mtest.FirstBatch)
86 mt.AddMockResponses(findRes)
87 cursor, err := mt.Coll.Find(context.Background(), bson.D{})
88 assert.Nil(mt, err, "Find error: %v", err)
89 defer cursor.Close(context.Background())
90 tryNextGetmoreError(mt, cursor)
91 })
92 })
93 mt.RunOpts("RemainingBatchLength", noClientOpts, func(mt *mtest.T) {
94 cappedMtOpts := mtest.NewOptions().CollectionCreateOptions(cappedCollectionOpts)
95
96 if os.Getenv("SERVERLESS") == "serverless" {
97 mt.Skip("skipping as serverless forbids capped collections")
98 }
99
100 mt.RunOpts("first batch is non empty", cappedMtOpts, func(mt *mtest.T) {
101
102
103
104 initCollection(mt, mt.Coll)
105
106
107 batchSize := 2
108 findOpts := options.Find().
109 SetBatchSize(int32(batchSize)).
110 SetCursorType(options.TailableAwait).
111 SetMaxAwaitTime(100 * time.Millisecond)
112 cursor, err := mt.Coll.Find(context.Background(), bson.D{}, findOpts)
113 assert.Nil(mt, err, "Find error: %v", err)
114 defer cursor.Close(context.Background())
115
116 mt.ClearEvents()
117
118
119
120 assertCursorBatchLength(mt, cursor, batchSize)
121 for i := 0; i < batchSize; i++ {
122 prevLength := cursor.RemainingBatchLength()
123 if !cursor.Next(context.Background()) {
124 mt.Fatalf("expected Next to return true on index %d; cursor err: %v", i, cursor.Err())
125 }
126
127
128 assertCursorBatchLength(mt, cursor, prevLength-1)
129 }
130 evt := mt.GetStartedEvent()
131 assert.Nil(mt, evt, "expected no events, got %v", evt)
132
133
134
135
136 assertCursorBatchLength(mt, cursor, 0)
137
138 assert.True(mt, cursor.Next(context.Background()), "expected Next to return true; cursor err: %v", cursor.Err())
139 evt = mt.GetStartedEvent()
140 assert.NotNil(mt, evt, "expected CommandStartedEvent, got nil")
141 assert.Equal(mt, "getMore", evt.CommandName, "expected command %q, got %q", "getMore", evt.CommandName)
142
143 assertCursorBatchLength(mt, cursor, batchSize-1)
144 })
145 mt.RunOpts("first batch is empty", mtest.NewOptions().ClientType(mtest.Mock), func(mt *mtest.T) {
146
147
148
149
150 cursorID := int64(50)
151 ns := mt.DB.Name() + "." + mt.Coll.Name()
152 getMoreBatch := []bson.D{
153 {{"x", 1}},
154 {{"x", 2}},
155 }
156
157
158 find := mtest.CreateCursorResponse(cursorID, ns, mtest.FirstBatch)
159 getMore := mtest.CreateCursorResponse(cursorID, ns, mtest.NextBatch, getMoreBatch...)
160 killCursors := mtest.CreateSuccessResponse()
161 mt.AddMockResponses(find, getMore, killCursors)
162
163 cursor, err := mt.Coll.Find(context.Background(), bson.D{})
164 assert.Nil(mt, err, "Find error: %v", err)
165 defer cursor.Close(context.Background())
166 mt.ClearEvents()
167
168 for {
169 if cursor.TryNext(context.Background()) {
170 break
171 }
172
173 assert.Nil(mt, cursor.Err(), "cursor error: %v", err)
174 assertCursorBatchLength(mt, cursor, 0)
175 }
176
177 assertCursorBatchLength(mt, cursor, len(getMoreBatch)-1)
178 })
179 })
180 mt.RunOpts("all", noClientOpts, func(mt *mtest.T) {
181 failpointOpts := mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.0")
182 mt.RunOpts("getMore error", failpointOpts, func(mt *mtest.T) {
183 failpointData := mtest.FailPointData{
184 FailCommands: []string{"getMore"},
185 ErrorCode: 100,
186 }
187 mt.SetFailPoint(mtest.FailPoint{
188 ConfigureFailPoint: "failCommand",
189 Mode: "alwaysOn",
190 Data: failpointData,
191 })
192 initCollection(mt, mt.Coll)
193 cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(2))
194 assert.Nil(mt, err, "Find error: %v", err)
195 defer cursor.Close(context.Background())
196
197 var docs []bson.D
198 err = cursor.All(context.Background(), &docs)
199 assert.NotNil(mt, err, "expected change stream error, got nil")
200
201
202 mongoErr, ok := err.(mongo.CommandError)
203 assert.True(mt, ok, "expected mongo.CommandError, got: %T", err)
204 assert.Equal(mt, failpointData.ErrorCode, mongoErr.Code, "expected code %v, got: %v", failpointData.ErrorCode, mongoErr.Code)
205 })
206
207 mt.Run("deferred Close uses context.Background", func(mt *mtest.T) {
208 initCollection(mt, mt.Coll)
209
210
211 cur, err := mt.Coll.Find(context.Background(), bson.D{},
212 options.Find().SetBatchSize(2))
213 assert.Nil(mt, err, "Find error: %v", err)
214
215
216 canceledCtx, cancel := context.WithCancel(context.Background())
217 cancel()
218
219
220 mt.ClearEvents()
221
222
223 var docs []bson.D
224 err = cur.All(canceledCtx, &docs)
225 assert.NotNil(mt, err, "expected error for All, got nil")
226 assert.True(mt, errors.Is(err, context.Canceled),
227 "expected context.Canceled error, got %v", err)
228
229
230
231 stEvt := mt.GetStartedEvent()
232 assert.NotNil(mt, stEvt, `expected a "getMore" started event, got no event`)
233 assert.Equal(mt, stEvt.CommandName, "getMore",
234 `expected a "getMore" started event, got %q`, stEvt.CommandName)
235 fEvt := mt.GetFailedEvent()
236 assert.NotNil(mt, fEvt, `expected a failed "getMore" event, got no event`)
237 assert.Equal(mt, fEvt.CommandName, "getMore",
238 `expected a failed "getMore" event, got %q`, fEvt.CommandName)
239
240
241
242 stEvt = mt.GetStartedEvent()
243 assert.NotNil(mt, stEvt, `expected a "killCursors" started event, got no event`)
244 assert.Equal(mt, stEvt.CommandName, "killCursors",
245 `expected a "killCursors" started event, got %q`, stEvt.CommandName)
246 suEvt := mt.GetSucceededEvent()
247 assert.NotNil(mt, suEvt, `expected a successful "killCursors" event, got no event`)
248 assert.Equal(mt, suEvt.CommandName, "killCursors",
249 `expected a successful "killCursors" event, got %q`, suEvt.CommandName)
250 })
251 })
252 mt.RunOpts("close", noClientOpts, func(mt *mtest.T) {
253 failpointOpts := mtest.NewOptions().Topologies(mtest.ReplicaSet).MinServerVersion("4.0")
254 mt.RunOpts("killCursors error", failpointOpts, func(mt *mtest.T) {
255 failpointData := mtest.FailPointData{
256 FailCommands: []string{"killCursors"},
257 ErrorCode: 100,
258 }
259 mt.SetFailPoint(mtest.FailPoint{
260 ConfigureFailPoint: "failCommand",
261 Mode: "alwaysOn",
262 Data: failpointData,
263 })
264 initCollection(mt, mt.Coll)
265 cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(2))
266 assert.Nil(mt, err, "Find error: %v", err)
267
268 err = cursor.Close(context.Background())
269 assert.NotNil(mt, err, "expected change stream error, got nil")
270
271
272 mongoErr, ok := err.(mongo.CommandError)
273 assert.True(mt, ok, "expected mongo.CommandError, got: %T", err)
274 assert.Equal(mt, failpointData.ErrorCode, mongoErr.Code, "expected code %v, got: %v", failpointData.ErrorCode, mongoErr.Code)
275 })
276 })
277
278 mt.RunOpts("set batchSize", mtest.NewOptions().MinServerVersion("3.2"), func(mt *mtest.T) {
279 initCollection(mt, mt.Coll)
280 mt.ClearEvents()
281
282
283 cursor, err := mt.Coll.Find(context.Background(), bson.D{}, options.Find().SetBatchSize(0))
284 assert.Nil(mt, err, "Find error: %v", err)
285 defer cursor.Close(context.Background())
286 evt := mt.GetStartedEvent()
287 assert.Equal(mt, "find", evt.CommandName, "expected 'find' event, got '%v'", evt.CommandName)
288 sizeVal, err := evt.Command.LookupErr("batchSize")
289 assert.Nil(mt, err, "expected find command to have batchSize")
290 batchSize := sizeVal.Int32()
291 assert.Equal(mt, int32(0), batchSize, "expected batchSize 0, got %v", batchSize)
292
293
294 batchCursor := mongo.BatchCursorFromCursor(cursor)
295 batchCursor.SetBatchSize(4)
296 assert.True(mt, cursor.Next(context.Background()), "expected Next true, got false")
297 evt = mt.GetStartedEvent()
298 assert.NotNil(mt, evt, "expected getMore event, got nil")
299 assert.Equal(mt, "getMore", evt.CommandName, "expected 'getMore' event, got '%v'", evt.CommandName)
300 sizeVal, err = evt.Command.LookupErr("batchSize")
301 assert.Nil(mt, err, "expected getMore command to have batchSize")
302 batchSize = sizeVal.Int32()
303 assert.Equal(mt, int32(4), batchSize, "expected batchSize 4, got %v", batchSize)
304 })
305 }
306
307 type tryNextCursor interface {
308 TryNext(context.Context) bool
309 Err() error
310 }
311
312 func tryNextExistingBatchTest(mt *mtest.T, cursor tryNextCursor) {
313 mt.Helper()
314
315 mt.ClearEvents()
316 assert.True(mt, cursor.TryNext(context.Background()), "expected TryNext to return true, got false")
317 evt := mt.GetStartedEvent()
318 if evt != nil {
319 mt.Fatalf("unexpected event sent during TryNext: %v", evt.CommandName)
320 }
321 }
322
323
324 func verifyOneGetmoreSent(mt *mtest.T) {
325 mt.Helper()
326
327 evt := mt.GetStartedEvent()
328 assert.NotNil(mt, evt, "expected getMore event, got nil")
329 assert.Equal(mt, "getMore", evt.CommandName, "expected 'getMore' event, got '%v'", evt.CommandName)
330 evt = mt.GetStartedEvent()
331 if evt != nil {
332 mt.Fatalf("unexpected event sent during TryNext: %v", evt.CommandName)
333 }
334 }
335
336
337 func tryNextGetmoreError(mt *mtest.T, cursor tryNextCursor) {
338 testErr := mtest.CommandError{
339 Code: 100,
340 Message: "getMore error",
341 Name: "CursorError",
342 Labels: []string{"NonResumableChangeStreamError"},
343 }
344 getMoreRes := mtest.CreateCommandErrorResponse(testErr)
345 mt.AddMockResponses(getMoreRes)
346
347
348
349
350 for i := 0; i < 2; i++ {
351 assert.False(mt, cursor.TryNext(context.Background()), "TryNext returned true on iteration %v", i)
352 }
353
354 err := cursor.Err()
355 assert.NotNil(mt, err, "expected change stream error, got nil")
356
357
358 mongoErr, ok := err.(mongo.CommandError)
359 assert.True(mt, ok, "expected mongo.CommandError, got: %T", err)
360 assert.Equal(mt, testErr.Code, mongoErr.Code, "expected code %v, got: %v", testErr.Code, mongoErr.Code)
361 assert.Equal(mt, testErr.Message, mongoErr.Message, "expected message %v, got: %v", testErr.Message, mongoErr.Message)
362 assert.Equal(mt, testErr.Name, mongoErr.Name, "expected name %v, got: %v", testErr.Name, mongoErr.Name)
363 assert.Equal(mt, testErr.Labels, mongoErr.Labels, "expected labels %v, got: %v", testErr.Labels, mongoErr.Labels)
364 }
365
366 func assertCursorBatchLength(mt *mtest.T, cursor *mongo.Cursor, expected int) {
367 batchLen := cursor.RemainingBatchLength()
368 assert.Equal(mt, expected, batchLen, "expected remaining batch length %d, got %d", expected, batchLen)
369 }
370
View as plain text