...

Source file src/go.mongodb.org/mongo-driver/mongo/gridfs/bucket.go

Documentation: go.mongodb.org/mongo-driver/mongo/gridfs

     1  // Copyright (C) MongoDB, Inc. 2017-present.
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License"); you may
     4  // not use this file except in compliance with the License. You may obtain
     5  // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
     6  
     7  package gridfs // import "go.mongodb.org/mongo-driver/mongo/gridfs"
     8  
     9  import (
    10  	"bytes"
    11  	"context"
    12  	"errors"
    13  	"fmt"
    14  	"io"
    15  	"time"
    16  
    17  	"go.mongodb.org/mongo-driver/bson"
    18  	"go.mongodb.org/mongo-driver/bson/primitive"
    19  	"go.mongodb.org/mongo-driver/internal/csot"
    20  	"go.mongodb.org/mongo-driver/mongo"
    21  	"go.mongodb.org/mongo-driver/mongo/options"
    22  	"go.mongodb.org/mongo-driver/mongo/readconcern"
    23  	"go.mongodb.org/mongo-driver/mongo/readpref"
    24  	"go.mongodb.org/mongo-driver/mongo/writeconcern"
    25  	"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
    26  )
    27  
    28  // TODO: add sessions options
    29  
    30  // DefaultChunkSize is the default size of each file chunk.
    31  const DefaultChunkSize int32 = 255 * 1024 // 255 KiB
    32  
    33  // ErrFileNotFound occurs if a user asks to download a file with a file ID that isn't found in the files collection.
    34  var ErrFileNotFound = errors.New("file with given parameters not found")
    35  
    36  // ErrMissingChunkSize occurs when downloading a file if the files collection document is missing the "chunkSize" field.
    37  var ErrMissingChunkSize = errors.New("files collection document does not contain a 'chunkSize' field")
    38  
    39  // Bucket represents a GridFS bucket.
    40  type Bucket struct {
    41  	db         *mongo.Database
    42  	chunksColl *mongo.Collection // collection to store file chunks
    43  	filesColl  *mongo.Collection // collection to store file metadata
    44  
    45  	name      string
    46  	chunkSize int32
    47  	wc        *writeconcern.WriteConcern
    48  	rc        *readconcern.ReadConcern
    49  	rp        *readpref.ReadPref
    50  
    51  	firstWriteDone bool
    52  	readBuf        []byte
    53  	writeBuf       []byte
    54  
    55  	readDeadline  time.Time
    56  	writeDeadline time.Time
    57  }
    58  
    59  // Upload contains options to upload a file to a bucket.
    60  type Upload struct {
    61  	chunkSize int32
    62  	metadata  bson.D
    63  }
    64  
    65  // NewBucket creates a GridFS bucket.
    66  func NewBucket(db *mongo.Database, opts ...*options.BucketOptions) (*Bucket, error) {
    67  	b := &Bucket{
    68  		name:      "fs",
    69  		chunkSize: DefaultChunkSize,
    70  		db:        db,
    71  		wc:        db.WriteConcern(),
    72  		rc:        db.ReadConcern(),
    73  		rp:        db.ReadPreference(),
    74  	}
    75  
    76  	bo := options.MergeBucketOptions(opts...)
    77  	if bo.Name != nil {
    78  		b.name = *bo.Name
    79  	}
    80  	if bo.ChunkSizeBytes != nil {
    81  		b.chunkSize = *bo.ChunkSizeBytes
    82  	}
    83  	if bo.WriteConcern != nil {
    84  		b.wc = bo.WriteConcern
    85  	}
    86  	if bo.ReadConcern != nil {
    87  		b.rc = bo.ReadConcern
    88  	}
    89  	if bo.ReadPreference != nil {
    90  		b.rp = bo.ReadPreference
    91  	}
    92  
    93  	var collOpts = options.Collection().SetWriteConcern(b.wc).SetReadConcern(b.rc).SetReadPreference(b.rp)
    94  
    95  	b.chunksColl = db.Collection(b.name+".chunks", collOpts)
    96  	b.filesColl = db.Collection(b.name+".files", collOpts)
    97  	b.readBuf = make([]byte, b.chunkSize)
    98  	b.writeBuf = make([]byte, b.chunkSize)
    99  
   100  	return b, nil
   101  }
   102  
   103  // SetWriteDeadline sets the write deadline for this bucket.
   104  func (b *Bucket) SetWriteDeadline(t time.Time) error {
   105  	b.writeDeadline = t
   106  	return nil
   107  }
   108  
   109  // SetReadDeadline sets the read deadline for this bucket
   110  func (b *Bucket) SetReadDeadline(t time.Time) error {
   111  	b.readDeadline = t
   112  	return nil
   113  }
   114  
   115  // OpenUploadStream creates a file ID new upload stream for a file given the filename.
   116  func (b *Bucket) OpenUploadStream(filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
   117  	return b.OpenUploadStreamWithID(primitive.NewObjectID(), filename, opts...)
   118  }
   119  
   120  // OpenUploadStreamWithID creates a new upload stream for a file given the file ID and filename.
   121  func (b *Bucket) OpenUploadStreamWithID(fileID interface{}, filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
   122  	ctx, cancel := deadlineContext(b.writeDeadline)
   123  	if cancel != nil {
   124  		defer cancel()
   125  	}
   126  
   127  	if err := b.checkFirstWrite(ctx); err != nil {
   128  		return nil, err
   129  	}
   130  
   131  	upload, err := b.parseUploadOptions(opts...)
   132  	if err != nil {
   133  		return nil, err
   134  	}
   135  
   136  	return newUploadStream(upload, fileID, filename, b.chunksColl, b.filesColl), nil
   137  }
   138  
   139  // UploadFromStream creates a fileID and uploads a file given a source stream.
   140  //
   141  // If this upload requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
   142  // write operations operations on this bucket that also require a custom deadline.
   143  func (b *Bucket) UploadFromStream(filename string, source io.Reader, opts ...*options.UploadOptions) (primitive.ObjectID, error) {
   144  	fileID := primitive.NewObjectID()
   145  	err := b.UploadFromStreamWithID(fileID, filename, source, opts...)
   146  	return fileID, err
   147  }
   148  
   149  // UploadFromStreamWithID uploads a file given a source stream.
   150  //
   151  // If this upload requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
   152  // write operations operations on this bucket that also require a custom deadline.
   153  func (b *Bucket) UploadFromStreamWithID(fileID interface{}, filename string, source io.Reader, opts ...*options.UploadOptions) error {
   154  	us, err := b.OpenUploadStreamWithID(fileID, filename, opts...)
   155  	if err != nil {
   156  		return err
   157  	}
   158  
   159  	err = us.SetWriteDeadline(b.writeDeadline)
   160  	if err != nil {
   161  		_ = us.Close()
   162  		return err
   163  	}
   164  
   165  	for {
   166  		n, err := source.Read(b.readBuf)
   167  		if err != nil && err != io.EOF {
   168  			_ = us.Abort() // upload considered aborted if source stream returns an error
   169  			return err
   170  		}
   171  
   172  		if n > 0 {
   173  			_, err := us.Write(b.readBuf[:n])
   174  			if err != nil {
   175  				return err
   176  			}
   177  		}
   178  
   179  		if n == 0 || err == io.EOF {
   180  			break
   181  		}
   182  	}
   183  
   184  	return us.Close()
   185  }
   186  
   187  // OpenDownloadStream creates a stream from which the contents of the file can be read.
   188  func (b *Bucket) OpenDownloadStream(fileID interface{}) (*DownloadStream, error) {
   189  	return b.openDownloadStream(bson.D{
   190  		{"_id", fileID},
   191  	})
   192  }
   193  
   194  // DownloadToStream downloads the file with the specified fileID and writes it to the provided io.Writer.
   195  // Returns the number of bytes written to the stream and an error, or nil if there was no error.
   196  //
   197  // If this download requires a custom read deadline to be set on the bucket, it cannot be done concurrently with other
   198  // read operations operations on this bucket that also require a custom deadline.
   199  func (b *Bucket) DownloadToStream(fileID interface{}, stream io.Writer) (int64, error) {
   200  	ds, err := b.OpenDownloadStream(fileID)
   201  	if err != nil {
   202  		return 0, err
   203  	}
   204  
   205  	return b.downloadToStream(ds, stream)
   206  }
   207  
   208  // OpenDownloadStreamByName opens a download stream for the file with the given filename.
   209  func (b *Bucket) OpenDownloadStreamByName(filename string, opts ...*options.NameOptions) (*DownloadStream, error) {
   210  	var numSkip int32 = -1
   211  	var sortOrder int32 = 1
   212  
   213  	nameOpts := options.MergeNameOptions(opts...)
   214  	if nameOpts.Revision != nil {
   215  		numSkip = *nameOpts.Revision
   216  	}
   217  
   218  	if numSkip < 0 {
   219  		sortOrder = -1
   220  		numSkip = (-1 * numSkip) - 1
   221  	}
   222  
   223  	findOpts := options.Find().SetSkip(int64(numSkip)).SetSort(bson.D{{"uploadDate", sortOrder}})
   224  
   225  	return b.openDownloadStream(bson.D{{"filename", filename}}, findOpts)
   226  }
   227  
   228  // DownloadToStreamByName downloads the file with the given name to the given io.Writer.
   229  //
   230  // If this download requires a custom read deadline to be set on the bucket, it cannot be done concurrently with other
   231  // read operations operations on this bucket that also require a custom deadline.
   232  func (b *Bucket) DownloadToStreamByName(filename string, stream io.Writer, opts ...*options.NameOptions) (int64, error) {
   233  	ds, err := b.OpenDownloadStreamByName(filename, opts...)
   234  	if err != nil {
   235  		return 0, err
   236  	}
   237  
   238  	return b.downloadToStream(ds, stream)
   239  }
   240  
   241  // Delete deletes all chunks and metadata associated with the file with the given file ID.
   242  //
   243  // If this operation requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
   244  // write operations operations on this bucket that also require a custom deadline.
   245  //
   246  // Use SetWriteDeadline to set a deadline for the delete operation.
   247  func (b *Bucket) Delete(fileID interface{}) error {
   248  	ctx, cancel := deadlineContext(b.writeDeadline)
   249  	if cancel != nil {
   250  		defer cancel()
   251  	}
   252  	return b.DeleteContext(ctx, fileID)
   253  }
   254  
   255  // DeleteContext deletes all chunks and metadata associated with the file with the given file ID and runs the underlying
   256  // delete operations with the provided context.
   257  //
   258  // Use the context parameter to time-out or cancel the delete operation. The deadline set by SetWriteDeadline is ignored.
   259  func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error {
   260  	// If Timeout is set on the Client and context is not already a Timeout
   261  	// context, honor Timeout in new Timeout context for operation execution to
   262  	// be shared by both delete operations.
   263  	if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
   264  		newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
   265  		// Redefine ctx to be the new timeout-derived context.
   266  		ctx = newCtx
   267  		// Cancel the timeout-derived context at the end of Execute to avoid a context leak.
   268  		defer cancelFunc()
   269  	}
   270  
   271  	// Delete document in files collection and then chunks to minimize race conditions.
   272  	res, err := b.filesColl.DeleteOne(ctx, bson.D{{"_id", fileID}})
   273  	if err == nil && res.DeletedCount == 0 {
   274  		err = ErrFileNotFound
   275  	}
   276  	if err != nil {
   277  		_ = b.deleteChunks(ctx, fileID) // Can attempt to delete chunks even if no docs in files collection matched.
   278  		return err
   279  	}
   280  
   281  	return b.deleteChunks(ctx, fileID)
   282  }
   283  
   284  // Find returns the files collection documents that match the given filter.
   285  //
   286  // If this download requires a custom read deadline to be set on the bucket, it cannot be done concurrently with other
   287  // read operations operations on this bucket that also require a custom deadline.
   288  //
   289  // Use SetReadDeadline to set a deadline for the find operation.
   290  func (b *Bucket) Find(filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
   291  	ctx, cancel := deadlineContext(b.readDeadline)
   292  	if cancel != nil {
   293  		defer cancel()
   294  	}
   295  
   296  	return b.FindContext(ctx, filter, opts...)
   297  }
   298  
   299  // FindContext returns the files collection documents that match the given filter and runs the underlying
   300  // find query with the provided context.
   301  //
   302  // Use the context parameter to time-out or cancel the find operation. The deadline set by SetReadDeadline
   303  // is ignored.
   304  func (b *Bucket) FindContext(ctx context.Context, filter interface{}, opts ...*options.GridFSFindOptions) (*mongo.Cursor, error) {
   305  	gfsOpts := options.MergeGridFSFindOptions(opts...)
   306  	find := options.Find()
   307  	if gfsOpts.AllowDiskUse != nil {
   308  		find.SetAllowDiskUse(*gfsOpts.AllowDiskUse)
   309  	}
   310  	if gfsOpts.BatchSize != nil {
   311  		find.SetBatchSize(*gfsOpts.BatchSize)
   312  	}
   313  	if gfsOpts.Limit != nil {
   314  		find.SetLimit(int64(*gfsOpts.Limit))
   315  	}
   316  	if gfsOpts.MaxTime != nil {
   317  		find.SetMaxTime(*gfsOpts.MaxTime)
   318  	}
   319  	if gfsOpts.NoCursorTimeout != nil {
   320  		find.SetNoCursorTimeout(*gfsOpts.NoCursorTimeout)
   321  	}
   322  	if gfsOpts.Skip != nil {
   323  		find.SetSkip(int64(*gfsOpts.Skip))
   324  	}
   325  	if gfsOpts.Sort != nil {
   326  		find.SetSort(gfsOpts.Sort)
   327  	}
   328  
   329  	return b.filesColl.Find(ctx, filter, find)
   330  }
   331  
   332  // Rename renames the stored file with the specified file ID.
   333  //
   334  // If this operation requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
   335  // write operations operations on this bucket that also require a custom deadline
   336  //
   337  // Use SetWriteDeadline to set a deadline for the rename operation.
   338  func (b *Bucket) Rename(fileID interface{}, newFilename string) error {
   339  	ctx, cancel := deadlineContext(b.writeDeadline)
   340  	if cancel != nil {
   341  		defer cancel()
   342  	}
   343  
   344  	return b.RenameContext(ctx, fileID, newFilename)
   345  }
   346  
   347  // RenameContext renames the stored file with the specified file ID and runs the underlying update with the provided
   348  // context.
   349  //
   350  // Use the context parameter to time-out or cancel the rename operation. The deadline set by SetWriteDeadline is ignored.
   351  func (b *Bucket) RenameContext(ctx context.Context, fileID interface{}, newFilename string) error {
   352  	res, err := b.filesColl.UpdateOne(ctx,
   353  		bson.D{{"_id", fileID}},
   354  		bson.D{{"$set", bson.D{{"filename", newFilename}}}},
   355  	)
   356  	if err != nil {
   357  		return err
   358  	}
   359  
   360  	if res.MatchedCount == 0 {
   361  		return ErrFileNotFound
   362  	}
   363  
   364  	return nil
   365  }
   366  
   367  // Drop drops the files and chunks collections associated with this bucket.
   368  //
   369  // If this operation requires a custom write deadline to be set on the bucket, it cannot be done concurrently with other
   370  // write operations operations on this bucket that also require a custom deadline
   371  //
   372  // Use SetWriteDeadline to set a deadline for the drop operation.
   373  func (b *Bucket) Drop() error {
   374  	ctx, cancel := deadlineContext(b.writeDeadline)
   375  	if cancel != nil {
   376  		defer cancel()
   377  	}
   378  
   379  	return b.DropContext(ctx)
   380  }
   381  
   382  // DropContext drops the files and chunks collections associated with this bucket and runs the drop operations with
   383  // the provided context.
   384  //
   385  // Use the context parameter to time-out or cancel the drop operation. The deadline set by SetWriteDeadline is ignored.
   386  func (b *Bucket) DropContext(ctx context.Context) error {
   387  	// If Timeout is set on the Client and context is not already a Timeout
   388  	// context, honor Timeout in new Timeout context for operation execution to
   389  	// be shared by both drop operations.
   390  	if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
   391  		newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
   392  		// Redefine ctx to be the new timeout-derived context.
   393  		ctx = newCtx
   394  		// Cancel the timeout-derived context at the end of Execute to avoid a context leak.
   395  		defer cancelFunc()
   396  	}
   397  
   398  	err := b.filesColl.Drop(ctx)
   399  	if err != nil {
   400  		return err
   401  	}
   402  
   403  	return b.chunksColl.Drop(ctx)
   404  }
   405  
   406  // GetFilesCollection returns a handle to the collection that stores the file documents for this bucket.
   407  func (b *Bucket) GetFilesCollection() *mongo.Collection {
   408  	return b.filesColl
   409  }
   410  
   411  // GetChunksCollection returns a handle to the collection that stores the file chunks for this bucket.
   412  func (b *Bucket) GetChunksCollection() *mongo.Collection {
   413  	return b.chunksColl
   414  }
   415  
   416  func (b *Bucket) openDownloadStream(filter interface{}, opts ...*options.FindOptions) (*DownloadStream, error) {
   417  	ctx, cancel := deadlineContext(b.readDeadline)
   418  	if cancel != nil {
   419  		defer cancel()
   420  	}
   421  
   422  	cursor, err := b.findFile(ctx, filter, opts...)
   423  	if err != nil {
   424  		return nil, err
   425  	}
   426  
   427  	// Unmarshal the data into a File instance, which can be passed to newDownloadStream. The _id value has to be
   428  	// parsed out separately because "_id" will not match the File.ID field and we want to avoid exposing BSON tags
   429  	// in the File type. After parsing it, use RawValue.Unmarshal to ensure File.ID is set to the appropriate value.
   430  	var foundFile File
   431  	if err = cursor.Decode(&foundFile); err != nil {
   432  		return nil, fmt.Errorf("error decoding files collection document: %w", err)
   433  	}
   434  
   435  	if foundFile.Length == 0 {
   436  		return newDownloadStream(nil, foundFile.ChunkSize, &foundFile), nil
   437  	}
   438  
   439  	// For a file with non-zero length, chunkSize must exist so we know what size to expect when downloading chunks.
   440  	if _, err := cursor.Current.LookupErr("chunkSize"); err != nil {
   441  		return nil, ErrMissingChunkSize
   442  	}
   443  
   444  	chunksCursor, err := b.findChunks(ctx, foundFile.ID)
   445  	if err != nil {
   446  		return nil, err
   447  	}
   448  	// The chunk size can be overridden for individual files, so the expected chunk size should be the "chunkSize"
   449  	// field from the files collection document, not the bucket's chunk size.
   450  	return newDownloadStream(chunksCursor, foundFile.ChunkSize, &foundFile), nil
   451  }
   452  
   453  func deadlineContext(deadline time.Time) (context.Context, context.CancelFunc) {
   454  	if deadline.Equal(time.Time{}) {
   455  		return context.Background(), nil
   456  	}
   457  
   458  	return context.WithDeadline(context.Background(), deadline)
   459  }
   460  
   461  func (b *Bucket) downloadToStream(ds *DownloadStream, stream io.Writer) (int64, error) {
   462  	err := ds.SetReadDeadline(b.readDeadline)
   463  	if err != nil {
   464  		_ = ds.Close()
   465  		return 0, err
   466  	}
   467  
   468  	copied, err := io.Copy(stream, ds)
   469  	if err != nil {
   470  		_ = ds.Close()
   471  		return 0, err
   472  	}
   473  
   474  	return copied, ds.Close()
   475  }
   476  
   477  func (b *Bucket) deleteChunks(ctx context.Context, fileID interface{}) error {
   478  	_, err := b.chunksColl.DeleteMany(ctx, bson.D{{"files_id", fileID}})
   479  	return err
   480  }
   481  
   482  func (b *Bucket) findFile(ctx context.Context, filter interface{}, opts ...*options.FindOptions) (*mongo.Cursor, error) {
   483  	cursor, err := b.filesColl.Find(ctx, filter, opts...)
   484  	if err != nil {
   485  		return nil, err
   486  	}
   487  
   488  	if !cursor.Next(ctx) {
   489  		_ = cursor.Close(ctx)
   490  		return nil, ErrFileNotFound
   491  	}
   492  
   493  	return cursor, nil
   494  }
   495  
   496  func (b *Bucket) findChunks(ctx context.Context, fileID interface{}) (*mongo.Cursor, error) {
   497  	chunksCursor, err := b.chunksColl.Find(ctx,
   498  		bson.D{{"files_id", fileID}},
   499  		options.Find().SetSort(bson.D{{"n", 1}})) // sort by chunk index
   500  	if err != nil {
   501  		return nil, err
   502  	}
   503  
   504  	return chunksCursor, nil
   505  }
   506  
   507  // returns true if the 2 index documents are equal
   508  func numericalIndexDocsEqual(expected, actual bsoncore.Document) (bool, error) {
   509  	if bytes.Equal(expected, actual) {
   510  		return true, nil
   511  	}
   512  
   513  	actualElems, err := actual.Elements()
   514  	if err != nil {
   515  		return false, err
   516  	}
   517  	expectedElems, err := expected.Elements()
   518  	if err != nil {
   519  		return false, err
   520  	}
   521  
   522  	if len(actualElems) != len(expectedElems) {
   523  		return false, nil
   524  	}
   525  
   526  	for idx, expectedElem := range expectedElems {
   527  		actualElem := actualElems[idx]
   528  		if actualElem.Key() != expectedElem.Key() {
   529  			return false, nil
   530  		}
   531  
   532  		actualVal := actualElem.Value()
   533  		expectedVal := expectedElem.Value()
   534  		actualInt, actualOK := actualVal.AsInt64OK()
   535  		expectedInt, expectedOK := expectedVal.AsInt64OK()
   536  
   537  		//GridFS indexes always have numeric values
   538  		if !actualOK || !expectedOK {
   539  			return false, nil
   540  		}
   541  
   542  		if actualInt != expectedInt {
   543  			return false, nil
   544  		}
   545  	}
   546  	return true, nil
   547  }
   548  
   549  // Create an index if it doesn't already exist
   550  func createNumericalIndexIfNotExists(ctx context.Context, iv mongo.IndexView, model mongo.IndexModel) error {
   551  	c, err := iv.List(ctx)
   552  	if err != nil {
   553  		return err
   554  	}
   555  	defer func() {
   556  		_ = c.Close(ctx)
   557  	}()
   558  
   559  	modelKeysBytes, err := bson.Marshal(model.Keys)
   560  	if err != nil {
   561  		return err
   562  	}
   563  	modelKeysDoc := bsoncore.Document(modelKeysBytes)
   564  
   565  	for c.Next(ctx) {
   566  		keyElem, err := c.Current.LookupErr("key")
   567  		if err != nil {
   568  			return err
   569  		}
   570  
   571  		keyElemDoc := keyElem.Document()
   572  
   573  		found, err := numericalIndexDocsEqual(modelKeysDoc, bsoncore.Document(keyElemDoc))
   574  		if err != nil {
   575  			return err
   576  		}
   577  		if found {
   578  			return nil
   579  		}
   580  	}
   581  
   582  	_, err = iv.CreateOne(ctx, model)
   583  	return err
   584  }
   585  
   586  // create indexes on the files and chunks collection if needed
   587  func (b *Bucket) createIndexes(ctx context.Context) error {
   588  	// must use primary read pref mode to check if files coll empty
   589  	cloned, err := b.filesColl.Clone(options.Collection().SetReadPreference(readpref.Primary()))
   590  	if err != nil {
   591  		return err
   592  	}
   593  
   594  	docRes := cloned.FindOne(ctx, bson.D{}, options.FindOne().SetProjection(bson.D{{"_id", 1}}))
   595  
   596  	_, err = docRes.Raw()
   597  	if !errors.Is(err, mongo.ErrNoDocuments) {
   598  		// nil, or error that occurred during the FindOne operation
   599  		return err
   600  	}
   601  
   602  	filesIv := b.filesColl.Indexes()
   603  	chunksIv := b.chunksColl.Indexes()
   604  
   605  	filesModel := mongo.IndexModel{
   606  		Keys: bson.D{
   607  			{"filename", int32(1)},
   608  			{"uploadDate", int32(1)},
   609  		},
   610  	}
   611  
   612  	chunksModel := mongo.IndexModel{
   613  		Keys: bson.D{
   614  			{"files_id", int32(1)},
   615  			{"n", int32(1)},
   616  		},
   617  		Options: options.Index().SetUnique(true),
   618  	}
   619  
   620  	if err = createNumericalIndexIfNotExists(ctx, filesIv, filesModel); err != nil {
   621  		return err
   622  	}
   623  	return createNumericalIndexIfNotExists(ctx, chunksIv, chunksModel)
   624  }
   625  
   626  func (b *Bucket) checkFirstWrite(ctx context.Context) error {
   627  	if !b.firstWriteDone {
   628  		// before the first write operation, must determine if files collection is empty
   629  		// if so, create indexes if they do not already exist
   630  
   631  		if err := b.createIndexes(ctx); err != nil {
   632  			return err
   633  		}
   634  		b.firstWriteDone = true
   635  	}
   636  
   637  	return nil
   638  }
   639  
   640  func (b *Bucket) parseUploadOptions(opts ...*options.UploadOptions) (*Upload, error) {
   641  	upload := &Upload{
   642  		chunkSize: b.chunkSize, // upload chunk size defaults to bucket's value
   643  	}
   644  
   645  	uo := options.MergeUploadOptions(opts...)
   646  	if uo.ChunkSizeBytes != nil {
   647  		upload.chunkSize = *uo.ChunkSizeBytes
   648  	}
   649  	if uo.Registry == nil {
   650  		uo.Registry = bson.DefaultRegistry
   651  	}
   652  	if uo.Metadata != nil {
   653  		// TODO(GODRIVER-2726): Replace with marshal() and unmarshal() once the
   654  		// TODO gridfs package is merged into the mongo package.
   655  		raw, err := bson.MarshalWithRegistry(uo.Registry, uo.Metadata)
   656  		if err != nil {
   657  			return nil, err
   658  		}
   659  		var doc bson.D
   660  		unMarErr := bson.UnmarshalWithRegistry(uo.Registry, raw, &doc)
   661  		if unMarErr != nil {
   662  			return nil, unMarErr
   663  		}
   664  		upload.metadata = doc
   665  	}
   666  
   667  	return upload, nil
   668  }
   669  

View as plain text