...

Source file src/go.mongodb.org/mongo-driver/x/mongo/driver/batch_cursor.go

Documentation: go.mongodb.org/mongo-driver/x/mongo/driver

     1  // Copyright (C) MongoDB, Inc. 2022-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package driver
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"io"
    14  	"strings"
    15  	"time"
    16  
    17  	"go.mongodb.org/mongo-driver/bson"
    18  	"go.mongodb.org/mongo-driver/bson/bsontype"
    19  	"go.mongodb.org/mongo-driver/event"
    20  	"go.mongodb.org/mongo-driver/internal/codecutil"
    21  	"go.mongodb.org/mongo-driver/internal/csot"
    22  	"go.mongodb.org/mongo-driver/mongo/description"
    23  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    24  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    25  )
    26  
    27  // ErrNoCursor is returned by NewCursorResponse when the database response does
    28  // not contain a cursor.
    29  var ErrNoCursor = errors.New("database response does not contain a cursor")
    30  
    31  // BatchCursor is a batch implementation of a cursor. It returns documents in entire batches instead
    32  // of one at a time. An individual document cursor can be built on top of this batch cursor.
    33  type BatchCursor struct {
    34  	clientSession        *session.Client
    35  	clock                *session.ClusterClock
    36  	comment              interface{}
    37  	encoderFn            codecutil.EncoderFn
    38  	database             string
    39  	collection           string
    40  	id                   int64
    41  	err                  error
    42  	server               Server
    43  	serverDescription    description.Server
    44  	errorProcessor       ErrorProcessor // This will only be set when pinning to a connection.
    45  	connection           PinnedConnection
    46  	batchSize            int32
    47  	maxTimeMS            int64
    48  	currentBatch         *bsoncore.DocumentSequence
    49  	firstBatch           bool
    50  	cmdMonitor           *event.CommandMonitor
    51  	postBatchResumeToken bsoncore.Document
    52  	crypt                Crypt
    53  	serverAPI            *ServerAPIOptions
    54  
    55  	// legacy server (< 3.2) fields
    56  	limit       int32
    57  	numReturned int32 // number of docs returned by server
    58  }
    59  
    60  // CursorResponse represents the response from a command the results in a cursor. A BatchCursor can
    61  // be constructed from a CursorResponse.
    62  type CursorResponse struct {
    63  	Server               Server
    64  	ErrorProcessor       ErrorProcessor // This will only be set when pinning to a connection.
    65  	Connection           PinnedConnection
    66  	Desc                 description.Server
    67  	FirstBatch           *bsoncore.DocumentSequence
    68  	Database             string
    69  	Collection           string
    70  	ID                   int64
    71  	postBatchResumeToken bsoncore.Document
    72  }
    73  
    74  // NewCursorResponse constructs a cursor response from the given response and
    75  // server. If the provided database response does not contain a cursor, it
    76  // returns ErrNoCursor.
    77  //
    78  // NewCursorResponse can be used within the ProcessResponse method for an operation.
    79  func NewCursorResponse(info ResponseInfo) (CursorResponse, error) {
    80  	response := info.ServerResponse
    81  	cur, err := response.LookupErr("cursor")
    82  	if errors.Is(err, bsoncore.ErrElementNotFound) {
    83  		return CursorResponse{}, ErrNoCursor
    84  	}
    85  	if err != nil {
    86  		return CursorResponse{}, fmt.Errorf("error getting cursor from database response: %w", err)
    87  	}
    88  	curDoc, ok := cur.DocumentOK()
    89  	if !ok {
    90  		return CursorResponse{}, fmt.Errorf("cursor should be an embedded document but is BSON type %s", cur.Type)
    91  	}
    92  	elems, err := curDoc.Elements()
    93  	if err != nil {
    94  		return CursorResponse{}, fmt.Errorf("error getting elements from cursor: %w", err)
    95  	}
    96  	curresp := CursorResponse{Server: info.Server, Desc: info.ConnectionDescription}
    97  
    98  	for _, elem := range elems {
    99  		switch elem.Key() {
   100  		case "firstBatch":
   101  			arr, ok := elem.Value().ArrayOK()
   102  			if !ok {
   103  				return CursorResponse{}, fmt.Errorf("firstBatch should be an array but is a BSON %s", elem.Value().Type)
   104  			}
   105  			curresp.FirstBatch = &bsoncore.DocumentSequence{Style: bsoncore.ArrayStyle, Data: arr}
   106  		case "ns":
   107  			ns, ok := elem.Value().StringValueOK()
   108  			if !ok {
   109  				return CursorResponse{}, fmt.Errorf("ns should be a string but is a BSON %s", elem.Value().Type)
   110  			}
   111  			index := strings.Index(ns, ".")
   112  			if index == -1 {
   113  				return CursorResponse{}, errors.New("ns field must contain a valid namespace, but is missing '.'")
   114  			}
   115  			curresp.Database = ns[:index]
   116  			curresp.Collection = ns[index+1:]
   117  		case "id":
   118  			curresp.ID, ok = elem.Value().Int64OK()
   119  			if !ok {
   120  				return CursorResponse{}, fmt.Errorf("id should be an int64 but it is a BSON %s", elem.Value().Type)
   121  			}
   122  		case "postBatchResumeToken":
   123  			curresp.postBatchResumeToken, ok = elem.Value().DocumentOK()
   124  			if !ok {
   125  				return CursorResponse{}, fmt.Errorf("post batch resume token should be a document but it is a BSON %s", elem.Value().Type)
   126  			}
   127  		}
   128  	}
   129  
   130  	// If the deployment is behind a load balancer and the cursor has a non-zero ID, pin the cursor to a connection and
   131  	// use the same connection to execute getMore and killCursors commands.
   132  	if curresp.Desc.LoadBalanced() && curresp.ID != 0 {
   133  		// Cache the server as an ErrorProcessor to use when constructing deployments for cursor commands.
   134  		ep, ok := curresp.Server.(ErrorProcessor)
   135  		if !ok {
   136  			return CursorResponse{}, fmt.Errorf("expected Server used to establish a cursor to implement ErrorProcessor, but got %T", curresp.Server)
   137  		}
   138  		curresp.ErrorProcessor = ep
   139  
   140  		refConn, ok := info.Connection.(PinnedConnection)
   141  		if !ok {
   142  			return CursorResponse{}, fmt.Errorf("expected Connection used to establish a cursor to implement PinnedConnection, but got %T", info.Connection)
   143  		}
   144  		if err := refConn.PinToCursor(); err != nil {
   145  			return CursorResponse{}, fmt.Errorf("error incrementing connection reference count when creating a cursor: %w", err)
   146  		}
   147  		curresp.Connection = refConn
   148  	}
   149  
   150  	return curresp, nil
   151  }
   152  
   153  // CursorOptions are extra options that are required to construct a BatchCursor.
   154  type CursorOptions struct {
   155  	BatchSize             int32
   156  	Comment               bsoncore.Value
   157  	MaxTimeMS             int64
   158  	Limit                 int32
   159  	CommandMonitor        *event.CommandMonitor
   160  	Crypt                 Crypt
   161  	ServerAPI             *ServerAPIOptions
   162  	MarshalValueEncoderFn func(io.Writer) (*bson.Encoder, error)
   163  }
   164  
   165  // NewBatchCursor creates a new BatchCursor from the provided parameters.
   166  func NewBatchCursor(cr CursorResponse, clientSession *session.Client, clock *session.ClusterClock, opts CursorOptions) (*BatchCursor, error) {
   167  	ds := cr.FirstBatch
   168  	bc := &BatchCursor{
   169  		clientSession:        clientSession,
   170  		clock:                clock,
   171  		comment:              opts.Comment,
   172  		database:             cr.Database,
   173  		collection:           cr.Collection,
   174  		id:                   cr.ID,
   175  		server:               cr.Server,
   176  		connection:           cr.Connection,
   177  		errorProcessor:       cr.ErrorProcessor,
   178  		batchSize:            opts.BatchSize,
   179  		maxTimeMS:            opts.MaxTimeMS,
   180  		cmdMonitor:           opts.CommandMonitor,
   181  		firstBatch:           true,
   182  		postBatchResumeToken: cr.postBatchResumeToken,
   183  		crypt:                opts.Crypt,
   184  		serverAPI:            opts.ServerAPI,
   185  		serverDescription:    cr.Desc,
   186  		encoderFn:            opts.MarshalValueEncoderFn,
   187  	}
   188  
   189  	if ds != nil {
   190  		bc.numReturned = int32(ds.DocumentCount())
   191  	}
   192  	if cr.Desc.WireVersion == nil {
   193  		bc.limit = opts.Limit
   194  
   195  		// Take as many documents from the batch as needed.
   196  		if bc.limit != 0 && bc.limit < bc.numReturned {
   197  			for i := int32(0); i < bc.limit; i++ {
   198  				_, err := ds.Next()
   199  				if err != nil {
   200  					return nil, err
   201  				}
   202  			}
   203  			ds.Data = ds.Data[:ds.Pos]
   204  			ds.ResetIterator()
   205  		}
   206  	}
   207  
   208  	bc.currentBatch = ds
   209  	return bc, nil
   210  }
   211  
   212  // NewEmptyBatchCursor returns a batch cursor that is empty.
   213  func NewEmptyBatchCursor() *BatchCursor {
   214  	return &BatchCursor{currentBatch: new(bsoncore.DocumentSequence)}
   215  }
   216  
   217  // NewBatchCursorFromDocuments returns a batch cursor with current batch set to a sequence-style
   218  // DocumentSequence containing the provided documents.
   219  func NewBatchCursorFromDocuments(documents []byte) *BatchCursor {
   220  	return &BatchCursor{
   221  		currentBatch: &bsoncore.DocumentSequence{
   222  			Data:  documents,
   223  			Style: bsoncore.SequenceStyle,
   224  		},
   225  		// BatchCursors created with this function have no associated ID nor server, so no getMore
   226  		// calls will be made.
   227  		id:     0,
   228  		server: nil,
   229  	}
   230  }
   231  
   232  // ID returns the cursor ID for this batch cursor.
   233  func (bc *BatchCursor) ID() int64 {
   234  	return bc.id
   235  }
   236  
   237  // Next indicates if there is another batch available. Returning false does not necessarily indicate
   238  // that the cursor is closed. This method will return false when an empty batch is returned.
   239  //
   240  // If Next returns true, there is a valid batch of documents available. If Next returns false, there
   241  // is not a valid batch of documents available.
   242  func (bc *BatchCursor) Next(ctx context.Context) bool {
   243  	if ctx == nil {
   244  		ctx = context.Background()
   245  	}
   246  
   247  	if bc.firstBatch {
   248  		bc.firstBatch = false
   249  		return !bc.currentBatch.Empty()
   250  	}
   251  
   252  	if bc.id == 0 || bc.server == nil {
   253  		return false
   254  	}
   255  
   256  	bc.getMore(ctx)
   257  
   258  	return !bc.currentBatch.Empty()
   259  }
   260  
   261  // Batch will return a DocumentSequence for the current batch of documents. The returned
   262  // DocumentSequence is only valid until the next call to Next or Close.
   263  func (bc *BatchCursor) Batch() *bsoncore.DocumentSequence { return bc.currentBatch }
   264  
   265  // Err returns the latest error encountered.
   266  func (bc *BatchCursor) Err() error { return bc.err }
   267  
   268  // Close closes this batch cursor.
   269  func (bc *BatchCursor) Close(ctx context.Context) error {
   270  	if ctx == nil {
   271  		ctx = context.Background()
   272  	}
   273  
   274  	err := bc.KillCursor(ctx)
   275  	bc.id = 0
   276  	bc.currentBatch.Data = nil
   277  	bc.currentBatch.Style = 0
   278  	bc.currentBatch.ResetIterator()
   279  
   280  	connErr := bc.unpinConnection()
   281  	if err == nil {
   282  		err = connErr
   283  	}
   284  	return err
   285  }
   286  
   287  func (bc *BatchCursor) unpinConnection() error {
   288  	if bc.connection == nil {
   289  		return nil
   290  	}
   291  
   292  	err := bc.connection.UnpinFromCursor()
   293  	closeErr := bc.connection.Close()
   294  	if err == nil && closeErr != nil {
   295  		err = closeErr
   296  	}
   297  	bc.connection = nil
   298  	return err
   299  }
   300  
   301  // Server returns the server for this cursor.
   302  func (bc *BatchCursor) Server() Server {
   303  	return bc.server
   304  }
   305  
   306  func (bc *BatchCursor) clearBatch() {
   307  	bc.currentBatch.Data = bc.currentBatch.Data[:0]
   308  }
   309  
   310  // KillCursor kills cursor on server without closing batch cursor
   311  func (bc *BatchCursor) KillCursor(ctx context.Context) error {
   312  	if bc.server == nil || bc.id == 0 {
   313  		return nil
   314  	}
   315  
   316  	return Operation{
   317  		CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) {
   318  			dst = bsoncore.AppendStringElement(dst, "killCursors", bc.collection)
   319  			dst = bsoncore.BuildArrayElement(dst, "cursors", bsoncore.Value{Type: bsontype.Int64, Data: bsoncore.AppendInt64(nil, bc.id)})
   320  			return dst, nil
   321  		},
   322  		Database:       bc.database,
   323  		Deployment:     bc.getOperationDeployment(),
   324  		Client:         bc.clientSession,
   325  		Clock:          bc.clock,
   326  		Legacy:         LegacyKillCursors,
   327  		CommandMonitor: bc.cmdMonitor,
   328  		ServerAPI:      bc.serverAPI,
   329  
   330  		// No read preference is passed to the killCursor command,
   331  		// resulting in the default read preference: "primaryPreferred".
   332  		// Since this could be confusing, and there is no requirement
   333  		// to use a read preference here, we omit it.
   334  		omitReadPreference: true,
   335  	}.Execute(ctx)
   336  }
   337  
   338  // calcGetMoreBatchSize calculates the number of documents to return in the
   339  // response of a "getMore" operation based on the given limit, batchSize, and
   340  // number of documents already returned. Returns false if a non-trivial limit is
   341  // lower than or equal to the number of documents already returned.
   342  func calcGetMoreBatchSize(bc BatchCursor) (int32, bool) {
   343  	gmBatchSize := bc.batchSize
   344  
   345  	// Account for legacy operations that don't support setting a limit.
   346  	if bc.limit != 0 && bc.numReturned+bc.batchSize >= bc.limit {
   347  		gmBatchSize = bc.limit - bc.numReturned
   348  		if gmBatchSize <= 0 {
   349  			return gmBatchSize, false
   350  		}
   351  	}
   352  
   353  	return gmBatchSize, true
   354  }
   355  
   356  func (bc *BatchCursor) getMore(ctx context.Context) {
   357  	bc.clearBatch()
   358  	if bc.id == 0 {
   359  		return
   360  	}
   361  
   362  	numToReturn, ok := calcGetMoreBatchSize(*bc)
   363  	if !ok {
   364  		if err := bc.Close(ctx); err != nil {
   365  			bc.err = err
   366  		}
   367  
   368  		return
   369  	}
   370  
   371  	bc.err = Operation{
   372  		CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) {
   373  			dst = bsoncore.AppendInt64Element(dst, "getMore", bc.id)
   374  			dst = bsoncore.AppendStringElement(dst, "collection", bc.collection)
   375  			if numToReturn > 0 {
   376  				dst = bsoncore.AppendInt32Element(dst, "batchSize", numToReturn)
   377  			}
   378  			if bc.maxTimeMS > 0 {
   379  				dst = bsoncore.AppendInt64Element(dst, "maxTimeMS", bc.maxTimeMS)
   380  			}
   381  
   382  			comment, err := codecutil.MarshalValue(bc.comment, bc.encoderFn)
   383  			if err != nil {
   384  				return nil, fmt.Errorf("error marshaling comment as a BSON value: %w", err)
   385  			}
   386  
   387  			// The getMore command does not support commenting pre-4.4.
   388  			if comment.Type != bsontype.Type(0) && bc.serverDescription.WireVersion.Max >= 9 {
   389  				dst = bsoncore.AppendValueElement(dst, "comment", comment)
   390  			}
   391  
   392  			return dst, nil
   393  		},
   394  		Database:   bc.database,
   395  		Deployment: bc.getOperationDeployment(),
   396  		ProcessResponseFn: func(info ResponseInfo) error {
   397  			response := info.ServerResponse
   398  			id, ok := response.Lookup("cursor", "id").Int64OK()
   399  			if !ok {
   400  				return fmt.Errorf("cursor.id should be an int64 but is a BSON %s", response.Lookup("cursor", "id").Type)
   401  			}
   402  			bc.id = id
   403  
   404  			batch, ok := response.Lookup("cursor", "nextBatch").ArrayOK()
   405  			if !ok {
   406  				return fmt.Errorf("cursor.nextBatch should be an array but is a BSON %s", response.Lookup("cursor", "nextBatch").Type)
   407  			}
   408  			bc.currentBatch.Style = bsoncore.ArrayStyle
   409  			bc.currentBatch.Data = batch
   410  			bc.currentBatch.ResetIterator()
   411  			bc.numReturned += int32(bc.currentBatch.DocumentCount()) // Required for legacy operations which don't support limit.
   412  
   413  			pbrt, err := response.LookupErr("cursor", "postBatchResumeToken")
   414  			if err != nil {
   415  				// I don't really understand why we don't set bc.err here
   416  				return nil
   417  			}
   418  
   419  			pbrtDoc, ok := pbrt.DocumentOK()
   420  			if !ok {
   421  				bc.err = fmt.Errorf("expected BSON type for post batch resume token to be EmbeddedDocument but got %s", pbrt.Type)
   422  				return nil
   423  			}
   424  
   425  			bc.postBatchResumeToken = pbrtDoc
   426  
   427  			return nil
   428  		},
   429  		Client:         bc.clientSession,
   430  		Clock:          bc.clock,
   431  		Legacy:         LegacyGetMore,
   432  		CommandMonitor: bc.cmdMonitor,
   433  		Crypt:          bc.crypt,
   434  		ServerAPI:      bc.serverAPI,
   435  
   436  		// No read preference is passed to the getMore command,
   437  		// resulting in the default read preference: "primaryPreferred".
   438  		// Since this could be confusing, and there is no requirement
   439  		// to use a read preference here, we omit it.
   440  		omitReadPreference: true,
   441  	}.Execute(ctx)
   442  
   443  	// Once the cursor has been drained, we can unpin the connection if one is currently pinned.
   444  	if bc.id == 0 {
   445  		err := bc.unpinConnection()
   446  		if err != nil && bc.err == nil {
   447  			bc.err = err
   448  		}
   449  	}
   450  
   451  	// If we're in load balanced mode and the pinned connection encounters a network error, we should not use it for
   452  	// future commands. Per the spec, the connection will not be unpinned until the cursor is actually closed, but
   453  	// we set the cursor ID to 0 to ensure the Close() call will not execute a killCursors command.
   454  	if driverErr, ok := bc.err.(Error); ok && driverErr.NetworkError() && bc.connection != nil {
   455  		bc.id = 0
   456  	}
   457  
   458  	// Required for legacy operations which don't support limit.
   459  	if bc.limit != 0 && bc.numReturned >= bc.limit {
   460  		// call KillCursor instead of Close because Close will clear out the data for the current batch.
   461  		err := bc.KillCursor(ctx)
   462  		if err != nil && bc.err == nil {
   463  			bc.err = err
   464  		}
   465  	}
   466  }
   467  
   468  // PostBatchResumeToken returns the latest seen post batch resume token.
   469  func (bc *BatchCursor) PostBatchResumeToken() bsoncore.Document {
   470  	return bc.postBatchResumeToken
   471  }
   472  
   473  // SetBatchSize sets the batchSize for future getMore operations.
   474  func (bc *BatchCursor) SetBatchSize(size int32) {
   475  	bc.batchSize = size
   476  }
   477  
   478  // SetMaxTime will set the maximum amount of time the server will allow the
   479  // operations to execute. The server will error if this field is set but the
   480  // cursor is not configured with awaitData=true.
   481  //
   482  // The time.Duration value passed by this setter will be converted and rounded
   483  // down to the nearest millisecond.
   484  func (bc *BatchCursor) SetMaxTime(dur time.Duration) {
   485  	bc.maxTimeMS = int64(dur / time.Millisecond)
   486  }
   487  
   488  // SetComment sets the comment for future getMore operations.
   489  func (bc *BatchCursor) SetComment(comment interface{}) {
   490  	bc.comment = comment
   491  }
   492  
   493  func (bc *BatchCursor) getOperationDeployment() Deployment {
   494  	if bc.connection != nil {
   495  		return &loadBalancedCursorDeployment{
   496  			errorProcessor: bc.errorProcessor,
   497  			conn:           bc.connection,
   498  		}
   499  	}
   500  	return SingleServerDeployment{bc.server}
   501  }
   502  
   503  // loadBalancedCursorDeployment is used as a Deployment for getMore and killCursors commands when pinning to a
   504  // connection in load balanced mode. This type also functions as an ErrorProcessor to ensure that SDAM errors are
   505  // handled for these commands in this mode.
   506  type loadBalancedCursorDeployment struct {
   507  	errorProcessor ErrorProcessor
   508  	conn           PinnedConnection
   509  }
   510  
   511  var _ Deployment = (*loadBalancedCursorDeployment)(nil)
   512  var _ Server = (*loadBalancedCursorDeployment)(nil)
   513  var _ ErrorProcessor = (*loadBalancedCursorDeployment)(nil)
   514  
   515  func (lbcd *loadBalancedCursorDeployment) SelectServer(_ context.Context, _ description.ServerSelector) (Server, error) {
   516  	return lbcd, nil
   517  }
   518  
   519  func (lbcd *loadBalancedCursorDeployment) Kind() description.TopologyKind {
   520  	return description.LoadBalanced
   521  }
   522  
   523  func (lbcd *loadBalancedCursorDeployment) Connection(_ context.Context) (Connection, error) {
   524  	return lbcd.conn, nil
   525  }
   526  
   527  // RTTMonitor implements the driver.Server interface.
   528  func (lbcd *loadBalancedCursorDeployment) RTTMonitor() RTTMonitor {
   529  	return &csot.ZeroRTTMonitor{}
   530  }
   531  
   532  func (lbcd *loadBalancedCursorDeployment) ProcessError(err error, conn Connection) ProcessErrorResult {
   533  	return lbcd.errorProcessor.ProcessError(err, conn)
   534  }
   535  

View as plain text