...

Source file src/go.mongodb.org/mongo-driver/mongo/cursor.go

Documentation: go.mongodb.org/mongo-driver/mongo

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package mongo
     8  
     9  import (
    10  	"context"
    11  	"errors"
    12  	"fmt"
    13  	"io"
    14  	"reflect"
    15  	"time"
    16  
    17  	"go.mongodb.org/mongo-driver/bson"
    18  	"go.mongodb.org/mongo-driver/bson/bsoncodec"
    19  	"go.mongodb.org/mongo-driver/bson/bsonrw"
    20  	"go.mongodb.org/mongo-driver/mongo/options"
    21  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    22  	"go.mongodb.org/mongo-driver/x/mongo/driver"
    23  	"go.mongodb.org/mongo-driver/x/mongo/driver/session"
    24  )
    25  
    26  // Cursor is used to iterate over a stream of documents. Each document can be decoded into a Go type via the Decode
    27  // method or accessed as raw BSON via the Current field. This type is not goroutine safe and must not be used
    28  // concurrently by multiple goroutines.
    29  type Cursor struct {
    30  	// Current contains the BSON bytes of the current change document. This property is only valid until the next call
    31  	// to Next or TryNext. If continued access is required, a copy must be made.
    32  	Current bson.Raw
    33  
    34  	bc            batchCursor
    35  	batch         *bsoncore.DocumentSequence
    36  	batchLength   int
    37  	bsonOpts      *options.BSONOptions
    38  	registry      *bsoncodec.Registry
    39  	clientSession *session.Client
    40  
    41  	err error
    42  }
    43  
    44  func newCursor(
    45  	bc batchCursor,
    46  	bsonOpts *options.BSONOptions,
    47  	registry *bsoncodec.Registry,
    48  ) (*Cursor, error) {
    49  	return newCursorWithSession(bc, bsonOpts, registry, nil)
    50  }
    51  
    52  func newCursorWithSession(
    53  	bc batchCursor,
    54  	bsonOpts *options.BSONOptions,
    55  	registry *bsoncodec.Registry,
    56  	clientSession *session.Client,
    57  ) (*Cursor, error) {
    58  	if registry == nil {
    59  		registry = bson.DefaultRegistry
    60  	}
    61  	if bc == nil {
    62  		return nil, errors.New("batch cursor must not be nil")
    63  	}
    64  	c := &Cursor{
    65  		bc:            bc,
    66  		bsonOpts:      bsonOpts,
    67  		registry:      registry,
    68  		clientSession: clientSession,
    69  	}
    70  	if bc.ID() == 0 {
    71  		c.closeImplicitSession()
    72  	}
    73  
    74  	// Initialize just the batchLength here so RemainingBatchLength will return an accurate result. The actual batch
    75  	// will be pulled up by the first Next/TryNext call.
    76  	c.batchLength = c.bc.Batch().DocumentCount()
    77  	return c, nil
    78  }
    79  
    80  func newEmptyCursor() *Cursor {
    81  	return &Cursor{bc: driver.NewEmptyBatchCursor()}
    82  }
    83  
    84  // NewCursorFromDocuments creates a new Cursor pre-loaded with the provided documents, error and registry. If no registry is provided,
    85  // bson.DefaultRegistry will be used.
    86  //
    87  // The documents parameter must be a slice of documents. The slice may be nil or empty, but all elements must be non-nil.
    88  func NewCursorFromDocuments(documents []interface{}, err error, registry *bsoncodec.Registry) (*Cursor, error) {
    89  	if registry == nil {
    90  		registry = bson.DefaultRegistry
    91  	}
    92  
    93  	// Convert documents slice to a sequence-style byte array.
    94  	var docsBytes []byte
    95  	for _, doc := range documents {
    96  		switch t := doc.(type) {
    97  		case nil:
    98  			return nil, ErrNilDocument
    99  		case []byte:
   100  			// Slight optimization so we'll just use MarshalBSON and not go through the codec machinery.
   101  			doc = bson.Raw(t)
   102  		}
   103  		var marshalErr error
   104  		docsBytes, marshalErr = bson.MarshalAppendWithRegistry(registry, docsBytes, doc)
   105  		if marshalErr != nil {
   106  			return nil, marshalErr
   107  		}
   108  	}
   109  
   110  	c := &Cursor{
   111  		bc:       driver.NewBatchCursorFromDocuments(docsBytes),
   112  		registry: registry,
   113  		err:      err,
   114  	}
   115  
   116  	// Initialize batch and batchLength here. The underlying batch cursor will be preloaded with the
   117  	// provided contents, and thus already has a batch before calls to Next/TryNext.
   118  	c.batch = c.bc.Batch()
   119  	c.batchLength = c.bc.Batch().DocumentCount()
   120  	return c, nil
   121  }
   122  
   123  // ID returns the ID of this cursor, or 0 if the cursor has been closed or exhausted.
   124  func (c *Cursor) ID() int64 { return c.bc.ID() }
   125  
   126  // Next gets the next document for this cursor. It returns true if there were no errors and the cursor has not been
   127  // exhausted.
   128  //
   129  // Next blocks until a document is available or an error occurs. If the context expires, the cursor's error will
   130  // be set to ctx.Err(). In case of an error, Next will return false.
   131  //
   132  // If Next returns false, subsequent calls will also return false.
   133  func (c *Cursor) Next(ctx context.Context) bool {
   134  	return c.next(ctx, false)
   135  }
   136  
   137  // TryNext attempts to get the next document for this cursor. It returns true if there were no errors and the next
   138  // document is available. This is only recommended for use with tailable cursors as a non-blocking alternative to
   139  // Next. See https://www.mongodb.com/docs/manual/core/tailable-cursors/ for more information about tailable cursors.
   140  //
   141  // TryNext returns false if the cursor is exhausted, an error occurs when getting results from the server, the next
   142  // document is not yet available, or ctx expires. If the context  expires, the cursor's error will be set to ctx.Err().
   143  //
   144  // If TryNext returns false and an error occurred or the cursor has been exhausted (i.e. c.Err() != nil || c.ID() == 0),
   145  // subsequent attempts will also return false. Otherwise, it is safe to call TryNext again until a document is
   146  // available.
   147  //
   148  // This method requires driver version >= 1.2.0.
   149  func (c *Cursor) TryNext(ctx context.Context) bool {
   150  	return c.next(ctx, true)
   151  }
   152  
   153  func (c *Cursor) next(ctx context.Context, nonBlocking bool) bool {
   154  	// return false right away if the cursor has already errored.
   155  	if c.err != nil {
   156  		return false
   157  	}
   158  
   159  	if ctx == nil {
   160  		ctx = context.Background()
   161  	}
   162  	doc, err := c.batch.Next()
   163  	switch {
   164  	case err == nil:
   165  		// Consume the next document in the current batch.
   166  		c.batchLength--
   167  		c.Current = bson.Raw(doc)
   168  		return true
   169  	case errors.Is(err, io.EOF): // Need to do a getMore
   170  	default:
   171  		c.err = err
   172  		return false
   173  	}
   174  
   175  	// call the Next method in a loop until at least one document is returned in the next batch or
   176  	// the context times out.
   177  	for {
   178  		// If we don't have a next batch
   179  		if !c.bc.Next(ctx) {
   180  			// Do we have an error? If so we return false.
   181  			c.err = replaceErrors(c.bc.Err())
   182  			if c.err != nil {
   183  				return false
   184  			}
   185  			// Is the cursor ID zero?
   186  			if c.bc.ID() == 0 {
   187  				c.closeImplicitSession()
   188  				return false
   189  			}
   190  			// empty batch, but cursor is still valid.
   191  			// use nonBlocking to determine if we should continue or return control to the caller.
   192  			if nonBlocking {
   193  				return false
   194  			}
   195  			continue
   196  		}
   197  
   198  		// close the implicit session if this was the last getMore
   199  		if c.bc.ID() == 0 {
   200  			c.closeImplicitSession()
   201  		}
   202  
   203  		// Use the new batch to update the batch and batchLength fields. Consume the first document in the batch.
   204  		c.batch = c.bc.Batch()
   205  		c.batchLength = c.batch.DocumentCount()
   206  		doc, err = c.batch.Next()
   207  		switch {
   208  		case err == nil:
   209  			c.batchLength--
   210  			c.Current = bson.Raw(doc)
   211  			return true
   212  		case errors.Is(err, io.EOF): // Empty batch so we continue
   213  		default:
   214  			c.err = err
   215  			return false
   216  		}
   217  	}
   218  }
   219  
   220  func getDecoder(
   221  	data []byte,
   222  	opts *options.BSONOptions,
   223  	reg *bsoncodec.Registry,
   224  ) (*bson.Decoder, error) {
   225  	dec, err := bson.NewDecoder(bsonrw.NewBSONDocumentReader(data))
   226  	if err != nil {
   227  		return nil, err
   228  	}
   229  
   230  	if opts != nil {
   231  		if opts.AllowTruncatingDoubles {
   232  			dec.AllowTruncatingDoubles()
   233  		}
   234  		if opts.BinaryAsSlice {
   235  			dec.BinaryAsSlice()
   236  		}
   237  		if opts.DefaultDocumentD {
   238  			dec.DefaultDocumentD()
   239  		}
   240  		if opts.DefaultDocumentM {
   241  			dec.DefaultDocumentM()
   242  		}
   243  		if opts.UseJSONStructTags {
   244  			dec.UseJSONStructTags()
   245  		}
   246  		if opts.UseLocalTimeZone {
   247  			dec.UseLocalTimeZone()
   248  		}
   249  		if opts.ZeroMaps {
   250  			dec.ZeroMaps()
   251  		}
   252  		if opts.ZeroStructs {
   253  			dec.ZeroStructs()
   254  		}
   255  	}
   256  
   257  	if reg != nil {
   258  		// TODO:(GODRIVER-2719): Remove error handling.
   259  		if err := dec.SetRegistry(reg); err != nil {
   260  			return nil, err
   261  		}
   262  	}
   263  
   264  	return dec, nil
   265  }
   266  
   267  // Decode will unmarshal the current document into val and return any errors from the unmarshalling process without any
   268  // modification. If val is nil or is a typed nil, an error will be returned.
   269  func (c *Cursor) Decode(val interface{}) error {
   270  	dec, err := getDecoder(c.Current, c.bsonOpts, c.registry)
   271  	if err != nil {
   272  		return fmt.Errorf("error configuring BSON decoder: %w", err)
   273  	}
   274  
   275  	return dec.Decode(val)
   276  }
   277  
   278  // Err returns the last error seen by the Cursor, or nil if no error has occurred.
   279  func (c *Cursor) Err() error { return c.err }
   280  
   281  // Close closes this cursor. Next and TryNext must not be called after Close has been called. Close is idempotent. After
   282  // the first call, any subsequent calls will not change the state.
   283  func (c *Cursor) Close(ctx context.Context) error {
   284  	defer c.closeImplicitSession()
   285  	return replaceErrors(c.bc.Close(ctx))
   286  }
   287  
   288  // All iterates the cursor and decodes each document into results. The results parameter must be a pointer to a slice.
   289  // The slice pointed to by results will be completely overwritten. This method will close the cursor after retrieving
   290  // all documents. If the cursor has been iterated, any previously iterated documents will not be included in results.
   291  //
   292  // This method requires driver version >= 1.1.0.
   293  func (c *Cursor) All(ctx context.Context, results interface{}) error {
   294  	resultsVal := reflect.ValueOf(results)
   295  	if resultsVal.Kind() != reflect.Ptr {
   296  		return fmt.Errorf("results argument must be a pointer to a slice, but was a %s", resultsVal.Kind())
   297  	}
   298  
   299  	sliceVal := resultsVal.Elem()
   300  	if sliceVal.Kind() == reflect.Interface {
   301  		sliceVal = sliceVal.Elem()
   302  	}
   303  
   304  	if sliceVal.Kind() != reflect.Slice {
   305  		return fmt.Errorf("results argument must be a pointer to a slice, but was a pointer to %s", sliceVal.Kind())
   306  	}
   307  
   308  	elementType := sliceVal.Type().Elem()
   309  	var index int
   310  	var err error
   311  
   312  	// Defer a call to Close to try to clean up the cursor server-side when all
   313  	// documents have not been exhausted. Use context.Background() to ensure Close
   314  	// completes even if the context passed to All has errored.
   315  	defer c.Close(context.Background())
   316  
   317  	batch := c.batch // exhaust the current batch before iterating the batch cursor
   318  	for {
   319  		sliceVal, index, err = c.addFromBatch(sliceVal, elementType, batch, index)
   320  		if err != nil {
   321  			return err
   322  		}
   323  
   324  		if !c.bc.Next(ctx) {
   325  			break
   326  		}
   327  
   328  		batch = c.bc.Batch()
   329  	}
   330  
   331  	if err = replaceErrors(c.bc.Err()); err != nil {
   332  		return err
   333  	}
   334  
   335  	resultsVal.Elem().Set(sliceVal.Slice(0, index))
   336  	return nil
   337  }
   338  
   339  // RemainingBatchLength returns the number of documents left in the current batch. If this returns zero, the subsequent
   340  // call to Next or TryNext will do a network request to fetch the next batch.
   341  func (c *Cursor) RemainingBatchLength() int {
   342  	return c.batchLength
   343  }
   344  
   345  // addFromBatch adds all documents from batch to sliceVal starting at the given index. It returns the new slice value,
   346  // the next empty index in the slice, and an error if one occurs.
   347  func (c *Cursor) addFromBatch(sliceVal reflect.Value, elemType reflect.Type, batch *bsoncore.DocumentSequence,
   348  	index int) (reflect.Value, int, error) {
   349  
   350  	docs, err := batch.Documents()
   351  	if err != nil {
   352  		return sliceVal, index, err
   353  	}
   354  
   355  	for _, doc := range docs {
   356  		if sliceVal.Len() == index {
   357  			// slice is full
   358  			newElem := reflect.New(elemType)
   359  			sliceVal = reflect.Append(sliceVal, newElem.Elem())
   360  			sliceVal = sliceVal.Slice(0, sliceVal.Cap())
   361  		}
   362  
   363  		currElem := sliceVal.Index(index).Addr().Interface()
   364  		dec, err := getDecoder(doc, c.bsonOpts, c.registry)
   365  		if err != nil {
   366  			return sliceVal, index, fmt.Errorf("error configuring BSON decoder: %w", err)
   367  		}
   368  		err = dec.Decode(currElem)
   369  		if err != nil {
   370  			return sliceVal, index, err
   371  		}
   372  
   373  		index++
   374  	}
   375  
   376  	return sliceVal, index, nil
   377  }
   378  
   379  func (c *Cursor) closeImplicitSession() {
   380  	if c.clientSession != nil && c.clientSession.IsImplicit {
   381  		c.clientSession.EndSession()
   382  	}
   383  }
   384  
   385  // SetBatchSize sets the number of documents to fetch from the database with
   386  // each iteration of the cursor's "Next" method. Note that some operations set
   387  // an initial cursor batch size, so this setting only affects subsequent
   388  // document batches fetched from the database.
   389  func (c *Cursor) SetBatchSize(batchSize int32) {
   390  	c.bc.SetBatchSize(batchSize)
   391  }
   392  
   393  // SetMaxTime will set the maximum amount of time the server will allow the
   394  // operations to execute. The server will error if this field is set but the
   395  // cursor is not configured with awaitData=true.
   396  //
   397  // The time.Duration value passed by this setter will be converted and rounded
   398  // down to the nearest millisecond.
   399  func (c *Cursor) SetMaxTime(dur time.Duration) {
   400  	c.bc.SetMaxTime(dur)
   401  }
   402  
   403  // SetComment will set a user-configurable comment that can be used to identify
   404  // the operation in server logs.
   405  func (c *Cursor) SetComment(comment interface{}) {
   406  	c.bc.SetComment(comment)
   407  }
   408  
   409  // BatchCursorFromCursor returns a driver.BatchCursor for the given Cursor. If there is no underlying
   410  // driver.BatchCursor, nil is returned.
   411  //
   412  // Deprecated: This is an unstable function because the driver.BatchCursor type exists in the "x" package. Neither this
   413  // function nor the driver.BatchCursor type should be used by applications and may be changed or removed in any release.
   414  func BatchCursorFromCursor(c *Cursor) *driver.BatchCursor {
   415  	bc, _ := c.bc.(*driver.BatchCursor)
   416  	return bc
   417  }
   418  

View as plain text