1
2
3
4
5
6
7 package driver
8
9 import (
10 "context"
11 "errors"
12 "fmt"
13 "io"
14 "strings"
15 "time"
16
17 "go.mongodb.org/mongo-driver/bson"
18 "go.mongodb.org/mongo-driver/bson/bsontype"
19 "go.mongodb.org/mongo-driver/event"
20 "go.mongodb.org/mongo-driver/internal/codecutil"
21 "go.mongodb.org/mongo-driver/internal/csot"
22 "go.mongodb.org/mongo-driver/mongo/description"
23 "go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
24 "go.mongodb.org/mongo-driver/x/mongo/driver/session"
25 )
26
27
28
29 var ErrNoCursor = errors.New("database response does not contain a cursor")
30
31
32
33 type BatchCursor struct {
34 clientSession *session.Client
35 clock *session.ClusterClock
36 comment interface{}
37 encoderFn codecutil.EncoderFn
38 database string
39 collection string
40 id int64
41 err error
42 server Server
43 serverDescription description.Server
44 errorProcessor ErrorProcessor
45 connection PinnedConnection
46 batchSize int32
47 maxTimeMS int64
48 currentBatch *bsoncore.DocumentSequence
49 firstBatch bool
50 cmdMonitor *event.CommandMonitor
51 postBatchResumeToken bsoncore.Document
52 crypt Crypt
53 serverAPI *ServerAPIOptions
54
55
56 limit int32
57 numReturned int32
58 }
59
60
61
62 type CursorResponse struct {
63 Server Server
64 ErrorProcessor ErrorProcessor
65 Connection PinnedConnection
66 Desc description.Server
67 FirstBatch *bsoncore.DocumentSequence
68 Database string
69 Collection string
70 ID int64
71 postBatchResumeToken bsoncore.Document
72 }
73
74
75
76
77
78
79 func NewCursorResponse(info ResponseInfo) (CursorResponse, error) {
80 response := info.ServerResponse
81 cur, err := response.LookupErr("cursor")
82 if errors.Is(err, bsoncore.ErrElementNotFound) {
83 return CursorResponse{}, ErrNoCursor
84 }
85 if err != nil {
86 return CursorResponse{}, fmt.Errorf("error getting cursor from database response: %w", err)
87 }
88 curDoc, ok := cur.DocumentOK()
89 if !ok {
90 return CursorResponse{}, fmt.Errorf("cursor should be an embedded document but is BSON type %s", cur.Type)
91 }
92 elems, err := curDoc.Elements()
93 if err != nil {
94 return CursorResponse{}, fmt.Errorf("error getting elements from cursor: %w", err)
95 }
96 curresp := CursorResponse{Server: info.Server, Desc: info.ConnectionDescription}
97
98 for _, elem := range elems {
99 switch elem.Key() {
100 case "firstBatch":
101 arr, ok := elem.Value().ArrayOK()
102 if !ok {
103 return CursorResponse{}, fmt.Errorf("firstBatch should be an array but is a BSON %s", elem.Value().Type)
104 }
105 curresp.FirstBatch = &bsoncore.DocumentSequence{Style: bsoncore.ArrayStyle, Data: arr}
106 case "ns":
107 ns, ok := elem.Value().StringValueOK()
108 if !ok {
109 return CursorResponse{}, fmt.Errorf("ns should be a string but is a BSON %s", elem.Value().Type)
110 }
111 index := strings.Index(ns, ".")
112 if index == -1 {
113 return CursorResponse{}, errors.New("ns field must contain a valid namespace, but is missing '.'")
114 }
115 curresp.Database = ns[:index]
116 curresp.Collection = ns[index+1:]
117 case "id":
118 curresp.ID, ok = elem.Value().Int64OK()
119 if !ok {
120 return CursorResponse{}, fmt.Errorf("id should be an int64 but it is a BSON %s", elem.Value().Type)
121 }
122 case "postBatchResumeToken":
123 curresp.postBatchResumeToken, ok = elem.Value().DocumentOK()
124 if !ok {
125 return CursorResponse{}, fmt.Errorf("post batch resume token should be a document but it is a BSON %s", elem.Value().Type)
126 }
127 }
128 }
129
130
131
132 if curresp.Desc.LoadBalanced() && curresp.ID != 0 {
133
134 ep, ok := curresp.Server.(ErrorProcessor)
135 if !ok {
136 return CursorResponse{}, fmt.Errorf("expected Server used to establish a cursor to implement ErrorProcessor, but got %T", curresp.Server)
137 }
138 curresp.ErrorProcessor = ep
139
140 refConn, ok := info.Connection.(PinnedConnection)
141 if !ok {
142 return CursorResponse{}, fmt.Errorf("expected Connection used to establish a cursor to implement PinnedConnection, but got %T", info.Connection)
143 }
144 if err := refConn.PinToCursor(); err != nil {
145 return CursorResponse{}, fmt.Errorf("error incrementing connection reference count when creating a cursor: %w", err)
146 }
147 curresp.Connection = refConn
148 }
149
150 return curresp, nil
151 }
152
153
154 type CursorOptions struct {
155 BatchSize int32
156 Comment bsoncore.Value
157 MaxTimeMS int64
158 Limit int32
159 CommandMonitor *event.CommandMonitor
160 Crypt Crypt
161 ServerAPI *ServerAPIOptions
162 MarshalValueEncoderFn func(io.Writer) (*bson.Encoder, error)
163 }
164
165
166 func NewBatchCursor(cr CursorResponse, clientSession *session.Client, clock *session.ClusterClock, opts CursorOptions) (*BatchCursor, error) {
167 ds := cr.FirstBatch
168 bc := &BatchCursor{
169 clientSession: clientSession,
170 clock: clock,
171 comment: opts.Comment,
172 database: cr.Database,
173 collection: cr.Collection,
174 id: cr.ID,
175 server: cr.Server,
176 connection: cr.Connection,
177 errorProcessor: cr.ErrorProcessor,
178 batchSize: opts.BatchSize,
179 maxTimeMS: opts.MaxTimeMS,
180 cmdMonitor: opts.CommandMonitor,
181 firstBatch: true,
182 postBatchResumeToken: cr.postBatchResumeToken,
183 crypt: opts.Crypt,
184 serverAPI: opts.ServerAPI,
185 serverDescription: cr.Desc,
186 encoderFn: opts.MarshalValueEncoderFn,
187 }
188
189 if ds != nil {
190 bc.numReturned = int32(ds.DocumentCount())
191 }
192 if cr.Desc.WireVersion == nil {
193 bc.limit = opts.Limit
194
195
196 if bc.limit != 0 && bc.limit < bc.numReturned {
197 for i := int32(0); i < bc.limit; i++ {
198 _, err := ds.Next()
199 if err != nil {
200 return nil, err
201 }
202 }
203 ds.Data = ds.Data[:ds.Pos]
204 ds.ResetIterator()
205 }
206 }
207
208 bc.currentBatch = ds
209 return bc, nil
210 }
211
212
213 func NewEmptyBatchCursor() *BatchCursor {
214 return &BatchCursor{currentBatch: new(bsoncore.DocumentSequence)}
215 }
216
217
218
219 func NewBatchCursorFromDocuments(documents []byte) *BatchCursor {
220 return &BatchCursor{
221 currentBatch: &bsoncore.DocumentSequence{
222 Data: documents,
223 Style: bsoncore.SequenceStyle,
224 },
225
226
227 id: 0,
228 server: nil,
229 }
230 }
231
232
233 func (bc *BatchCursor) ID() int64 {
234 return bc.id
235 }
236
237
238
239
240
241
242 func (bc *BatchCursor) Next(ctx context.Context) bool {
243 if ctx == nil {
244 ctx = context.Background()
245 }
246
247 if bc.firstBatch {
248 bc.firstBatch = false
249 return !bc.currentBatch.Empty()
250 }
251
252 if bc.id == 0 || bc.server == nil {
253 return false
254 }
255
256 bc.getMore(ctx)
257
258 return !bc.currentBatch.Empty()
259 }
260
261
262
263 func (bc *BatchCursor) Batch() *bsoncore.DocumentSequence { return bc.currentBatch }
264
265
266 func (bc *BatchCursor) Err() error { return bc.err }
267
268
269 func (bc *BatchCursor) Close(ctx context.Context) error {
270 if ctx == nil {
271 ctx = context.Background()
272 }
273
274 err := bc.KillCursor(ctx)
275 bc.id = 0
276 bc.currentBatch.Data = nil
277 bc.currentBatch.Style = 0
278 bc.currentBatch.ResetIterator()
279
280 connErr := bc.unpinConnection()
281 if err == nil {
282 err = connErr
283 }
284 return err
285 }
286
287 func (bc *BatchCursor) unpinConnection() error {
288 if bc.connection == nil {
289 return nil
290 }
291
292 err := bc.connection.UnpinFromCursor()
293 closeErr := bc.connection.Close()
294 if err == nil && closeErr != nil {
295 err = closeErr
296 }
297 bc.connection = nil
298 return err
299 }
300
301
302 func (bc *BatchCursor) Server() Server {
303 return bc.server
304 }
305
306 func (bc *BatchCursor) clearBatch() {
307 bc.currentBatch.Data = bc.currentBatch.Data[:0]
308 }
309
310
311 func (bc *BatchCursor) KillCursor(ctx context.Context) error {
312 if bc.server == nil || bc.id == 0 {
313 return nil
314 }
315
316 return Operation{
317 CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) {
318 dst = bsoncore.AppendStringElement(dst, "killCursors", bc.collection)
319 dst = bsoncore.BuildArrayElement(dst, "cursors", bsoncore.Value{Type: bsontype.Int64, Data: bsoncore.AppendInt64(nil, bc.id)})
320 return dst, nil
321 },
322 Database: bc.database,
323 Deployment: bc.getOperationDeployment(),
324 Client: bc.clientSession,
325 Clock: bc.clock,
326 Legacy: LegacyKillCursors,
327 CommandMonitor: bc.cmdMonitor,
328 ServerAPI: bc.serverAPI,
329
330
331
332
333
334 omitReadPreference: true,
335 }.Execute(ctx)
336 }
337
338
339
340
341
342 func calcGetMoreBatchSize(bc BatchCursor) (int32, bool) {
343 gmBatchSize := bc.batchSize
344
345
346 if bc.limit != 0 && bc.numReturned+bc.batchSize >= bc.limit {
347 gmBatchSize = bc.limit - bc.numReturned
348 if gmBatchSize <= 0 {
349 return gmBatchSize, false
350 }
351 }
352
353 return gmBatchSize, true
354 }
355
356 func (bc *BatchCursor) getMore(ctx context.Context) {
357 bc.clearBatch()
358 if bc.id == 0 {
359 return
360 }
361
362 numToReturn, ok := calcGetMoreBatchSize(*bc)
363 if !ok {
364 if err := bc.Close(ctx); err != nil {
365 bc.err = err
366 }
367
368 return
369 }
370
371 bc.err = Operation{
372 CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) {
373 dst = bsoncore.AppendInt64Element(dst, "getMore", bc.id)
374 dst = bsoncore.AppendStringElement(dst, "collection", bc.collection)
375 if numToReturn > 0 {
376 dst = bsoncore.AppendInt32Element(dst, "batchSize", numToReturn)
377 }
378 if bc.maxTimeMS > 0 {
379 dst = bsoncore.AppendInt64Element(dst, "maxTimeMS", bc.maxTimeMS)
380 }
381
382 comment, err := codecutil.MarshalValue(bc.comment, bc.encoderFn)
383 if err != nil {
384 return nil, fmt.Errorf("error marshaling comment as a BSON value: %w", err)
385 }
386
387
388 if comment.Type != bsontype.Type(0) && bc.serverDescription.WireVersion.Max >= 9 {
389 dst = bsoncore.AppendValueElement(dst, "comment", comment)
390 }
391
392 return dst, nil
393 },
394 Database: bc.database,
395 Deployment: bc.getOperationDeployment(),
396 ProcessResponseFn: func(info ResponseInfo) error {
397 response := info.ServerResponse
398 id, ok := response.Lookup("cursor", "id").Int64OK()
399 if !ok {
400 return fmt.Errorf("cursor.id should be an int64 but is a BSON %s", response.Lookup("cursor", "id").Type)
401 }
402 bc.id = id
403
404 batch, ok := response.Lookup("cursor", "nextBatch").ArrayOK()
405 if !ok {
406 return fmt.Errorf("cursor.nextBatch should be an array but is a BSON %s", response.Lookup("cursor", "nextBatch").Type)
407 }
408 bc.currentBatch.Style = bsoncore.ArrayStyle
409 bc.currentBatch.Data = batch
410 bc.currentBatch.ResetIterator()
411 bc.numReturned += int32(bc.currentBatch.DocumentCount())
412
413 pbrt, err := response.LookupErr("cursor", "postBatchResumeToken")
414 if err != nil {
415
416 return nil
417 }
418
419 pbrtDoc, ok := pbrt.DocumentOK()
420 if !ok {
421 bc.err = fmt.Errorf("expected BSON type for post batch resume token to be EmbeddedDocument but got %s", pbrt.Type)
422 return nil
423 }
424
425 bc.postBatchResumeToken = pbrtDoc
426
427 return nil
428 },
429 Client: bc.clientSession,
430 Clock: bc.clock,
431 Legacy: LegacyGetMore,
432 CommandMonitor: bc.cmdMonitor,
433 Crypt: bc.crypt,
434 ServerAPI: bc.serverAPI,
435
436
437
438
439
440 omitReadPreference: true,
441 }.Execute(ctx)
442
443
444 if bc.id == 0 {
445 err := bc.unpinConnection()
446 if err != nil && bc.err == nil {
447 bc.err = err
448 }
449 }
450
451
452
453
454 if driverErr, ok := bc.err.(Error); ok && driverErr.NetworkError() && bc.connection != nil {
455 bc.id = 0
456 }
457
458
459 if bc.limit != 0 && bc.numReturned >= bc.limit {
460
461 err := bc.KillCursor(ctx)
462 if err != nil && bc.err == nil {
463 bc.err = err
464 }
465 }
466 }
467
468
469 func (bc *BatchCursor) PostBatchResumeToken() bsoncore.Document {
470 return bc.postBatchResumeToken
471 }
472
473
474 func (bc *BatchCursor) SetBatchSize(size int32) {
475 bc.batchSize = size
476 }
477
478
479
480
481
482
483
484 func (bc *BatchCursor) SetMaxTime(dur time.Duration) {
485 bc.maxTimeMS = int64(dur / time.Millisecond)
486 }
487
488
489 func (bc *BatchCursor) SetComment(comment interface{}) {
490 bc.comment = comment
491 }
492
493 func (bc *BatchCursor) getOperationDeployment() Deployment {
494 if bc.connection != nil {
495 return &loadBalancedCursorDeployment{
496 errorProcessor: bc.errorProcessor,
497 conn: bc.connection,
498 }
499 }
500 return SingleServerDeployment{bc.server}
501 }
502
503
504
505
506 type loadBalancedCursorDeployment struct {
507 errorProcessor ErrorProcessor
508 conn PinnedConnection
509 }
510
511 var _ Deployment = (*loadBalancedCursorDeployment)(nil)
512 var _ Server = (*loadBalancedCursorDeployment)(nil)
513 var _ ErrorProcessor = (*loadBalancedCursorDeployment)(nil)
514
515 func (lbcd *loadBalancedCursorDeployment) SelectServer(_ context.Context, _ description.ServerSelector) (Server, error) {
516 return lbcd, nil
517 }
518
519 func (lbcd *loadBalancedCursorDeployment) Kind() description.TopologyKind {
520 return description.LoadBalanced
521 }
522
523 func (lbcd *loadBalancedCursorDeployment) Connection(_ context.Context) (Connection, error) {
524 return lbcd.conn, nil
525 }
526
527
528 func (lbcd *loadBalancedCursorDeployment) RTTMonitor() RTTMonitor {
529 return &csot.ZeroRTTMonitor{}
530 }
531
532 func (lbcd *loadBalancedCursorDeployment) ProcessError(err error, conn Connection) ProcessErrorResult {
533 return lbcd.errorProcessor.ProcessError(err, conn)
534 }
535
View as plain text